Repository: flink
Updated Branches:
  refs/heads/release-1.4 e784f3a18 -> 666b1b2e6


[FLINK-7266] [core] Prevent attempt for parent directory deletion for object 
stores

This closes #4397


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/666b1b2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/666b1b2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/666b1b2e

Branch: refs/heads/release-1.4
Commit: 666b1b2e62463ae4985d237535d56c9e0ab9dba9
Parents: a0dbe18
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 25 17:26:38 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Nov 17 17:22:24 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/state/filesystem/FileStateHandle.java   | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/666b1b2e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index bdf3f42..7655f0b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.filesystem;
 
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.FileUtils;
@@ -77,14 +78,15 @@ public class FileStateHandle implements StreamStateHandle {
         */
        @Override
        public void discardState() throws Exception {
-
                FileSystem fs = getFileSystem();
 
                fs.delete(filePath, false);
 
-               try {
-                       FileUtils.deletePathIfEmpty(fs, filePath.getParent());
-               } catch (Exception ignored) {}
+               if (fs.getKind() == FileSystemKind.FILE_SYSTEM) {
+                       try {
+                               FileUtils.deletePathIfEmpty(fs, 
filePath.getParent());
+                       } catch (Exception ignored) {}
+               }
        }
 
        /**

Reply via email to