NetsanetGeb edited a comment on issue #544: Improve HiveSyncTool handling of empty commit timeline URL: https://github.com/apache/incubator-hudi/issues/544#issuecomment-474949396 I published a json file to kafka and run the hoodie delta streamer as a spark job with kafka as main data source. I set the kafka properties as following: ``` **include=base.properties hoodie.datasource.write.recordkey.field=key hoodie.datasource.write.partitionpath.field=date hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs:///Projects/Hudi/Resources/source.avsc hoodie.deltastreamer.source.kafka.topic=trial metadata.broker.list=10.0.2.15:9091 auto.offset.reset=smallest** ``` The problem is i am getting an EOFException and its related with the kafkacluster class imported from org.apache.spark and it's not even reading the kafkaparms that i specified in the kafka-source.properties. I am attaching you the log file to receive some insights from you: ``` 2019-03-20 16:21:49,755 INFO hudi,Hudi_Kafka_Source_Sync_Job,3,application_1552567740249_0039 **SimpleConsumer: Reconnect due to error: java.io.EOFException** at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:147) at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:89) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86) at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:114) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:134) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:133) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:362) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:362) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:133) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:120) at com.uber.hoodie.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:196) at com.uber.hoodie.utilities.sources.JsonKafkaSource.fetchNewData(JsonKafkaSource.java:53) at com.uber.hoodie.utilities.sources.Source.fetchNext(Source.java:72) at com.uber.hoodie.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:64) at com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:229) at com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:442) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:679) 2019-03-20 16:21:50,411 ERROR hudi,Hudi_Kafka_Source_Sync_Job,3,application_1552567740249_0039 ApplicationMaster: User class threw exception: com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException: Error obtaining partition metadata com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException: Error obtaining partition metadata at com.uber.hoodie.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:201) at com.uber.hoodie.utilities.sources.JsonKafkaSource.fetchNewData(JsonKafkaSource.java:53) at com.uber.hoodie.utilities.sources.Source.fetchNext(Source.java:72) at com.uber.hoodie.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:64) at com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:229) at com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:442) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:679) **Caused by: java.io.EOFException** at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:147) at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:102) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86) at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:114) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:134) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:133) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:362) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:362) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:133) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:120) at com.uber.hoodie.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:196) ... 10 more ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services