samserpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1542618638

   Certainly Apicurio's Avro serialization and more importantly **inferred 
schema** was inaccurate as detailed in [this 
comment](https://github.com/apache/hudi/issues/8519#issuecomment-1540961546). 
So I ended up leveraging **Confluent's Schema Registry (SR)** instead. 
Especially since Hudi's source code mentions in multiple places that it's very 
much entangled with Confluent's SR style APIs/responses.
   
   Their SR did the right thing and I verified that the `source` field is 
properly serialized and its schema/type is inferred accurately now. However, 
when trying to run the `DeltaStreamer` job, I'm still getting the **same** 
error:
   
   ```
   Exception in thread "main" org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:191)
           at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:186)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:553)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
           at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
           at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
           at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
           at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
           at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
           at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
           at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException
           at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
           at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
           at 
org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:189)
           ... 15 more
   Caused by: org.apache.hudi.exception.HoodieException
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:713)
           at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.NullPointerException
           at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:301)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:679)
           ... 4 more
   ```
   
   The command I'm running is:
   
   ```
   spark-submit \                                                               
                                                                                
                                                                                
                 
   --jars "opt/spark/jars/hudi-utilities-bundle.jar,<other_jars>" \             
                                                                                
                                                                                
                 
   --master spark://<SPARK_MASTER_URL>:<PORT> \                                 
                                                                                
                                                                                
                 
   --total-executor-cores 1 \                                                   
                                                                                
                                                                                
                 
   --executor-memory 4g \                                                       
                                                                                
                                                                                
                 
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \     
                                                                                
                                                                                
                 
   --conf spark.hadoop.fs.s3a.connection.maximum=10000 \                        
                                                                                
                                                                                
                 
   --conf spark.scheduler.mode=FAIR \                                           
                                                                                
                                                                                
                 
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
opt/spark/jars/hudi-utilities-bundle.jar \                                      
                                                                                
                          
   --table-type COPY_ON_WRITE \                                                 
                                                                                
                                                                                
                 
   --target-base-path s3a://path/to/directory \                                 
                                                                                
                                                                                
                 
   --target-table <TABLE_NAME> \                                                
                                                                                
                                                                                
                 
   --min-sync-interval-seconds 30 \                                             
                                                                                
                                                                                
                 
   --source-class 
org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \             
                                                                                
                                                                               
   --payload-class 
org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \             
                                                                                
                                                                              
   --source-ordering-field _event_lsn \                                         
                                                                                
                                                                                
                 
   --op UPSERT \                                                                
                                                                                
                                                                                
                 
   --continuous \                                                               
                                                                                
                                                                                
                 
   --source-limit 5000 \                                                        
                                                                                
                                                                                
                 
   --hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SERVER>:9092 \              
                                                                                
                                                                                
                 
   --hoodie-conf schema.registry.url=http://<SCHEMA_REGISTRY_URL>:8081 \        
                                                                                
                                                                                
                 
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=http://<SCHEMA_REGISTRY_URL>:8081/subjects/<KAFKA_TOPIC>-value/versions/<VERSION_NO>
 \                                                                              
                           
   --hoodie-conf 
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
 \                                                                              
                                          
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<KAFKA_TOPIC> \        
                                                                                
                                                                                
                 
   --hoodie-conf auto.offset.reset=earliest \                                   
                                                                                
                                                                                
                 
   --hoodie-conf 
hoodie.datasource.write.recordkey.field=<UPSTREAM_DB_PKEY_FIELD> \              
                                                                                
                                                                                
   --hoodie-conf 
hoodie.datasource.write.partitionpath.field=<PARTITION_FIELD_NAME> \            
                                                                                
                                                                                
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \         
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \          
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.metadata.enable=true \                                  
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.metadata.index.column.stats.enable=true \               
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.parquet.small.file.limit=134217728
   ```
   
   ## What About Vanilla Kafka Consumption with DeltaStreamer
   
   This is most likely a [**Debezium <> 
DeltaStreamer**](https://hudi.apache.org/blog/2022/01/14/change-data-capture-with-debezium-and-apache-hudi/)
 specific issue. Because I tried to run a DeltaStreamer job to just consume the 
**same Kafka topic** and the run goes further along. And it only fails for a 
sensible and expected reason (the `UPSTREAM_DB_PKEY_FIELD` is NULL since I'm 
not longer using `PostgresDebeziumSource` so my Kafka events won't go through 
[this 
processing](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java#L50-L85)
 and my provided `recordKey.field` no longer exists under that particular 
post-processed name). Here's the command I used to run a vanilla Kafka 
ingestion DeltaStreamer:
   
   ```
   spark-submit \                                                               
                                                                                
                                                                                
                 
   --jars "opt/spark/jars/hudi-utilities-bundle.jar,<other_jars>" \             
                                                                                
                                                                                
                 
   --master spark://<SPARK_MASTER_URL>:<PORT> \                                 
                                                                                
                                                                                
                 
   --total-executor-cores 1 \                                                   
                                                                                
                                                                                
                 
   --executor-memory 4g \                                                       
                                                                                
                                                                                
                 
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \     
                                                                                
                                                                                
                 
   --conf spark.hadoop.fs.s3a.connection.maximum=10000 \                        
                                                                                
                                                                                
                 
   --conf spark.scheduler.mode=FAIR \                                           
                                                                                
                                                                                
                 
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
opt/spark/jars/hudi-utilities-bundle.jar \                                      
                                                                                
                          
   --table-type COPY_ON_WRITE \                                                 
                                                                                
                                                                                
                 
   --target-base-path s3a://path/to/directory \                                 
                                                                                
                                                                                
                 
   --target-table <TABLE_NAME> \                                                
                                                                                
                                                                                
                 
   --min-sync-interval-seconds 30 \                                             
                                                                                
                                                                                
                 
   --source-ordering-field _event_lsn \                                         
                                                                                
                                                                                
                 
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \                       
                                                                                
                                                                       
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \           
                                                                                
                                                                                
                 
   --op BULK_INSERT \                                                           
                                                                                
                                                                                
                 
   --continuous \                                                               
                                                                                
                                                                                
                 
   --source-limit 5000 \                                                        
                                                                                
                                                                                
                 
   --hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SERVER>:9092 \              
                                                                                
                                                                                
                 
   --hoodie-conf schema.registry.url=http://<SCHEMA_REGISTRY_URL>:8081 \        
                                                                                
                                                                                
                 
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=http://<SCHEMA_REGISTRY_URL>:8081/subjects/<KAFKA_TOPIC>-value/versions/<VERSION_NO>
 \                                                                              
                           
   --hoodie-conf 
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
 \                                                                              
                                          
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<KAFKA_TOPIC> \        
                                                                                
                                                                                
                 
   --hoodie-conf auto.offset.reset=earliest \                                   
                                                                                
                                                                                
                 
   --hoodie-conf 
hoodie.datasource.write.recordkey.field=<UPSTREAM_DB_PKEY_FIELD> \              
                                                                                
                                                                                
   --hoodie-conf 
hoodie.datasource.write.partitionpath.field=<PARTITION_FIELD_NAME> \            
                                                                                
                                                                                
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \         
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \          
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.metadata.enable=true \                                  
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.metadata.index.column.stats.enable=true \               
                                                                                
                                                                                
                 
   --hoodie-conf hoodie.parquet.small.file.limit=134217728
   ```
   
   And NULL KEY ERROR I mentioned is:
   
   ```
   Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: 
"null" for field: "<UPSTREAM_DB_PKEY_FIELD>" cannot be null or empty.
           at 
org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:141)
           at 
org.apache.hudi.keygen.SimpleAvroKeyGenerator.getRecordKey(SimpleAvroKeyGenerator.java:50)
           at 
org.apache.hudi.keygen.SimpleKeyGenerator.getRecordKey(SimpleKeyGenerator.java:58)
           at 
org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
           at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$fetchFromSource$b741bfe4$1(DeltaSync.java:495)
           at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
           at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271)
           at scala.collection.Iterator.foreach(Iterator.scala:941)
           at scala.collection.Iterator.foreach$(Iterator.scala:941)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
           at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
           at 
scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
           at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
           at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
           at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
           at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
           at scala.collection.AbstractIterator.to(Iterator.scala:1429)
           at 
scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
           at 
scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
           at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
           at 
scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
           at 
scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
           at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
           at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449)
           at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:131)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
           ... 3 more
   ```
   
   Any thoughts/help would be highly appreciated :smile: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to