[FLINK-8699][checkpointing] Create deep copy of state meta data to avoid concurrency problem with checkpoints
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f271161 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f271161 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f271161 Branch: refs/heads/master Commit: 0f2711618f543248a5dcd6ba8e3c4dc2b55fa568 Parents: 08d0881 Author: Stefan Richter <[email protected]> Authored: Wed Feb 21 15:58:50 2018 +0100 Committer: Stefan Richter <[email protected]> Committed: Sun Feb 25 15:59:54 2018 +0100 ---------------------------------------------------------------------- .../streaming/state/RocksDBKeyedStateBackend.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0f271161/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 0cb2792..3accbe5 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -1818,6 +1818,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private Snapshot snapshot; private ReadOptions readOptions; + private List<Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformationCopy; private List<Tuple2<RocksIterator, Integer>> kvStateIterators; private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider; @@ -1841,7 +1842,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { */ public void takeDBSnapShot() { Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!"); - this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size()); + this.kvStateInformationCopy = new ArrayList<>(stateBackend.kvStateInformation.values()); this.snapshot = stateBackend.db.getSnapshot(); } @@ -1928,20 +1929,22 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private void writeKVStateMetaData() throws IOException { List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots = - new ArrayList<>(stateBackend.kvStateInformation.size()); + new ArrayList<>(kvStateInformationCopy.size()); + + this.kvStateIterators = new ArrayList<>(kvStateInformationCopy.size()); int kvStateId = 0; - for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> column : - stateBackend.kvStateInformation.entrySet()) { + for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : + kvStateInformationCopy) { - metaInfoSnapshots.add(column.getValue().f1.snapshot()); + metaInfoSnapshots.add(column.f1.snapshot()); //retrieve iterator for this k/v states readOptions = new ReadOptions(); readOptions.setSnapshot(snapshot); kvStateIterators.add( - new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId)); + new Tuple2<>(stateBackend.db.newIterator(column.f0, readOptions), kvStateId)); ++kvStateId; }
