Repository: flink
Updated Branches:
  refs/heads/master 5baea3f2e -> 8a7288ea9


[FLINK-5300] Add more gentle file deletion procedure

Before deleting a parent directory always check the directory whether it 
contains some
files. If not, then try to delete the parent directory.

This will give a more gentle behaviour wrt storage systems which are not 
instructed to
delete a non-empty directory.

Add test case for more gentle file deletion

This closes #2970.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8a7288ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8a7288ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8a7288ea

Branch: refs/heads/master
Commit: 8a7288ea9af6b6822ea1f4022bb08a10b7b82318
Parents: 5baea3f
Author: Till Rohrmann <[email protected]>
Authored: Thu Dec 8 18:53:40 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Dec 13 16:52:17 2016 +0100

----------------------------------------------------------------------
 .../flink/api/common/io/FileOutputFormat.java   |  2 +-
 .../java/org/apache/flink/util/FileUtils.java   | 34 ++++++++++++
 .../core/fs/local/LocalFileSystemTest.java      | 46 ++++++++++++++++
 .../org/apache/flink/hdfstests/HDFSTest.java    | 58 ++++++++++++++++++--
 .../filesystem/AbstractFileStateHandle.java     |  7 +--
 .../state/filesystem/FileStateHandle.java       |  8 +--
 .../filesystem/FsCheckpointStreamFactory.java   |  8 ++-
 7 files changed, 146 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8a7288ea/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index 7530ba1..0ab12df 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -319,7 +319,7 @@ public abstract class FileOutputFormat<IT> extends 
