[ 
https://issues.apache.org/jira/browse/SPARK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14276917#comment-14276917
 ] 

Max Xu edited comment on SPARK-5220 at 1/14/15 1:57 PM:
--------------------------------------------------------

Current situation: 
1. The TimeoutException is thrown by ReliableKafkaReceiver, receiver logs the 
error and do nothing else;
2. Block pushing thread in BlockGenerator is terminated due to the exception;
3. ReliableKafkaReceiver is ACTIVE but rogues afterward because 
blocksForPushing queue is full and no block pushing thread available to clean 
the queue.

So when the exception occurs, what user will see is the streaming application 
keeps running but sits there doing nothing (We are using some supervisor to 
restart the failed app. But if the app keeps running and doing nothing, we have 
to have some curator to monitor the data flow and kill the app in such 
situation). Can ReliableKafkaReceiver handle TimeoutException? For example 
retry pushing block when a TimeoutException is thrown? Or restart itself? Or at 
least fail the streaming application?


was (Author: superxma):
Current situation: 
1. The TimeoutException is thrown by ReliableKafkaReceiver, receiver logs the 
error and do nothing else;
2. Block pushing thread in BlockGenerator is terminated due to the exception;
3. ReliableKafkaReceiver is ACTIVE but rogues afterward because 
blocksForPushing queue is full and no block pushing thread available to clean 
the queue.

So when the exception occurs, what user will see is the streaming application 
keeps running but sits there doing nothing (We are using some supervisor to 
restart the failed app. But if the app keeps running and doing nothing, we have 
to have some curator to monitoring the data flow and kill the app). Can 
ReliableKafkaReceiver handle TimeoutException? For example retry pushing block 
when a TimeoutException is thrown? Or restart itself? Or at least fail the 
streaming application?

> keepPushingBlocks in BlockGenerator terminated when an exception occurs, 
> which causes the block pushing thread to terminate and blocks receiver  
> -------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5220
>                 URL: https://issues.apache.org/jira/browse/SPARK-5220
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Max Xu
>
> I am running a Spark streaming application with ReliableKafkaReceiver. It 
> uses BlockGenerator to push blocks to BlockManager. However, writing WALs to 
> HDFS may time out that causes keepPushingBlocks in BlockGenerator to 
> terminate.
> 15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>         at 
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>         at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>         at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>         at scala.concurrent.Await$.result(package.scala:107)
>         at 
> org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176)
>         at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160)
>         at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126)
>         at 
> org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)
>         at 
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207)
>         at 
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86)
> Then the block pushing thread is done and no subsequent blocks can be pushed 
> into blockManager. In turn this blocks receiver from receiving new data.
> So when running my app and the TimeoutException happens, the 
> ReliableKafkaReceiver stays in ACTIVE status but doesn't do anything at all. 
> The application rogues.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to