[FLINK-9022][state] Fix resource release in StreamTaskStateInitializerImpl.streamOperatorStateContext()
This closes #5716. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9df13c5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9df13c5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9df13c5 Branch: refs/heads/master Commit: f9df13c5058f194a5c686b9b753345d9226fc87a Parents: 7c952dd Author: sihuazhou <[email protected]> Authored: Mon Mar 19 19:48:32 2018 +0800 Committer: Till Rohrmann <[email protected]> Committed: Wed Mar 21 10:56:55 2018 +0100 ---------------------------------------------------------------------- .../contrib/streaming/state/RocksDBKeyedStateBackend.java | 3 ++- .../streaming/api/operators/AbstractStreamOperator.java | 4 ++-- .../api/operators/StreamTaskStateInitializerImpl.java | 9 +++------ 3 files changed, 7 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f9df13c5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 6a66121..41b7bd0 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -1620,7 +1620,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { CheckpointOptions checkpointOptions) throws Exception { long startTime = System.currentTimeMillis(); - final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); if (kvStateInformation.isEmpty()) { if (LOG.isDebugEnabled()) { @@ -1647,6 +1646,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { CheckpointedStateScope.EXCLUSIVE, primaryStreamFactory); + final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); + final RocksDBFullSnapshotOperation<K> snapshotOperation = new RocksDBFullSnapshotOperation<>( RocksDBKeyedStateBackend.this, http://git-wip-us.apache.org/repos/asf/flink/blob/f9df13c5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 4d3f9f5..e447cbe 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -248,8 +248,8 @@ public abstract class AbstractStreamOperator<OUT> context.isRestored(), // information whether we restore or start for the first time operatorStateBackend, // access to operator state backend keyedStateStore, // access to keyed state backend - keyedStateInputs, // access to operator state stream - operatorStateInputs); // access to keyed state stream + keyedStateInputs, // access to keyed state stream + operatorStateInputs); // access to operator state stream initializeState(initializationContext); } finally { http://git-wip-us.apache.org/repos/asf/flink/blob/f9df13c5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 7e91554..d9bd089 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -166,11 +166,12 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize // cleanup if something went wrong before results got published. if (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) { - IOUtils.closeQuietly(keyedStatedBackend); + // release resource (e.g native resource) + keyedStatedBackend.dispose(); } if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) { - IOUtils.closeQuietly(keyedStatedBackend); + operatorStateBackend.dispose(); } if (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) { @@ -181,10 +182,6 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize IOUtils.closeQuietly(rawOperatorStateInputs); } - if (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) { - IOUtils.closeQuietly(rawOperatorStateInputs); - } - throw new Exception("Exception while creating StreamOperatorStateContext.", ex); } }
