[
https://issues.apache.org/jira/browse/FLINK-6439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010454#comment-16010454
]
ASF GitHub Bot commented on FLINK-6439:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/3904#discussion_r116483708
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
---
@@ -46,111 +46,113 @@ public static String getResourceFilename(String
filename) {
public static void writeStateHandle(OperatorStateHandles state, String
path) throws IOException {
FileOutputStream out = new FileOutputStream(path);
- DataOutputStream dos = new DataOutputStream(out);
-
- dos.writeInt(state.getOperatorChainIndex());
-
-
SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(),
dos);
-
- Collection<OperatorStateHandle> rawOperatorState =
state.getRawOperatorState();
- if (rawOperatorState != null) {
- dos.writeInt(rawOperatorState.size());
- for (OperatorStateHandle operatorStateHandle :
rawOperatorState) {
-
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+
+ try(DataOutputStream dos = new DataOutputStream(out)) {
+
+ dos.writeInt(state.getOperatorChainIndex());
+
+
SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(),
dos);
+
+ Collection<OperatorStateHandle> rawOperatorState =
state.getRawOperatorState();
+ if (rawOperatorState != null) {
+ dos.writeInt(rawOperatorState.size());
+ for (OperatorStateHandle operatorStateHandle :
rawOperatorState) {
+
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ }
+ } else {
+ // this means no states, not even an empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no states, not even an empty list
- dos.writeInt(-1);
- }
- Collection<OperatorStateHandle> managedOperatorState =
state.getManagedOperatorState();
- if (managedOperatorState != null) {
- dos.writeInt(managedOperatorState.size());
- for (OperatorStateHandle operatorStateHandle :
managedOperatorState) {
-
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ Collection<OperatorStateHandle> managedOperatorState =
state.getManagedOperatorState();
+ if (managedOperatorState != null) {
+ dos.writeInt(managedOperatorState.size());
+ for (OperatorStateHandle operatorStateHandle :
managedOperatorState) {
+
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ }
+ } else {
+ // this means no states, not even an empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no states, not even an empty list
- dos.writeInt(-1);
- }
- Collection<KeyedStateHandle> rawKeyedState =
state.getRawKeyedState();
- if (rawKeyedState != null) {
- dos.writeInt(rawKeyedState.size());
- for (KeyedStateHandle keyedStateHandle : rawKeyedState)
{
-
SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ Collection<KeyedStateHandle> rawKeyedState =
state.getRawKeyedState();
+ if (rawKeyedState != null) {
+ dos.writeInt(rawKeyedState.size());
+ for (KeyedStateHandle keyedStateHandle :
rawKeyedState) {
+
SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ }
+ } else {
+ // this means no operator states, not even an
empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no operator states, not even an empty list
- dos.writeInt(-1);
- }
- Collection<KeyedStateHandle> managedKeyedState =
state.getManagedKeyedState();
- if (managedKeyedState != null) {
- dos.writeInt(managedKeyedState.size());
- for (KeyedStateHandle keyedStateHandle :
managedKeyedState) {
-
SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ Collection<KeyedStateHandle> managedKeyedState =
state.getManagedKeyedState();
+ if (managedKeyedState != null) {
+ dos.writeInt(managedKeyedState.size());
+ for (KeyedStateHandle keyedStateHandle :
managedKeyedState) {
+
SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ }
+ } else {
+ // this means no operator states, not even an
empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no operator states, not even an empty list
- dos.writeInt(-1);
- }
- dos.flush();
- out.close();
+ dos.flush();
+ }
}
public static OperatorStateHandles readStateHandle(String path) throws
IOException, ClassNotFoundException {
FileInputStream in = new FileInputStream(path);
- DataInputStream dis = new DataInputStream(in);
- int index = dis.readInt();
+ try(DataInputStream dis = new DataInputStream(in)) {
--- End diff --
missing space after try.
> Unclosed InputStream in OperatorSnapshotUtil#readStateHandle()
> --------------------------------------------------------------
>
> Key: FLINK-6439
> URL: https://issues.apache.org/jira/browse/FLINK-6439
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Reporter: Ted Yu
> Assignee: Fang Yong
> Priority: Minor
>
> {code}
> FileInputStream in = new FileInputStream(path);
> DataInputStream dis = new DataInputStream(in);
> {code}
> None of the in / dis is closed upon return from the method.
> In writeStateHandle(), OutputStream should be closed in finally block.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)