Repository: flink Updated Branches: refs/heads/release-1.3 c62553c00 -> 5fde739fd
[FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fde739f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fde739f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fde739f Branch: refs/heads/release-1.3 Commit: 5fde739fd2b040a90d42a6a73f1d119648e863a7 Parents: c3ab5c8 Author: zjureel <[email protected]> Authored: Mon May 15 18:14:11 2017 +0800 Committer: zentol <[email protected]> Committed: Fri May 19 21:09:08 2017 +0200 ---------------------------------------------------------------------- .../streaming/util/OperatorSnapshotUtil.java | 162 ++++++++++--------- 1 file changed, 82 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5fde739f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java index 92a9452..8011279 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java @@ -46,111 +46,113 @@ public class OperatorSnapshotUtil { public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException { FileOutputStream out = new FileOutputStream(path); - DataOutputStream dos = new DataOutputStream(out); - - dos.writeInt(state.getOperatorChainIndex()); - - SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos); - - Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState(); - if (rawOperatorState != null) { - dos.writeInt(rawOperatorState.size()); - for (OperatorStateHandle operatorStateHandle : rawOperatorState) { - SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + + try (DataOutputStream dos = new DataOutputStream(out)) { + + dos.writeInt(state.getOperatorChainIndex()); + + SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos); + + Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState(); + if (rawOperatorState != null) { + dos.writeInt(rawOperatorState.size()); + for (OperatorStateHandle operatorStateHandle : rawOperatorState) { + SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + } + } else { + // this means no states, not even an empty list + dos.writeInt(-1); } - } else { - // this means no states, not even an empty list - dos.writeInt(-1); - } - Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState(); - if (managedOperatorState != null) { - dos.writeInt(managedOperatorState.size()); - for (OperatorStateHandle operatorStateHandle : managedOperatorState) { - SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState(); + if (managedOperatorState != null) { + dos.writeInt(managedOperatorState.size()); + for (OperatorStateHandle operatorStateHandle : managedOperatorState) { + SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + } + } else { + // this means no states, not even an empty list + dos.writeInt(-1); } - } else { - // this means no states, not even an empty list - dos.writeInt(-1); - } - Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState(); - if (rawKeyedState != null) { - dos.writeInt(rawKeyedState.size()); - for (KeyedStateHandle keyedStateHandle : rawKeyedState) { - SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos); + Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState(); + if (rawKeyedState != null) { + dos.writeInt(rawKeyedState.size()); + for (KeyedStateHandle keyedStateHandle : rawKeyedState) { + SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos); + } + } else { + // this means no operator states, not even an empty list + dos.writeInt(-1); } - } else { - // this means no operator states, not even an empty list - dos.writeInt(-1); - } - Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState(); - if (managedKeyedState != null) { - dos.writeInt(managedKeyedState.size()); - for (KeyedStateHandle keyedStateHandle : managedKeyedState) { - SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos); + Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState(); + if (managedKeyedState != null) { + dos.writeInt(managedKeyedState.size()); + for (KeyedStateHandle keyedStateHandle : managedKeyedState) { + SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos); + } + } else { + // this means no operator states, not even an empty list + dos.writeInt(-1); } - } else { - // this means no operator states, not even an empty list - dos.writeInt(-1); - } - dos.flush(); - out.close(); + dos.flush(); + } } public static OperatorStateHandles readStateHandle(String path) throws IOException, ClassNotFoundException { FileInputStream in = new FileInputStream(path); - DataInputStream dis = new DataInputStream(in); - int index = dis.readInt(); + try (DataInputStream dis = new DataInputStream(in)) { + int index = dis.readInt(); - StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis); + StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis); - List<OperatorStateHandle> rawOperatorState = null; - int numRawOperatorStates = dis.readInt(); - if (numRawOperatorStates >= 0) { - rawOperatorState = new ArrayList<>(); - for (int i = 0; i < numRawOperatorStates; i++) { - OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle( + List<OperatorStateHandle> rawOperatorState = null; + int numRawOperatorStates = dis.readInt(); + if (numRawOperatorStates >= 0) { + rawOperatorState = new ArrayList<>(); + for (int i = 0; i < numRawOperatorStates; i++) { + OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle( dis); - rawOperatorState.add(operatorState); + rawOperatorState.add(operatorState); + } } - } - List<OperatorStateHandle> managedOperatorState = null; - int numManagedOperatorStates = dis.readInt(); - if (numManagedOperatorStates >= 0) { - managedOperatorState = new ArrayList<>(); - for (int i = 0; i < numManagedOperatorStates; i++) { - OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle( + List<OperatorStateHandle> managedOperatorState = null; + int numManagedOperatorStates = dis.readInt(); + if (numManagedOperatorStates >= 0) { + managedOperatorState = new ArrayList<>(); + for (int i = 0; i < numManagedOperatorStates; i++) { + OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle( dis); - managedOperatorState.add(operatorState); + managedOperatorState.add(operatorState); + } } - } - List<KeyedStateHandle> rawKeyedState = null; - int numRawKeyedStates = dis.readInt(); - if (numRawKeyedStates >= 0) { - rawKeyedState = new ArrayList<>(); - for (int i = 0; i < numRawKeyedStates; i++) { - KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle( + List<KeyedStateHandle> rawKeyedState = null; + int numRawKeyedStates = dis.readInt(); + if (numRawKeyedStates >= 0) { + rawKeyedState = new ArrayList<>(); + for (int i = 0; i < numRawKeyedStates; i++) { + KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle( dis); - rawKeyedState.add(keyedState); + rawKeyedState.add(keyedState); + } } - } - List<KeyedStateHandle> managedKeyedState = null; - int numManagedKeyedStates = dis.readInt(); - if (numManagedKeyedStates >= 0) { - managedKeyedState = new ArrayList<>(); - for (int i = 0; i < numManagedKeyedStates; i++) { - KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle( + List<KeyedStateHandle> managedKeyedState = null; + int numManagedKeyedStates = dis.readInt(); + if (numManagedKeyedStates >= 0) { + managedKeyedState = new ArrayList<>(); + for (int i = 0; i < numManagedKeyedStates; i++) { + KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle( dis); - managedKeyedState.add(keyedState); + managedKeyedState.add(keyedState); + } } - } - return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState); + return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState); + } } }
