[ https://issues.apache.org/jira/browse/SPARK-11693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15001877#comment-15001877 ]
xiaoxiaoluo commented on SPARK-11693: ------------------------------------- Should we catch this exception from this file [https://github.com/apache/spark/blob/v1.5.1/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala] ? {noformat} override def getNext(): R = { if (iter == null || !iter.hasNext) { try { iter = fetchBatch } catch { case ex: Throwable => logError(ex.toString) finished = true return null.asInstanceOf[R] } } if (!iter.hasNext) { assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) finished = true null.asInstanceOf[R] } else { val item = iter.next() if (item.offset >= part.untilOffset) { assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part)) finished = true null.asInstanceOf[R] } else { requestOffset = item.nextOffset messageHandler(new MessageAndMetadata( part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder)) } } } {noformat} > spark kafka direct streaming exception > -------------------------------------- > > Key: SPARK-11693 > URL: https://issues.apache.org/jira/browse/SPARK-11693 > Project: Spark > Issue Type: Question > Components: Streaming > Affects Versions: 1.5.1 > Reporter: xiaoxiaoluo > Priority: Minor > > We are using spark kafka direct streaming in our test enviroment. We have > limited the kafka partition size to avoid to exhaust the disk space.So when > the speed of data writing to kafka faster than the speed of spark streaming > reading data. There will be some exception in spark streaming, and the > application will be shut down. > {noformat} > 15/11/11 10:17:35 ERROR Executor: Exception in task 0.3 in stage 1626659.0 > (TID 1134180) > kafka.common.OffsetOutOfRangeException > at sun.reflect.GeneratedConstructorAccessor32.newInstance(Unknown > Source) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > at java.lang.Class.newInstance(Class.java:442) > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/11/11 10:17:42 ERROR CoarseGrainedExecutorBackend: Driver 10.1.92.44:49939 > disassociated! Shutting down. > {noformat} > Could streaming get the current smallest offset from this partition? and go > on to process streaming data? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org