[ 
https://issues.apache.org/jira/browse/SPARK-30675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mullaivendhan Ariaputhri updated SPARK-30675:
---------------------------------------------
    Description: 
 

*+Stream+*

We have observed discrepancy in  kinesis stream, whereas stream has continuous 
incoming records but GetRecords.Records is not available.

 

Upon analysis, we have understood that there were no GetRecords calls made by 
Spark Job during the time due to which the GetRecords count is not available, 
hence there should not be any issues with streams as the messages were being 
received.

*+Spark/EMR+*

>From the driver logs, it has been found that the driver de-registered the 
>receiver for the stream

+*_Driver Logs_*+

2020-01-03 11:11:40 ERROR ReceiverTracker:70 - *{color:#de350b}Deregistered 
receiver for stream 0: Error while storing block into Spark - 
java.util.concurrent.TimeoutException: Futures timed out after [30 
seconds]{color}*

        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

        at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)

        at 
org.apache.spark.streaming.receiver.{color:#de350b}*WriteAheadLogBasedBlockHandler.storeBlock*{color}(ReceivedBlockHandler.scala:210)

        at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)

        at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)

        at 
org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)

        at 
org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:293)

        at 
org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:344)

        at 
org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)

        at 
org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)

        at 
org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)

        ...

*Till this point, there is no receiver being started/registered. From the 
executor logs (below), it has been observed that one of the executors was 
running on the container.*

 

+*_Executor Logs_*+

2020-01-03 11:11:30 INFO  BlockManager:54 - Removing RDD 2851002

2020-01-03 11:11:31 INFO  ReceiverSupervisorImpl:54 - 
{color:#de350b}*S**topping receiver with message: Error while storing block 
into Spark: java.util.concurrent.TimeoutException: Futures timed out after [30 
seconds]*{color}

2020-01-03 11:11:31 INFO  Worker:593 - Worker shutdown requested.

2020-01-03 11:11:31 INFO  LeaseCoordinator:298 - Worker 
ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
 has successfully stopped lease-tracking threads

2020-01-03 11:11:31 INFO  KinesisRecordProcessor:54 - Shutdown:  Shutting down 
workerId 
ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
 with reason ZOMBIE

2020-01-03 11:11:32 INFO  MemoryStore:54 - Block input-0-1575374565339 stored 
as bytes in memory (estimated size /7.3 KB, free 3.4 GB)

2020-01-03 11:11:33 INFO  Worker:634 - All record processors have been shut 
down successfully.

 

*After this point, the Kinesis KCL worker seemed to be terminated which was 
reading the Queue, due to which we could see the gap in the GetRecords.*  

 

+*Mitigation*+

Increased the timeout
 * 'spark.streaming.receiver.blockStoreTimeout’ to 59 seconds (from default - 
30 seconds) 
 * 'spark.streaming.driver.writeAheadLog.batchingTimeout’ to 30 seconds (from 
default - 5seconds)

  was:
*+Spark/EMR+*

>From the driver logs, it has been found that the driver de-registered the 
>receiver for the stream

 

 

*# Driver Logs*

2020-01-03 11:11:40 ERROR ReceiverTracker:70 - *{color:#de350b}Deregistered 
receiver for stream 0: Error while storing block into Spark - 
java.util.concurrent.TimeoutException: Futures timed out after [30 
seconds]{color}*

        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

        at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)

        at 
org.apache.spark.streaming.receiver.{color:#de350b}*WriteAheadLogBasedBlockHandler.storeBlock*{color}(ReceivedBlockHandler.scala:210)

        at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)

        at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)

        at 
org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)

        at 
org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:293)

        at 
org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:344)

        at 
org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)

        at 
org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)

        at 
org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)

        ...

*Till this point, there is no receiver being started/registered. From the 
executor logs (below), it has been observed that one of the executors was 
running on the container.*

 

*# Executer Logs* 

2020-01-03 11:11:30 INFO  BlockManager:54 - Removing RDD 2851002

