Repository: flink
Updated Branches:
  refs/heads/release-1.5 e4906a7aa -> 071dedcb7


[FLINK-9019] Unclosed closeableRegistry in 
StreamTaskStateInitializerImpl#rawOperatorStateInputs

This closes #5723.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2a62b3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2a62b3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2a62b3e

Branch: refs/heads/release-1.5
Commit: e2a62b3e257a0625392209be0a93880988c0c6f7
Parents: f338963
Author: yanghua <[email protected]>
Authored: Tue Mar 20 10:02:21 2018 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Tue Mar 20 17:38:39 2018 +0100

----------------------------------------------------------------------
 .../api/operators/StreamTaskStateInitializerImpl.java          | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e2a62b3e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index acbc2f8..7e91554 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -279,8 +279,6 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
 
                if (restoreStateAlternatives.hasNext()) {
 
-                       final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
-
                        Collection<OperatorStateHandle> rawOperatorState = 
restoreStateAlternatives.next();
                        // TODO currently this does not support local state 
recovery, so we expect there is only one handle.
                        Preconditions.checkState(
@@ -288,8 +286,10 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                                "Local recovery is currently not implemented 
for raw operator state, but found state alternative.");
 
                        if (rawOperatorState != null) {
-
                                return new 
CloseableIterable<StatePartitionStreamProvider>() {
+
+                                       final CloseableRegistry 
closeableRegistry = new CloseableRegistry();
+
                                        @Override
                                        public void close() throws IOException {
                                                closeableRegistry.close();

Reply via email to