[
https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16733209#comment-16733209
]
Gabor Somogyi commented on SPARK-26359:
---------------------------------------
[~Tint] did the suggested workaround work? I would close this because this
happened because of S3's read-after-write consistency.
[[email protected]]
{quote}With S3 the time to rename is about 6-10MB/s{quote}
all I can say wow :)
> Spark checkpoint restore fails after query restart
> --------------------------------------------------
>
> Key: SPARK-26359
> URL: https://issues.apache.org/jira/browse/SPARK-26359
> Project: Spark
> Issue Type: Bug
> Components: Spark Submit, Structured Streaming
> Affects Versions: 2.4.0
> Environment: Spark 2.4.0 deployed in standalone-client mode
> Checkpointing is done to S3
> The Spark application in question is responsible for running 4 different
> queries
> Queries are written using Structured Streaming
> We are using the following algorithm for hopes of better performance:
> spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the
> default is 1
> Reporter: Kaspar Tint
> Priority: Major
> Attachments: driver-redacted, metadata, redacted-offsets,
> state-redacted, worker-redacted
>
>
> We had an incident where one of our structured streaming queries could not be
> restarted after an usual S3 checkpointing failure. Now to clarify before
> everything else - we are aware of the issues with S3 and are working towards
> moving to HDFS but this will take time. S3 will cause queries to fail quite
> often during peak hours and we have separate logic to handle this that will
> attempt to restart the failed queries if possible.
> In this particular case we could not restart one of the failed queries. Seems
> like between detecting a failure in the query and starting it up again
> something went really wrong with Spark and state in checkpoint folder got
> corrupted for some reason.
> The issue starts with the usual *FileNotFoundException* that happens with S3
> {code:java}
> 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id =
> c074233a-2563-40fc-8036-b5e38e2e2c42, runId =
> e607eb6e-8431-4269-acab-cc2c1f9f09dd]
> terminated with error
> java.io.FileNotFoundException: No such file or directory:
> s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37
> 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
> at
> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715)
> at
> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
> at
> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
> at
> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965)
> at
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331)
> at
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL
> og.scala:126)
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
> at scala.Option.getOrElse(Option.scala:121)
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> 2018-12-10 21:09:25.791 WARN InsightsSparkAggregates$: Query feedback
> terminated with exception, attempting restart
> {code}
> At the last line we claim that a restart will be attempted for the query
> named *feedback*. We start the query up and encounter this almost immediately
> {code:java}
> 2018-12-10 21:09:26.870 WARN InsightsSparkAggregates$: Query feedback
> currently not running, starting query in own scheduling pool
> 2018-12-10 21:09:51.776 WARN TaskSetManager: Lost task 11.0 in stage 66240.0
> (TID 2782264, ec2-52-87-158-48.compute-1.amazonaws.com, executor 29):
> java.lang.IllegalStateException: Error reading delta file
> s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta of
> HDFSStateStoreProvider[id = (op=2,part=11),dir =
> s3a://some.domain/spark/checkpoints/49/feedback/state/2/11]:
> s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta does
> not exist
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:427)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:384)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383)
> at
> scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:383)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:356)
> at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:535)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:356)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:204)
> at
> org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:371)
> at
> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:88)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: No such file or directory:
> s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
> at
> org.apache.hadoop.fs.DelegateToFileSystem.open(DelegateToFileSystem.java:190)
> at
> org.apache.hadoop.fs.AbstractFileSystem.open(AbstractFileSystem.java:649)
> at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:802)
> at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:798)
> at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
> at org.apache.hadoop.fs.FileContext.open(FileContext.java:804)
> at
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.open(CheckpointFileManager.scala:322)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:424)
> ... 28 more
> {code}
> And this will go on for ever until we bump the checkpoint folder name.
> {code:java}
> 2018-12-10 21:09:57.261 WARN TaskSetManager: Lost task 7.0 in stage 66265.0
> (TID 2783200, ec2-34-236-156-197.compute-1.amazonaws.com, executor 40):
> java.lang.IllegalStateException: Error committing version 49464 into
> HDFSStateStore[id=(op=1,part=7),dir=s3a://some.domain/spark/checkpoints/49/dlr/state/1/7]
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:138)
> at
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)
> .....
> {code}
> Now when looking into S3 it indeed looks like this delta file never was
> created. Instead we have a
> {code:java}
> s3://some.domain/spark/checkpoints/49/feedback/state/2/11/.36870.delta.02e75d1f-aa48-4c61-9257-a5928b32e22e.TID2469780.tmp
> {code}
> file that I assume is named like that as long as the whole operation is not
> finished yet. So this file never got renamed to 36870.delta and the
> application will keep trying to reference it.
> I will have all the relevant redacted logs attached to this report together
> with ls output of S3 folders and also the metadata file. If any more
> information is needed then I would be happy to provide it. Would also
> appreciate on some input on how to best resolve this issue? For now it has
> happened on 2 separate days and the solution has been to bump the checkpoint.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]