[FLINK-8699][checkpointing] Create deep copy of state meta data to avoid 
concurrency problem with checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f271161
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f271161
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f271161

Branch: refs/heads/master
Commit: 0f2711618f543248a5dcd6ba8e3c4dc2b55fa568
Parents: 08d0881
Author: Stefan Richter <[email protected]>
Authored: Wed Feb 21 15:58:50 2018 +0100
Committer: Stefan Richter <[email protected]>
Committed: Sun Feb 25 15:59:54 2018 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBKeyedStateBackend.java    | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0f271161/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 0cb2792..3accbe5 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1818,6 +1818,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                private Snapshot snapshot;
                private ReadOptions readOptions;
+               private List<Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformationCopy;
                private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
 
                private CheckpointStreamWithResultProvider 
checkpointStreamWithResultProvider;
@@ -1841,7 +1842,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 */
                public void takeDBSnapShot() {
                        Preconditions.checkArgument(snapshot == null, "Only one 
ongoing snapshot allowed!");
-                       this.kvStateIterators = new 
ArrayList<>(stateBackend.kvStateInformation.size());
+                       this.kvStateInformationCopy = new 
ArrayList<>(stateBackend.kvStateInformation.values());
                        this.snapshot = stateBackend.db.getSnapshot();
                }
 
@@ -1928,20 +1929,22 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private void writeKVStateMetaData() throws IOException {
 
                        List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> metaInfoSnapshots =
-                               new 
ArrayList<>(stateBackend.kvStateInformation.size());
+                               new ArrayList<>(kvStateInformationCopy.size());
+
+                       this.kvStateIterators = new 
ArrayList<>(kvStateInformationCopy.size());
 
                        int kvStateId = 0;
-                       for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
-                               stateBackend.kvStateInformation.entrySet()) {
+                       for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
+                               kvStateInformationCopy) {
 
-                               
metaInfoSnapshots.add(column.getValue().f1.snapshot());
+                               metaInfoSnapshots.add(column.f1.snapshot());
 
                                //retrieve iterator for this k/v states
                                readOptions = new ReadOptions();
                                readOptions.setSnapshot(snapshot);
 
                                kvStateIterators.add(
-                                       new 
Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), 
kvStateId));
+                                       new 
Tuple2<>(stateBackend.db.newIterator(column.f0, readOptions), kvStateId));
 
                                ++kvStateId;
                        }

Reply via email to