[FLINK-9022][state] Fix resource release in 
StreamTaskStateInitializerImpl.streamOperatorStateContext()

This closes #5716.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9df13c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9df13c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9df13c5

Branch: refs/heads/master
Commit: f9df13c5058f194a5c686b9b753345d9226fc87a
Parents: 7c952dd
Author: sihuazhou <[email protected]>
Authored: Mon Mar 19 19:48:32 2018 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Wed Mar 21 10:56:55 2018 +0100

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java   | 3 ++-
 .../streaming/api/operators/AbstractStreamOperator.java     | 4 ++--
 .../api/operators/StreamTaskStateInitializerImpl.java       | 9 +++------
 3 files changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f9df13c5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 6a66121..41b7bd0 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1620,7 +1620,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        CheckpointOptions checkpointOptions) throws Exception {
 
                        long startTime = System.currentTimeMillis();
-                       final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
 
                        if (kvStateInformation.isEmpty()) {
                                if (LOG.isDebugEnabled()) {
@@ -1647,6 +1646,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                
CheckpointedStateScope.EXCLUSIVE,
                                                primaryStreamFactory);
 
+                       final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
+
                        final RocksDBFullSnapshotOperation<K> snapshotOperation 
=
                                new RocksDBFullSnapshotOperation<>(
                                        RocksDBKeyedStateBackend.this,

http://git-wip-us.apache.org/repos/asf/flink/blob/f9df13c5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 4d3f9f5..e447cbe 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -248,8 +248,8 @@ public abstract class AbstractStreamOperator<OUT>
                                context.isRestored(), // information whether we 
restore or start for the first time
                                operatorStateBackend, // access to operator 
state backend
                                keyedStateStore, // access to keyed state 
backend
-                               keyedStateInputs, // access to operator state 
stream
-                               operatorStateInputs); // access to keyed state 
stream
+                               keyedStateInputs, // access to keyed state 
stream
+                               operatorStateInputs); // access to operator 
state stream
 
                        initializeState(initializationContext);
                } finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9df13c5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index 7e91554..d9bd089 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -166,11 +166,12 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
 
                        // cleanup if something went wrong before results got 
published.
                        if 
(streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
-                               IOUtils.closeQuietly(keyedStatedBackend);
+                               // release resource (e.g native resource)
+                               keyedStatedBackend.dispose();
                        }
 
                        if 
(streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
-                               IOUtils.closeQuietly(keyedStatedBackend);
+                               operatorStateBackend.dispose();
                        }
 
                        if 
(streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
@@ -181,10 +182,6 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                                IOUtils.closeQuietly(rawOperatorStateInputs);
                        }
 
-                       if 
(streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
-                               IOUtils.closeQuietly(rawOperatorStateInputs);
-                       }
-
                        throw new Exception("Exception while creating 
StreamOperatorStateContext.", ex);
                }
        }

Reply via email to