[ https://issues.apache.org/jira/browse/HUDI-1007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17134634#comment-17134634 ]
liujinhui commented on HUDI-1007: --------------------------------- Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Aborting TaskSet 60.0 because task 8 (partition 8) cannot run anywhere due to node and executor blacklist. Most recent failure: Lost task 7.2 in stage 60.0 (TID 240, prod-t3-data-lake-007, executor 1): java.lang.IllegalArgumentException: requirement failed: Got wrong record for spark-executor-null t3_ops_elk_gis_feature_log-7 even after seeking to offset 4479769508 got offset 4479777628 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets at scala.Predef$.require(Predef.scala:224) at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146) at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36) at org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218) at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261) at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31) at org.apache.hudi.utilities.sources.JsonKafkaSource$1.call(JsonKafkaSource.java:90) at org.apache.hudi.utilities.sources.JsonKafkaSource$1.call(JsonKafkaSource.java:84) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:153) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:153) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1193) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1167) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1102) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1167) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:893) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Blacklisting behavior can be configured via spark.blacklist.*. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2102) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2121) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.collect(RDD.scala:944) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:370) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:370) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:369) at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:312) at org.apache.hudi.table.WorkloadProfile.buildProfile(WorkloadProfile.java:67) at org.apache.hudi.table.WorkloadProfile.<init>(WorkloadProfile.java:59) at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.execute(BaseCommitActionExecutor.java:92) at org.apache.hudi.table.action.commit.WriteHelper.write(WriteHelper.java:55) ... 10 more @[~vinoth] > When earliestOffsets is greater than checkpoint, Hudi will not be able to > successfully consume data > --------------------------------------------------------------------------------------------------- > > Key: HUDI-1007 > URL: https://issues.apache.org/jira/browse/HUDI-1007 > Project: Apache Hudi > Issue Type: Bug > Components: DeltaStreamer > Reporter: liujinhui > Assignee: liujinhui > Priority: Major > Fix For: 0.6.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > Use deltastreamer to consume kafka, > When earliestOffsets is greater than checkpoint, Hudi will not be able to > successfully consume data > org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen#checkupValidOffsets > boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream() > .anyMatch(offset -> offset.getValue() < > earliestOffsets.get(offset.getKey())); > return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; > Kafka data is continuously generated, which means that some data will > continue to expire. > When earliestOffsets is greater than checkpoint, earliestOffsets will be > taken. But at this moment, some data expired. In the end, consumption fails. > This process is an endless cycle. I can understand that this design may be to > avoid the loss of data, but it will lead to such a situation, I want to fix > this problem, I want to hear your opinion -- This message was sent by Atlassian Jira (v8.3.4#803005)