[ 
https://issues.apache.org/jira/browse/SPARK-30675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mullaivendhan Ariaputhri updated SPARK-30675:
---------------------------------------------
    Attachment: Instance-Config-P2.JPG
                Instance-Config-P1.JPG
                Cluster-Config-P2.JPG
                Cluster-Config-P1.JPG

> Spark Streaming Job stopped reading events from Queue upon Deregister 
> Exception
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-30675
>                 URL: https://issues.apache.org/jira/browse/SPARK-30675
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, DStreams
>    Affects Versions: 2.4.3
>            Reporter: Mullaivendhan Ariaputhri
>            Priority: Major
>         Attachments: Cluster-Config-P1.JPG, Cluster-Config-P2.JPG, 
> Instance-Config-P1.JPG, Instance-Config-P2.JPG
>
>
>  
> *+Stream+*
> We have observed discrepancy in  kinesis stream, whereas stream has 
> continuous incoming records but GetRecords.Records is not available.
>  
> Upon analysis, we have understood that there were no GetRecords calls made by 
> Spark Job during the time due to which the GetRecords count is not available, 
> hence there should not be any issues with streams as the messages were being 
> received.
> *+Spark/EMR+*
> From the driver logs, it has been found that the driver de-registered the 
> receiver for the stream
> +*_Driver Logs_*+
> 2020-01-03 11:11:40 ERROR ReceiverTracker:70 - *{color:#de350b}Deregistered 
> receiver for stream 0: Error while storing block into Spark - 
> java.util.concurrent.TimeoutException: Futures timed out after [30 
> seconds]{color}*
>         at 
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>         at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
>         at 
> org.apache.spark.streaming.receiver.{color:#de350b}*WriteAheadLogBasedBlockHandler.storeBlock*{color}(ReceivedBlockHandler.scala:210)
>         at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
>         at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
>         at 
> org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)
>         at 
> org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:293)
>         at 
> org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:344)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)
>         ...
> *Till this point, there is no receiver being started/registered. From the 
> executor logs (below), it has been observed that one of the executors was 
> running on the container.*
>  
> +*_Executor Logs_*+
> 2020-01-03 11:11:30 INFO  BlockManager:54 - Removing RDD 2851002
> 2020-01-03 11:11:31 INFO  ReceiverSupervisorImpl:54 - 
> {color:#de350b}*S**topping receiver with message: Error while storing block 
> into Spark: java.util.concurrent.TimeoutException: Futures timed out after 
> [30 seconds]*{color}
> 2020-01-03 11:11:31 INFO  Worker:593 - Worker shutdown requested.
> 2020-01-03 11:11:31 INFO  LeaseCoordinator:298 - Worker 
> ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
>  has successfully stopped lease-tracking threads
> 2020-01-03 11:11:31 INFO  KinesisRecordProcessor:54 - Shutdown:  Shutting 
> down workerId 
> ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
>  with reason ZOMBIE
> 2020-01-03 11:11:32 INFO  MemoryStore:54 - Block input-0-1575374565339 stored 
> as bytes in memory (estimated size /7.3 KB, free 3.4 GB)
> 2020-01-03 11:11:33 INFO  Worker:634 - All record processors have been shut 
> down successfully.
>  
> *After this point, the Kinesis KCL worker seemed to be terminated which was 
> reading the Queue, due to which we could see the gap in the GetRecords.*  
>  
> +*Mitigation*+
> Increased the timeout
>  * 'spark.streaming.receiver.blockStoreTimeout’ to 59 seconds (from default - 
> 30 seconds) 
>  * 'spark.streaming.driver.writeAheadLog.batchingTimeout’ to 30 seconds (from 
> default - 5seconds)
>  
> Note : 
>  1. Writeahead logs and Checkpoint is being maitained in AWS S3 bucket
> 2. Spark submit Configuration as below:
> spark-submit --deploy-mode cluster --executor-memory 4608M --driver-memory 
> 4608M 
>  --conf spark.yarn.driver.memoryOverhead=710M 
>  --conf spark.yarn.executor.memoryOverhead=710M --driver-cores 3 
> --executor-cores 3 
>  --conf spark.dynamicAllocation.minExecutors=1 
>  --conf spark.dynamicAllocation.maxExecutors=2 
>  --conf spark.dynamicAllocation.initialExecutors=2 
>  --conf spark.locality.wait.node=0 
>  --conf spark.dynamicAllocation.enabled=true 
>  --conf maximizeResourceAllocation=false --class XXXXXXXXXXXX 
>  --conf spark.streaming.driver.writeAheadLog.closeFileAfterWrite=true 
>  --conf spark.scheduler.mode=FAIR 
>  --conf spark.metrics.conf=XXXXXXXXXXXX.properties 
> --files=s3://XXXXXXXXXXXX/XXXXXXXXXXXX.properties 
>  --conf spark.streaming.receiver.writeAheadLog.closeFileAfterWrite=true 
>  --conf spark.streaming.receiver.writeAheadLog.enable=true 
>  --conf spark.streaming.receiver.blockStoreTimeout=59 
>  --conf spark.streaming.driver.writeAheadLog.batchingTimeout=30000 
>  --conf spark.streaming.receiver.maxRate=120 
> s3://XXXXXXXXXXXX/XXXXXXXXXXXX.jar yarn XXXXXXXXXXXX 
> applicationContext-XXXXXXXXXXXX-streaming.xml root kinesis 60 &
> 3. EMR Version - 5.26
> 4. Hadoop Distribution - Amazon 2.8.5
> 5. Hardware Config
>  * Master (3 instances - Multi Master Cluster)
>  c5.2xlarge
>  8 vCore, 16 GiB memory, EBS only storage
>  EBS Storage:64 GiB
>  * Core (6 instances [Min - 2, Max - 6])
>  c5.4xlarge
>  16 vCore, 32 GiB memory, EBS only storage
>  EBS Storage:1000 GiB
> 6. There are 3 spark jobs running on the same cluster
> 7. Streaming - Kinesis
> 8. Cluster Config and Instance Config is attached
>  
> Please let us know if any additional information is required.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to