[FLINK-8699][state] Deep copy state info to avoid potential concurrency problem 
in full checkpoint.

(cherry picked from commit 21cf59d)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cbad9cf3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cbad9cf3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cbad9cf3

Branch: refs/heads/release-1.5
Commit: cbad9cf34095ef0d709df9b1521387d8fea38f4a
Parents: 340ee26
Author: sihuazhou <summerle...@163.com>
Authored: Fri Mar 16 16:07:54 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Apr 6 12:33:41 2018 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 36 +++++++++++++-------
 1 file changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cbad9cf3/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index cdeb608..31b9d99 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1836,7 +1836,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                private Snapshot snapshot;
                private ReadOptions readOptions;
-               private List<Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformationCopy;
+
+               /** The state meta data. */
+               private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots;
+
+               /** The copied column handle. */
+               private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
+
                private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
 
                private CheckpointStreamWithResultProvider 
checkpointStreamWithResultProvider;
@@ -1860,7 +1866,19 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 */
                public void takeDBSnapShot() {
                        Preconditions.checkArgument(snapshot == null, "Only one 
ongoing snapshot allowed!");
-                       this.kvStateInformationCopy = new 
ArrayList<>(stateBackend.kvStateInformation.values());
+
+                       this.stateMetaInfoSnapshots = new 
ArrayList<>(stateBackend.kvStateInformation.size());
+
+                       this.copiedColumnFamilyHandles = new 
ArrayList<>(stateBackend.kvStateInformation.size());
+
+                       for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> tuple2 :
+                               stateBackend.kvStateInformation.values()) {
+                               // snapshot meta info
+                               
this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
+
+                               // copy column family handle
+                               this.copiedColumnFamilyHandles.add(tuple2.f0);
+                       }
                        this.snapshot = stateBackend.db.getSnapshot();
                }
 
@@ -1946,10 +1964,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                private void writeKVStateMetaData() throws IOException {
 
-                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> metaInfoSnapshots =
-                               new ArrayList<>(kvStateInformationCopy.size());
-
-                       this.kvStateIterators = new 
ArrayList<>(kvStateInformationCopy.size());
+                       this.kvStateIterators = new 
ArrayList<>(copiedColumnFamilyHandles.size());
 
                        int kvStateId = 0;
 
@@ -1957,13 +1972,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        readOptions = new ReadOptions();
                        readOptions.setSnapshot(snapshot);
 
-                       for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
-                               kvStateInformationCopy) {
-
-                               metaInfoSnapshots.add(column.f1.snapshot());
+                       for (ColumnFamilyHandle columnFamilyHandle : 
copiedColumnFamilyHandles) {
 
                                kvStateIterators.add(
-                                       new 
Tuple2<>(stateBackend.db.newIterator(column.f0, readOptions), kvStateId));
+                                       new 
Tuple2<>(stateBackend.db.newIterator(columnFamilyHandle, readOptions), 
kvStateId));
 
                                ++kvStateId;
                        }
@@ -1971,7 +1983,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        KeyedBackendSerializationProxy<K> serializationProxy =
                                new KeyedBackendSerializationProxy<>(
                                        stateBackend.getKeySerializer(),
-                                       metaInfoSnapshots,
+                                       stateMetaInfoSnapshots,
                                        !Objects.equals(
                                                
UncompressedStreamCompressionDecorator.INSTANCE,
                                                
stateBackend.keyGroupCompressionDecorator));

Reply via email to