YousifS7 opened a new issue, #11635: URL: https://github.com/apache/hudi/issues/11635
Hello, We are using `org.apache.hudi.utilities.streamer.HoodieStreamer` class to extract data out of Kafka and write to Hudi table. The Kafka topic is populated via Debezium using SQL Server table. We are using EMR 7.1.0 to run Spark-Submit. **To Reproduce** Steps to reproduce the behavior: 1. Sync SQL Server table to Kafka via Debezium (AvroConverter) 2. Provision EMR 7.1.0 3. Run below Spark-Submit: > -- Spark-Submit ``` spark-submit --packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0 --conf spark.streaming.kafka.allowNonConsecutiveOffsets=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --class org.apache.hudi.utilities.streamer.HoodieStreamer s3://some_bucket/jars/hudi-utilities-slim-bundle.jar --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider --props s3://some_bucket/configs/some_table.properties --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --enable-sync --source-ordering-field ts_ms --target-base-path s3://some_bucket/some_folder/some_table --target-table dbo.some_table --table-type COPY_ON_WRITE --sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool --op BULK_INSERT --transformer-class org.apache.hudi.utilities.transform.SqlFileBasedTransformer ``` > -- Properties File ``` hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator hoodie.datasource.write.recordkey.field=hudi_key hoodie.datasource.write.partitionpath.field=partition_path hoodie.datasource.write.precombine.field=ts_ms hoodie.streamer.transformer.sql.file=s3://some_bucket/configs/some_table.sql hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/test.dbo.some_table-value/versions/latest schema.registry.url=http://localhost:8081 hoodie.streamer.source.kafka.topic=test.dbo.some_table bootstrap.servers=localhost:9092 auto.offset.reset=earliest ``` > -- SQL Transfomer File ``` CACHE TABLE dbz_filtered AS SELECT ts_ms, op, before, after FROM <SRC> WHERE op IN ('d', 'u', 'c', 'r'); CACHE TABLE dbz_events AS SELECT ts_ms, CASE WHEN op = 'd' THEN before ELSE after END AS source_fields, CASE WHEN op = 'd' THEN true ELSE false END AS is_deleted FROM dbz_filtered; CACHE TABLE dbz_fields AS SELECT ts_ms, source_fields.* FROM dbz_events; SELECT s.*, Concat(s.col1, s.col2) AS hudi_key, YEAR(FROM_UNIXTIME(s.col2 / 1000)) AS partition_path FROM dbz_fields s; ``` **Environment Description** * Hudi version : 0.15 * Spark version : 3.5.0 * Hive version : N/A * Hadoop version : N/A * Storage : S3 * Running on Docker? : No **Error Message** ``` org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "hudi_key" cannot be null or empty. ``` I have confirmed that the transformer SQL file is extracting the data correctly. Could you please verify whether the transformer runs before the Hudi writer and if it is supplying the writer with the final transformed data? If so, could you help determine why the 'hudi_key' field is not being passed correctly? I can confirm that the 'hudi_key' is generated by concatenating two non-null fields. Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
