[FLINK-6504] [checkpoint] Fix synchronization on materializedSstFiles in 
RocksDBKeyedStateBackend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7d79d8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7d79d8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7d79d8f

Branch: refs/heads/release-1.3
Commit: f7d79d8fd9e6b2691c75af1214666cc99b5aaca7
Parents: b30b8ee
Author: Stefan Richter <[email protected]>
Authored: Thu May 11 11:59:47 2017 +0200
Committer: Stefan Richter <[email protected]>
Committed: Sun May 14 14:07:26 2017 +0200

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java     | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f7d79d8f/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 1080e59..b9468f7 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -828,9 +828,12 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                void takeSnapshot() throws Exception {
+                       assert 
(Thread.holdsLock(stateBackend.asyncSnapshotLock));
+
                        // use the last completed checkpoint as the comparison 
base.
                        baseSstFiles = 
stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
 
+
                        // save meta data
                        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
                                        : 
stateBackend.kvStateInformation.entrySet()) {
@@ -888,7 +891,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        sstFiles.putAll(newSstFiles);
                        sstFiles.putAll(oldSstFiles);
 
-                       stateBackend.materializedSstFiles.put(checkpointId, 
sstFiles);
+                       synchronized (stateBackend.asyncSnapshotLock) {
+                               
stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
+                       }
 
                        return new RocksDBIncrementalKeyedStateHandle(
                                stateBackend.operatorIdentifier,

Reply via email to