Repository: flink Updated Branches: refs/heads/release-1.5 e4906a7aa -> 071dedcb7
[FLINK-9019] Unclosed closeableRegistry in StreamTaskStateInitializerImpl#rawOperatorStateInputs This closes #5723. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2a62b3e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2a62b3e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2a62b3e Branch: refs/heads/release-1.5 Commit: e2a62b3e257a0625392209be0a93880988c0c6f7 Parents: f338963 Author: yanghua <[email protected]> Authored: Tue Mar 20 10:02:21 2018 +0800 Committer: Till Rohrmann <[email protected]> Committed: Tue Mar 20 17:38:39 2018 +0100 ---------------------------------------------------------------------- .../api/operators/StreamTaskStateInitializerImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e2a62b3e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index acbc2f8..7e91554 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -279,8 +279,6 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize if (restoreStateAlternatives.hasNext()) { - final CloseableRegistry closeableRegistry = new CloseableRegistry(); - Collection<OperatorStateHandle> rawOperatorState = restoreStateAlternatives.next(); // TODO currently this does not support local state recovery, so we expect there is only one handle. Preconditions.checkState( @@ -288,8 +286,10 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize "Local recovery is currently not implemented for raw operator state, but found state alternative."); if (rawOperatorState != null) { - return new CloseableIterable<StatePartitionStreamProvider>() { + + final CloseableRegistry closeableRegistry = new CloseableRegistry(); + @Override public void close() throws IOException { closeableRegistry.close();
