[
https://issues.apache.org/jira/browse/SPARK-30675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mullaivendhan Ariaputhri updated SPARK-30675:
---------------------------------------------
Attachment: Instance-Config-P2.JPG
Instance-Config-P1.JPG
Cluster-Config-P2.JPG
Cluster-Config-P1.JPG
> Spark Streaming Job stopped reading events from Queue upon Deregister
> Exception
> -------------------------------------------------------------------------------
>
> Key: SPARK-30675
> URL: https://issues.apache.org/jira/browse/SPARK-30675
> Project: Spark
> Issue Type: Bug
> Components: Block Manager, DStreams
> Affects Versions: 2.4.3
> Reporter: Mullaivendhan Ariaputhri
> Priority: Major
> Attachments: Cluster-Config-P1.JPG, Cluster-Config-P2.JPG,
> Instance-Config-P1.JPG, Instance-Config-P2.JPG
>
>
>
> *+Stream+*
> We have observed discrepancy in kinesis stream, whereas stream has
> continuous incoming records but GetRecords.Records is not available.
>
> Upon analysis, we have understood that there were no GetRecords calls made by
> Spark Job during the time due to which the GetRecords count is not available,
> hence there should not be any issues with streams as the messages were being
> received.
> *+Spark/EMR+*
> From the driver logs, it has been found that the driver de-registered the
> receiver for the stream
> +*_Driver Logs_*+
> 2020-01-03 11:11:40 ERROR ReceiverTracker:70 - *{color:#de350b}Deregistered
> receiver for stream 0: Error while storing block into Spark -
> java.util.concurrent.TimeoutException: Futures timed out after [30
> seconds]{color}*
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
> at
> org.apache.spark.streaming.receiver.{color:#de350b}*WriteAheadLogBasedBlockHandler.storeBlock*{color}(ReceivedBlockHandler.scala:210)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
> at
> org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)
> at
> org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:293)
> at
> org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:344)
> at
> org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
> at
> org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
> at
> org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)
> ...
> *Till this point, there is no receiver being started/registered. From the
> executor logs (below), it has been observed that one of the executors was
> running on the container.*
>
> +*_Executor Logs_*+
> 2020-01-03 11:11:30 INFO BlockManager:54 - Removing RDD 2851002
> 2020-01-03 11:11:31 INFO ReceiverSupervisorImpl:54 -
> {color:#de350b}*S**topping receiver with message: Error while storing block
> into Spark: java.util.concurrent.TimeoutException: Futures timed out after
> [30 seconds]*{color}
> 2020-01-03 11:11:31 INFO Worker:593 - Worker shutdown requested.
> 2020-01-03 11:11:31 INFO LeaseCoordinator:298 - Worker
> ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
> has successfully stopped lease-tracking threads
> 2020-01-03 11:11:31 INFO KinesisRecordProcessor:54 - Shutdown: Shutting
> down workerId
> ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
> with reason ZOMBIE
> 2020-01-03 11:11:32 INFO MemoryStore:54 - Block input-0-1575374565339 stored
> as bytes in memory (estimated size /7.3 KB, free 3.4 GB)
> 2020-01-03 11:11:33 INFO Worker:634 - All record processors have been shut
> down successfully.
>
> *After this point, the Kinesis KCL worker seemed to be terminated which was
> reading the Queue, due to which we could see the gap in the GetRecords.*
>
> +*Mitigation*+
> Increased the timeout
> * 'spark.streaming.receiver.blockStoreTimeout’ to 59 seconds (from default -
> 30 seconds)
> * 'spark.streaming.driver.writeAheadLog.batchingTimeout’ to 30 seconds (from
> default - 5seconds)
>
> Note :
> 1. Writeahead logs and Checkpoint is being maitained in AWS S3 bucket
> 2. Spark submit Configuration as below:
> spark-submit --deploy-mode cluster --executor-memory 4608M --driver-memory
> 4608M
> --conf spark.yarn.driver.memoryOverhead=710M
> --conf spark.yarn.executor.memoryOverhead=710M --driver-cores 3
> --executor-cores 3
> --conf spark.dynamicAllocation.minExecutors=1
> --conf spark.dynamicAllocation.maxExecutors=2
> --conf spark.dynamicAllocation.initialExecutors=2
> --conf spark.locality.wait.node=0
> --conf spark.dynamicAllocation.enabled=true
> --conf maximizeResourceAllocation=false --class XXXXXXXXXXXX
> --conf spark.streaming.driver.writeAheadLog.closeFileAfterWrite=true
> --conf spark.scheduler.mode=FAIR
> --conf spark.metrics.conf=XXXXXXXXXXXX.properties
> --files=s3://XXXXXXXXXXXX/XXXXXXXXXXXX.properties
> --conf spark.streaming.receiver.writeAheadLog.closeFileAfterWrite=true
> --conf spark.streaming.receiver.writeAheadLog.enable=true
> --conf spark.streaming.receiver.blockStoreTimeout=59
> --conf spark.streaming.driver.writeAheadLog.batchingTimeout=30000
> --conf spark.streaming.receiver.maxRate=120
> s3://XXXXXXXXXXXX/XXXXXXXXXXXX.jar yarn XXXXXXXXXXXX
> applicationContext-XXXXXXXXXXXX-streaming.xml root kinesis 60 &
> 3. EMR Version - 5.26
> 4. Hadoop Distribution - Amazon 2.8.5
> 5. Hardware Config
> * Master (3 instances - Multi Master Cluster)
> c5.2xlarge
> 8 vCore, 16 GiB memory, EBS only storage
> EBS Storage:64 GiB
> * Core (6 instances [Min - 2, Max - 6])
> c5.4xlarge
> 16 vCore, 32 GiB memory, EBS only storage
> EBS Storage:1000 GiB
> 6. There are 3 spark jobs running on the same cluster
> 7. Streaming - Kinesis
> 8. Cluster Config and Instance Config is attached
>
> Please let us know if any additional information is required.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]