[
https://issues.apache.org/jira/browse/SPARK-30675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mullaivendhan Ariaputhri updated SPARK-30675:
---------------------------------------------
Description:
*+Stream+*
We have observed discrepancy in kinesis stream, whereas stream has continuous
incoming records but GetRecords.Records is not available.
Upon analysis, we have understood that there were no GetRecords calls made by
Spark Job during the time due to which the GetRecords count is not available,
hence there should not be any issues with streams as the messages were being
received.
*+Spark/EMR+*
>From the driver logs, it has been found that the driver de-registered the
>receiver for the stream
+*_Driver Logs_*+
2020-01-03 11:11:40 ERROR ReceiverTracker:70 - *{color:#de350b}Deregistered
receiver for stream 0: Error while storing block into Spark -
java.util.concurrent.TimeoutException: Futures timed out after [30
seconds]{color}*
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at
org.apache.spark.streaming.receiver.{color:#de350b}*WriteAheadLogBasedBlockHandler.storeBlock*{color}(ReceivedBlockHandler.scala:210)
at
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
at
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
at
org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)
at
org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:293)
at
org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:344)
at
org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
at
org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
at
org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)
...
*Till this point, there is no receiver being started/registered. From the
executor logs (below), it has been observed that one of the executors was
running on the container.*
+*_Executor Logs_*+
2020-01-03 11:11:30 INFO BlockManager:54 - Removing RDD 2851002
2020-01-03 11:11:31 INFO ReceiverSupervisorImpl:54 -
{color:#de350b}*S**topping receiver with message: Error while storing block
into Spark: java.util.concurrent.TimeoutException: Futures timed out after [30
seconds]*{color}
2020-01-03 11:11:31 INFO Worker:593 - Worker shutdown requested.
2020-01-03 11:11:31 INFO LeaseCoordinator:298 - Worker
ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
has successfully stopped lease-tracking threads
2020-01-03 11:11:31 INFO KinesisRecordProcessor:54 - Shutdown: Shutting down
workerId
ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
with reason ZOMBIE
2020-01-03 11:11:32 INFO MemoryStore:54 - Block input-0-1575374565339 stored
as bytes in memory (estimated size /7.3 KB, free 3.4 GB)
2020-01-03 11:11:33 INFO Worker:634 - All record processors have been shut
down successfully.
*After this point, the Kinesis KCL worker seemed to be terminated which was
reading the Queue, due to which we could see the gap in the GetRecords.*
+*Mitigation*+
Increased the timeout
* 'spark.streaming.receiver.blockStoreTimeout’ to 59 seconds (from default -
30 seconds)
* 'spark.streaming.driver.writeAheadLog.batchingTimeout’ to 30 seconds (from
default - 5seconds)
Note :
1. Writeahead logs and Checkpoint is being maitained in AWS S3 bucket
2. Spark submit Configuration as below:
spark-submit --deploy-mode cluster --executor-memory 4608M --driver-memory
4608M
--conf spark.yarn.driver.memoryOverhead=710M
--conf spark.yarn.executor.memoryOverhead=710M --driver-cores 3
--executor-cores 3
--conf spark.dynamicAllocation.minExecutors=1
--conf spark.dynamicAllocation.maxExecutors=2
--conf spark.dynamicAllocation.initialExecutors=2
--conf spark.locality.wait.node=0
--conf spark.dynamicAllocation.enabled=true
--conf maximizeResourceAllocation=false --class XXXXXXXXXXXX
--conf spark.streaming.driver.writeAheadLog.closeFileAfterWrite=true
--conf spark.scheduler.mode=FAIR
--conf spark.metrics.conf=XXXXXXXXXXXX.properties
--files=s3://XXXXXXXXXXXX/XXXXXXXXXXXX.properties
--conf spark.streaming.receiver.writeAheadLog.closeFileAfterWrite=true
--conf spark.streaming.receiver.writeAheadLog.enable=true
--conf spark.streaming.receiver.blockStoreTimeout=59
--conf spark.streaming.driver.writeAheadLog.batchingTimeout=30000
--conf spark.streaming.receiver.maxRate=120 s3://XXXXXXXXXXXX/XXXXXXXXXXXX.jar
yarn XXXXXXXXXXXX applicationContext-XXXXXXXXXXXX-streaming.xml root kinesis 60
&
3. EMR Version - 5.26
4. Hadoop Distribution - Amazon 2.8.5
5. Hardware Config
* Master (3 instances - Multi Master Cluster)
c5.2xlarge
8 vCore, 16 GiB memory, EBS only storage
EBS Storage:64 GiB
* Core (6 instances [Min - 2, Max - 6])
c5.4xlarge
16 vCore, 32 GiB memory, EBS only storage
EBS Storage:1000 GiB
was:
*+Stream+*
We have observed discrepancy in kinesis stream, whereas stream has continuous
incoming records but GetRecords.Records is not available.
Upon analysis, we have understood that there were no GetRecords calls made by
Spark Job during the time due to which the GetRecords count is not available,
hence there should not be any issues with streams as the messages were being
received.
*+Spark/EMR+*
>From the driver logs, it has been found that the driver de-registered the
>receiver for the stream
+*_Driver Logs_*+
2020-01-03 11:11:40 ERROR ReceiverTracker:70 - *{color:#de350b}Deregistered
receiver for stream 0: Error while storing block into Spark -
java.util.concurrent.TimeoutException: Futures timed out after [30
seconds]{color}*
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at
org.apache.spark.streaming.receiver.{color:#de350b}*WriteAheadLogBasedBlockHandler.storeBlock*{color}(ReceivedBlockHandler.scala:210)
at
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
at
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
at
org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)
at
org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:293)
at
org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:344)
at
org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
at
org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
at
org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)
...
*Till this point, there is no receiver being started/registered. From the
executor logs (below), it has been observed that one of the executors was
running on the container.*
+*_Executor Logs_*+
2020-01-03 11:11:30 INFO BlockManager:54 - Removing RDD 2851002
2020-01-03 11:11:31 INFO ReceiverSupervisorImpl:54 -
{color:#de350b}*S**topping receiver with message: Error while storing block
into Spark: java.util.concurrent.TimeoutException: Futures timed out after [30
seconds]*{color}
2020-01-03 11:11:31 INFO Worker:593 - Worker shutdown requested.
2020-01-03 11:11:31 INFO LeaseCoordinator:298 - Worker
ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
has successfully stopped lease-tracking threads
2020-01-03 11:11:31 INFO KinesisRecordProcessor:54 - Shutdown: Shutting down
workerId
ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
with reason ZOMBIE
2020-01-03 11:11:32 INFO MemoryStore:54 - Block input-0-1575374565339 stored
as bytes in memory (estimated size /7.3 KB, free 3.4 GB)
2020-01-03 11:11:33 INFO Worker:634 - All record processors have been shut
down successfully.
*After this point, the Kinesis KCL worker seemed to be terminated which was
reading the Queue, due to which we could see the gap in the GetRecords.*
+*Mitigation*+
Increased the timeout
* 'spark.streaming.receiver.blockStoreTimeout’ to 59 seconds (from default -
30 seconds)
* 'spark.streaming.driver.writeAheadLog.batchingTimeout’ to 30 seconds (from
default - 5seconds)
Note :
1. Writeahead logs and Checkpoint is being maitained in AWS S3 bucket
2. Spark submit Configuration as below:
spark-submit --deploy-mode cluster --executor-memory 4608M --driver-memory
4608M
--conf spark.yarn.driver.memoryOverhead=710M
--conf spark.yarn.executor.memoryOverhead=710M --driver-cores 3
--executor-cores 3
--conf spark.dynamicAllocation.minExecutors=1
--conf spark.dynamicAllocation.maxExecutors=2
--conf spark.dynamicAllocation.initialExecutors=2
--conf spark.locality.wait.node=0
--conf spark.dynamicAllocation.enabled=true
--conf maximizeResourceAllocation=false --class XXXXXXXXXXXX
--conf spark.streaming.driver.writeAheadLog.closeFileAfterWrite=true
--conf spark.scheduler.mode=FAIR
--conf spark.metrics.conf=XXXXXXXXXXXX.properties
--files=s3://XXXXXXXXXXXX/XXXXXXXXXXXX.properties
--conf spark.streaming.receiver.writeAheadLog.closeFileAfterWrite=true
--conf spark.streaming.receiver.writeAheadLog.enable=true
--conf spark.streaming.receiver.blockStoreTimeout=59
--conf spark.streaming.driver.writeAheadLog.batchingTimeout=30000
--conf spark.streaming.receiver.maxRate=120 s3://XXXXXXXXXXXX/XXXXXXXXXXXX.jar
yarn XXXXXXXXXXXX applicationContext-XXXXXXXXXXXX-streaming.xml root kinesis 60
&
3. EMR Version - 5.26
4. Hadoop Distribution - Amazon 2.8.5
5. Hardware Config
* Master (3 instances - Multi Master Cluster)
c5.2xlarge
8 vCore, 16 GiB memory, EBS only storage
EBS Storage:64 GiB
* Core (6 instances)
c5.4xlarge
16 vCore, 32 GiB memory, EBS only storage
EBS Storage:1000 GiB
> Spark Streaming Job stopped reading events from Queue upon Deregister
> Exception
> -------------------------------------------------------------------------------
>
> Key: SPARK-30675
> URL: https://issues.apache.org/jira/browse/SPARK-30675
> Project: Spark
> Issue Type: Bug
> Components: Block Manager, DStreams
> Affects Versions: 2.4.3
> Reporter: Mullaivendhan Ariaputhri
> Priority: Major
>
>
> *+Stream+*
> We have observed discrepancy in kinesis stream, whereas stream has
> continuous incoming records but GetRecords.Records is not available.
>
> Upon analysis, we have understood that there were no GetRecords calls made by
> Spark Job during the time due to which the GetRecords count is not available,
> hence there should not be any issues with streams as the messages were being
> received.
> *+Spark/EMR+*
> From the driver logs, it has been found that the driver de-registered the
> receiver for the stream
> +*_Driver Logs_*+
> 2020-01-03 11:11:40 ERROR ReceiverTracker:70 - *{color:#de350b}Deregistered
> receiver for stream 0: Error while storing block into Spark -
> java.util.concurrent.TimeoutException: Futures timed out after [30
> seconds]{color}*
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
> at
> org.apache.spark.streaming.receiver.{color:#de350b}*WriteAheadLogBasedBlockHandler.storeBlock*{color}(ReceivedBlockHandler.scala:210)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
> at
> org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)
> at
> org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:293)
> at
> org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:344)
> at
> org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
> at
> org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
> at
> org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)
> ...
> *Till this point, there is no receiver being started/registered. From the
> executor logs (below), it has been observed that one of the executors was
> running on the container.*
>
> +*_Executor Logs_*+
> 2020-01-03 11:11:30 INFO BlockManager:54 - Removing RDD 2851002
> 2020-01-03 11:11:31 INFO ReceiverSupervisorImpl:54 -
> {color:#de350b}*S**topping receiver with message: Error while storing block
> into Spark: java.util.concurrent.TimeoutException: Futures timed out after
> [30 seconds]*{color}
> 2020-01-03 11:11:31 INFO Worker:593 - Worker shutdown requested.
> 2020-01-03 11:11:31 INFO LeaseCoordinator:298 - Worker
> ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
> has successfully stopped lease-tracking threads
> 2020-01-03 11:11:31 INFO KinesisRecordProcessor:54 - Shutdown: Shutting
> down workerId
> ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2
> with reason ZOMBIE
> 2020-01-03 11:11:32 INFO MemoryStore:54 - Block input-0-1575374565339 stored
> as bytes in memory (estimated size /7.3 KB, free 3.4 GB)
> 2020-01-03 11:11:33 INFO Worker:634 - All record processors have been shut
> down successfully.
>
> *After this point, the Kinesis KCL worker seemed to be terminated which was
> reading the Queue, due to which we could see the gap in the GetRecords.*
>
> +*Mitigation*+
> Increased the timeout
> * 'spark.streaming.receiver.blockStoreTimeout’ to 59 seconds (from default -
> 30 seconds)
> * 'spark.streaming.driver.writeAheadLog.batchingTimeout’ to 30 seconds (from
> default - 5seconds)
>
> Note :
> 1. Writeahead logs and Checkpoint is being maitained in AWS S3 bucket
> 2. Spark submit Configuration as below:
> spark-submit --deploy-mode cluster --executor-memory 4608M --driver-memory
> 4608M
> --conf spark.yarn.driver.memoryOverhead=710M
> --conf spark.yarn.executor.memoryOverhead=710M --driver-cores 3
> --executor-cores 3
> --conf spark.dynamicAllocation.minExecutors=1
> --conf spark.dynamicAllocation.maxExecutors=2
> --conf spark.dynamicAllocation.initialExecutors=2
> --conf spark.locality.wait.node=0
> --conf spark.dynamicAllocation.enabled=true
> --conf maximizeResourceAllocation=false --class XXXXXXXXXXXX
> --conf spark.streaming.driver.writeAheadLog.closeFileAfterWrite=true
> --conf spark.scheduler.mode=FAIR
> --conf spark.metrics.conf=XXXXXXXXXXXX.properties
> --files=s3://XXXXXXXXXXXX/XXXXXXXXXXXX.properties
> --conf spark.streaming.receiver.writeAheadLog.closeFileAfterWrite=true
> --conf spark.streaming.receiver.writeAheadLog.enable=true
> --conf spark.streaming.receiver.blockStoreTimeout=59
> --conf spark.streaming.driver.writeAheadLog.batchingTimeout=30000
> --conf spark.streaming.receiver.maxRate=120
> s3://XXXXXXXXXXXX/XXXXXXXXXXXX.jar yarn XXXXXXXXXXXX
> applicationContext-XXXXXXXXXXXX-streaming.xml root kinesis 60 &
> 3. EMR Version - 5.26
> 4. Hadoop Distribution - Amazon 2.8.5
> 5. Hardware Config
> * Master (3 instances - Multi Master Cluster)
> c5.2xlarge
> 8 vCore, 16 GiB memory, EBS only storage
> EBS Storage:64 GiB
> * Core (6 instances [Min - 2, Max - 6])
> c5.4xlarge
> 16 vCore, 32 GiB memory, EBS only storage
> EBS Storage:1000 GiB
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]