RichOutputFormat<IT> implemen
                        } catch (FileNotFoundException e) {
                                // ignore, may not be visible yet or may be 
already removed
                        } catch (Throwable t) {
-                               LOG.error("Could not remove the incomplete file 
" + actualFilePath);
+                               LOG.error("Could not remove the incomplete file 
" + actualFilePath + '.', t);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8a7288ea/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 078599d..23f5eb9 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.util;
 
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -81,6 +85,36 @@ public final class FileUtils {
        public static void writeFileUtf8(File file, String contents) throws 
IOException {
                writeFile(file, contents, "UTF-8");
        }
+
+       // 
------------------------------------------------------------------------
+       //  Deleting directories
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Deletes the path if it is empty. A path can only be empty if it is a 
directory which does
+        * not contain any other directories/files.
+        *
+        * @param fileSystem to use
+        * @param path to be deleted if empty
+        * @return true if the path could be deleted; otherwise false
+        * @throws IOException if the delete operation fails
+        */
+       public static boolean deletePathIfEmpty(FileSystem fileSystem, Path 
path) throws IOException {
+               FileStatus[] fileStatuses = null;
+
+               try {
+                       fileStatuses = fileSystem.listStatus(path);
+               } catch (Exception ignored) {}
+
+               // if there are no more files or if we couldn't list the file 
status try to delete the path
+               if (fileStatuses == null || fileStatuses.length == 0) {
+                       // attempt to delete the path (will fail and be ignored 
if the path now contains
+                       // some files (possibly added concurrently))
+                       return fileSystem.delete(path, false);
+               } else {
+                       return false;
+               }
+       }
        
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8a7288ea/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
index d21e0f1..ee1deef 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemTest.java
@@ -34,10 +34,14 @@ import java.util.UUID;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
+import org.apache.flink.util.FileUtils;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 /**
  * This class tests the functionality of the {@link LocalFileSystem} class in 
its components. In particular,
@@ -45,6 +49,9 @@ import org.junit.Test;
  */
 public class LocalFileSystemTest {
 
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
        /**
         * This test checks the functionality of the {@link LocalFileSystem} 
class.
         */
@@ -166,4 +173,43 @@ public class LocalFileSystemTest {
                        tempdir.delete();
                }
        }
+
+       /**
+        * Test that {@link FileUtils#deletePathIfEmpty(FileSystem, Path)} 
deletes the path if it is
+        * empty. A path can only be empty if it is a directory which does not 
contain any
+        * files/directories.
+        */
+       @Test
+       public void testDeletePathIfEmpty() throws IOException {
+               File file = temporaryFolder.newFile();
+               File directory = temporaryFolder.newFolder();
+               File directoryFile = new File(directory, 
UUID.randomUUID().toString());
+
+               assertTrue(directoryFile.createNewFile());
+
+               Path filePath = new Path(file.toURI());
+               Path directoryPath = new Path(directory.toURI());
+               Path directoryFilePath = new Path(directoryFile.toURI());
+
+               FileSystem fs = FileSystem.getLocalFileSystem();
+
+               // verify that the files have been created
+               assertTrue(fs.exists(filePath));
+               assertTrue(fs.exists(directoryFilePath));
+
+               // delete the single file
+               assertFalse(FileUtils.deletePathIfEmpty(fs, filePath));
+               assertTrue(fs.exists(filePath));
+
+               // try to delete the non-empty directory
+               assertFalse(FileUtils.deletePathIfEmpty(fs, directoryPath));
+               assertTrue(fs.exists(directoryPath));
+
+               // delete the file contained in the directory
+               assertTrue(fs.delete(directoryFilePath, false));
+
+               // now the deletion should work
+               assertTrue(FileUtils.deletePathIfEmpty(fs, directoryPath));
+               assertFalse(fs.exists(directoryPath));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8a7288ea/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index fd959af..1df6390 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.examples.java.wordcount.WordCount;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.util.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -41,6 +42,11 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.UUID;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * This test should logically be located in the 'flink-runtime' tests. 
However, this project
@@ -98,7 +104,7 @@ public class HDFSTest {
                org.apache.hadoop.fs.Path result = new 
org.apache.hadoop.fs.Path(hdfsURI + "/result");
                try {
                        FileSystem fs = file.getFileSystem();
-                       Assert.assertTrue("Must be HadoopFileSystem", fs 
instanceof HadoopFileSystem);
+                       assertTrue("Must be HadoopFileSystem", fs instanceof 
HadoopFileSystem);
                        
                        DopOneTestEnvironment.setAsContext();
                        try {
@@ -114,7 +120,7 @@ public class HDFSTest {
                                DopOneTestEnvironment.unsetAsContext();
                        }
                        
-                       Assert.assertTrue("No result file present", 
hdfs.exists(result));
+                       assertTrue("No result file present", 
hdfs.exists(result));
                        
                        // validate output:
                        org.apache.hadoop.fs.FSDataInputStream inStream = 
hdfs.open(result);
@@ -154,11 +160,11 @@ public class HDFSTest {
                        avroOut.close();
 
 
-                       Assert.assertTrue("No result file present", 
hdfs.exists(result));
+                       assertTrue("No result file present", 
hdfs.exists(result));
                        FileStatus[] files = hdfs.listStatus(result);
                        Assert.assertEquals(2, files.length);
                        for(FileStatus file : files) {
-                               
Assert.assertTrue("1.avro".equals(file.getPath().getName()) || 
"2.avro".equals(file.getPath().getName()));
+                               
assertTrue("1.avro".equals(file.getPath().getName()) || 
"2.avro".equals(file.getPath().getName()));
                        }
 
                } catch (IOException e) {
@@ -167,6 +173,50 @@ public class HDFSTest {
                }
        }
 
+       /**
+        * Test that {@link FileUtils#deletePathIfEmpty(FileSystem, Path)} 
deletes the path if it is
+        * empty. A path can only be empty if it is a directory which does not 
contain any
+        * files/directories.
+        */
+       @Test
+       public void testDeletePathIfEmpty() throws IOException {
+               final Path basePath = new Path(hdfsURI);
+               final Path directory = new Path(basePath, 
UUID.randomUUID().toString());
+               final Path directoryFile = new Path(directory, 
UUID.randomUUID().toString());
+               final Path singleFile = new Path(basePath, 
UUID.randomUUID().toString());
+
+               FileSystem fs = basePath.getFileSystem();
+
+               fs.mkdirs(directory);
+
+               byte[] data = "HDFSTest#testDeletePathIfEmpty".getBytes();
+
+               for (Path file: Arrays.asList(singleFile, directoryFile)) {
+                       org.apache.flink.core.fs.FSDataOutputStream 
outputStream = fs.create(file, true);
+                       outputStream.write(data);
+                       outputStream.close();
+               }
+
+               // verify that the files have been created
+               assertTrue(fs.exists(singleFile));
+               assertTrue(fs.exists(directoryFile));
+
+               // delete the single file
+               assertFalse(FileUtils.deletePathIfEmpty(fs, singleFile));
+               assertTrue(fs.exists(singleFile));
+
+               // try to delete the non-empty directory
+               assertFalse(FileUtils.deletePathIfEmpty(fs, directory));
+               assertTrue(fs.exists(directory));
+
+               // delete the file contained in the directory
+               assertTrue(fs.delete(directoryFile, false));
+
+               // now the deletion should work
+               assertTrue(FileUtils.deletePathIfEmpty(fs, directory));
+               assertFalse(fs.exists(directory));
+       }
+
        // package visible
        static abstract class DopOneTestEnvironment extends 
ExecutionEnvironment {
                

http://git-wip-us.apache.org/repos/asf/flink/blob/8a7288ea/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java
index 25a0e89..fcdcc78 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFileStateHandle.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.migration.runtime.state.AbstractCloseableHandle;
 import org.apache.flink.migration.runtime.state.StateObject;
+import org.apache.flink.util.FileUtils;
 
 import java.io.IOException;
 
@@ -68,11 +69,9 @@ public abstract class AbstractFileStateHandle extends 
AbstractCloseableHandle im
        public void discardState() throws Exception {
                getFileSystem().delete(filePath, false);
 
-               // send a call to delete the checkpoint directory containing 
the file. This will
-               // fail (and be ignored) when some files still exist
                try {
-                       getFileSystem().delete(filePath.getParent(), false);
-               } catch (IOException ignored) {}
+                       FileUtils.deletePathIfEmpty(getFileSystem(), 
filePath.getParent());
+               } catch (Exception ignored) {}
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/8a7288ea/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index b61c52d..4b2d350 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.FileUtils;
 
 import java.io.IOException;
 
@@ -81,12 +82,9 @@ public class FileStateHandle implements StreamStateHandle {
 
                fs.delete(filePath, false);
 
-               // send a call to delete the checkpoint directory containing 
the file. This will
-               // fail (and be ignored) when some files still exist
                try {
-                       fs.delete(filePath.getParent(), false);
-               } catch (IOException ignored) {
-               }
+                       FileUtils.deletePathIfEmpty(fs, filePath.getParent());
+               } catch (Exception ignored) {}
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/8a7288ea/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 135ee04..1be3abf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -267,10 +268,11 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
                                                outStream.close();
                                                fs.delete(statePath, false);
 
-                                               // attempt to delete the parent 
(will fail and be ignored if the parent has more files)
                                                try {
-                                                       fs.delete(basePath, 
false);
-                                               } catch (IOException ignored) {}
+                                                       
FileUtils.deletePathIfEmpty(fs, basePath);
+                                               } catch (Exception ignored) {
+                                                       LOG.debug("Could not 
delete the parent directory {}.", basePath, ignored);
+                                               }
                                        }
                                        catch (Exception e) {
                                                LOG.warn("Cannot delete closed 
and discarded state stream for " + statePath, e);

Reply via email to