[FLINK-6504] [checkpoint] Fix synchronization on materializedSstFiles in RocksDBKeyedStateBackend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7d79d8f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7d79d8f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7d79d8f Branch: refs/heads/release-1.3 Commit: f7d79d8fd9e6b2691c75af1214666cc99b5aaca7 Parents: b30b8ee Author: Stefan Richter <[email protected]> Authored: Thu May 11 11:59:47 2017 +0200 Committer: Stefan Richter <[email protected]> Committed: Sun May 14 14:07:26 2017 +0200 ---------------------------------------------------------------------- .../contrib/streaming/state/RocksDBKeyedStateBackend.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f7d79d8f/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 1080e59..b9468f7 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -828,9 +828,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } void takeSnapshot() throws Exception { + assert (Thread.holdsLock(stateBackend.asyncSnapshotLock)); + // use the last completed checkpoint as the comparison base. baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId); + // save meta data for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) { @@ -888,7 +891,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { sstFiles.putAll(newSstFiles); sstFiles.putAll(oldSstFiles); - stateBackend.materializedSstFiles.put(checkpointId, sstFiles); + synchronized (stateBackend.asyncSnapshotLock) { + stateBackend.materializedSstFiles.put(checkpointId, sstFiles); + } return new RocksDBIncrementalKeyedStateHandle( stateBackend.operatorIdentifier,
