amitsingh-10 opened a new issue #1335: [SUPPORT] HoodieDeltaStreamer offset reset not working URL: https://github.com/apache/incubator-hudi/issues/1335 **Describe the problem you faced** I am trying to create a Hoodie table using `HoodieDeltaStreamer` from Kafka Avro topic. Setting `auto.offset.reset` to `earliest` should read from the first events available in the Kafka topic however, it is being overriden by `org.apache.spark.streaming.kafka010.KafkaUtil`. **To Reproduce** Steps to reproduce the behavior: 1. Use an EMR emr-5.23.0 cluster. 2. Create a custom properties file. ``` # General properties hoodie.datasource.write.table.type=MERGE_ON_READ hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field=timestamp hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.keygen.TimestampBasedKeyGenerator hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd # Kafka properties enable.auto.commit=false auto.offset.reset=earliest group.id=hudi_test_group schema.registry.url=http://confluent*:8081/subjects/test_table/versions/latest bootstrap.servers=confluent.****:9092 hoodie.deltastreamer.source.kafka.topic=test-topic hoodie.deltastreamer.schemaprovider.registry.url=http://confluent***:8081/subjects/test_table/versions/latest hoodie.deltastreamer.kafka.source.maxEvents=10000 # Hive metastore properties hoodie.datasource.hive_sync.database=hudi_test hoodie.datasource.hive_sync.table=test_table hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver.***:10000 hoodie.datasource.hive_sync.assume_date_partitioning=true hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor ``` 3. Run `spark-submit` with `HoodieDeltaStreamer`. ``` spark-submit --queue ingestion --deploy-mode cluster --master yarn \ --conf 'spark.jars=/home/hadoop/hudi*.jar' \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls hudi-utilities*.jar` \ --op UPSERT --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ --table-type MERGE_ON_READ --target-base-path s3:\/\/hudi-bucket/test_table \ --target-table hudi_test.test_table --source-limit 200 \ --enable-hive-sync \ --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ --props s3:\/\/hudi-bucket/hudi_custom.properties ``` **Expected behavior** The data should be read from the earliest/latest available offset in Kafka as per the config. **Environment Description** * EMR version : emr-5.23.0 * Hudi version : 0.5.1-incubating * Spark version : 2.4.0 * Hive version : 2.3.4 * Hadoop version : 2.8.5 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : No **Additional context** The issue about Spark Kafka overriding `auto.offset.reset` is being tracked by Kafka JIRA issue ([KAFKA-4396](https://issues.apache.org/jira/browse/KAFKA-4396)) and Spark issue [(SPARK-19680](https://issues.apache.org/jira/browse/SPARK-19680)). From what I can see from the logs, the provided config is being read correctly. ``` auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [confluent.*:9092] connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false ... ``` However, fixKafkaParams is resetting the `auto.offsets.reset` property. ``` 2020-02-14 07:16:18,664 WARN [Driver] org.apache.spark.streaming.kafka010.KafkaUtils:overriding enable.auto.commit to false for executor 2020-02-14 07:16:18,665 WARN [Driver] org.apache.spark.streaming.kafka010.KafkaUtils:overriding auto.offset.reset to none for executor 2020-02-14 07:16:18,666 WARN [Driver] org.apache.spark.streaming.kafka010.KafkaUtils:overriding executor group.id to spark-executor-hudi_test_group 2020-02-14 07:16:18,666 WARN [Driver] org.apache.spark.streaming.kafka010.KafkaUtils:overriding receive.buffer.bytes to 65536 see KAFKA-3135 ``` **Stacktrace** ```Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1364) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.take(RDD.scala:1337) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1472) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1472) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1472) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1471) at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544) at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45) at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:329) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:226) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:122) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:295) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678) Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {hudi-test-topic-0=0} at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200) at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:129) at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36) at org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218) at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261) at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$10.next(Iterator.scala:394) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ) ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
