Repository: flink
Updated Branches:
  refs/heads/release-1.1 afaa27e9f -> f3d0cc3c1


[FLINK-5300] Add more gentle file deletion procedure

Before deleting a parent directory always check the directory whether it 
contains some
files. If not, then try to delete the parent directory.

This will give a more gentle behaviour wrt storage systems which are not 
instructed to
delete a non-empty directory.

This closes #2971.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3d0cc3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3d0cc3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3d0cc3c

Branch: refs/heads/release-1.1
Commit: f3d0cc3c13b95fd59ba7603722b30b42037591b7
Parents: afaa27e
Author: Till Rohrmann <[email protected]>
Authored: Thu Dec 8 18:53:40 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Mon Dec 12 12:06:13 2016 +0100

----------------------------------------------------------------------
 .../flink/api/common/io/FileOutputFormat.java   |  2 +-
 .../java/org/apache/flink/util/FileUtils.java   | 33 ++++++++++++++++++++
 .../filesystem/AbstractFileStateHandle.java     |  7 ++---
 .../state/filesystem/FsStateBackend.java        | 20 +++++++-----
 4 files changed, 50 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index 557c342..4a84512 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -315,7 +315,7 @@ public abstract class FileOutputFormat<IT> extends 
RichOutputFormat<IT> implemen
                        } catch (FileNotFoundException e) {
                                // ignore, may not be visible yet or may be 
already removed
                        } catch (Throwable t) {
-                               LOG.error("Could not remove the incomplete file 
" + actualFilePath);
+                               LOG.error("Could not remove the incomplete file 
" + actualFilePath + '.', t);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 078599d..85f4b30 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.util;
 
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -81,6 +85,35 @@ public final class FileUtils {
        public static void writeFileUtf8(File file, String contents) throws 
IOException {
                writeFile(file, contents, "UTF-8");
        }
+
+       // 
------------------------------------------------------------------------
+       //  Deleting directories
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Deletes the path if it is empty (does not contain any other 
directories/files).
+        *
+        * @param fileSystem to use
+        * @param path to be deleted if empty
+        * @return true if the path could be deleted; otherwise false
+        * @throws IOException if the delete operation fails
+        */
+       public static boolean deletePathIfEmpty(FileSystem fileSystem, Path 
path) throws IOException {
+               FileStatus[] fileStatuses = null;
+
+               try {
+                       fileStatuses = fileSystem.listStatus(path);
+               } catch (Exception ignored) {}
+
+               // if there are no more files or if we couldn't list the file 
status try to delete the path
+               if (fileStatuses == null || fileStatuses.length == 0) {
+                       // attempt to delete the path (will fail and be ignored 
if the path now contains
+                       // some files (possibly added concurrently))
+                       return fileSystem.delete(path, false);
+               } else {
+                       return false;
+               }
+       }
        
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
index e9d54dc..805c12d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractCloseableHandle;
 import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,11 +73,9 @@ public abstract class AbstractFileStateHandle extends 
AbstractCloseableHandle im
        public void discardState() throws Exception {
                getFileSystem().delete(filePath, false);
 
-               // send a call to delete the checkpoint directory containing 
the file. This will
-               // fail (and be ignored) when some files still exist
                try {
-                       getFileSystem().delete(filePath.getParent(), false);
-               } catch (IOException ignored) {}
+                       FileUtils.deletePathIfEmpty(getFileSystem(), 
filePath.getParent());
+               } catch (Exception ignored) {}
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index e783264..c32fa26 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.apache.flink.util.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -630,10 +631,11 @@ public class FsStateBackend extends AbstractStateBackend {
                                                try {
                                                        fs.delete(statePath, 
false);
 
-                                                       // attempt to delete 
the parent (will fail and be ignored if the parent has more files)
                                                        try {
-                                                               
fs.delete(basePath, false);
-                                                       } catch (Throwable 
ignored) {}
+                                                               
FileUtils.deletePathIfEmpty(fs, basePath);
+                                                       } catch (Throwable 
ignored) {
+                                                               
LOG.debug("Could not delete parent directory for path {}.", basePath, ignored);
+                                                       }
                                                } catch (Throwable ioE) {
                                                        LOG.warn("Could not 
delete stream file for {}.", statePath, ioE);
                                                }
@@ -669,8 +671,10 @@ public class FsStateBackend extends AbstractStateBackend {
                                                                
fs.delete(statePath, false);
 
                                                                try {
-                                                                       
fs.delete(basePath, false);
-                                                               } catch 
(Throwable ignored) {}
+                                                                       
FileUtils.deletePathIfEmpty(fs, basePath);
+                                                               } catch 
(Throwable ignored) {
+                                                                       
LOG.debug("Could not delete parent directory for path {}.", basePath, ignored);
+                                                               }
                                                        } catch (Throwable 
deleteException) {
                                                                LOG.warn("Could 
not delete close and discarded state stream for {}.", statePath, 
deleteException);
                                                        }
@@ -715,8 +719,10 @@ public class FsStateBackend extends AbstractStateBackend {
                                                        fs.delete(statePath, 
false);
 
                                                        try {
-                                                               
fs.delete(basePath, false);
-                                                       } catch (Throwable 
ignored) {}
+                                                               
FileUtils.deletePathIfEmpty(fs, basePath);
+                                                       } catch (Throwable 
ignored) {
+                                                               
LOG.debug("Could not delete parent directory for path {}.", basePath, ignored);
+                                                       }
                                                } catch (Throwable 
deleteException) {
                                                        LOG.warn("Could not 
delete close and discarded state stream for {}.", statePath, deleteException);
                                                }

Reply via email to