YousifS7 opened a new issue, #11635:
URL: https://github.com/apache/hudi/issues/11635

   Hello,
   We are using `org.apache.hudi.utilities.streamer.HoodieStreamer` class to 
extract data out of Kafka and write to Hudi table. The Kafka topic is populated 
via Debezium using SQL Server table. We are using EMR 7.1.0 to run Spark-Submit.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Sync SQL Server table to Kafka via Debezium (AvroConverter)
   2. Provision EMR 7.1.0
   3. Run below Spark-Submit:
   
   > -- Spark-Submit
   ```
   spark-submit 
   --packages org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0 
   --conf spark.streaming.kafka.allowNonConsecutiveOffsets=true 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   --class org.apache.hudi.utilities.streamer.HoodieStreamer 
s3://some_bucket/jars/hudi-utilities-slim-bundle.jar 
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider 
   --props s3://some_bucket/configs/some_table.properties 
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
   --enable-sync 
   --source-ordering-field ts_ms 
   --target-base-path s3://some_bucket/some_folder/some_table 
   --target-table dbo.some_table 
   --table-type COPY_ON_WRITE 
   --sync-tool-classes org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool 
   --op BULK_INSERT 
   --transformer-class 
org.apache.hudi.utilities.transform.SqlFileBasedTransformer
   ```
   
   > -- Properties File
   ```
   
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
   hoodie.datasource.write.recordkey.field=hudi_key
   hoodie.datasource.write.partitionpath.field=partition_path
   hoodie.datasource.write.precombine.field=ts_ms
   hoodie.streamer.transformer.sql.file=s3://some_bucket/configs/some_table.sql
   
hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/test.dbo.some_table-value/versions/latest
   schema.registry.url=http://localhost:8081
   hoodie.streamer.source.kafka.topic=test.dbo.some_table
   bootstrap.servers=localhost:9092
   auto.offset.reset=earliest
   ```
   
   > -- SQL Transfomer File
   ```
   CACHE TABLE dbz_filtered AS
   SELECT ts_ms, op, before, after FROM <SRC> WHERE op IN ('d', 'u', 'c', 'r');
   
   CACHE TABLE dbz_events AS
   SELECT ts_ms, CASE WHEN op = 'd' THEN before ELSE after END AS 
source_fields, CASE WHEN op = 'd' THEN true ELSE false END AS is_deleted FROM 
dbz_filtered;
   
   CACHE TABLE dbz_fields AS
   SELECT ts_ms, source_fields.* FROM dbz_events;
   
   SELECT s.*, Concat(s.col1, s.col2) AS hudi_key, YEAR(FROM_UNIXTIME(s.col2 / 
1000)) AS partition_path FROM dbz_fields s;
   ```
   
   **Environment Description**
   
   * Hudi version : 0.15
   
   * Spark version : 3.5.0
   
   * Hive version : N/A
   
   * Hadoop version : N/A
   
   * Storage : S3
   
   * Running on Docker? : No
   
   **Error Message**
   
   ```
    org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for 
field: "hudi_key" cannot be null or empty.
   ```
   
   I have confirmed that the transformer SQL file is extracting the data 
correctly. Could you please verify whether the transformer runs before the Hudi 
writer and if it is supplying the writer with the final transformed data? If 
so, could you help determine why the 'hudi_key' field is not being passed 
correctly? I can confirm that the 'hudi_key' is generated by concatenating two 
non-null fields.
   
   Thank you
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to