[ 
https://issues.apache.org/jira/browse/SPARK-16417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367486#comment-15367486
 ] 

ren xing commented on SPARK-16417:
----------------------------------

I referred to the 1.6.2 source code and found the same problem. I just picked 
up a code snippet from this scala class 
org.apache.spark.streaming.receiver.BlockGenerator. 
  /** Keep pushing blocks to the BlockManager. */
  private def keepPushingBlocks() {
    logInfo("Started block pushing thread")

    def areBlocksBeingGenerated: Boolean = synchronized {
      state != StoppedGeneratingBlocks
    }

    try {
      // While blocks are being generated, keep polling for to-be-pushed blocks 
and push them.
      while (areBlocksBeingGenerated) {
        Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
          case Some(block) => pushBlock(block)
          case None =>
        }
      }

      // At this point, state is StoppedGeneratingBlock. So drain the queue of 
to-be-pushed blocks.
      logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
      while (!blocksForPushing.isEmpty) {
        val block = blocksForPushing.take()
        logDebug(s"Pushing block $block")
        pushBlock(block)
        logInfo("Blocks left to push " + blocksForPushing.size())
      }
      logInfo("Stopped block pushing thread")
    } catch {
      case ie: InterruptedException =>
        logInfo("Block pushing thread was interrupted")
      case e: Exception =>
        reportError("Error in block pushing thread", e)
    }
  }


This funciton 'keepPushingBlocks' shows that if there is an exception thrown 
when calling  pushBlock(block), the thread just report the error message and 
exit without crash this executor. This means the receiver can still put the 
single record in the memory until this executor OOM. And only at the OOM time, 
can i find the online issue (from the spark UI). But it is really too late.
This case only happens when using the receiver.store(single-record). In my use 
case, the thrown exception is listed below:

WARN [org.apache.spark.streaming.scheduler.ReceiverTracker:logWarning:71] - 
Error reported by receiver for stream 3: Futures timed out after [30 seconds] - 
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at 
org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:202)
    at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:156)
    at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:127)
    at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)
    at 
com.jd.aof.spark.streaming.receiver.BlockProducer.pushBlockWithRetry(BlockProducer.scala:157)
    at 
com.jd.aof.spark.streaming.receiver.BlockProducer.keepPushingBlocks(BlockProducer.scala:134)
    at 
com.jd.aof.spark.streaming.receiver.BlockProducer$$anon$2.run(BlockProducer.scala:42)

I don't know how to fix this timeout exception (this happens casually, maybe 
once per two days), i really appreciate it if you know how to fix it. But turn 
to spark, as my understanding, this thread should crash this executor instead 
of eating exceptions.

In order to avoid this issue, i generate the block myself and use 
store(ArrayBuffer) interface and handle the uncertain exception myself.

> spark 1.5.2 receiver store(single-record) with ahead log enabled makes 
> executor crash if there is an exception when BlockGenerator storing block
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-16417
>                 URL: https://issues.apache.org/jira/browse/SPARK-16417
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.5.2
>         Environment: spark streaming version 1.5.2.
>            Reporter: ren xing
>
> receiver has the store(single-record) function which actually puts the record 
> to a buffer. One backend thread will periodically search this buffer and 
> generate a block and store this block to spark. If enabled the ahead log, 
> sometimes there be an exception when writing the ahead log. This exception 
> will be caught by the backend thread. However the backend thread just print 
> some message AND EXIT! This means there will be no consumer for the receiver 
> inner buffered records. As time goes on, the executor will be OOM



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to