NetsanetGeb edited a comment on issue #544: Improve HiveSyncTool handling of 
empty commit timeline
URL: https://github.com/apache/incubator-hudi/issues/544#issuecomment-474949396
 
 
   I  published a json file to kafka and run the hoodie delta streamer as a 
spark job with  kafka as main data source. 
   I set the kafka properties as following:
   ```
   **include=base.properties
   hoodie.datasource.write.recordkey.field=key
   hoodie.datasource.write.partitionpath.field=date
   
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs:///Projects/Hudi/Resources/source.avsc
   hoodie.deltastreamer.source.kafka.topic=trial
   metadata.broker.list=10.0.2.15:9091
   auto.offset.reset=smallest**
   ```
   
   The problem is i am getting an EOFException and its related with the 
kafkacluster class imported from org.apache.spark  and it's not even reading 
the kafkaparms that i specified in the kafka-source.properties. I am attaching 
you the log file to receive some insights from you: 
   
   ```
   2019-03-20 16:21:49,755 INFO 
hudi,Hudi_Kafka_Source_Sync_Job,3,application_1552567740249_0039 
**SimpleConsumer: Reconnect due to error:
   java.io.EOFException**
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:147)
        at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:89)
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)
        at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:114)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:134)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:133)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:366)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:362)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
        at 
org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:362)
        at 
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:133)
        at 
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:120)
        at 
com.uber.hoodie.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:196)
        at 
com.uber.hoodie.utilities.sources.JsonKafkaSource.fetchNewData(JsonKafkaSource.java:53)
        at com.uber.hoodie.utilities.sources.Source.fetchNext(Source.java:72)
        at 
com.uber.hoodie.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:64)
        at 
com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:229)
        at 
com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:442)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:679)
   2019-03-20 16:21:50,411 ERROR 
hudi,Hudi_Kafka_Source_Sync_Job,3,application_1552567740249_0039 
ApplicationMaster: User class threw exception: 
com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException: Error 
obtaining partition metadata
   com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException: Error 
obtaining partition metadata
        at 
com.uber.hoodie.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:201)
        at 
com.uber.hoodie.utilities.sources.JsonKafkaSource.fetchNewData(JsonKafkaSource.java:53)
        at com.uber.hoodie.utilities.sources.Source.fetchNext(Source.java:72)
        at 
com.uber.hoodie.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:64)
        at 
com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:229)
        at 
com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:442)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:679)
   **Caused by: java.io.EOFException**
        at 
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:147)
        at 
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:102)
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)
        at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:114)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:134)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:133)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:366)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:362)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
        at 
org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:362)
        at 
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:133)
        at 
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:120)
        at 
com.uber.hoodie.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:196)
        ... 10 more
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to