[FLINK-2866] [runtime] Eagerly close FSDataInputStream in file state handle
This closes #1282 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d647151 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d647151 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d647151 Branch: refs/heads/master Commit: 2d647151c729f707681dd5e2d226e4bbe0329bba Parents: fa88d9e Author: tedyu <[email protected]> Authored: Wed Oct 21 19:40:21 2015 -0700 Committer: Stephan Ewen <[email protected]> Committed: Sun Oct 25 19:13:11 2015 +0100 ---------------------------------------------------------------------- .../runtime/state/filesystem/FileSerializableStateHandle.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2d647151/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java index b7e7cd1..63336d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java @@ -46,8 +46,9 @@ public class FileSerializableStateHandle<T> extends AbstractFileState implements @Override @SuppressWarnings("unchecked") public T getState(ClassLoader classLoader) throws Exception { - FSDataInputStream inStream = getFileSystem().open(getFilePath()); - ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader); - return (T) ois.readObject(); + try (FSDataInputStream inStream = getFileSystem().open(getFilePath())) { + ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader); + return (T) ois.readObject(); + } } }
