[FLINK-2866] [runtime] Eagerly close FSDataInputStream in file state handle
This closes #1282 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2811ce9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2811ce9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2811ce9 Branch: refs/heads/release-0.10 Commit: c2811ce975548e18d64a5f3c2b1b397f9e42bc1c Parents: ec1730b Author: tedyu <[email protected]> Authored: Wed Oct 21 19:40:21 2015 -0700 Committer: Stephan Ewen <[email protected]> Committed: Sun Oct 25 19:12:28 2015 +0100 ---------------------------------------------------------------------- .../runtime/state/filesystem/FileSerializableStateHandle.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c2811ce9/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java index b7e7cd1..63336d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java @@ -46,8 +46,9 @@ public class FileSerializableStateHandle<T> extends AbstractFileState implements @Override @SuppressWarnings("unchecked") public T getState(ClassLoader classLoader) throws Exception { - FSDataInputStream inStream = getFileSystem().open(getFilePath()); - ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader); - return (T) ois.readObject(); + try (FSDataInputStream inStream = getFileSystem().open(getFilePath())) { + ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader); + return (T) ois.readObject(); + } } }
