sydneyhoran opened a new issue, #8521:
URL: https://github.com/apache/hudi/issues/8521

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   
   **Describe the problem you faced**
   
   Hi there, I am curious on the compatibility of KafkaAvroSchemaDeserializer 
with PostgresDebeziumSource. Do these two options work together?
   
   Trying to follow [this 
article](https://hudi.apache.org/blog/2021/08/16/kafka-custom-deserializer/) to 
use Deltastreamer with Kafka topics where the schema is evolving. For some 
reason, I am unable to pass the config:
   
   ```
   
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
   ```
   
   I am also providing the schema provider class:
   ```
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider
   ```
   
   The job fails to start due to:
   ```
   Caused by: org.apache.hudi.exception.HoodieException: 
java.lang.IllegalArgumentException: Property 
hoodie.deltastreamer.source.kafka.value.deserializer.schema not found
   ```
   
   It appears that this line of code in 
[AvroKafkaSource.java](https://github.com/apache/hudi/blob/6082e9c9c46ee8da6aa779c2f009fc50f83467b7/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java#L53)
 changes the config to `.schema` instead of `.class`, and then the job is 
subsequently unable to find it in the configs.
   
   The job at least starts up with the default param (KafkaAvroDeserializer), 
but on certain topics I eventually get ArrayIndexOutOfBounds exception (`Caused 
by: java.lang.ArrayIndexOutOfBoundsException: 10`) which we believe is due to 
schema evolution mid-batch, so I'm trying to use the custom 
KafkaAvroSchemaDeserializer instead.
   
   We are using Deltastreamer with a PostgresDebeziumSource and consuming data 
from confluent Kafka, using Confluent Schema Registry as the Schema Provider. 
The Kafka consumer fails to construct when we pass the config 
`hoodie.deltastreamer.source.kafka.value.deserializer.class`. Without this 
config, it runs fine for a while and then all of a sudden fails with 
ArrayIndexOutOfBoundsException.
   
   Thanks for any input!
   
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Run Deltastreamer job with PostgresDebeziumSource using the following 
params:
   ``` 
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider
   --source-class 
org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource
   --payload-class 
org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
   <...>
   --hoodie-conf 
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
   ```
   2. Kafka consumer fails to construct
   
   
   **Expected behavior**
   
   The job should accept this config to be able to handle schema changes from 
Debezium -> Kafka gracefully.
   
   
   **Environment Description**
   
   * Hudi version : 0.13.0
   
   * Spark version : 3.1
   
   * Hive version : N/A
   
   * Hadoop version : N/A
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : both
   
   
   **Additional context**
   
   **hoodie configs:**
   ```
   --target-base-path s3a://{{ bucket }}/{{ table_path }}
   --target-table {{ table_name }}
   --continuous
   --props gs://path/to/tablename.properties
   --min-sync-interval-seconds 15
   --source-ordering-field updated_at
   --source-limit 5000
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider
   --table-type COPY_ON_WRITE
   --op UPSERT
   --source-class 
org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource
   --payload-class 
org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
   ```
   
   **tablename.properties**
   ```
   
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
   hoodie.deltastreamer.schemaprovider.registry.url={{ schema_url 
}}.confluent.cloud/subjects/{{ topic }}-value/versions/latest
   hoodie.deltastreamer.source.kafka.topic=some.topic
   hoodie.datasource.write.recordkey.field=id
   hoodie.datasource.write.partitionpath.field=inserted_at
   hoodie.datasource.write.precombine.field=updated_at
   schema.registry.url={{ schema_url }}
   schema.registry.basic.auth.user.info={{ schema_user }}:{{ schema_key }}
   sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username='{{ kafka_user }}' password='{{ kafka_key }}';
   bootstrap.servers={{ bootstrap_server }}
   hoodie.embed.timeline.server=false
   
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
   hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
   hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
   group.id=hudi-deltastreamer
   security.protocol=SASL_SSL
   sasl.mechanism=PLAIN
   basic.auth.credentials.source=USER_INFO
   heartbeat.interval.ms=5000
   session.timeout.ms=120000
   request.timeout.ms=900000
   retry.backoff.ms=500
   hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true
   max.rounds.without.new.data.to.shutdown=5
   hoodie.write.concurrency.mode=optimistic_concurrency_control
   
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
   hoodie.cleaner.policy.failed.writes=LAZY
   hoodie.client.heartbeat.interval_in_ms=120000
   hoodie.client.heartbeat.tolerable.misses=10
   hoodie.write.lock.client.wait_time_ms_between_retry=1000
   hoodie.write.lock.max_wait_time_ms_between_retry=1000
   hoodie.write.lock.wait_time_ms_between_retry=500
   hoodie.write.lock.wait_time_ms=5000
   hoodie.write.lock.client.num_retries=10
   hoodie.metadata.enable=false
   ```
   
   
   **Stacktrace**
   
   ```
   Exception in thread "main" 
org.apache.hudi.utilities.ingestion.HoodieIngestionException: Ingestion service 
was shut down with exception.
        at 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:66)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:212)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:575)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: Failed to construct kafka consumer
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at 
org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
        at 
org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:64)
        ... 15 more
   Caused by: org.apache.hudi.exception.HoodieException: Failed to construct 
kafka consumer
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:757)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
consumer
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
        at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:193)
        at 
org.apache.hudi.utilities.sources.debezium.DebeziumSource.fetchNextBatch(DebeziumSource.java:113)
        at 
org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43)
        at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
        at 
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:176)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:585)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:493)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:401)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:717)
        ... 4 more
   Caused by: org.apache.hudi.exception.HoodieException: 
java.lang.IllegalArgumentException: Property 
hoodie.deltastreamer.source.kafka.value.deserializer.schema not found
        at 
org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer.configure(KafkaAvroSchemaDeserializer.java:53)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:721)
        ... 15 more
   Caused by: java.lang.IllegalArgumentException: Property 
hoodie.deltastreamer.source.kafka.value.deserializer.schema not found
        at 
org.apache.hudi.common.config.TypedProperties.checkKey(TypedProperties.java:67)
        at 
org.apache.hudi.common.config.TypedProperties.getString(TypedProperties.java:72)
        at 
org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer.configure(KafkaAvroSchemaDeserializer.java:51)
        ... 16 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to