[FLINK-9019] Unclosed closeableRegistry in 
StreamTaskStateInitializerImpl#rawOperatorStateInputs

This closes #5723.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6eb91a10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6eb91a10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6eb91a10

Branch: refs/heads/master
Commit: 6eb91a1006590e6806ec0e6c381fca411d0e23d7
Parents: f9fbbc3
Author: yanghua <[email protected]>
Authored: Tue Mar 20 10:02:21 2018 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Tue Mar 20 14:53:44 2018 +0100

----------------------------------------------------------------------
 .../api/operators/StreamTaskStateInitializerImpl.java          | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6eb91a10/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index acbc2f8..7e91554 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -279,8 +279,6 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
 
                if (restoreStateAlternatives.hasNext()) {
 
-                       final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
-
                        Collection<OperatorStateHandle> rawOperatorState = 
restoreStateAlternatives.next();
                        // TODO currently this does not support local state 
recovery, so we expect there is only one handle.
                        Preconditions.checkState(
@@ -288,8 +286,10 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                                "Local recovery is currently not implemented 
for raw operator state, but found state alternative.");
 
                        if (rawOperatorState != null) {
-
                                return new 
CloseableIterable<StatePartitionStreamProvider>() {
+
+                                       final CloseableRegistry 
closeableRegistry = new CloseableRegistry();
+
                                        @Override
                                        public void close() throws IOException {
                                                closeableRegistry.close();

Reply via email to