[ 
https://issues.apache.org/jira/browse/HUDI-1007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17134634#comment-17134634
 ] 

liujinhui commented on HUDI-1007:
---------------------------------

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Aborting TaskSet 60.0 because task 8 (partition 8)
cannot run anywhere due to node and executor blacklist.
Most recent failure:
Lost task 7.2 in stage 60.0 (TID 240, prod-t3-data-lake-007, executor 1): 
java.lang.IllegalArgumentException: requirement failed: Got wrong record for 
spark-executor-null t3_ops_elk_gis_feature_log-7 even after seeking to offset 
4479769508 got offset 4479777628 instead. If this is a compacted topic, 
consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
 at scala.Predef$.require(Predef.scala:224)
 at 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146)
 at 
org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
 at 
org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218)
 at 
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
 at 
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
 at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
 at 
org.apache.hudi.utilities.sources.JsonKafkaSource$1.call(JsonKafkaSource.java:90)
 at 
org.apache.hudi.utilities.sources.JsonKafkaSource$1.call(JsonKafkaSource.java:84)
 at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:153)
 at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:153)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
 at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1193)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1167)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1102)
 at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1167)
 at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:893)
 at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
 at org.apache.spark.scheduler.Task.run(Task.scala:121)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)


Blacklisting behavior can be configured via spark.blacklist.*.

at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
 at scala.Option.foreach(Option.scala:257)
 at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
 at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2102)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2121)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
 at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
 at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
 at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
 at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:370)
 at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:370)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
 at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
 at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:369)
 at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:312)
 at org.apache.hudi.table.WorkloadProfile.buildProfile(WorkloadProfile.java:67)
 at org.apache.hudi.table.WorkloadProfile.<init>(WorkloadProfile.java:59)
 at 
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.execute(BaseCommitActionExecutor.java:92)
 at org.apache.hudi.table.action.commit.WriteHelper.write(WriteHelper.java:55)
 ... 10 more
@[~vinoth] 

> When earliestOffsets is greater than checkpoint, Hudi will not be able to 
> successfully consume data
> ---------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-1007
>                 URL: https://issues.apache.org/jira/browse/HUDI-1007
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer
>            Reporter: liujinhui
>            Assignee: liujinhui
>            Priority: Major
>             Fix For: 0.6.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Use deltastreamer to consume kafka,
>  When earliestOffsets is greater than checkpoint, Hudi will not be able to 
> successfully consume data
> org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen#checkupValidOffsets
> boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
>  .anyMatch(offset -> offset.getValue() < 
> earliestOffsets.get(offset.getKey()));
> return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
> Kafka data is continuously generated, which means that some data will 
> continue to expire.
>  When earliestOffsets is greater than checkpoint, earliestOffsets will be 
> taken. But at this moment, some data expired. In the end, consumption fails. 
> This process is an endless cycle. I can understand that this design may be to 
> avoid the loss of data, but it will lead to such a situation, I want to fix 
> this problem, I want to hear your opinion  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to