[
https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16725949#comment-16725949
]
Steve Loughran commented on SPARK-26359:
----------------------------------------
bq. This seems to be related to the fact that we decided to try the
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 instead of the
default 1. Gives a much better performance but looks like this issue here is
the tradeoff then
No, that's separate. And both v1 and v2 committers are both slow and unreliable
against s3.
Dont assume that either of the MR committers work with eventually consistent
object stores: They both rely on listing and copy to commit. With S3 the time
to rename is about 6-10MB/s, so the bigger the file, the longer the checkpoint,
*the higher the risk of it failing*
I think Gabor's workaround: rename it yourself,. will be safe. IF the .tmp file
is there at all it means the write completed, it failed during the rename. So
finish off the rename.
Really checkpointing can be done here by writing direct to the destination
directory, relying on the fact that writes of files of arbitrary length are
atomic: there's no need to rename at all
> Spark checkpoint restore fails after query restart
> --------------------------------------------------
>
> Key: SPARK-26359
> URL: https://issues.apache.org/jira/browse/SPARK-26359
> Project: Spark
> Issue Type: Bug
> Components: Spark Submit, Structured Streaming
> Affects Versions: 2.4.0
> Environment: Spark 2.4.0 deployed in standalone-client mode
> Checkpointing is done to S3
> The Spark application in question is responsible for running 4 different
> queries
> Queries are written using Structured Streaming
> We are using the following algorithm for hopes of better performance:
> spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the
> default is 1
> Reporter: Kaspar Tint
> Priority: Major
> Attachments: driver-redacted, metadata, redacted-offsets,
> state-redacted, worker-redacted
>
>
> We had an incident where one of our structured streaming queries could not be
> restarted after an usual S3 checkpointing failure. Now to clarify before
> everything else - we are aware of the issues with S3 and are working towards
> moving to HDFS but this will take time. S3 will cause queries to fail quite
> often during peak hours and we have separate logic to handle this that will
> attempt to restart the failed queries if possible.
> In this particular case we could not restart one of the failed queries. Seems
> like between detecting a failure in the query and starting it up again
> something went really wrong with Spark and state in checkpoint folder got
> corrupted for some reason.
> The issue starts with the usual *FileNotFoundException* that happens with S3
> {code:java}
> 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id =
> c074233a-2563-40fc-8036-b5e38e2e2c42, runId =
> e607eb6e-8431-4269-acab-cc2c1f9f09dd]
> terminated with error
> java.io.FileNotFoundException: No such file or directory:
> s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37
> 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
> at
> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715)
> at
> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
> at
> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
> at
> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965)
> at
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331)
> at
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL
> og.scala:126)
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
> at scala.Option.getOrElse(Option.scala:121)
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> 2018-12-10 21:09:25.791 WARN InsightsSparkAggregates$: Query feedback
> terminated with exception, attempting restart
> {code}
> At the last line we claim that a restart will be attempted for the query
> named *feedback*. We start the query up and encounter this almost immediately
> {code:java}
> 2018-12-10 21:09:26.870 WARN InsightsSparkAggregates$: Query feedback
> currently not running, starting query in own scheduling pool
> 2018-12-10 21:09:51.776 WARN TaskSetManager: Lost task 11.0 in stage 66240.0
> (TID 2782264, ec2-52-87-158-48.compute-1.amazonaws.com, executor 29):
> java.lang.IllegalStateException: Error reading delta file
> s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta of
> HDFSStateStoreProvider[id = (op=2,part=11),dir =
> s3a://some.domain/spark/checkpoints/49/feedback/state/2/11]:
> s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta does
> not exist
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:427)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:384)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383)
> at
> scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:383)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:356)
> at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:535)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:356)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:204)
> at
> org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:371)
> at
> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:88)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.FileNotFoundException: No such file or directory:
> s3a://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
> at
> org.apache.hadoop.fs.DelegateToFileSystem.open(DelegateToFileSystem.java:190)
> at
> org.apache.hadoop.fs.AbstractFileSystem.open(AbstractFileSystem.java:649)
> at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:802)
> at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:798)
> at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
> at org.apache.hadoop.fs.FileContext.open(FileContext.java:804)
> at
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.open(CheckpointFileManager.scala:322)
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:424)
> ... 28 more
> {code}
> And this will go on for ever until we bump the checkpoint folder name.
> {code:java}
> 2018-12-10 21:09:57.261 WARN TaskSetManager: Lost task 7.0 in stage 66265.0
> (TID 2783200, ec2-34-236-156-197.compute-1.amazonaws.com, executor 40):
> java.lang.IllegalStateException: Error committing version 49464 into
> HDFSStateStore[id=(op=1,part=7),dir=s3a://some.domain/spark/checkpoints/49/dlr/state/1/7]
> at
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:138)
> at
> org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)
> .....
> {code}
> Now when looking into S3 it indeed looks like this delta file never was
> created. Instead we have a
> {code:java}
> s3://some.domain/spark/checkpoints/49/feedback/state/2/11/.36870.delta.02e75d1f-aa48-4c61-9257-a5928b32e22e.TID2469780.tmp
> {code}
> file that I assume is named like that as long as the whole operation is not
> finished yet. So this file never got renamed to 36870.delta and the
> application will keep trying to reference it.
> I will have all the relevant redacted logs attached to this report together
> with ls output of S3 folders and also the metadata file. If any more
> information is needed then I would be happy to provide it. Would also
> appreciate on some input on how to best resolve this issue? For now it has
> happened on 2 separate days and the solution has been to bump the checkpoint.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]