[
https://issues.apache.org/jira/browse/HUDI-1007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17134634#comment-17134634
]
Vinoth Chandar edited comment on HUDI-1007 at 6/16/20, 1:16 PM:
----------------------------------------------------------------
{code}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
Aborting TaskSet 60.0 because task 8 (partition 8)
cannot run anywhere due to node and executor blacklist.
Most recent failure:
Lost task 7.2 in stage 60.0 (TID 240, prod-t3-data-lake-007, executor 1):
java.lang.IllegalArgumentException: requirement failed: Got wrong record for
spark-executor-null t3_ops_elk_gis_feature_log-7 even after seeking to offset
4479769508 got offset 4479777628 instead. If this is a compacted topic,
consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
at scala.Predef$.require(Predef.scala:224)
at
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146)
at
org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
at
org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218)
at
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
at
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
at
org.apache.hudi.utilities.sources.JsonKafkaSource$1.call(JsonKafkaSource.java:90)
at
org.apache.hudi.utilities.sources.JsonKafkaSource$1.call(JsonKafkaSource.java:84)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:153)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:153)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1193)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1167)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1102)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1167)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:893)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Blacklisting behavior can be configured via spark.blacklist.*.
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2102)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2121)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:370)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:370)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:369)
at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:312)
at org.apache.hudi.table.WorkloadProfile.buildProfile(WorkloadProfile.java:67)
at org.apache.hudi.table.WorkloadProfile.<init>(WorkloadProfile.java:59)
at
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.execute(BaseCommitActionExecutor.java:92)
at org.apache.hudi.table.action.commit.WriteHelper.write(WriteHelper.java:55)
... 10 more
{code}
@[~vinoth]
was (Author: liujinhui):
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
Aborting TaskSet 60.0 because task 8 (partition 8)
cannot run anywhere due to node and executor blacklist.
Most recent failure:
Lost task 7.2 in stage 60.0 (TID 240, prod-t3-data-lake-007, executor 1):
java.lang.IllegalArgumentException: requirement failed: Got wrong record for
spark-executor-null t3_ops_elk_gis_feature_log-7 even after seeking to offset
4479769508 got offset 4479777628 instead. If this is a compacted topic,
consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
at scala.Predef$.require(Predef.scala:224)
at
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146)
at
org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
at
org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:218)
at
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
at
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
at
org.apache.hudi.utilities.sources.JsonKafkaSource$1.call(JsonKafkaSource.java:90)
at
org.apache.hudi.utilities.sources.JsonKafkaSource$1.call(JsonKafkaSource.java:84)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:153)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:153)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1193)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1167)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1102)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1167)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:893)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Blacklisting behavior can be configured via spark.blacklist.*.
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2102)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2121)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:370)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:370)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:369)
at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:312)
at org.apache.hudi.table.WorkloadProfile.buildProfile(WorkloadProfile.java:67)
at org.apache.hudi.table.WorkloadProfile.<init>(WorkloadProfile.java:59)
at
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.execute(BaseCommitActionExecutor.java:92)
at org.apache.hudi.table.action.commit.WriteHelper.write(WriteHelper.java:55)
... 10 more
@[~vinoth]
> When earliestOffsets is greater than checkpoint, Hudi will not be able to
> successfully consume data
> ---------------------------------------------------------------------------------------------------
>
> Key: HUDI-1007
> URL: https://issues.apache.org/jira/browse/HUDI-1007
> Project: Apache Hudi
> Issue Type: Bug
> Components: DeltaStreamer
> Reporter: liujinhui
> Assignee: liujinhui
> Priority: Major
> Fix For: 0.6.0
>
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> Use deltastreamer to consume kafka,
> When earliestOffsets is greater than checkpoint, Hudi will not be able to
> successfully consume data
> org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen#checkupValidOffsets
> boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
> .anyMatch(offset -> offset.getValue() <
> earliestOffsets.get(offset.getKey()));
> return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
> Kafka data is continuously generated, which means that some data will
> continue to expire.
> When earliestOffsets is greater than checkpoint, earliestOffsets will be
> taken. But at this moment, some data expired. In the end, consumption fails.
> This process is an endless cycle. I can understand that this design may be to
> avoid the loss of data, but it will lead to such a situation, I want to fix
> this problem, I want to hear your opinion
--
This message was sent by Atlassian Jira
(v8.3.4#803005)