[FLINK-6504] [checkpoint] Fix synchronization on materializedSstFiles in RocksDBKeyedStateBackend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/958773b7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/958773b7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/958773b7 Branch: refs/heads/master Commit: 958773b71c52aae94560508f8d4cd894059d4467 Parents: 4745d0c Author: Stefan Richter <[email protected]> Authored: Thu May 11 11:59:47 2017 +0200 Committer: Stefan Richter <[email protected]> Committed: Sun May 14 13:49:50 2017 +0200 ---------------------------------------------------------------------- .../contrib/streaming/state/RocksDBKeyedStateBackend.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/958773b7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 1080e59..b9468f7 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -828,9 +828,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } void takeSnapshot() throws Exception { + assert (Thread.holdsLock(stateBackend.asyncSnapshotLock)); + // use the last completed checkpoint as the comparison base. baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId); + // save meta data for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) { @@ -888,7 +891,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { sstFiles.putAll(newSstFiles); sstFiles.putAll(oldSstFiles); - stateBackend.materializedSstFiles.put(checkpointId, sstFiles); + synchronized (stateBackend.asyncSnapshotLock) { + stateBackend.materializedSstFiles.put(checkpointId, sstFiles); + } return new RocksDBIncrementalKeyedStateHandle( stateBackend.operatorIdentifier,
