amitsingh-10 opened a new issue #1335: [SUPPORT] HoodieDeltaStreamer offset 
reset not working
URL: https://github.com/apache/incubator-hudi/issues/1335
 
 
   **Describe the problem you faced**
   I am trying to create a Hoodie table using `HoodieDeltaStreamer` from Kafka 
Avro topic. Setting `auto.offset.reset` to `earliest` should read from the 
first events available in the Kafka topic however, it is being overriden by 
`org.apache.spark.streaming.kafka010.KafkaUtil`.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Use an EMR emr-5.23.0 cluster.
   2. Create a custom properties file.
   ```
   # General properties
   hoodie.datasource.write.table.type=MERGE_ON_READ
   hoodie.datasource.write.recordkey.field=id
   hoodie.datasource.write.partitionpath.field=timestamp
   
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.keygen.TimestampBasedKeyGenerator
   hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
   hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
   
   # Kafka properties
   enable.auto.commit=false
   auto.offset.reset=earliest
   group.id=hudi_test_group
   
schema.registry.url=http://confluent*:8081/subjects/test_table/versions/latest
   bootstrap.servers=confluent.****:9092
   hoodie.deltastreamer.source.kafka.topic=test-topic
   
hoodie.deltastreamer.schemaprovider.registry.url=http://confluent***:8081/subjects/test_table/versions/latest
   hoodie.deltastreamer.kafka.source.maxEvents=10000
   
   # Hive metastore properties
   hoodie.datasource.hive_sync.database=hudi_test
   hoodie.datasource.hive_sync.table=test_table
   hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver.***:10000
   hoodie.datasource.hive_sync.assume_date_partitioning=true
   
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
   ```
   3. Run `spark-submit` with `HoodieDeltaStreamer`.
   ```
   spark-submit  --queue ingestion --deploy-mode cluster --master yarn \
   --conf 'spark.jars=/home/hadoop/hudi*.jar'  \
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls 
hudi-utilities*.jar` \ 
   --op UPSERT --source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
\
   --table-type MERGE_ON_READ --target-base-path s3:\/\/hudi-bucket/test_table \
   --target-table hudi_test.test_table --source-limit 200 \ 
   --enable-hive-sync \
   --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider \
   --props s3:\/\/hudi-bucket/hudi_custom.properties
   ```
   
   **Expected behavior**
   The data should be read from the earliest/latest available offset in Kafka 
as per the config.
   
   **Environment Description**
   
   * EMR version : emr-5.23.0
   
   * Hudi version : 0.5.1-incubating
   
   * Spark version : 2.4.0
   
   * Hive version : 2.3.4
   
   * Hadoop version : 2.8.5
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   
   **Additional context**
   The issue about Spark Kafka overriding `auto.offset.reset` is being tracked 
by Kafka JIRA issue 
([KAFKA-4396](https://issues.apache.org/jira/browse/KAFKA-4396)) and Spark 
issue [(SPARK-19680](https://issues.apache.org/jira/browse/SPARK-19680)).
   
   From what I can see from the logs, the provided config is being read 
correctly.
   ```
   auto.commit.interval.ms = 5000
   auto.offset.reset = earliest
   bootstrap.servers = [confluent.*:9092]
   connections.max.idle.ms = 540000
   default.api.timeout.ms = 60000
   enable.auto.commit = false
   ...
   ```
   
   However, fixKafkaParams is resetting the `auto.offsets.reset` property.
   ```
   2020-02-14 07:16:18,664 WARN [Driver] 
org.apache.spark.streaming.kafka010.KafkaUtils:overriding enable.auto.commit to 
false for executor
   2020-02-14 07:16:18,665 WARN [Driver] 
org.apache.spark.streaming.kafka010.KafkaUtils:overriding auto.offset.reset to 
none for executor
   2020-02-14 07:16:18,666 WARN [Driver] 
org.apache.spark.streaming.kafka010.KafkaUtils:overriding executor group.id to 
spark-executor-hudi_test_group
   2020-02-14 07:16:18,666 WARN [Driver] 
org.apache.spark.streaming.kafka010.KafkaUtils:overriding receive.buffer.bytes 
to 65536 see KAFKA-3135
   ```
   
   
   **Stacktrace**
   
   ```Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1364)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1337)
        at 
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1472)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1472)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1472)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1471)
        at 
org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
        at 
org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:329)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:226)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:122)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:295)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
   Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
Offsets out of range with no configured reset policy for partitions: 
{hudi-test-topic-0=0}
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200)
        at 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:129)
        at 
org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
        at 
org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218)
        at 
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
        at 
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at scala.collection.AbstractIterator.to(Iterator.scala:1334)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
        at 
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
        at 
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   )
   ```
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to