[
https://issues.apache.org/jira/browse/HUDI-105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17077475#comment-17077475
]
Hoang Ngo commented on HUDI-105:
--------------------------------
Hi [~vinoth],
Do you know if this fix is applied in hudi 0.5.0? I have same problem here.
This is my spark-submit
spark-submit --conf
'spark.jars=/usr/lib/hudi/hudi-hadoop-mr-bundle-0.5.0-incubating.jar,/usr/lib/hudi/hudi-spark-bundle-0.5.0-incubating.jar,/usr/lib/hudi/hudi-utilities-bundle-0.5.0-incubating.jar'
\
--num-executors 1 \
--executor-memory 2g \
--driver-memory 2g \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls
/usr/lib/hudi/hudi-utilities-bundle-*.jar` \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--storage-type COPY_ON_WRITE \
--target-base-path s3://mybucket/tmp/hudidata-delta2/ \
--target-table hudidata-delta2 \
--props s3://mybucket/tmp/emr/hudipoc/a.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
This is my trace:
Exception in thread "main" org.apache.spark.SparkException: Offsets not
available on leader: OffsetRange(topic: 'mytopic', partition: 1, range: [0 ->
25447]),OffsetRange(topic: 'mytopic', partition: 2, range: [0 ->
115661]),OffsetRange(topic: 'mytopic', partition: 3, range: [0 ->
42775]),OffsetRange(topic: 'mytopic', partition: 7, range: [0 ->
115661]),OffsetRange(topic: 'mytopic', partition: 8, range: [0 ->
115661]),OffsetRange(topic: 'mytopic', partition: 13, range: [0 ->
115661]),OffsetRange(topic: 'mytopic', partition: 16, range: [0 ->
115661]),OffsetRange(topic: 'mytopic', partition: 17, range: [0 ->
115661]),OffsetRange(topic: 'mytopic', partition: 18, range: [0 ->
6494]),OffsetRange(topic: 'mytopic', partition: 20, range: [0 ->
115661]),OffsetRange(topic: 'mytopic', partition: 21, range: [0 -> 115657])
at
org.apache.spark.streaming.kafka.KafkaUtils$.org$apache$spark$streaming$kafka$KafkaUtils$$checkOffsets(KafkaUtils.scala:201)
at
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$1.apply(KafkaUtils.scala:254)
at
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$1.apply(KafkaUtils.scala:250)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:699)
at org.apache.spark.streaming.kafka.KafkaUtils$.createRDD(KafkaUtils.scala:250)
at
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$3.apply(KafkaUtils.scala:339)
at
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$3.apply(KafkaUtils.scala:334)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:699)
at org.apache.spark.streaming.kafka.KafkaUtils$.createRDD(KafkaUtils.scala:334)
at org.apache.spark.streaming.kafka.KafkaUtils.createRDD(KafkaUtils.scala)
at
org.apache.hudi.utilities.sources.AvroKafkaSource.toRDD(AvroKafkaSource.java:67)
at
org.apache.hudi.utilities.sources.AvroKafkaSource.fetchNewData(AvroKafkaSource.java:61)
at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:71)
at
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:62)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:292)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:214)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:120)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:292)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
20/04/07 17:45:12 INFO ShutdownHookManager: Shutdown hook called
> DeltaStreamer Kafka Ingestion does not handle invalid offsets
> -------------------------------------------------------------
>
> Key: HUDI-105
> URL: https://issues.apache.org/jira/browse/HUDI-105
> Project: Apache Hudi (incubating)
> Issue Type: Bug
> Components: Usability, Utilities
> Reporter: Vinoth Chandar
> Assignee: Vinoth Chandar
> Priority: Major
> Labels: pull-request-available
> Fix For: 0.5.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Reported here [https://github.com/apache/incubator-hudi/issues/643]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)