[ https://issues.apache.org/jira/browse/HUDI-105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17077475#comment-17077475 ]
Hoang Ngo commented on HUDI-105: -------------------------------- Hi [~vinoth], Do you know if this fix is applied in hudi 0.5.0? I have same problem here. This is my spark-submit spark-submit --conf 'spark.jars=/usr/lib/hudi/hudi-hadoop-mr-bundle-0.5.0-incubating.jar,/usr/lib/hudi/hudi-spark-bundle-0.5.0-incubating.jar,/usr/lib/hudi/hudi-utilities-bundle-0.5.0-incubating.jar' \ --num-executors 1 \ --executor-memory 2g \ --driver-memory 2g \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /usr/lib/hudi/hudi-utilities-bundle-*.jar` \ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --storage-type COPY_ON_WRITE \ --target-base-path s3://mybucket/tmp/hudidata-delta2/ \ --target-table hudidata-delta2 \ --props s3://mybucket/tmp/emr/hudipoc/a.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider This is my trace: Exception in thread "main" org.apache.spark.SparkException: Offsets not available on leader: OffsetRange(topic: 'mytopic', partition: 1, range: [0 -> 25447]),OffsetRange(topic: 'mytopic', partition: 2, range: [0 -> 115661]),OffsetRange(topic: 'mytopic', partition: 3, range: [0 -> 42775]),OffsetRange(topic: 'mytopic', partition: 7, range: [0 -> 115661]),OffsetRange(topic: 'mytopic', partition: 8, range: [0 -> 115661]),OffsetRange(topic: 'mytopic', partition: 13, range: [0 -> 115661]),OffsetRange(topic: 'mytopic', partition: 16, range: [0 -> 115661]),OffsetRange(topic: 'mytopic', partition: 17, range: [0 -> 115661]),OffsetRange(topic: 'mytopic', partition: 18, range: [0 -> 6494]),OffsetRange(topic: 'mytopic', partition: 20, range: [0 -> 115661]),OffsetRange(topic: 'mytopic', partition: 21, range: [0 -> 115657]) at org.apache.spark.streaming.kafka.KafkaUtils$.org$apache$spark$streaming$kafka$KafkaUtils$$checkOffsets(KafkaUtils.scala:201) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$1.apply(KafkaUtils.scala:254) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$1.apply(KafkaUtils.scala:250) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.SparkContext.withScope(SparkContext.scala:699) at org.apache.spark.streaming.kafka.KafkaUtils$.createRDD(KafkaUtils.scala:250) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$3.apply(KafkaUtils.scala:339) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$3.apply(KafkaUtils.scala:334) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.SparkContext.withScope(SparkContext.scala:699) at org.apache.spark.streaming.kafka.KafkaUtils$.createRDD(KafkaUtils.scala:334) at org.apache.spark.streaming.kafka.KafkaUtils.createRDD(KafkaUtils.scala) at org.apache.hudi.utilities.sources.AvroKafkaSource.toRDD(AvroKafkaSource.java:67) at org.apache.hudi.utilities.sources.AvroKafkaSource.fetchNewData(AvroKafkaSource.java:61) at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:71) at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:62) at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:292) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:214) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:120) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:292) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 20/04/07 17:45:12 INFO ShutdownHookManager: Shutdown hook called > DeltaStreamer Kafka Ingestion does not handle invalid offsets > ------------------------------------------------------------- > > Key: HUDI-105 > URL: https://issues.apache.org/jira/browse/HUDI-105 > Project: Apache Hudi (incubating) > Issue Type: Bug > Components: Usability, Utilities > Reporter: Vinoth Chandar > Assignee: Vinoth Chandar > Priority: Major > Labels: pull-request-available > Fix For: 0.5.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Reported here [https://github.com/apache/incubator-hudi/issues/643] -- This message was sent by Atlassian Jira (v8.3.4#803005)