[
https://issues.apache.org/jira/browse/SPARK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826594#comment-16826594
]
Hyukjin Kwon commented on SPARK-27529:
--------------------------------------
It's difficult to diagnose if it was a correct behaviour or not. It's been
almost 5 years - it's when I just got started to contribute to Spark.
If this is a question, please ask it to the mailing list before filing this as
an issue.
If this is an issue, file it against higher versions of Spark rather than EOL
Spark version.
> Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException
> -------------------------------------------------------------------------
>
> Key: SPARK-27529
> URL: https://issues.apache.org/jira/browse/SPARK-27529
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 1.5.0
> Reporter: Dmitry Goldenberg
> Priority: Major
>
> We have a Spark Streaming consumer which at a certain point started
> consistently failing upon a restart with the below error.
> Some details:
> * Spark version is 1.5.0.
> * Kafka version is 0.8.2.1 (2.10-0.8.2.1).
> * The topic is configured with: retention.ms=1471228928,
> max.message.bytes=100000000.
> * The consumer runs with auto.offset.reset=smallest.
> * No checkpointing is currently enabled.
> I don't see anything in the Spark or Kafka doc to understand why this is
> happening. From googling around,
> {noformat}
> https://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/
> Finally, I’ll repeat that any semantics beyond at-most-once require that you
> have sufficient log retention in Kafka. If you’re seeing things like
> OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka
> storage, not because something’s wrong with Spark or Kafka.{noformat}
> Also looking at SPARK-12693 and SPARK-11693, I don't understand the possible
> causes.
> {noformat}
> You've under-provisioned Kafka storage and / or Spark compute capacity.
> The result is that data is being deleted before it has been
> processed.{noformat}
> All we're trying to do is start the consumer and consume from the topic from
> the earliest available offset. Why would we not be able to do that? How can
> the offsets be out of range if we're saying, just read from the earliest
> available?
> Since we have the retention.ms set to 1 year and we created the topic just a
> few weeks ago, I'd not expect any deletion being done by Kafka as we're
> consuming.
> I'd like to understand the actual cause of this error. Any recommendations on
> a workaround would be appreciated.
> Stack traces:
> {noformat}
> 2019-04-19 11:35:17,147 ERROR org.apache.spark.scheduler
> .TaskSetManager: Task 10 in stage 147.0 failed 4 times; aborting job
> 2019-04-19 11:35:17,160 ERROR
> org.apache.spark.streaming.scheduler.JobScheduler: Error running job
> streaming job 1555682554000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in
> stage 147.0 failed 4 times, most recent failure: Lost task
> 10.3 in stage 147.0 (TID 2368, 10.150.0.58):
> kafka.common.OffsetOutOfRangeException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
> at
> com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69)
> at
> com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24)
> at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
> at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sca
> la:1280) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
> ~[spark-assembly-1.5.0-hadoop2.4
> .0.jar:1.5.0]
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
> ~[spark-assembly-1.5.0-hadoop2.4
> .0.jar:1.5.0]
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.ja
> r:?]
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> ~[spark-assembly-1.5.0-h
> adoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> ~[spark-assembly-1.5.0-h
> adoop2.4.0.jar:1.5.0]
> at scala.Option.foreach(Option.scala:236)
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.
> 5.0]
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
> ~[spark-assembly-1.5.0-hadoop2.4
> .0.jar:1.5.0]
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
> ~[spark-assembly-1.5.0-hadoop2.4.0
> .jar:1.5.0]
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
> ~[spark-assembly-1.5.0-hadoop2.4.0
> .jar:1.5.0]
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> [spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:222)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.
> 5.0]
> at
> org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:47)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1
> .5.0]
> at
> com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:218)
> ~[acme-ingest-kafka-spa
> rk-2.0.0-SNAPSHOT.jar:?]
> at
> com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:207)
> ~[acme-ingest-kafka-spa
> rk-2.0.0-SNAPSHOT.jar:?]
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
> ~[spark-assembly-1
> .5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
> ~[spark-assembly-1
> .5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
> ~[spark-ass
> embly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
> ~[spark-ass
> embly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
> ~[
> spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
> ~[spark-a
> ssembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
> ~[spark-a
> ssembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> ~[spark-assembly-1.5.0-hadoop2.4.0
> .jar:1.5.0]
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
> ~[spark-assembly-1.5.0-had
> oop2.4.0.jar:1.5.0]
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> ~[spark-assembly-1.5.0-hadoop2.4.
> 0.jar:1.5.0]
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> ~[spark-assembly-1.5.0-hadoop2.4.
> 0.jar:1.5.0]
> at scala.util.Try$.apply(Try.scala:161)
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
> ~[spark-asse
> mbly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
> ~[spark-assembly-1.
> 5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
> ~[spark-assembly-1.
> 5.0-hadoop2.4.0.jar:1.5.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
> ~[spark-assembly-1.5.0-hadoop2.4.0.j
> ar:1.5.0]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_201]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_201]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201]
> Caused by: kafka.common.OffsetOutOfRangeException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> ~[?:1.8.0_201]
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> ~[?:1.8.0_201]
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> ~[?:1.8.0_201]
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> ~[?:1.8.0_201]
> at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_201]
> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
> ~[acme-ingest-kafka-spark-2.
> 0.0-SNAPSHOT.jar:?]
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
> ~[acme-ingest-kafka-spark-2.0.0-
> SNAPSHOT.jar:?]
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
> ~[acme-ingest-kafka-spark-2.0.0-SNA
> PSHOT.jar:?]
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:
> ?]
> at
> com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69)
> ~[acme-ingest-kafka-spark-2.
> 0.0-SNAPSHOT.jar:?]
> at
> com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24)
> ~[acme-ingest-kafka-spark-2.
> 0.0-SNAPSHOT.jar:?]
> at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
> ~[spark-assembly-1.5.0-hadoop2
> .4.0.jar:1.5.0]
> at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
> ~[spark-assembly-1.5.0-hadoop2
> .4.0.jar:1.5.0]
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
> ~[spark-assembly-1.5.0-hadoop2.4.
> 0.jar:1.5.0]
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
> ~[spark-assembly-1.5.0-hadoop2.4.
> 0.jar:1.5.0]
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> ... 3 more{noformat}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]