Repository: flink Updated Branches: refs/heads/release-1.3 f112b56e2 -> ce685dbda
[FLINK-6492] Fix unclosed DataOutputViewStream usage This closes #3898. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ae98d38 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ae98d38 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ae98d38 Branch: refs/heads/release-1.3 Commit: 0ae98d3863fb49f67ea4afdf66790b74c1d64d3d Parents: f112b56 Author: huafengw <[email protected]> Authored: Mon May 15 15:56:19 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue May 23 22:26:52 2017 +0800 ---------------------------------------------------------------------- .../base/GenericArraySerializerConfigSnapshot.java | 8 +++++--- .../runtime/KryoRegistrationSerializerConfigSnapshot.java | 9 ++++++--- .../typeutils/runtime/TupleSerializerConfigSnapshot.java | 8 +++++--- .../java/org/apache/flink/runtime/state/JavaSerializer.java | 8 +++++--- 4 files changed, 21 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0ae98d38/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java index 79dcf89..70e5210 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java @@ -58,15 +58,17 @@ public final class GenericArraySerializerConfigSnapshot<C> extends CompositeType public void write(DataOutputView out) throws IOException { super.write(out); - InstantiationUtil.serializeObject(new DataOutputViewStream(out), componentClass); + try (final DataOutputViewStream outViewWrapper = new DataOutputViewStream(out)) { + InstantiationUtil.serializeObject(outViewWrapper, componentClass); + } } @Override public void read(DataInputView in) throws IOException { super.read(in); - try { - componentClass = InstantiationUtil.deserializeObject(new DataInputViewStream(in), getUserCodeClassLoader()); + try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) { + componentClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader()); } catch (ClassNotFoundException e) { throw new IOException("Could not find requested element class in classpath.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/0ae98d38/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java index 3a42d69..14287ca 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java @@ -133,7 +133,9 @@ public abstract class KryoRegistrationSerializerConfigSnapshot<T> extends Generi out.writeUTF(kryoRegistration.getSerializerClass().getName()); break; case INSTANCE: - InstantiationUtil.serializeObject(new DataOutputViewStream(out), kryoRegistration.getSerializableSerializerInstance()); + try (final DataOutputViewStream outViewWrapper = new DataOutputViewStream(out)) { + InstantiationUtil.serializeObject(outViewWrapper, kryoRegistration.getSerializableSerializerInstance()); + } break; default: // this should not happen; adding as a guard for the future @@ -184,8 +186,9 @@ public abstract class KryoRegistrationSerializerConfigSnapshot<T> extends Generi case INSTANCE: ExecutionConfig.SerializableSerializer<? extends Serializer<RC>> serializerInstance; - try { - serializerInstance = InstantiationUtil.deserializeObject(new DataInputViewStream(in), userCodeClassLoader); + + try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) { + serializerInstance = InstantiationUtil.deserializeObject(inViewWrapper, userCodeClassLoader); } catch (ClassNotFoundException e) { LOG.warn("Cannot find registered Kryo serializer class for class " + registeredClassname + " in classpath; using a dummy Kryo serializer that should be replaced as soon as" + http://git-wip-us.apache.org/repos/asf/flink/blob/0ae98d38/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java index 1e7701c..705099e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java @@ -51,15 +51,17 @@ public final class TupleSerializerConfigSnapshot<T> extends CompositeTypeSeriali public void write(DataOutputView out) throws IOException { super.write(out); - InstantiationUtil.serializeObject(new DataOutputViewStream(out), tupleClass); + try (final DataOutputViewStream outViewWrapper = new DataOutputViewStream(out)) { + InstantiationUtil.serializeObject(outViewWrapper, tupleClass); + } } @Override public void read(DataInputView in) throws IOException { super.read(in); - try { - tupleClass = InstantiationUtil.deserializeObject(new DataInputViewStream(in), getUserCodeClassLoader()); + try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) { + tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader()); } catch (ClassNotFoundException e) { throw new IOException("Could not find requested tuple class in classpath.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/0ae98d38/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java index 5252b3d..7d9e888 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java @@ -66,14 +66,16 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializerSinglet @Override public void serialize(T record, DataOutputView target) throws IOException { - InstantiationUtil.serializeObject(new DataOutputViewStream(target), record); + try (final DataOutputViewStream outViewWrapper = new DataOutputViewStream(target)) { + InstantiationUtil.serializeObject(outViewWrapper, record); + } } @Override public T deserialize(DataInputView source) throws IOException { - try { + try (final DataInputViewStream inViewWrapper = new DataInputViewStream(source)) { return InstantiationUtil.deserializeObject( - new DataInputViewStream(source), + inViewWrapper, Thread.currentThread().getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IOException("Could not deserialize object.", e);
