Repository: flink Updated Branches: refs/heads/release-1.4 e784f3a18 -> 666b1b2e6
[FLINK-7266] [core] Prevent attempt for parent directory deletion for object stores This closes #4397 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/666b1b2e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/666b1b2e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/666b1b2e Branch: refs/heads/release-1.4 Commit: 666b1b2e62463ae4985d237535d56c9e0ab9dba9 Parents: a0dbe18 Author: Stephan Ewen <se...@apache.org> Authored: Tue Jul 25 17:26:38 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Nov 17 17:22:24 2017 +0100 ---------------------------------------------------------------------- .../flink/runtime/state/filesystem/FileStateHandle.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/666b1b2e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java index bdf3f42..7655f0b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.filesystem; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.FileUtils; @@ -77,14 +78,15 @@ public class FileStateHandle implements StreamStateHandle { */ @Override public void discardState() throws Exception { - FileSystem fs = getFileSystem(); fs.delete(filePath, false); - try { - FileUtils.deletePathIfEmpty(fs, filePath.getParent()); - } catch (Exception ignored) {} + if (fs.getKind() == FileSystemKind.FILE_SYSTEM) { + try { + FileUtils.deletePathIfEmpty(fs, filePath.getParent()); + } catch (Exception ignored) {} + } } /**