[FLINK-9019] Unclosed closeableRegistry in StreamTaskStateInitializerImpl#rawOperatorStateInputs
This closes #5723. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6eb91a10 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6eb91a10 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6eb91a10 Branch: refs/heads/master Commit: 6eb91a1006590e6806ec0e6c381fca411d0e23d7 Parents: f9fbbc3 Author: yanghua <[email protected]> Authored: Tue Mar 20 10:02:21 2018 +0800 Committer: Till Rohrmann <[email protected]> Committed: Tue Mar 20 14:53:44 2018 +0100 ---------------------------------------------------------------------- .../api/operators/StreamTaskStateInitializerImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6eb91a10/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index acbc2f8..7e91554 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -279,8 +279,6 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize if (restoreStateAlternatives.hasNext()) { - final CloseableRegistry closeableRegistry = new CloseableRegistry(); - Collection<OperatorStateHandle> rawOperatorState = restoreStateAlternatives.next(); // TODO currently this does not support local state recovery, so we expect there is only one handle. Preconditions.checkState( @@ -288,8 +286,10 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize "Local recovery is currently not implemented for raw operator state, but found state alternative."); if (rawOperatorState != null) { - return new CloseableIterable<StatePartitionStreamProvider>() { + + final CloseableRegistry closeableRegistry = new CloseableRegistry(); + @Override public void close() throws IOException { closeableRegistry.close();
