When executing a HoodieDeltaStreamer Job, We run into this below exception. I 
see hudi-utilities-bundle packaged with kafka-0.8 client libs, but believe it 
should be compatible with the MSK version of Kafka. Any pointers what the issue 
could be?



Spark - 2.2.1

Kafka – MSK 2.1.0



19/09/30 19:10:56 main INFO SimpleConsumer: Reconnect due to error:

java.io.EOFException

       at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)

       at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)

       at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)

       at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86)

       at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)

       at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:111)

       at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:133)

       at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:132)

       at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:365)

       at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:361)

       at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

       at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)

       at 
org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:361)

       at 
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:132)

       at 
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:119)

       at 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:196)

       at 
org.apache.hudi.utilities.sources.AvroKafkaSource.fetchNewData(AvroKafkaSource.java:55)

       at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:72)

       at 
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:62)

       at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:299)

       at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:218)

       at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:123)

       at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294)

       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

       at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

       at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

       at java.lang.reflect.Method.invoke(Method.java:498)

       at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:926)

       at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:204)

       at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:229)

       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127)

       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

19/09/30 19:10:57 main INFO KafkaOffsetGen: HUDI -- getNextOffsetRanges -- 
topic name is KAFKA_TEST



Thanks,

Gautam

Reply via email to