YousifS7 commented on issue #11635:
URL: https://github.com/apache/hudi/issues/11635#issuecomment-2232340343
Following up on this, I was able to run the spark app without errors,
however, even when specifying columns that are in the 'before' and 'after'
Struct type extracted from Kafka, only the following columns were written to
the Hudi table:
```
ts_ms, op, before, after, source
```
> Transformer File Query
```
CACHE TABLE dbz_filtered AS
SELECT a.ts_ms, a.op, a.before, a.after, a.source FROM <SRC> a WHERE a.op IN
('d', 'u', 'c', 'r');
SELECT b.before.col1 AS before_col1, b.after.col1 AS after_col1, b.ts_ms,
b.op, b.before, b.after, b.source FROM dbz_filtered b;
```
It seems the additional columns extracted from the envelopes 'before' and
'after' are being ignored for some reason. I even tried to specify a static
columns like: 'test' AS test_columns, it was not written on the Hudi tables.
> Properties File
```
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.datasource.write.recordkey.field=op
hoodie.datasource.write.partitionpath.field=op
hoodie.datasource.write.precombine.field=ts_ms
hoodie.streamer.transformer.sql.file=s3://some_bucket/configs/some_table.sql
hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/test.dbo.some_table-value/versions/latest
schema.registry.url=http://localhost:8081
hoodie.streamer.source.kafka.topic=test.dbo.some_table
bootstrap.servers=localhost:9092
auto.offset.reset=earliest
```
> Spark Submit
```
spark-submit
--packages
org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0,org.apache.hudi:hudi-aws-bundle:0.15.0
--conf spark.streaming.kafka.allowNonConsecutiveOffsets=true
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--class org.apache.hudi.utilities.streamer.HoodieStreamer
s3://some_bucket/jars/hudi-utilities-slim-bundle.jar
--schemaprovider-class
org.apache.hudi.utilities.schema.SchemaRegistryProvider
--props s3://some_bucket/configs/some_table.properties
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource
--enable-sync
--source-ordering-field ts_ms
--target-base-path s3://some_bucket/some_folder/some_table
--target-table dbo.some_table
--table-type COPY_ON_WRITE
--sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool
--op BULK_INSERT
--transformer-class
org.apache.hudi.utilities.transform.SqlFileBasedTransformer
```
Is there something else that I might be missing?
Thank you
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]