Repository: spark Updated Branches: refs/heads/branch-2.1 8b572116f -> acef22429
[SPARK-22429][STREAMING] Streaming checkpointing code does not retry after failure ## What changes were proposed in this pull request? SPARK-14930/SPARK-13693 put in a change to set the fs object to null after a failure, however the retry loop does not include initialization. Moved fs initialization inside the retry while loop to aid recoverability. ## How was this patch tested? Passes all existing unit tests. Author: Tristan Stevens <[email protected]> Closes #19645 from tmgstevens/SPARK-22429. (cherry picked from commit fe258a7963361c1f31bc3dc3a2a2ee4a5834bb58) Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acef2242 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acef2242 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acef2242 Branch: refs/heads/branch-2.1 Commit: acef2242947f1120bf832e467a845241bc114e71 Parents: 8b57211 Author: Tristan Stevens <[email protected]> Authored: Sun Nov 5 09:10:40 2017 +0000 Committer: Sean Owen <[email protected]> Committed: Sun Nov 5 09:11:05 2017 +0000 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/acef2242/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index b8c780d..7006f9b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -209,9 +209,6 @@ class CheckpointWriter( if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) { latestCheckpointTime = checkpointTime } - if (fs == null) { - fs = new Path(checkpointDir).getFileSystem(hadoopConf) - } var attempts = 0 val startTime = System.currentTimeMillis() val tempFile = new Path(checkpointDir, "temp") @@ -231,7 +228,9 @@ class CheckpointWriter( attempts += 1 try { logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'") - + if (fs == null) { + fs = new Path(checkpointDir).getFileSystem(hadoopConf) + } // Write checkpoint to temp file fs.delete(tempFile, true) // just in case it exists val fos = fs.create(tempFile) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
