[
https://issues.apache.org/jira/browse/SPARK-16417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367486#comment-15367486
]
ren xing commented on SPARK-16417:
----------------------------------
I referred to the 1.6.2 source code and found the same problem. I just picked
up a code snippet from this scala class
org.apache.spark.streaming.receiver.BlockGenerator.
/** Keep pushing blocks to the BlockManager. */
private def keepPushingBlocks() {
logInfo("Started block pushing thread")
def areBlocksBeingGenerated: Boolean = synchronized {
state != StoppedGeneratingBlocks
}
try {
// While blocks are being generated, keep polling for to-be-pushed blocks
and push them.
while (areBlocksBeingGenerated) {
Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
}
// At this point, state is StoppedGeneratingBlock. So drain the queue of
to-be-pushed blocks.
logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
while (!blocksForPushing.isEmpty) {
val block = blocksForPushing.take()
logDebug(s"Pushing block $block")
pushBlock(block)
logInfo("Blocks left to push " + blocksForPushing.size())
}
logInfo("Stopped block pushing thread")
} catch {
case ie: InterruptedException =>
logInfo("Block pushing thread was interrupted")
case e: Exception =>
reportError("Error in block pushing thread", e)
}
}
This funciton 'keepPushingBlocks' shows that if there is an exception thrown
when calling pushBlock(block), the thread just report the error message and
exit without crash this executor. This means the receiver can still put the
single record in the memory until this executor OOM. And only at the OOM time,
can i find the online issue (from the spark UI). But it is really too late.
This case only happens when using the receiver.store(single-record). In my use
case, the thrown exception is listed below:
WARN [org.apache.spark.streaming.scheduler.ReceiverTracker:logWarning:71] -
Error reported by receiver for stream 3: Futures timed out after [30 seconds] -
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at
org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:202)
at
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:156)
at
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:127)
at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)
at
com.jd.aof.spark.streaming.receiver.BlockProducer.pushBlockWithRetry(BlockProducer.scala:157)
at
com.jd.aof.spark.streaming.receiver.BlockProducer.keepPushingBlocks(BlockProducer.scala:134)
at
com.jd.aof.spark.streaming.receiver.BlockProducer$$anon$2.run(BlockProducer.scala:42)
I don't know how to fix this timeout exception (this happens casually, maybe
once per two days), i really appreciate it if you know how to fix it. But turn
to spark, as my understanding, this thread should crash this executor instead
of eating exceptions.
In order to avoid this issue, i generate the block myself and use
store(ArrayBuffer) interface and handle the uncertain exception myself.
> spark 1.5.2 receiver store(single-record) with ahead log enabled makes
> executor crash if there is an exception when BlockGenerator storing block
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-16417
> URL: https://issues.apache.org/jira/browse/SPARK-16417
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.5.2
> Environment: spark streaming version 1.5.2.
> Reporter: ren xing
>
> receiver has the store(single-record) function which actually puts the record
> to a buffer. One backend thread will periodically search this buffer and
> generate a block and store this block to spark. If enabled the ahead log,
> sometimes there be an exception when writing the ahead log. This exception
> will be caught by the backend thread. However the backend thread just print
> some message AND EXIT! This means there will be no consumer for the receiver
> inner buffered records. As time goes on, the executor will be OOM
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]