Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1542619896

   Certainly Apicurio's Avro serialization and more importantly **inferred 
schema** was inaccurate as detailed in [this 
comment](https://github.com/apache/hudi/issues/8519#issuecomment-1540961546). 
So I ended up leveraging **Confluent's Schema Registry (SR)** instead. 
Especially since Hudi's source code mentions in multiple places that it's very 
much entangled with Confluent's SR style APIs/responses.
   
   Their SR did the right thing and I verified that the `source` field is 
properly serialized and its schema/type is inferred accurately now. However, 
when trying to run the `DeltaStreamer` job, I'm still getting the **same** 
error:
   
   ```
   Exception in thread "main" org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:191)
           at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:186)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:553)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
           at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
           at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
           at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
           at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
           at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
           at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
           at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException
           at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
           at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
           at 
org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:189)
           ... 15 more
   Caused by: org.apache.hudi.exception.HoodieException
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:713)
           at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.NullPointerException
           at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:301)
           at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:679)
           ... 4 more
   ```
   
   The command I'm running is:
   
   ```
   spark-submit \
   --jars "opt/spark/jars/hudi-utilities-bundle.jar,<other_jars>" \
   --master spark://<SPARK_MASTER_URL>:<PORT> \
   --total-executor-cores 1 \
   --executor-memory 4g \
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
   --conf spark.hadoop.fs.s3a.connection.maximum=10000 \
   --conf spark.scheduler.mode=FAIR \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
opt/spark/jars/hudi-utilities-bundle.jar \
   --table-type COPY_ON_WRITE \
   --target-base-path s3a://path/to/directory \
   --target-table <TABLE_NAME> \
   --min-sync-interval-seconds 30 \
   --source-class 
org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \
   --payload-class 
org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \
   --source-ordering-field _event_lsn \
   --op UPSERT \
   --continuous \
   --source-limit 5000 \
   --hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SERVER>:9092 \
   --hoodie-conf schema.registry.url=http://<SCHEMA_REGISTRY_URL>:8081 \
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=http://<SCHEMA_REGISTRY_URL>:8081/subjects/<KAFKA_TOPIC>-value/versions/<VERSION_NO>
 \
   --hoodie-conf 
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
 \
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<KAFKA_TOPIC> \
   --hoodie-conf auto.offset.reset=earliest \
   --hoodie-conf 
hoodie.datasource.write.recordkey.field=<UPSTREAM_DB_PKEY_FIELD> \
   --hoodie-conf 
hoodie.datasource.write.partitionpath.field=<PARTITION_FIELD_NAME> \
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
   --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \
   --hoodie-conf hoodie.metadata.enable=true \
   --hoodie-conf hoodie.metadata.index.column.stats.enable=true \
   --hoodie-conf hoodie.parquet.small.file.limit=134217728
   ```
   
   ## What About Vanilla Kafka Consumption with DeltaStreamer
   
   This is most likely a [**Debezium <> 
DeltaStreamer**](https://hudi.apache.org/blog/2022/01/14/change-data-capture-with-debezium-and-apache-hudi/)
 specific issue. Because I tried to run a DeltaStreamer job to just consume the 
**same Kafka topic** and the run goes further along. And it only fails for a 
sensible and expected reason (the `UPSTREAM_DB_PKEY_FIELD` is NULL since I'm 
not longer using `PostgresDebeziumSource` so my Kafka events won't go through 
[this 
processing](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java#L50-L85)
 and my provided `recordKey.field` no longer exists under that particular 
post-processed name). Here's the command I used to run a vanilla Kafka 
ingestion DeltaStreamer:
   
   ```
   spark-submit \
   --jars "opt/spark/jars/hudi-utilities-bundle.jar,<other_jars>" \
   --master spark://<SPARK_MASTER_URL>:<PORT> \
   --total-executor-cores 1 \
   --executor-memory 4g \
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
   --conf spark.hadoop.fs.s3a.connection.maximum=10000 \
   --conf spark.scheduler.mode=FAIR \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
opt/spark/jars/hudi-utilities-bundle.jar \
   --table-type COPY_ON_WRITE \
   --target-base-path s3a://path/to/directory \
   --target-table <TABLE_NAME> \
   --min-sync-interval-seconds 30 \
   --source-ordering-field _event_lsn \
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
   --op BULK_INSERT \
   --continuous \
   --source-limit 5000 \
   --hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SERVER>:9092 \
   --hoodie-conf schema.registry.url=http://<SCHEMA_REGISTRY_URL>:8081 \
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=http://<SCHEMA_REGISTRY_URL>:8081/subjects/<KAFKA_TOPIC>-value/versions/<VERSION_NO>
 \
   --hoodie-conf 
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
 \
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<KAFKA_TOPIC> \
   --hoodie-conf auto.offset.reset=earliest \
   --hoodie-conf 
hoodie.datasource.write.recordkey.field=<UPSTREAM_DB_PKEY_FIELD> \
   --hoodie-conf 
hoodie.datasource.write.partitionpath.field=<PARTITION_FIELD_NAME> \
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
   --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \
   --hoodie-conf hoodie.metadata.enable=true \
   --hoodie-conf hoodie.metadata.index.column.stats.enable=true \
   --hoodie-conf hoodie.parquet.small.file.limit=134217728
   ```
   
   And NULL KEY ERROR I mentioned is:
   
   ```
   Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: 
"null" for field: "<UPSTREAM_DB_PKEY_FIELD>" cannot be null or empty.
           at 
org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:141)
           at 
org.apache.hudi.keygen.SimpleAvroKeyGenerator.getRecordKey(SimpleAvroKeyGenerator.java:50)
           at 
org.apache.hudi.keygen.SimpleKeyGenerator.getRecordKey(SimpleKeyGenerator.java:58)
           at 
org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
           at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$fetchFromSource$b741bfe4$1(DeltaSync.java:495)
           at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
           at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271)
           at scala.collection.Iterator.foreach(Iterator.scala:941)
           at scala.collection.Iterator.foreach$(Iterator.scala:941)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
           at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
           at 
scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
           at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
           at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
           at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
           at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
           at scala.collection.AbstractIterator.to(Iterator.scala:1429)
           at 
scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
           at 
scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
           at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
           at 
scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
           at 
scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
           at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
           at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449)
           at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:131)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
           ... 3 more
   ```
   
   Any thoughts/help would be highly appreciated :smile:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to