Repository: flink Updated Branches: refs/heads/master 5baea3f2e -> 8a7288ea9
[FLINK-5300] Add more gentle file deletion procedure Before deleting a parent directory always check the directory whether it contains some files. If not, then try to delete the parent directory. This will give a more gentle behaviour wrt storage systems which are not instructed to delete a non-empty directory. Add test case for more gentle file deletion This closes #2970. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8a7288ea Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8a7288ea Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8a7288ea Branch: refs/heads/master Commit: 8a7288ea9af6b6822ea1f4022bb08a10b7b82318 Parents: 5baea3f Author: Till Rohrmann <[email protected]> Authored: Thu Dec 8 18:53:40 2016 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Dec 13 16:52:17 2016 +0100 ---------------------------------------------------------------------- .../flink/api/common/io/FileOutputFormat.java | 2 +- .../java/org/apache/flink/util/FileUtils.java | 34 ++++++++++++ .../core/fs/local/LocalFileSystemTest.java | 46 ++++++++++++++++ .../org/apache/flink/hdfstests/HDFSTest.java | 58 ++++++++++++++++++-- .../filesystem/AbstractFileStateHandle.java | 7 +-- .../state/filesystem/FileStateHandle.java | 8 +-- .../filesystem/FsCheckpointStreamFactory.java | 8 ++- 7 files changed, 146 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8a7288ea/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java index 7530ba1..0ab12df 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java @@ -319,7 +319,7 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen } catch (FileNotFoundException e) { // ignore, may not be visible yet or may be already removed } catch (Throwable t) { - LOG.error("Could not remove the incomplete file " + actualFilePath); + LOG.error("Could not remove the incomplete file " + actualFilePath + '.', t); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8a7288ea/flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java index 078599d..23f5eb9 100644 --- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java @@ -18,6 +18,10 @@ package org.apache.flink.util; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -81,6 +85,36 @@ public final class FileUtils { public static void writeFileUtf8(File file, String contents) throws IOException { writeFile(file, contents, "UTF-8"); } + + // ------------------------------------------------------------------------ + // Deleting directories + // ------------------------------------------------------------------------ + + /** + * Deletes the path if it is empty. A path can only be empty if it is a directory which does + * not contain any other directories/files. + * + * @param fileSystem to use + * @param path to be deleted if empty + * @return true if the path could be deleted; otherwise false + * @throws IOException if the delete operation fails + */ + public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws IOException { + FileStatus[] fileStatuses = null; + + try { + fileStatuses = fileSystem.listStatus(path); + } catch (Exception ignored) {} + + // if there are no more files or if we couldn't list the file status try to delete the path + if (fileStatuses == null || fileStatuses.length == 0) { + // attempt to delete the path (will fail and be ignored if the path now contains + // some files (possibly added concurrently)) + return fileSystem.delete(path, false); + } else { + return false; + } + } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/8a7288ea/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java index d21e0f1..ee1deef 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java @@ -34,10 +34,14 @@ import java.util.UUID; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.util.FileUtils; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; /** * This class tests the functionality of the {@link LocalFileSystem} class in its components. In particular, @@ -45,6 +49,9 @@ import org.junit.Test; */ public class LocalFileSystemTest { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + /** * This test checks the functionality of the {@link LocalFileSystem} class. */ @@ -166,4 +173,43 @@ public class LocalFileSystemTest { tempdir.delete(); } } + + /** + * Test that {@link FileUtils#deletePathIfEmpty(FileSystem, Path)} deletes the path if it is + * empty. A path can only be empty if it is a directory which does not contain any + * files/directories. + */ + @Test + public void testDeletePathIfEmpty() throws IOException { + File file = temporaryFolder.newFile(); + File directory = temporaryFolder.newFolder(); + File directoryFile = new File(directory, UUID.randomUUID().toString()); + + assertTrue(directoryFile.createNewFile()); + + Path filePath = new Path(file.toURI()); + Path directoryPath = new Path(directory.toURI()); + Path directoryFilePath = new Path(directoryFile.toURI()); + + FileSystem fs = FileSystem.getLocalFileSystem(); + + // verify that the files have been created + assertTrue(fs.exists(filePath)); + assertTrue(fs.exists(directoryFilePath)); + + // delete the single file + assertFalse(FileUtils.deletePathIfEmpty(fs, filePath)); + assertTrue(fs.exists(filePath)); + + // try to delete the non-empty directory + assertFalse(FileUtils.deletePathIfEmpty(fs, directoryPath)); + assertTrue(fs.exists(directoryPath)); + + // delete the file contained in the directory + assertTrue(fs.delete(directoryFilePath, false)); + + // now the deletion should work + assertTrue(FileUtils.deletePathIfEmpty(fs, directoryPath)); + assertFalse(fs.exists(directoryPath)); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8a7288ea/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java index fd959af..1df6390 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java @@ -28,6 +28,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.examples.java.wordcount.WordCount; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.util.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -41,6 +42,11 @@ import org.junit.Test; import java.io.File; import java.io.IOException; import java.io.StringWriter; +import java.util.Arrays; +import java.util.UUID; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * This test should logically be located in the 'flink-runtime' tests. However, this project @@ -98,7 +104,7 @@ public class HDFSTest { org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result"); try { FileSystem fs = file.getFileSystem(); - Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem); + assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem); DopOneTestEnvironment.setAsContext(); try { @@ -114,7 +120,7 @@ public class HDFSTest { DopOneTestEnvironment.unsetAsContext(); } - Assert.assertTrue("No result file present", hdfs.exists(result)); + assertTrue("No result file present", hdfs.exists(result)); // validate output: org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result); @@ -154,11 +160,11 @@ public class HDFSTest { avroOut.close(); - Assert.assertTrue("No result file present", hdfs.exists(result)); + assertTrue("No result file present", hdfs.exists(result)); FileStatus[] files = hdfs.listStatus(result); Assert.assertEquals(2, files.length); for(FileStatus file : files) { - Assert.assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName())); + assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName())); } } catch (IOException e) { @@ -167,6 +173,50 @@ public class HDFSTest { } } + /** + * Test that {@link FileUtils#deletePathIfEmpty(FileSystem, Path)} deletes the path if it is + * empty. A path can only be empty if it is a directory which does not contain any + * files/directories. + */ + @Test + public void testDeletePathIfEmpty() throws IOException { + final Path basePath = new Path(hdfsURI); + final Path directory = new Path(basePath, UUID.randomUUID().toString()); + final Path directoryFile = new Path(directory, UUID.randomUUID().toString()); + final Path singleFile = new Path(basePath, UUID.randomUUID().toString()); + + FileSystem fs = basePath.getFileSystem(); + + fs.mkdirs(directory); + + byte[] data = "HDFSTest#testDeletePathIfEmpty".getBytes(); + + for (Path file: Arrays.asList(singleFile, directoryFile)) { + org.apache.flink.core.fs.FSDataOutputStream outputStream = fs.create(file, true); + outputStream.write(data); + outputStream.close(); + } + + // verify that the files have been created + assertTrue(fs.exists(singleFile)); + assertTrue(fs.exists(directoryFile)); + + // delete the single file + assertFalse(FileUtils.deletePathIfEmpty(fs, singleFile)); + assertTrue(fs.exists(singleFile)); + + // try to delete the non-empty directory + assertFalse(FileUtils.deletePathIfEmpty(fs, directory)); + assertTrue(fs.exists(directory)); + + // delete the file contained in the directory + assertTrue(fs.delete(directoryFile, false)); + + // now the deletion should work + assertTrue(FileUtils.deletePathIfEmpty(fs, directory)); + assertFalse(fs.exists(directory)); + } + // package visible static abstract class DopOneTestEnvironment extends ExecutionEnvironment { http://git-wip-us.apache.org/repos/asf/flink/blob/8a7288ea/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java index 25a0e89..fcdcc78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java @@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.migration.runtime.state.AbstractCloseableHandle; import org.apache.flink.migration.runtime.state.StateObject; +import org.apache.flink.util.FileUtils; import java.io.IOException; @@ -68,11 +69,9 @@ public abstract class AbstractFileStateHandle extends AbstractCloseableHandle im public void discardState() throws Exception { getFileSystem().delete(filePath, false); - // send a call to delete the checkpoint directory containing the file. This will - // fail (and be ignored) when some files still exist try { - getFileSystem().delete(filePath.getParent(), false); - } catch (IOException ignored) {} + FileUtils.deletePathIfEmpty(getFileSystem(), filePath.getParent()); + } catch (Exception ignored) {} } /** http://git-wip-us.apache.org/repos/asf/flink/blob/8a7288ea/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java index b61c52d..4b2d350 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java @@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.FileUtils; import java.io.IOException; @@ -81,12 +82,9 @@ public class FileStateHandle implements StreamStateHandle { fs.delete(filePath, false); - // send a call to delete the checkpoint directory containing the file. This will - // fail (and be ignored) when some files still exist try { - fs.delete(filePath.getParent(), false); - } catch (IOException ignored) { - } + FileUtils.deletePathIfEmpty(fs, filePath.getParent()); + } catch (Exception ignored) {} } /** http://git-wip-us.apache.org/repos/asf/flink/blob/8a7288ea/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java index 135ee04..1be3abf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.util.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -267,10 +268,11 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory { outStream.close(); fs.delete(statePath, false); - // attempt to delete the parent (will fail and be ignored if the parent has more files) try { - fs.delete(basePath, false); - } catch (IOException ignored) {} + FileUtils.deletePathIfEmpty(fs, basePath); + } catch (Exception ignored) { + LOG.debug("Could not delete the parent directory {}.", basePath, ignored); + } } catch (Exception e) { LOG.warn("Cannot delete closed and discarded state stream for " + statePath, e);
