Repository: flink Updated Branches: refs/heads/release-1.1 afaa27e9f -> f3d0cc3c1
[FLINK-5300] Add more gentle file deletion procedure Before deleting a parent directory always check the directory whether it contains some files. If not, then try to delete the parent directory. This will give a more gentle behaviour wrt storage systems which are not instructed to delete a non-empty directory. This closes #2971. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3d0cc3c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3d0cc3c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3d0cc3c Branch: refs/heads/release-1.1 Commit: f3d0cc3c13b95fd59ba7603722b30b42037591b7 Parents: afaa27e Author: Till Rohrmann <[email protected]> Authored: Thu Dec 8 18:53:40 2016 +0100 Committer: Till Rohrmann <[email protected]> Committed: Mon Dec 12 12:06:13 2016 +0100 ---------------------------------------------------------------------- .../flink/api/common/io/FileOutputFormat.java | 2 +- .../java/org/apache/flink/util/FileUtils.java | 33 ++++++++++++++++++++ .../filesystem/AbstractFileStateHandle.java | 7 ++--- .../state/filesystem/FsStateBackend.java | 20 +++++++----- 4 files changed, 50 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java index 557c342..4a84512 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java @@ -315,7 +315,7 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen } catch (FileNotFoundException e) { // ignore, may not be visible yet or may be already removed } catch (Throwable t) { - LOG.error("Could not remove the incomplete file " + actualFilePath); + LOG.error("Could not remove the incomplete file " + actualFilePath + '.', t); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java index 078599d..85f4b30 100644 --- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java @@ -18,6 +18,10 @@ package org.apache.flink.util; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -81,6 +85,35 @@ public final class FileUtils { public static void writeFileUtf8(File file, String contents) throws IOException { writeFile(file, contents, "UTF-8"); } + + // ------------------------------------------------------------------------ + // Deleting directories + // ------------------------------------------------------------------------ + + /** + * Deletes the path if it is empty (does not contain any other directories/files). + * + * @param fileSystem to use + * @param path to be deleted if empty + * @return true if the path could be deleted; otherwise false + * @throws IOException if the delete operation fails + */ + public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws IOException { + FileStatus[] fileStatuses = null; + + try { + fileStatuses = fileSystem.listStatus(path); + } catch (Exception ignored) {} + + // if there are no more files or if we couldn't list the file status try to delete the path + if (fileStatuses == null || fileStatuses.length == 0) { + // attempt to delete the path (will fail and be ignored if the path now contains + // some files (possibly added concurrently)) + return fileSystem.delete(path, false); + } else { + return false; + } + } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java index e9d54dc..805c12d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java @@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.AbstractCloseableHandle; import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,11 +73,9 @@ public abstract class AbstractFileStateHandle extends AbstractCloseableHandle im public void discardState() throws Exception { getFileSystem().delete(filePath, false); - // send a call to delete the checkpoint directory containing the file. This will - // fail (and be ignored) when some files still exist try { - getFileSystem().delete(filePath.getParent(), false); - } catch (IOException ignored) {} + FileUtils.deletePathIfEmpty(getFileSystem(), filePath.getParent()); + } catch (Exception ignored) {} } /** http://git-wip-us.apache.org/repos/asf/flink/blob/f3d0cc3c/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index e783264..c32fa26 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -630,10 +631,11 @@ public class FsStateBackend extends AbstractStateBackend { try { fs.delete(statePath, false); - // attempt to delete the parent (will fail and be ignored if the parent has more files) try { - fs.delete(basePath, false); - } catch (Throwable ignored) {} + FileUtils.deletePathIfEmpty(fs, basePath); + } catch (Throwable ignored) { + LOG.debug("Could not delete parent directory for path {}.", basePath, ignored); + } } catch (Throwable ioE) { LOG.warn("Could not delete stream file for {}.", statePath, ioE); } @@ -669,8 +671,10 @@ public class FsStateBackend extends AbstractStateBackend { fs.delete(statePath, false); try { - fs.delete(basePath, false); - } catch (Throwable ignored) {} + FileUtils.deletePathIfEmpty(fs, basePath); + } catch (Throwable ignored) { + LOG.debug("Could not delete parent directory for path {}.", basePath, ignored); + } } catch (Throwable deleteException) { LOG.warn("Could not delete close and discarded state stream for {}.", statePath, deleteException); } @@ -715,8 +719,10 @@ public class FsStateBackend extends AbstractStateBackend { fs.delete(statePath, false); try { - fs.delete(basePath, false); - } catch (Throwable ignored) {} + FileUtils.deletePathIfEmpty(fs, basePath); + } catch (Throwable ignored) { + LOG.debug("Could not delete parent directory for path {}.", basePath, ignored); + } } catch (Throwable deleteException) { LOG.warn("Could not delete close and discarded state stream for {}.", statePath, deleteException); }
