[ 
https://issues.apache.org/jira/browse/SPARK-11693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15001877#comment-15001877
 ] 

xiaoxiaoluo edited comment on SPARK-11693 at 11/12/15 9:48 AM:
---------------------------------------------------------------

Should we catch this exception from this file 
[https://github.com/apache/spark/blob/v1.5.1/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala]
 ?

{noformat}
  override def getNext(): R = {
      if (iter == null || !iter.hasNext) {
        try {
          iter = fetchBatch
        } catch {
          case ex: Throwable =>
            logError(ex.getMessage)
            finished = true
            return null.asInstanceOf[R]
        }
      }
      if (!iter.hasNext) {
        assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
        finished = true
        null.asInstanceOf[R]
      } else {
        val item = iter.next()
        if (item.offset >= part.untilOffset) {
          assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, 
part))
          finished = true
          null.asInstanceOf[R]
        } else {
          requestOffset = item.nextOffset
          messageHandler(new MessageAndMetadata(
            part.topic, part.partition, item.message, item.offset, keyDecoder, 
valueDecoder))
        }
      }
    }
{noformat}


was (Author: luotao):
Should we catch this exception from this file 
[https://github.com/apache/spark/blob/v1.5.1/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala]
 ?

{noformat}
  override def getNext(): R = {
      if (iter == null || !iter.hasNext) {
        try {
          iter = fetchBatch
        } catch {
          case ex: Throwable =>
            logError(ex.toString)
            finished = true
            return null.asInstanceOf[R]
        }
      }
      if (!iter.hasNext) {
        assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
        finished = true
        null.asInstanceOf[R]
      } else {
        val item = iter.next()
        if (item.offset >= part.untilOffset) {
          assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, 
part))
          finished = true
          null.asInstanceOf[R]
        } else {
          requestOffset = item.nextOffset
          messageHandler(new MessageAndMetadata(
            part.topic, part.partition, item.message, item.offset, keyDecoder, 
valueDecoder))
        }
      }
    }
{noformat}

> spark kafka direct streaming exception
> --------------------------------------
>
>                 Key: SPARK-11693
>                 URL: https://issues.apache.org/jira/browse/SPARK-11693
>             Project: Spark
>          Issue Type: Question
>          Components: Streaming
>    Affects Versions: 1.5.1
>            Reporter: xiaoxiaoluo
>            Priority: Minor
>
> We are using spark kafka direct streaming in our test enviroment. We have 
> limited the kafka partition size to avoid to exhaust the disk space.So when 
> the speed of data writing to kafka faster than the speed of spark streaming 
> reading data. There will be some exception in spark streaming, and the 
> application will be shut down.
> {noformat}
> 15/11/11 10:17:35 ERROR Executor: Exception in task 0.3 in stage 1626659.0 
> (TID 1134180)
> kafka.common.OffsetOutOfRangeException
>       at sun.reflect.GeneratedConstructorAccessor32.newInstance(Unknown 
> Source)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>       at java.lang.Class.newInstance(Class.java:442)
>       at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>       at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
>       at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
>       at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
>       at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
>       at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>       at org.apache.spark.scheduler.Task.run(Task.scala:88)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 15/11/11 10:17:42 ERROR CoarseGrainedExecutorBackend: Driver 10.1.92.44:49939 
> disassociated! Shutting down.
> {noformat}
> Could streaming get the current smallest offset from this partition? and go 
> on to process streaming data?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to