Sam-Serpoosh commented on issue #8519: URL: https://github.com/apache/hudi/issues/8519#issuecomment-1542619896
Certainly Apicurio's Avro serialization and more importantly **inferred schema** was inaccurate as detailed in [this comment](https://github.com/apache/hudi/issues/8519#issuecomment-1540961546). So I ended up leveraging **Confluent's Schema Registry (SR)** instead. Especially since Hudi's source code mentions in multiple places that it's very much entangled with Confluent's SR style APIs/responses. Their SR did the right thing and I verified that the `source` field is properly serialized and its schema/type is inferred accurately now. However, when trying to run the `DeltaStreamer` job, I'm still getting the **same** error: ``` Exception in thread "main" org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:191) at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:186) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:553) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:189) ... 15 more Caused by: org.apache.hudi.exception.HoodieException at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:713) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:301) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:679) ... 4 more ``` The command I'm running is: ``` spark-submit \ --jars "opt/spark/jars/hudi-utilities-bundle.jar,<other_jars>" \ --master spark://<SPARK_MASTER_URL>:<PORT> \ --total-executor-cores 1 \ --executor-memory 4g \ --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ --conf spark.hadoop.fs.s3a.connection.maximum=10000 \ --conf spark.scheduler.mode=FAIR \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer opt/spark/jars/hudi-utilities-bundle.jar \ --table-type COPY_ON_WRITE \ --target-base-path s3a://path/to/directory \ --target-table <TABLE_NAME> \ --min-sync-interval-seconds 30 \ --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \ --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \ --source-ordering-field _event_lsn \ --op UPSERT \ --continuous \ --source-limit 5000 \ --hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SERVER>:9092 \ --hoodie-conf schema.registry.url=http://<SCHEMA_REGISTRY_URL>:8081 \ --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://<SCHEMA_REGISTRY_URL>:8081/subjects/<KAFKA_TOPIC>-value/versions/<VERSION_NO> \ --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer \ --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<KAFKA_TOPIC> \ --hoodie-conf auto.offset.reset=earliest \ --hoodie-conf hoodie.datasource.write.recordkey.field=<UPSTREAM_DB_PKEY_FIELD> \ --hoodie-conf hoodie.datasource.write.partitionpath.field=<PARTITION_FIELD_NAME> \ --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \ --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \ --hoodie-conf hoodie.metadata.enable=true \ --hoodie-conf hoodie.metadata.index.column.stats.enable=true \ --hoodie-conf hoodie.parquet.small.file.limit=134217728 ``` ## What About Vanilla Kafka Consumption with DeltaStreamer This is most likely a [**Debezium <> DeltaStreamer**](https://hudi.apache.org/blog/2022/01/14/change-data-capture-with-debezium-and-apache-hudi/) specific issue. Because I tried to run a DeltaStreamer job to just consume the **same Kafka topic** and the run goes further along. And it only fails for a sensible and expected reason (the `UPSTREAM_DB_PKEY_FIELD` is NULL since I'm not longer using `PostgresDebeziumSource` so my Kafka events won't go through [this processing](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java#L50-L85) and my provided `recordKey.field` no longer exists under that particular post-processed name). Here's the command I used to run a vanilla Kafka ingestion DeltaStreamer: ``` spark-submit \ --jars "opt/spark/jars/hudi-utilities-bundle.jar,<other_jars>" \ --master spark://<SPARK_MASTER_URL>:<PORT> \ --total-executor-cores 1 \ --executor-memory 4g \ --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ --conf spark.hadoop.fs.s3a.connection.maximum=10000 \ --conf spark.scheduler.mode=FAIR \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer opt/spark/jars/hudi-utilities-bundle.jar \ --table-type COPY_ON_WRITE \ --target-base-path s3a://path/to/directory \ --target-table <TABLE_NAME> \ --min-sync-interval-seconds 30 \ --source-ordering-field _event_lsn \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --op BULK_INSERT \ --continuous \ --source-limit 5000 \ --hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SERVER>:9092 \ --hoodie-conf schema.registry.url=http://<SCHEMA_REGISTRY_URL>:8081 \ --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://<SCHEMA_REGISTRY_URL>:8081/subjects/<KAFKA_TOPIC>-value/versions/<VERSION_NO> \ --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer \ --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<KAFKA_TOPIC> \ --hoodie-conf auto.offset.reset=earliest \ --hoodie-conf hoodie.datasource.write.recordkey.field=<UPSTREAM_DB_PKEY_FIELD> \ --hoodie-conf hoodie.datasource.write.partitionpath.field=<PARTITION_FIELD_NAME> \ --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \ --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \ --hoodie-conf hoodie.metadata.enable=true \ --hoodie-conf hoodie.metadata.index.column.stats.enable=true \ --hoodie-conf hoodie.parquet.small.file.limit=134217728 ``` And NULL KEY ERROR I mentioned is: ``` Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "<UPSTREAM_DB_PKEY_FIELD>" cannot be null or empty. at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:141) at org.apache.hudi.keygen.SimpleAvroKeyGenerator.getRecordKey(SimpleAvroKeyGenerator.java:50) at org.apache.hudi.keygen.SimpleKeyGenerator.getRecordKey(SimpleKeyGenerator.java:58) at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65) at org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$fetchFromSource$b741bfe4$1(DeltaSync.java:495) at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) at scala.collection.AbstractIterator.to(Iterator.scala:1429) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) at scala.collection.AbstractIterator.toArray(Iterator.scala:1429) at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) ... 3 more ``` Any thoughts/help would be highly appreciated :smile: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
