[
https://issues.apache.org/jira/browse/SPARK-30677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mullaivendhan Ariaputhri updated SPARK-30677:
---------------------------------------------
Attachment: Cluster-Config-P1.JPG
Instance-Config-P2.JPG
Instance-Config-P1.JPG
Cluster-Config-P2.JPG
> Spark Streaming Job stuck when Kinesis Shard is increased when the job is
> running
> ---------------------------------------------------------------------------------
>
> Key: SPARK-30677
> URL: https://issues.apache.org/jira/browse/SPARK-30677
> Project: Spark
> Issue Type: Bug
> Components: Block Manager, Structured Streaming
> Affects Versions: 2.4.3
> Reporter: Mullaivendhan Ariaputhri
> Priority: Major
> Attachments: Cluster-Config-P1.JPG, Cluster-Config-P2.JPG,
> Instance-Config-P1.JPG, Instance-Config-P2.JPG
>
>
> Spark job stopped processing when the number of shards is increased when the
> job is already running.
> We have observed the below exceptions.
>
> 2020-01-27 06:42:29 WARN FileBasedWriteAheadLog_ReceivedBlockTracker:66 -
> Failed to write to write ahead log
> 2020-01-27 06:42:29 WARN FileBasedWriteAheadLog_ReceivedBlockTracker:66 -
> Failed to write to write ahead log
> 2020-01-27 06:42:29 ERROR FileBasedWriteAheadLog_ReceivedBlockTracker:70 -
> Failed to write to write ahead log after 3 failures
> 2020-01-27 06:42:29 WARN BatchedWriteAheadLog:87 - BatchedWriteAheadLog
> Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0
> lim=1845 cap=1845],1580107349095,Future(<not completed>)))
> java.io.IOException: Not supported
> at
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.append(S3NativeFileSystem.java:588)
> at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
> at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
> at
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
> at
> org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
> at
> org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
> at java.lang.Thread.run(Thread.java:748)
> 2020-01-27 06:42:29 WARN ReceivedBlockTracker:87 - Exception thrown while
> writing record:
> BlockAdditionEvent(ReceivedBlockInfo(0,Some(36),Some(SequenceNumberRanges(SequenceNumberRange(XXXXXXXXXXX,shardId-000000000006,49603657998853972269624727295162770770442241924489281634,49603657998853972269624727295206292099948368574778703970,36))),WriteAheadLogBasedStoreResult(input-0-1580106915391,Some(36),FileBasedWriteAheadLogSegment(s3://XXXXXXXXXXX/spark/checkpoint/XX/XXXXXXXXXXX/receivedData/0/log-1580107349000-1580107409000,0,31769))))
> to the WriteAheadLog.
> org.apache.spark.SparkException: Exception thrown in awaitResult:
> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
> at
> org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
> at
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
> at
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
> at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Not supported
> at
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.append(S3NativeFileSystem.java:588)
> at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
> at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
> at
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
> at
> org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
> at
> org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
> ... 1 more
> 2020-01-27 06:42:29 INFO BlockManagerInfo:54 - Added input-0-1580106915392
> in memory on XXXXXXXXXXX:38393 (size: 31.0 KB, free: 3.4 GB)
> 2020-01-27 06:42:29 INFO MultipartUploadOutputStream:414 - close
> closed:false
> s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107349323-1580107409323
> 2020-01-27 06:42:29 INFO BlockManagerInfo:54 - Added input-3-1580106915123
> in memory on XXXXXXXXXXX:42027 (size: 25.9 KB, free: 3.4 GB)
> 2020-01-27 06:42:29 INFO MultipartUploadOutputStream:414 - close
> closed:false
> s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107349908-1580107409908
> 2020-01-27 06:42:30 INFO BlockManagerInfo:54 - Added input-2-1580106915311
> in memory on XXXXXXXXXXX:38393 (size: 29.3 KB, free: 3.4 GB)
> 2020-01-27 06:42:30 INFO BlockManagerInfo:54 - Added input-0-1580106915393
> in memory on XXXXXXXXXXX:38393 (size: 31.0 KB, free: 3.4 GB)
> 2020-01-27 06:42:30 INFO MultipartUploadOutputStream:414 - close
> closed:false
> s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107350000-1580107410000
> 2020-01-27 06:42:30 INFO JobScheduler:54 - Added jobs for time 1580107350000
> ms
> 2020-01-27 06:42:30 INFO JobGenerator:54 - Checkpointing graph for time
> 1580107350000 ms
> 2020-01-27 06:42:30 INFO DStreamGraph:54 - Updating checkpoint data for time
> 1580107350000 ms
> 2020-01-27 06:42:30 INFO DStreamGraph:54 - Updated checkpoint data for time
> 1580107350000 ms
> 2020-01-27 06:42:30 INFO CheckpointWriter:54 - Submitted checkpoint of time
> 1580107350000 ms to writer queue
>
> Note :
> 1. Writeahead logs and Checkpoint is being maitained in AWS S3 bucket
> 2. Spark submit Configuration as below:
> spark-submit --deploy-mode cluster --executor-memory 4608M --driver-memory
> 4608M
> --conf spark.yarn.driver.memoryOverhead=710M
> --conf spark.yarn.executor.memoryOverhead=710M --driver-cores 3
> --executor-cores 3
> --conf spark.dynamicAllocation.minExecutors=1
> --conf spark.dynamicAllocation.maxExecutors=2
> --conf spark.dynamicAllocation.initialExecutors=2
> --conf spark.locality.wait.node=0
> --conf spark.dynamicAllocation.enabled=true
> --conf maximizeResourceAllocation=false --class XXXXXXXXXXXX
> --conf spark.streaming.driver.writeAheadLog.closeFileAfterWrite=true
> --conf spark.scheduler.mode=FAIR
> --conf spark.metrics.conf=XXXXXXXXXXXX.properties
> --files=s3://XXXXXXXXXXXX/XXXXXXXXXXXX.properties
> --conf spark.streaming.receiver.writeAheadLog.closeFileAfterWrite=true
> --conf spark.streaming.receiver.writeAheadLog.enable=true
> --conf spark.streaming.receiver.blockStoreTimeout=59
> --conf spark.streaming.driver.writeAheadLog.batchingTimeout=30000
> --conf spark.streaming.receiver.maxRate=120
> s3://XXXXXXXXXXXX/XXXXXXXXXXXX.jar yarn XXXXXXXXXXXX
> applicationContext-XXXXXXXXXXXX-streaming.xml root kinesis 60 &
> 3. EMR Version - 5.26
> 4. Hadoop Distribution - Amazon 2.8.5
> 5. Hardware Config
> * Master (3 instances - Multi Master Cluster)
> c5.2xlarge
> 8 vCore, 16 GiB memory, EBS only storage
> EBS Storage:64 GiB
> * Core (6 instances [Min - 2, Max - 6])
> c5.4xlarge
> 16 vCore, 32 GiB memory, EBS only storage
> EBS Storage:1000 GiB
> 6. There are 3 spark jobs running on the same cluster
> 7. Streaming - Kinesis
> 8. Cluster Config and Instance Config is attached
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]