eigakow opened a new issue #1375: [SUPPORT] HoodieDeltaStreamer offset not handled correctly URL: https://github.com/apache/incubator-hudi/issues/1375 **Describe the problem you faced** I am trying to implement a continuous DeltaStreamer (`hudi-0.5.1-incubating`) from Kafka source with 3 partitions, which has been running for a while, so the earliest offsets are no longer available on Kafka server. While connecting with a new target-base-path: - the first message is processed correctly - the second one fails with `'org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions'` Any subsequent submits give the OffsetOutOfRange exceptions, unless I will provide a new target-base-path. I have added some logging into KafkaOffsetGen as suggested in [#1335](https://github.com/apache/incubator-hudi/issues/1335) and I see that: - for first message there is no checkpoint present, therefore it is going with 'LATEST' as expected. Message is processed and commit is successful. - for the next message the previous checkpoint is now detected. However it wants to pull all the previous messages from Kafka topic due to fromOffsets value being empty: `numEvents: 5000000, fromOffsets: {}, toOffsets: {fr-bru-0=338362, fr-bru-1=142427, fr-bru-2=142401}` **To Reproduce** Steps to reproduce the behavior: 1. Create a custom properties file. ``` hoodie.datasource.write.recordkey.field=full_count hoodie.datasource.write.partitionpath.field=full_count hoodie.deltastreamer.schemaprovider.source.schema.file=file:///home/director/me/hudi-0.5.1-incubating/schema.avro hoodie.deltastreamer.schemaprovider.target.schema.file=file:///home/director/me/hudi-0.5.1-incubating/schema.avro source-class=FR24JsonKafkaSource bootstrap.servers=streaming-kafka-broker-1:9092,streaming-kafka-broker-2:9092,streaming-kafka-broker-3:9092 group.id=hudi_testing hoodie.deltastreamer.source.kafka.topic=fr-bru enable.auto.commit=false schemaprovider-class=org.apache.hudi.utilities.schema.FilebasedSchemaProvider auto.offset.reset=latest ``` 2. Launch spark-submit with HoodieDeltaStreamer `spark-submit --master local --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --jars $(pwd)/../my-app-1-jar-with-dependencies.jar $(pwd)/../hudi-0.5.1-incubating_latest/incubator-hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.5.1-incubating.jar --props file:///$(pwd)/hudi-fr24.properties --target-base-path file:///tmp/test-hudi_new --table-type MERGE_ON_READ --target-table test_hudi_new --source-class FR24JsonKafkaSource --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --continuous ` **Expected behavior** The jobs handles the offsets correctly and continues to read kafka messages **Environment Description** * Hudi version : hudi-0.5.1-incubating * Spark version : 2.4.0-cdh6.1.0 * Hive version : 2.1.1-cdh6.1.0 * Hadoop version : 3.0.0-cdh6.1.0 * Storage (HDFS/S3/GCS..) : tried both hdfs and local * Running on Docker? (yes/no) : no **Additional context** I am using a custom class for source. **Stacktrace** ```20/03/04 10:21:02 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: core-default.xml, ore-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf_.xml], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_2059723432_1, [email protected] (auth:KERBEROS)]]] 20/03/04 10:21:02 INFO table.HoodieTableConfig: Loading table properties from /tmp/test-hudi_new/.hoodie/hoodie.properties 20/03/04 10:21:02 INFO table.HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1) from /tmp/test-hudi_new 20/03/04 10:21:02 INFO table.HoodieTableMetaClient: Loading Active commit timeline for /tmp/test-hudi_new 20/03/04 10:21:02 INFO timeline.HoodieActiveTimeline: Loaded instants [[20200304102036__commit__COMPLETED]] 20/03/04 10:21:02 INFO table.HoodieCopyOnWriteTable: Nothing to clean here. It is already clean 20/03/04 10:21:02 INFO hudi.AbstractHoodieWriteClient: Committed 20200304102036 20/03/04 10:21:02 INFO deltastreamer.DeltaSync: Commit 20200304102036 successful! 20/03/04 10:21:02 INFO rdd.MapPartitionsRDD: Removing RDD 31 from persistence list 20/03/04 10:21:02 INFO storage.BlockManager: Removing RDD 31 20/03/04 10:21:02 INFO rdd.MapPartitionsRDD: Removing RDD 39 from persistence list 20/03/04 10:21:02 INFO storage.BlockManager: Removing RDD 39 20/03/04 10:21:02 INFO table.HoodieTableMetaClient: Loading HoodieTableMetaClient from /tmp/test-hudi_new 20/03/04 10:21:02 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://nameservice1], Config:[Configuration: core-default.xml, ore-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml], FileSystem: [DFS[DFClient[clientName=DFSClient_NONMAPREDUCE_2059723432_1, [email protected] (auth:KERBEROS)]]] 20/03/04 10:21:02 INFO table.HoodieTableConfig: Loading table properties from /tmp/test-hudi_new/.hoodie/hoodie.properties 20/03/04 10:21:02 INFO table.HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1) from /tmp/test-hudi_new 20/03/04 10:21:02 INFO timeline.HoodieActiveTimeline: Loaded instants [[20200304102036__commit__COMPLETED]] 20/03/04 10:21:02 INFO deltastreamer.DeltaSync: Checkpoint to resume from : Option{val=} 20/03/04 10:21:02 INFO consumer.ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [streaming-kafka-broker-1:9092, streaming-kafka-broker-2:9092, streaming-kafka-broker-3:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = hudi_testing heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class fr24.ingest.ZlibDeserializerString 20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 'schemaprovider-class' was supplied but isn't a known config. 20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 'hoodie.deltastreamer.schemaprovider.source.schema.file' was supplied bt isn't a known config. 20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 'hoodie.deltastreamer.schemaprovider.target.schema.file' was supplied bt isn't a known config. 20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 'hoodie.datasource.write.partitionpath.field' was supplied but isn't a nown config. 20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 'hoodie.datasource.write.recordkey.field' was supplied but isn't a know config. 20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 'hoodie.deltastreamer.source.kafka.topic' was supplied but isn't a know config. 20/03/04 10:21:02 WARN consumer.ConsumerConfig: The configuration 'source-class' was supplied but isn't a known config. 20/03/04 10:21:02 INFO utils.AppInfoParser: Kafka version : 2.0.0-cdh6.1.0 20/03/04 10:21:02 INFO utils.AppInfoParser: Kafka commitId : null 20/03/04 10:21:02 INFO clients.Metadata: Cluster ID: h7B3MAm8TLumIZQHVT802A 20/03/04 10:21:02 INFO sources.JsonKafkaSource: About to read 619753 from Kafka for topic :fr-bru 20/03/04 10:21:02 WARN kafka010.KafkaUtils: overriding enable.auto.commit to false for executor 20/03/04 10:21:02 WARN kafka010.KafkaUtils: overriding auto.offset.reset to none for executor 20/03/04 10:21:02 WARN kafka010.KafkaUtils: overriding executor group.id to spark-executor-hudi_testing 20/03/04 10:21:02 WARN kafka010.KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135 20/03/04 10:21:02 INFO spark.SparkContext: Starting job: isEmpty at DeltaSync.java:329 20/03/04 10:21:02 INFO scheduler.DAGScheduler: Got job 8 (isEmpty at DeltaSync.java:329) with 1 output partitions 20/03/04 10:21:02 INFO scheduler.DAGScheduler: Final stage: ResultStage 14 (isEmpty at DeltaSync.java:329) 20/03/04 10:21:02 INFO scheduler.DAGScheduler: Parents of final stage: List() 20/03/04 10:21:02 INFO scheduler.DAGScheduler: Missing parents: List() 20/03/04 10:21:02 INFO scheduler.DAGScheduler: Submitting ResultStage 14 (MapPartitionsRDD[47] at map at SourceFormatAdapter.java:62), whch has no missing parents 20/03/04 10:21:02 INFO memory.MemoryStore: Block broadcast_9 stored as values in memory (estimated size 5.5 KB, free 366.2 MB) 20/03/04 10:21:02 INFO memory.MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 3.3 KB, free 366.2 MB) 20/03/04 10:21:02 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in memory on biedva-worker-1.bigdatapoc.local:34716 (size: 3.3 B, free: 366.3 MB) 20/03/04 10:21:02 INFO spark.SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:1164 20/03/04 10:21:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 14 (MapPartitionsRDD[47] at map at SourceFormaAdapter.java:62) (first 15 tasks are for partitions Vector(0)) 20/03/04 10:21:02 INFO cluster.YarnScheduler: Adding task set 14.0 with 1 tasks 20/03/04 10:21:02 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 14.0 (TID 10502, biedva-worker-6.bigdatapoc.local, executor 7 partition 0, PROCESS_LOCAL, 7765 bytes) 20/03/04 10:21:02 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in memory on biedva-worker-6.bigdatapoc.local:34194 (size: 3.3 B, free: 366.3 MB) 20/03/04 10:21:02 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 14.0 (TID 10502, biedva-worker-6.bigdatapoc.local, executor 7): og.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {fr-bru-0=0} at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:1002) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:508) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1261) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1189) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200) at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:129) at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36) at org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:212) at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261) at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$10.next(Iterator.scala:394) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2113) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2113) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
