sydneyhoran opened a new issue, #8521: URL: https://github.com/apache/hudi/issues/8521
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** Hi there, I am curious on the compatibility of KafkaAvroSchemaDeserializer with PostgresDebeziumSource. Do these two options work together? Trying to follow [this article](https://hudi.apache.org/blog/2021/08/16/kafka-custom-deserializer/) to use Deltastreamer with Kafka topics where the schema is evolving. For some reason, I am unable to pass the config: ``` hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer ``` I am also providing the schema provider class: ``` --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider ``` The job fails to start due to: ``` Caused by: org.apache.hudi.exception.HoodieException: java.lang.IllegalArgumentException: Property hoodie.deltastreamer.source.kafka.value.deserializer.schema not found ``` It appears that this line of code in [AvroKafkaSource.java](https://github.com/apache/hudi/blob/6082e9c9c46ee8da6aa779c2f009fc50f83467b7/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java#L53) changes the config to `.schema` instead of `.class`, and then the job is subsequently unable to find it in the configs. The job at least starts up with the default param (KafkaAvroDeserializer), but on certain topics I eventually get ArrayIndexOutOfBounds exception (`Caused by: java.lang.ArrayIndexOutOfBoundsException: 10`) which we believe is due to schema evolution mid-batch, so I'm trying to use the custom KafkaAvroSchemaDeserializer instead. We are using Deltastreamer with a PostgresDebeziumSource and consuming data from confluent Kafka, using Confluent Schema Registry as the Schema Provider. The Kafka consumer fails to construct when we pass the config `hoodie.deltastreamer.source.kafka.value.deserializer.class`. Without this config, it runs fine for a while and then all of a sudden fails with ArrayIndexOutOfBoundsException. Thanks for any input! **To Reproduce** Steps to reproduce the behavior: 1. Run Deltastreamer job with PostgresDebeziumSource using the following params: ``` --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload <...> --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer ``` 2. Kafka consumer fails to construct **Expected behavior** The job should accept this config to be able to handle schema changes from Debezium -> Kafka gracefully. **Environment Description** * Hudi version : 0.13.0 * Spark version : 3.1 * Hive version : N/A * Hadoop version : N/A * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : both **Additional context** **hoodie configs:** ``` --target-base-path s3a://{{ bucket }}/{{ table_path }} --target-table {{ table_name }} --continuous --props gs://path/to/tablename.properties --min-sync-interval-seconds 15 --source-ordering-field updated_at --source-limit 5000 --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider --table-type COPY_ON_WRITE --op UPSERT --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload ``` **tablename.properties** ``` hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer hoodie.deltastreamer.schemaprovider.registry.url={{ schema_url }}.confluent.cloud/subjects/{{ topic }}-value/versions/latest hoodie.deltastreamer.source.kafka.topic=some.topic hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field=inserted_at hoodie.datasource.write.precombine.field=updated_at schema.registry.url={{ schema_url }} schema.registry.basic.auth.user.info={{ schema_user }}:{{ schema_key }} sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ kafka_user }}' password='{{ kafka_key }}'; bootstrap.servers={{ bootstrap_server }} hoodie.embed.timeline.server=false hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd group.id=hudi-deltastreamer security.protocol=SASL_SSL sasl.mechanism=PLAIN basic.auth.credentials.source=USER_INFO heartbeat.interval.ms=5000 session.timeout.ms=120000 request.timeout.ms=900000 retry.backoff.ms=500 hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true max.rounds.without.new.data.to.shutdown=5 hoodie.write.concurrency.mode=optimistic_concurrency_control hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider hoodie.cleaner.policy.failed.writes=LAZY hoodie.client.heartbeat.interval_in_ms=120000 hoodie.client.heartbeat.tolerable.misses=10 hoodie.write.lock.client.wait_time_ms_between_retry=1000 hoodie.write.lock.max_wait_time_ms_between_retry=1000 hoodie.write.lock.wait_time_ms_between_retry=500 hoodie.write.lock.wait_time_ms=5000 hoodie.write.lock.client.num_retries=10 hoodie.metadata.enable=false ``` **Stacktrace** ``` Exception in thread "main" org.apache.hudi.utilities.ingestion.HoodieIngestionException: Ingestion service was shut down with exception. at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:66) at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:212) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:575) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: Failed to construct kafka consumer at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103) at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:64) ... 15 more Caused by: org.apache.hudi.exception.HoodieException: Failed to construct kafka consumer at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:757) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612) at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:193) at org.apache.hudi.utilities.sources.debezium.DebeziumSource.fetchNextBatch(DebeziumSource.java:113) at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43) at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76) at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:176) at org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:585) at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:493) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:401) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:717) ... 4 more Caused by: org.apache.hudi.exception.HoodieException: java.lang.IllegalArgumentException: Property hoodie.deltastreamer.source.kafka.value.deserializer.schema not found at org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer.configure(KafkaAvroSchemaDeserializer.java:53) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:721) ... 15 more Caused by: java.lang.IllegalArgumentException: Property hoodie.deltastreamer.source.kafka.value.deserializer.schema not found at org.apache.hudi.common.config.TypedProperties.checkKey(TypedProperties.java:67) at org.apache.hudi.common.config.TypedProperties.getString(TypedProperties.java:72) at org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer.configure(KafkaAvroSchemaDeserializer.java:51) ... 16 more ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
