Repository: flink Updated Branches: refs/heads/master 97f0cac2a -> 2479ff53c
[FLINK-8559][RocksDB] Release resources if snapshot operation fails This closes #5412. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbb81acb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbb81acb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbb81acb Branch: refs/heads/master Commit: dbb81acb5a1d0f2a9521c6eef7eeb2436bb8004d Parents: 5e41eaa Author: zentol <[email protected]> Authored: Mon Feb 5 13:15:29 2018 +0100 Committer: zentol <[email protected]> Committed: Tue Feb 6 20:20:47 2018 +0100 ---------------------------------------------------------------------- .../contrib/streaming/state/RocksDBKeyedStateBackend.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dbb81acb/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 29a0854..5507339 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -377,7 +377,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { checkpointId, checkpointTimestamp); - snapshotOperation.takeSnapshot(); + try { + snapshotOperation.takeSnapshot(); + } catch (Exception e) { + snapshotOperation.stop(); + snapshotOperation.releaseResources(true); + throw e; + } return new FutureTask<KeyedStateHandle>( new Callable<KeyedStateHandle>() {
