Repository: flink
Updated Branches:
  refs/heads/master f369f8640 -> 557540a51


[FLINK-6492] Fix unclosed DataOutputViewStream usage

This closes #3898.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ded464b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ded464b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ded464b8

Branch: refs/heads/master
Commit: ded464b8bd60d8f19221c0f1589346684c11c78d
Parents: f369f86
Author: huafengw <[email protected]>
Authored: Mon May 15 15:56:19 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue May 23 20:46:43 2017 +0800

----------------------------------------------------------------------
 .../base/GenericArraySerializerConfigSnapshot.java          | 8 +++++---
 .../runtime/KryoRegistrationSerializerConfigSnapshot.java   | 9 ++++++---
 .../typeutils/runtime/TupleSerializerConfigSnapshot.java    | 8 +++++---
 .../java/org/apache/flink/runtime/state/JavaSerializer.java | 8 +++++---
 4 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ded464b8/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index 79dcf89..70e5210 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -58,15 +58,17 @@ public final class GenericArraySerializerConfigSnapshot<C> 
extends CompositeType
        public void write(DataOutputView out) throws IOException {
                super.write(out);
 
-               InstantiationUtil.serializeObject(new 
DataOutputViewStream(out), componentClass);
+               try (final DataOutputViewStream outViewWrapper = new 
DataOutputViewStream(out)) {
+                       InstantiationUtil.serializeObject(outViewWrapper, 
componentClass);
+               }
        }
 
        @Override
        public void read(DataInputView in) throws IOException {
                super.read(in);
 
-               try {
-                       componentClass = 
InstantiationUtil.deserializeObject(new DataInputViewStream(in), 
getUserCodeClassLoader());
+               try (final DataInputViewStream inViewWrapper = new 
DataInputViewStream(in)) {
+                       componentClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
                } catch (ClassNotFoundException e) {
                        throw new IOException("Could not find requested element 
class in classpath.", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ded464b8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
index 3a42d69..14287ca 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
@@ -133,7 +133,9 @@ public abstract class 
KryoRegistrationSerializerConfigSnapshot<T> extends Generi
                                        
out.writeUTF(kryoRegistration.getSerializerClass().getName());
                                        break;
                                case INSTANCE:
-                                       InstantiationUtil.serializeObject(new 
DataOutputViewStream(out), 
kryoRegistration.getSerializableSerializerInstance());
+                                       try (final DataOutputViewStream 
outViewWrapper = new DataOutputViewStream(out)) {
+                                               
InstantiationUtil.serializeObject(outViewWrapper, 
kryoRegistration.getSerializableSerializerInstance());
+                                       }
                                        break;
                                default:
                                        // this should not happen; adding as a 
guard for the future
@@ -184,8 +186,9 @@ public abstract class 
KryoRegistrationSerializerConfigSnapshot<T> extends Generi
 
                                case INSTANCE:
                                        
ExecutionConfig.SerializableSerializer<? extends Serializer<RC>> 
serializerInstance;
-                                       try {
-                                               serializerInstance = 
InstantiationUtil.deserializeObject(new DataInputViewStream(in), 
userCodeClassLoader);
+
+                                       try (final DataInputViewStream 
inViewWrapper = new DataInputViewStream(in)) {
+                                               serializerInstance = 
InstantiationUtil.deserializeObject(inViewWrapper, userCodeClassLoader);
                                        } catch (ClassNotFoundException e) {
                                                LOG.warn("Cannot find 
registered Kryo serializer class for class " + registeredClassname +
                                                                " in classpath; 
using a dummy Kryo serializer that should be replaced as soon as" +

http://git-wip-us.apache.org/repos/asf/flink/blob/ded464b8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
index 1e7701c..705099e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -51,15 +51,17 @@ public final class TupleSerializerConfigSnapshot<T> extends 
CompositeTypeSeriali
        public void write(DataOutputView out) throws IOException {
                super.write(out);
 
-               InstantiationUtil.serializeObject(new 
DataOutputViewStream(out), tupleClass);
+               try (final DataOutputViewStream outViewWrapper = new 
DataOutputViewStream(out)) {
+                       InstantiationUtil.serializeObject(outViewWrapper, 
tupleClass);
+               }
        }
 
        @Override
        public void read(DataInputView in) throws IOException {
                super.read(in);
 
-               try {
-                       tupleClass = InstantiationUtil.deserializeObject(new 
DataInputViewStream(in), getUserCodeClassLoader());
+               try (final DataInputViewStream inViewWrapper = new 
DataInputViewStream(in)) {
+                       tupleClass = 
InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
                } catch (ClassNotFoundException e) {
                        throw new IOException("Could not find requested tuple 
class in classpath.", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ded464b8/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
index 5252b3d..7d9e888 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -66,14 +66,16 @@ final class JavaSerializer<T extends Serializable> extends 
TypeSerializerSinglet
 
        @Override
        public void serialize(T record, DataOutputView target) throws 
IOException {
-               InstantiationUtil.serializeObject(new 
DataOutputViewStream(target), record);
+               try (final DataOutputViewStream outViewWrapper = new 
DataOutputViewStream(target)) {
+                       InstantiationUtil.serializeObject(outViewWrapper, 
record);
+               }
        }
 
        @Override
        public T deserialize(DataInputView source) throws IOException {
-               try {
+               try (final DataInputViewStream inViewWrapper = new 
DataInputViewStream(source)) {
                        return InstantiationUtil.deserializeObject(
-                                       new DataInputViewStream(source),
+                                       inViewWrapper,
                                        
Thread.currentThread().getContextClassLoader());
                } catch (ClassNotFoundException e) {
                        throw new IOException("Could not deserialize object.", 
e);

Reply via email to