Repository: spark
Updated Branches:
  refs/heads/master 572284c5b -> fe258a796


[SPARK-22429][STREAMING] Streaming checkpointing code does not retry after 
failure

## What changes were proposed in this pull request?

SPARK-14930/SPARK-13693 put in a change to set the fs object to null after a 
failure, however the retry loop does not include initialization. Moved fs 
initialization inside the retry while loop to aid recoverability.

## How was this patch tested?

Passes all existing unit tests.

Author: Tristan Stevens <[email protected]>

Closes #19645 from tmgstevens/SPARK-22429.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe258a79
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe258a79
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe258a79

Branch: refs/heads/master
Commit: fe258a7963361c1f31bc3dc3a2a2ee4a5834bb58
Parents: 572284c
Author: Tristan Stevens <[email protected]>
Authored: Sun Nov 5 09:10:40 2017 +0000
Committer: Sean Owen <[email protected]>
Committed: Sun Nov 5 09:10:40 2017 +0000

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/streaming/Checkpoint.scala    | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fe258a79/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 9ebb91b..3cfbced 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -211,9 +211,6 @@ class CheckpointWriter(
       if (latestCheckpointTime == null || latestCheckpointTime < 
checkpointTime) {
         latestCheckpointTime = checkpointTime
       }
-      if (fs == null) {
-        fs = new Path(checkpointDir).getFileSystem(hadoopConf)
-      }
       var attempts = 0
       val startTime = System.currentTimeMillis()
       val tempFile = new Path(checkpointDir, "temp")
@@ -233,7 +230,9 @@ class CheckpointWriter(
         attempts += 1
         try {
           logInfo(s"Saving checkpoint for time $checkpointTime to file 
'$checkpointFile'")
-
+          if (fs == null) {
+            fs = new Path(checkpointDir).getFileSystem(hadoopConf)
+          }
           // Write checkpoint to temp file
           fs.delete(tempFile, true) // just in case it exists
           val fos = fs.create(tempFile)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to