2020-01-03 11:11:31 INFO  ReceiverSupervisorImpl:54 - 
{color:#de350b}*S**topping receiver with message: Error while storing block 
into Spark: java.util.concurrent.TimeoutException: Futures timed out after [30 
seconds]*{color}

2020-01-03 11:11:31 INFO  Worker:593 - Worker shutdown requested.

2020-01-03 11:11:31 INFO  LeaseCoordinator:298 - Worker 
ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
 has successfully stopped lease-tracking threads

2020-01-03 11:11:31 INFO  KinesisRecordProcessor:54 - Shutdown:  Shutting down 
workerId 
ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
 with reason ZOMBIE

2020-01-03 11:11:32 INFO  MemoryStore:54 - Block input-0-1575374565339 stored 
as bytes in memory (estimated size /7.3 KB, free 3.4 GB)

2020-01-03 11:11:33 INFO  Worker:634 - All record processors have been shut 
down successfully.


> Spark Streaming Job stopped reading events from Queue upon Deregister 
> Exception
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-30675
>                 URL: https://issues.apache.org/jira/browse/SPARK-30675
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, DStreams
>    Affects Versions: 2.4.3
>            Reporter: Mullaivendhan Ariaputhri
>            Priority: Major
>
>  
> *+Stream+*
> We have observed discrepancy in  kinesis stream, whereas stream has 
> continuous incoming records but GetRecords.Records is not available.
>  
> Upon analysis, we have understood that there were no GetRecords calls made by 
> Spark Job during the time due to which the GetRecords count is not available, 
> hence there should not be any issues with streams as the messages were being 
> received.
> *+Spark/EMR+*
> From the driver logs, it has been found that the driver de-registered the 
> receiver for the stream
> +*_Driver Logs_*+
> 2020-01-03 11:11:40 ERROR ReceiverTracker:70 - *{color:#de350b}Deregistered 
> receiver for stream 0: Error while storing block into Spark - 
> java.util.concurrent.TimeoutException: Futures timed out after [30 
> seconds]{color}*
>         at 
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>         at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
>         at 
> org.apache.spark.streaming.receiver.{color:#de350b}*WriteAheadLogBasedBlockHandler.storeBlock*{color}(ReceivedBlockHandler.scala:210)
>         at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
>         at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
>         at 
> org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)
>         at 
> org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:293)
>         at 
> org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:344)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)
>         ...
> *Till this point, there is no receiver being started/registered. From the 
> executor logs (below), it has been observed that one of the executors was 
> running on the container.*
>  
> +*_Executor Logs_*+
> 2020-01-03 11:11:30 INFO  BlockManager:54 - Removing RDD 2851002
> 2020-01-03 11:11:31 INFO  ReceiverSupervisorImpl:54 - 
> {color:#de350b}*S**topping receiver with message: Error while storing block 
> into Spark: java.util.concurrent.TimeoutException: Futures timed out after 
> [30 seconds]*{color}
> 2020-01-03 11:11:31 INFO  Worker:593 - Worker shutdown requested.
> 2020-01-03 11:11:31 INFO  LeaseCoordinator:298 - Worker 
> ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
>  has successfully stopped lease-tracking threads
> 2020-01-03 11:11:31 INFO  KinesisRecordProcessor:54 - Shutdown:  Shutting 
> down workerId 
> ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
>  with reason ZOMBIE
> 2020-01-03 11:11:32 INFO  MemoryStore:54 - Block input-0-1575374565339 stored 
> as bytes in memory (estimated size /7.3 KB, free 3.4 GB)
> 2020-01-03 11:11:33 INFO  Worker:634 - All record processors have been shut 
> down successfully.
>  
> *After this point, the Kinesis KCL worker seemed to be terminated which was 
> reading the Queue, due to which we could see the gap in the GetRecords.*  
>  
> +*Mitigation*+
> Increased the timeout
>  * 'spark.streaming.receiver.blockStoreTimeout’ to 59 seconds (from default - 
> 30 seconds) 
>  * 'spark.streaming.driver.writeAheadLog.batchingTimeout’ to 30 seconds (from 
> default - 5seconds)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to