Repository: spark
Updated Branches:
  refs/heads/branch-1.3 b104501d3 -> a98603f8c


[SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.

Spark streaming deletes the temp file and backup files without checking if they 
exist or not

Author: Hao Zhu <[email protected]>

Closes #8082 from viadea/master and squashes the following commits:

242d05f [Hao Zhu] [SPARK-9801][Streaming]No need to check the existence of 
those files
fd143f2 [Hao Zhu] [SPARK-9801][Streaming]Check if backupFile exists before 
deleting backupFile files.
087daf0 [Hao Zhu] SPARK-9801

(cherry picked from commit 3c9802d9400bea802984456683b2736a450ee17e)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a98603f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a98603f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a98603f8

Branch: refs/heads/branch-1.3
Commit: a98603f8c118fcd23efe80ebaa120e47e9785d46
Parents: b104501
Author: Hao Zhu <[email protected]>
Authored: Mon Aug 10 17:17:22 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Mon Aug 10 17:18:03 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/streaming/Checkpoint.scala   | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a98603f8/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 832ce78..c1d0fe4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -137,7 +137,9 @@ class CheckpointWriter(
             + "'")
 
           // Write checkpoint to temp file
-          fs.delete(tempFile, true)   // just in case it exists
+          if (fs.exists(tempFile)) {
+            fs.delete(tempFile, true)   // just in case it exists
+          }
           val fos = fs.create(tempFile)
           fos.write(bytes)
           fos.close()
@@ -145,7 +147,9 @@ class CheckpointWriter(
           // If the checkpoint file exists, back it up
           // If the backup exists as well, just delete it, otherwise rename 
will fail
           if (fs.exists(checkpointFile)) {
-            fs.delete(backupFile, true) // just in case it exists
+            if (fs.exists(backupFile)){
+              fs.delete(backupFile, true) // just in case it exists
+            }
             if (!fs.rename(checkpointFile, backupFile)) {
               logWarning("Could not rename " + checkpointFile + " to " + 
backupFile)
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to