bkosuru opened a new issue, #5824:
URL: https://github.com/apache/hudi/issues/5824
Hello,
HoodieDeltaStreamer does not work with AvroKafkaSource and
FilebasedSchemaProvider. It still expects schema.registry.url
Hudi version 0.10.1
**To Reproduce**
Kafka-source.properties
# Key fields, for kafka example
hoodie.datasource.write.recordkey.field=s
hoodie.datasource.write.partitionpath.field=g
# schema provider configs
# Kafka Source
hoodie.deltastreamer.source.kafka.topic=recall-test
#Kafka props
bootstrap.servers=x.corp.com:3207
auto.offset.reset=earliest
hoodie.deltastreamer.schemaprovider.source.schema.file=/user/bgk/schema.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=/user/bgk/schema.avsc
spark-submit \
--master yarn \
--packages
org.apache.hudi:hudi-spark-bundle_2.11:0.10.1,org.apache.spark:spark-avro_2.11:2.4.0
\
--deploy-mode cluster \
--name triple-ingestion \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
hudi-utilities-bundle_2.11-0.10.1.jar \
--props kafka-source.properties \
--hoodie-conf
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
\
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--schemaprovider-class
org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--source-ordering-field s \
--target-base-path /data/recall-kafka \
--target-table spog \
--table-type COPY_ON_WRITE \
--op BULK_INSERT
schema.avsc
{"type":"record","name":"topLevelRecord","fields":[{"name":"s","type":["string","null"]},{"name":"p","type":["string","null"]},{"name":"o","type":["string","null"]},{"name":"g","type":["string","null"]},{"name":"isDeleted","type":"int"}]}
**Stacktrace**
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:605)
at
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:229)
at
org.apache.hudi.utilities.sources.AvroKafkaSource.fetchNewData(AvroKafkaSource.java:92)
at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
at
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:61)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:425)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:290)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:193)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:191)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:514)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)
Caused by: io.confluent.common.config.ConfigException: Missing required
configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:251)
at
io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
at
io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:136)
at
io.confluent.kafka.serializers.KafkaAvroDeserializerConfig.<init>(KafkaAvroDeserializerConfig.java:42)
at
io.confluent.kafka.serializers.KafkaAvroDeserializer.configure(KafkaAvroDeserializer.java:50)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:713)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]