bkosuru opened a new issue, #5824:
URL: https://github.com/apache/hudi/issues/5824

   Hello,
   
   HoodieDeltaStreamer does not work with AvroKafkaSource and 
FilebasedSchemaProvider.  It still expects schema.registry.url
   Hudi version 0.10.1
   
   **To Reproduce**
   Kafka-source.properties
   # Key fields, for kafka example
   hoodie.datasource.write.recordkey.field=s
   hoodie.datasource.write.partitionpath.field=g
   # schema provider configs
   # Kafka Source
   hoodie.deltastreamer.source.kafka.topic=recall-test
   #Kafka props
   bootstrap.servers=x.corp.com:3207
   auto.offset.reset=earliest
   hoodie.deltastreamer.schemaprovider.source.schema.file=/user/bgk/schema.avsc
   hoodie.deltastreamer.schemaprovider.target.schema.file=/user/bgk/schema.avsc
    
   spark-submit \
   --master yarn \
   --packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.10.1,org.apache.spark:spark-avro_2.11:2.4.0
 \
   --deploy-mode cluster \
   --name triple-ingestion \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
   hudi-utilities-bundle_2.11-0.10.1.jar \
   --props kafka-source.properties \
   --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
 \
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
   --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
   --source-ordering-field s \
   --target-base-path /data/recall-kafka \
   --target-table spog \
   --table-type COPY_ON_WRITE \
   --op BULK_INSERT
   
   schema.avsc
   
{"type":"record","name":"topLevelRecord","fields":[{"name":"s","type":["string","null"]},{"name":"p","type":["string","null"]},{"name":"o","type":["string","null"]},{"name":"g","type":["string","null"]},{"name":"isDeleted","type":"int"}]}
   
   **Stacktrace**
   org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
          at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)
          at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
          at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:605)
          at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:229)
          at 
org.apache.hudi.utilities.sources.AvroKafkaSource.fetchNewData(AvroKafkaSource.java:92)
          at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
          at 
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:61)
          at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:425)
          at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:290)
          at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:193)
          at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
          at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:191)
          at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:514)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)
   Caused by: io.confluent.common.config.ConfigException: Missing required 
configuration "schema.registry.url" which has no default value.
          at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:251)
          at 
io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
          at 
io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:136)
          at 
io.confluent.kafka.serializers.KafkaAvroDeserializerConfig.<init>(KafkaAvroDeserializerConfig.java:42)
          at 
io.confluent.kafka.serializers.KafkaAvroDeserializer.configure(KafkaAvroDeserializer.java:50)
          at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:713)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to