YousifS7 commented on issue #11635:
URL: https://github.com/apache/hudi/issues/11635#issuecomment-2232340343

   Following up on this, I was able to run the spark app without errors, 
however, even when specifying columns that are in the 'before' and 'after' 
Struct type extracted from Kafka, only the following columns were written to 
the Hudi table:
   ```
   ts_ms, op, before, after, source
   ```
   
   > Transformer File Query
   ```
   CACHE TABLE dbz_filtered AS
   SELECT a.ts_ms, a.op, a.before, a.after, a.source FROM <SRC> a WHERE a.op IN 
('d', 'u', 'c', 'r');
   
   SELECT b.before.col1 AS before_col1, b.after.col1 AS after_col1, b.ts_ms, 
b.op, b.before, b.after, b.source FROM dbz_filtered b;
   ```
   
   It seems the additional columns extracted from the envelopes 'before' and 
'after' are being ignored for some reason. I even tried to specify a static 
columns like: 'test' AS test_columns, it was not written on the Hudi tables.
   
   > Properties File
   ```
   
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
   hoodie.datasource.write.recordkey.field=op
   hoodie.datasource.write.partitionpath.field=op
   hoodie.datasource.write.precombine.field=ts_ms
   hoodie.streamer.transformer.sql.file=s3://some_bucket/configs/some_table.sql
   
hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/test.dbo.some_table-value/versions/latest
   schema.registry.url=http://localhost:8081
   hoodie.streamer.source.kafka.topic=test.dbo.some_table
   bootstrap.servers=localhost:9092
   auto.offset.reset=earliest
   ```
   
   > Spark Submit
   ```
   spark-submit 
   --packages 
org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0,org.apache.hudi:hudi-aws-bundle:0.15.0
   --conf spark.streaming.kafka.allowNonConsecutiveOffsets=true 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   --class org.apache.hudi.utilities.streamer.HoodieStreamer 
s3://some_bucket/jars/hudi-utilities-slim-bundle.jar 
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider 
   --props s3://some_bucket/configs/some_table.properties 
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
   --enable-sync 
   --source-ordering-field ts_ms 
   --target-base-path s3://some_bucket/some_folder/some_table 
   --target-table dbo.some_table 
   --table-type COPY_ON_WRITE 
   --sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool 
   --op BULK_INSERT 
   --transformer-class 
org.apache.hudi.utilities.transform.SqlFileBasedTransformer
   ```
   
   Is there something else that I might be missing?
   
   Thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to