Repository: flink
Updated Branches:
  refs/heads/release-1.3 c62553c00 -> 5fde739fd


[FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fde739f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fde739f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fde739f

Branch: refs/heads/release-1.3
Commit: 5fde739fd2b040a90d42a6a73f1d119648e863a7
Parents: c3ab5c8
Author: zjureel <[email protected]>
Authored: Mon May 15 18:14:11 2017 +0800
Committer: zentol <[email protected]>
Committed: Fri May 19 21:09:08 2017 +0200

----------------------------------------------------------------------
 .../streaming/util/OperatorSnapshotUtil.java    | 162 ++++++++++---------
 1 file changed, 82 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5fde739f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 92a9452..8011279 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -46,111 +46,113 @@ public class OperatorSnapshotUtil {
 
        public static void writeStateHandle(OperatorStateHandles state, String 
path) throws IOException {
                FileOutputStream out = new FileOutputStream(path);
-               DataOutputStream dos = new DataOutputStream(out);
-
-               dos.writeInt(state.getOperatorChainIndex());
-
-               
SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(),
 dos);
-
-               Collection<OperatorStateHandle> rawOperatorState = 
state.getRawOperatorState();
-               if (rawOperatorState != null) {
-                       dos.writeInt(rawOperatorState.size());
-                       for (OperatorStateHandle operatorStateHandle : 
rawOperatorState) {
-                               
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+               
+               try (DataOutputStream dos = new DataOutputStream(out)) {
+
+                       dos.writeInt(state.getOperatorChainIndex());
+
+                       
SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(),
 dos);
+
+                       Collection<OperatorStateHandle> rawOperatorState = 
state.getRawOperatorState();
+                       if (rawOperatorState != null) {
+                               dos.writeInt(rawOperatorState.size());
+                               for (OperatorStateHandle operatorStateHandle : 
rawOperatorState) {
+                                       
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+                               }
+                       } else {
+                               // this means no states, not even an empty list
+                               dos.writeInt(-1);
                        }
-               } else {
-                       // this means no states, not even an empty list
-                       dos.writeInt(-1);
-               }
 
-               Collection<OperatorStateHandle> managedOperatorState = 
state.getManagedOperatorState();
-               if (managedOperatorState != null) {
-                       dos.writeInt(managedOperatorState.size());
-                       for (OperatorStateHandle operatorStateHandle : 
managedOperatorState) {
-                               
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+                       Collection<OperatorStateHandle> managedOperatorState = 
state.getManagedOperatorState();
+                       if (managedOperatorState != null) {
+                               dos.writeInt(managedOperatorState.size());
+                               for (OperatorStateHandle operatorStateHandle : 
managedOperatorState) {
+                                       
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+                               }
+                       } else {
+                               // this means no states, not even an empty list
+                               dos.writeInt(-1);
                        }
-               } else {
-                       // this means no states, not even an empty list
-                       dos.writeInt(-1);
-               }
 
-               Collection<KeyedStateHandle> rawKeyedState = 
state.getRawKeyedState();
-               if (rawKeyedState != null) {
-                       dos.writeInt(rawKeyedState.size());
-                       for (KeyedStateHandle keyedStateHandle : rawKeyedState) 
{
-                               
SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+                       Collection<KeyedStateHandle> rawKeyedState = 
state.getRawKeyedState();
+                       if (rawKeyedState != null) {
+                               dos.writeInt(rawKeyedState.size());
+                               for (KeyedStateHandle keyedStateHandle : 
rawKeyedState) {
+                                       
SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+                               }
+                       } else {
+                               // this means no operator states, not even an 
empty list
+                               dos.writeInt(-1);
                        }
-               } else {
-                       // this means no operator states, not even an empty list
-                       dos.writeInt(-1);
-               }
 
-               Collection<KeyedStateHandle> managedKeyedState = 
state.getManagedKeyedState();
-               if (managedKeyedState != null) {
-                       dos.writeInt(managedKeyedState.size());
-                       for (KeyedStateHandle keyedStateHandle : 
managedKeyedState) {
-                               
SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+                       Collection<KeyedStateHandle> managedKeyedState = 
state.getManagedKeyedState();
+                       if (managedKeyedState != null) {
+                               dos.writeInt(managedKeyedState.size());
+                               for (KeyedStateHandle keyedStateHandle : 
managedKeyedState) {
+                                       
SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+                               }
+                       } else {
+                               // this means no operator states, not even an 
empty list
+                               dos.writeInt(-1);
                        }
-               } else {
-                       // this means no operator states, not even an empty list
-                       dos.writeInt(-1);
-               }
 
-               dos.flush();
-               out.close();
+                       dos.flush();
+               }
        }
 
        public static OperatorStateHandles readStateHandle(String path) throws 
IOException, ClassNotFoundException {
                FileInputStream in = new FileInputStream(path);
-               DataInputStream dis = new DataInputStream(in);
-               int index = dis.readInt();
+               try (DataInputStream dis = new DataInputStream(in)) {
+                       int index = dis.readInt();
 
-               StreamStateHandle legacyState = 
SavepointV1Serializer.deserializeStreamStateHandle(dis);
+                       StreamStateHandle legacyState = 
SavepointV1Serializer.deserializeStreamStateHandle(dis);
 
-               List<OperatorStateHandle> rawOperatorState = null;
-               int numRawOperatorStates = dis.readInt();
-               if (numRawOperatorStates >= 0) {
-                       rawOperatorState = new ArrayList<>();
-                       for (int i = 0; i < numRawOperatorStates; i++) {
-                               OperatorStateHandle operatorState = 
SavepointV1Serializer.deserializeOperatorStateHandle(
+                       List<OperatorStateHandle> rawOperatorState = null;
+                       int numRawOperatorStates = dis.readInt();
+                       if (numRawOperatorStates >= 0) {
+                               rawOperatorState = new ArrayList<>();
+                               for (int i = 0; i < numRawOperatorStates; i++) {
+                                       OperatorStateHandle operatorState = 
SavepointV1Serializer.deserializeOperatorStateHandle(
                                                dis);
-                               rawOperatorState.add(operatorState);
+                                       rawOperatorState.add(operatorState);
+                               }
                        }
-               }
 
-               List<OperatorStateHandle> managedOperatorState = null;
-               int numManagedOperatorStates = dis.readInt();
-               if (numManagedOperatorStates >= 0) {
-                       managedOperatorState = new ArrayList<>();
-                       for (int i = 0; i < numManagedOperatorStates; i++) {
-                               OperatorStateHandle operatorState = 
SavepointV1Serializer.deserializeOperatorStateHandle(
+                       List<OperatorStateHandle> managedOperatorState = null;
+                       int numManagedOperatorStates = dis.readInt();
+                       if (numManagedOperatorStates >= 0) {
+                               managedOperatorState = new ArrayList<>();
+                               for (int i = 0; i < numManagedOperatorStates; 
i++) {
+                                       OperatorStateHandle operatorState = 
SavepointV1Serializer.deserializeOperatorStateHandle(
                                                dis);
-                               managedOperatorState.add(operatorState);
+                                       managedOperatorState.add(operatorState);
+                               }
                        }
-               }
 
-               List<KeyedStateHandle> rawKeyedState = null;
-               int numRawKeyedStates = dis.readInt();
-               if (numRawKeyedStates >= 0) {
-                       rawKeyedState = new ArrayList<>();
-                       for (int i = 0; i < numRawKeyedStates; i++) {
-                               KeyedStateHandle keyedState = 
SavepointV1Serializer.deserializeKeyedStateHandle(
+                       List<KeyedStateHandle> rawKeyedState = null;
+                       int numRawKeyedStates = dis.readInt();
+                       if (numRawKeyedStates >= 0) {
+                               rawKeyedState = new ArrayList<>();
+                               for (int i = 0; i < numRawKeyedStates; i++) {
+                                       KeyedStateHandle keyedState = 
SavepointV1Serializer.deserializeKeyedStateHandle(
                                                dis);
-                               rawKeyedState.add(keyedState);
+                                       rawKeyedState.add(keyedState);
+                               }
                        }
-               }
 
-               List<KeyedStateHandle> managedKeyedState = null;
-               int numManagedKeyedStates = dis.readInt();
-               if (numManagedKeyedStates >= 0) {
-                       managedKeyedState = new ArrayList<>();
-                       for (int i = 0; i < numManagedKeyedStates; i++) {
-                               KeyedStateHandle keyedState = 
SavepointV1Serializer.deserializeKeyedStateHandle(
+                       List<KeyedStateHandle> managedKeyedState = null;
+                       int numManagedKeyedStates = dis.readInt();
+                       if (numManagedKeyedStates >= 0) {
+                               managedKeyedState = new ArrayList<>();
+                               for (int i = 0; i < numManagedKeyedStates; i++) 
{
+                                       KeyedStateHandle keyedState = 
SavepointV1Serializer.deserializeKeyedStateHandle(
                                                dis);
-                               managedKeyedState.add(keyedState);
+                                       managedKeyedState.add(keyedState);
+                               }
                        }
-               }
 
-               return new OperatorStateHandles(index, legacyState, 
managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+                       return new OperatorStateHandles(index, legacyState, 
managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+               }
        }
 }

Reply via email to