Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/3904#discussion_r116483708
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
---
@@ -46,111 +46,113 @@ public static String getResourceFilename(String
filename) {
public static void writeStateHandle(OperatorStateHandles state, String
path) throws IOException {
FileOutputStream out = new FileOutputStream(path);
- DataOutputStream dos = new DataOutputStream(out);
-
- dos.writeInt(state.getOperatorChainIndex());
-
-
SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(),
dos);
-
- Collection<OperatorStateHandle> rawOperatorState =
state.getRawOperatorState();
- if (rawOperatorState != null) {
- dos.writeInt(rawOperatorState.size());
- for (OperatorStateHandle operatorStateHandle :
rawOperatorState) {
-
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+
+ try(DataOutputStream dos = new DataOutputStream(out)) {
+
+ dos.writeInt(state.getOperatorChainIndex());
+
+
SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(),
dos);
+
+ Collection<OperatorStateHandle> rawOperatorState =
state.getRawOperatorState();
+ if (rawOperatorState != null) {
+ dos.writeInt(rawOperatorState.size());
+ for (OperatorStateHandle operatorStateHandle :
rawOperatorState) {
+
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ }
+ } else {
+ // this means no states, not even an empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no states, not even an empty list
- dos.writeInt(-1);
- }
- Collection<OperatorStateHandle> managedOperatorState =
state.getManagedOperatorState();
- if (managedOperatorState != null) {
- dos.writeInt(managedOperatorState.size());
- for (OperatorStateHandle operatorStateHandle :
managedOperatorState) {
-
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ Collection<OperatorStateHandle> managedOperatorState =
state.getManagedOperatorState();
+ if (managedOperatorState != null) {
+ dos.writeInt(managedOperatorState.size());
+ for (OperatorStateHandle operatorStateHandle :
managedOperatorState) {
+
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ }
+ } else {
+ // this means no states, not even an empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no states, not even an empty list
- dos.writeInt(-1);
- }
- Collection<KeyedStateHandle> rawKeyedState =
state.getRawKeyedState();
- if (rawKeyedState != null) {
- dos.writeInt(rawKeyedState.size());
- for (KeyedStateHandle keyedStateHandle : rawKeyedState)
{
-
SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ Collection<KeyedStateHandle> rawKeyedState =
state.getRawKeyedState();
+ if (rawKeyedState != null) {
+ dos.writeInt(rawKeyedState.size());
+ for (KeyedStateHandle keyedStateHandle :
rawKeyedState) {
+
SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ }
+ } else {
+ // this means no operator states, not even an
empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no operator states, not even an empty list
- dos.writeInt(-1);
- }
- Collection<KeyedStateHandle> managedKeyedState =
state.getManagedKeyedState();
- if (managedKeyedState != null) {
- dos.writeInt(managedKeyedState.size());
- for (KeyedStateHandle keyedStateHandle :
managedKeyedState) {
-
SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ Collection<KeyedStateHandle> managedKeyedState =
state.getManagedKeyedState();
+ if (managedKeyedState != null) {
+ dos.writeInt(managedKeyedState.size());
+ for (KeyedStateHandle keyedStateHandle :
managedKeyedState) {
+
SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ }
+ } else {
+ // this means no operator states, not even an
empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no operator states, not even an empty list
- dos.writeInt(-1);
- }
- dos.flush();
- out.close();
+ dos.flush();
+ }
}
public static OperatorStateHandles readStateHandle(String path) throws
IOException, ClassNotFoundException {
FileInputStream in = new FileInputStream(path);
- DataInputStream dis = new DataInputStream(in);
- int index = dis.readInt();
+ try(DataInputStream dis = new DataInputStream(in)) {
--- End diff --
missing space after try.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---