[ 
https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16721195#comment-16721195
 ] 

Gabor Somogyi commented on SPARK-26359:
---------------------------------------

Yeah, after the recovery it's advised to switch that back.
Related recovery metadata can be deleted. BTW, when you've said
{quote}an engineer stepped in and bumped the checkpoint version manually{quote}
which file was touched exactly and how?
 

 

> Spark checkpoint restore fails after query restart
> --------------------------------------------------
>
>                 Key: SPARK-26359
>                 URL: https://issues.apache.org/jira/browse/SPARK-26359
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Submit, Structured Streaming
>    Affects Versions: 2.4.0
>         Environment: Spark 2.4.0 deployed in standalone-client mode
> Checkpointing is done to S3
> The Spark application in question is responsible for running 4 different 
> queries
> Queries are written using Structured Streaming
> We are using the following algorithm for hopes of better performance:
> spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the 
> default is 1
>            Reporter: Kaspar Tint
>            Priority: Major
>         Attachments: driver-redacted, metadata, redacted-offsets, 
> state-redacted, worker-redacted
>
>
> We had an incident where one of our structured streaming queries could not be 
> restarted after an usual S3 checkpointing failure. Now to clarify before 
> everything else - we are aware of the issues with S3 and are working towards 
> moving to HDFS but this will take time. S3 will cause queries to fail quite 
> often during peak hours and we have separate logic to handle this that will 
> attempt to restart the failed queries if possible.
> In this particular case we could not restart one of the failed queries. Seems 
> like between detecting a failure in the query and starting it up again 
> something went really wrong with Spark and state in checkpoint folder got 
> corrupted for some reason.
> The issue starts with the usual *FileNotFoundException* that happens with S3
> {code:java}
> 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = 
> c074233a-2563-40fc-8036-b5e38e2e2c42, runId = 
> e607eb6e-8431-4269-acab-cc2c1f9f09dd]
> terminated with error
> java.io.FileNotFoundException: No such file or directory: 
> s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37
> 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
>         at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
>         at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
>         at 
> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715)
>         at 
> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>         at 
> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>         at 
> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>         at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965)
>         at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331)
>         at 
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>         at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL
> og.scala:126)
>         at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
>         at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
>         at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
>         at scala.Option.getOrElse(Option.scala:121)
>         at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> 2018-12-10 21:09:25.791 WARN InsightsSparkAggregates$: Query feedback 
> terminated with exception, attempting restart
> {code}
> At the last line we claim that a restart will be attempted for the query 
> named *feedback*. We start the query up and encounter this almost immediately
> {code:java}
> 2018-12-10 21:09:26.870 WARN InsightsSparkAggregates$: Query feedback 
> currently not running, starting query in own scheduling pool
> 2018-12-10 21:09:51.776 WARN TaskSetManager: Lost task 11.0 in stage 66240.0 
> (TID 2782264, ec2-52-87-158-48.compute-1.amazonaws.com, executor 29): 
> java.lang.IllegalStateException: Error reading delta file 
> s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta of 
> HDFSStateStoreProvider[id = (op=2,part=11),dir = 
> s3a://some.domain/spark/checkpoints/49/feedback/state/2/11]: 
> s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta does 
> not exist
>         at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:427)
>         at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:384)
>         at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383)
>         at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383)
>         at 
> scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
>         at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:383)
>         at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:356)
>         at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:535)
>         at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:356)
>         at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:204)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:371)
>         at 
> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:88)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>         at org.apache.spark.scheduler.Task.run(Task.scala:121)
>         at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: No such file or directory: 
> s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta
>         at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
>         at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
>         at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
>         at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
>         at 
> org.apache.hadoop.fs.DelegateToFileSystem.open(DelegateToFileSystem.java:190)
>         at 
> org.apache.hadoop.fs.AbstractFileSystem.open(AbstractFileSystem.java:649)
>         at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:802)
>         at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:798)
>         at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>         at org.apache.hadoop.fs.FileContext.open(FileContext.java:804)
>         at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.open(CheckpointFileManager.scala:322)
>         at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:424)
>         ... 28 more
> {code}
> And this will go on for ever until we bump the checkpoint folder name.
> {code:java}
> 2018-12-10 21:09:57.261 WARN TaskSetManager: Lost task 7.0 in stage 66265.0 
> (TID 2783200, ec2-34-236-156-197.compute-1.amazonaws.com, executor 40): 
> java.lang.IllegalStateException: Error committing version 49464 into 
> HDFSStateStore[id=(op=1,part=7),dir=s3a://some.domain/spark/checkpoints/49/dlr/state/1/7]
>         at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:138)
>         at 
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)
>     .....
> {code}
> Now when looking into S3 it indeed looks like this delta file never was 
> created. Instead we have a
> {code:java} 
> s3://some.domain/spark/checkpoints/49/feedback/state/2/11/.36870.delta.02e75d1f-aa48-4c61-9257-a5928b32e22e.TID2469780.tmp
> {code}
> file that I assume is named like that as long as the whole operation is not 
> finished yet. So this file never got renamed to 36870.delta and the 
> application will keep trying to reference it.
> I will have all the relevant redacted logs attached to this report together 
> with ls output of S3 folders and also the metadata file. If any more 
> information is needed then I would be happy to provide it. Would also 
> appreciate on some input on how to best resolve this issue? For now it has 
> happened on 2 separate days and the solution has been to bump the checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to