Repository: spark Updated Branches: refs/heads/master 572284c5b -> fe258a796
[SPARK-22429][STREAMING] Streaming checkpointing code does not retry after failure ## What changes were proposed in this pull request? SPARK-14930/SPARK-13693 put in a change to set the fs object to null after a failure, however the retry loop does not include initialization. Moved fs initialization inside the retry while loop to aid recoverability. ## How was this patch tested? Passes all existing unit tests. Author: Tristan Stevens <[email protected]> Closes #19645 from tmgstevens/SPARK-22429. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe258a79 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe258a79 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe258a79 Branch: refs/heads/master Commit: fe258a7963361c1f31bc3dc3a2a2ee4a5834bb58 Parents: 572284c Author: Tristan Stevens <[email protected]> Authored: Sun Nov 5 09:10:40 2017 +0000 Committer: Sean Owen <[email protected]> Committed: Sun Nov 5 09:10:40 2017 +0000 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fe258a79/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 9ebb91b..3cfbced 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -211,9 +211,6 @@ class CheckpointWriter( if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) { latestCheckpointTime = checkpointTime } - if (fs == null) { - fs = new Path(checkpointDir).getFileSystem(hadoopConf) - } var attempts = 0 val startTime = System.currentTimeMillis() val tempFile = new Path(checkpointDir, "temp") @@ -233,7 +230,9 @@ class CheckpointWriter( attempts += 1 try { logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'") - + if (fs == null) { + fs = new Path(checkpointDir).getFileSystem(hadoopConf) + } // Write checkpoint to temp file fs.delete(tempFile, true) // just in case it exists val fos = fs.create(tempFile) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
