This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aba02eb3fcc4472c3d5f5a0f527960d79c659c31
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Tue Aug 7 15:57:27 2018 +0200

    [FLINK-10042][state] (part 1) Extract snapshot algorithms from inner 
classes of RocksDBKeyedStateBackend into full classes
---
 .../flink/runtime/state/SnapshotStrategy.java      |    3 +-
 .../runtime/state/heap/HeapKeyedStateBackend.java  |    5 +
 .../streaming/state/RocksDBKeyedStateBackend.java  | 1071 ++------------------
 .../state/snapshot/RocksFullSnapshotStrategy.java  |  478 +++++++++
 .../snapshot/RocksIncrementalSnapshotStrategy.java |  578 +++++++++++
 .../state/snapshot/RocksSnapshotUtil.java          |   51 +
 .../state/snapshot/SnapshotStrategyBase.java       |   90 ++
 .../streaming/state/RocksDBAsyncSnapshotTest.java  |   27 +-
 .../streaming/state/RocksDBStateBackendTest.java   |    1 +
 9 files changed, 1317 insertions(+), 987 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
index 9139fa7..3ad68af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
@@ -28,8 +28,7 @@ import java.util.concurrent.RunnableFuture;
  *
  * @param <S> type of the returned state object that represents the result of 
the snapshot operation.
  */
-@FunctionalInterface
-public interface SnapshotStrategy<S extends StateObject> {
+public interface SnapshotStrategy<S extends StateObject> extends 
CheckpointListener {
 
        /**
         * Operation that writes a snapshot into a stream that is provided by 
the given {@link CheckpointStreamFactory} and
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index bc1e0f5..0e2f16c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -882,6 +882,11 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                }
                        }
                }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                       // nothing to do.
+               }
        }
 
        private interface StateFactory {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index c159976..87c7e55 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -35,9 +35,8 @@ import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import 
org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
-import 
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
-import 
org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
-import org.apache.flink.core.fs.CloseableRegistry;
+import 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy;
+import 
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
@@ -47,32 +46,22 @@ import org.apache.flink.core.memory.ByteArrayDataInputView;
 import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
-import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
-import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.DirectoryStateHandle;
-import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyExtractorFunction;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.PriorityComparable;
 import org.apache.flink.runtime.state.PriorityComparator;
 import org.apache.flink.runtime.state.PriorityQueueSetFactory;
@@ -80,14 +69,11 @@ import 
org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
-import org.apache.flink.runtime.state.SnapshotDirectory;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategy;
 import org.apache.flink.runtime.state.StateHandleID;
-import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import 
org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
-import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
@@ -96,25 +82,19 @@ import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ResourceGuard;
 import org.apache.flink.util.StateMigrationException;
-import org.apache.flink.util.function.SupplierWithException;
 
-import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.Snapshot;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -125,7 +105,6 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
@@ -144,12 +123,16 @@ import java.util.Spliterator;
 import java.util.Spliterators;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.END_OF_KEY_GROUP_MARK;
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.clearMetaDataFollowsFlag;
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag;
+
 /**
  * An {@link AbstractKeyedStateBackend} that stores its state in {@code 
RocksDB} and serializes state to
  * streams provided by a {@link 
org.apache.flink.runtime.state.CheckpointStreamFactory} upon
@@ -167,9 +150,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /** The name of the merge operator in RocksDB. Do not change except you 
know exactly what you do. */
        public static final String MERGE_OPERATOR_NAME = "stringappendtest";
 
-       /** File suffix of sstable files. */
-       private static final String SST_FILE_SUFFIX = ".sst";
-
        private static final Map<Class<? extends StateDescriptor>, 
StateFactory> STATE_FACTORIES =
                Stream.of(
                        Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
RocksDBValueState::create),
@@ -230,7 +210,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         * Information about the k/v states as we create them. This is used to 
retrieve the
         * column family that is used for a state and also for sanity checks 
when restoring.
         */
-       private final Map<String, Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase>> kvStateInformation;
+       private final LinkedHashMap<String, Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase>> kvStateInformation;
 
        /**
         * Map of state names to their corresponding restored state meta info.
@@ -246,20 +226,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /** True if incremental checkpointing is enabled. */
        private final boolean enableIncrementalCheckpointing;
 
-       /** The state handle ids of all sst files materialized in snapshots for 
previous checkpoints. */
-       private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
-
-       /** The identifier of the last completed checkpoint. */
-       private long lastCompletedCheckpointId = -1L;
-
-       /** Unique ID of this backend. */
-       private UUID backendUID;
-
        /** The configuration of local recovery. */
        private final LocalRecoveryConfig localRecoveryConfig;
 
        /** The snapshot strategy, e.g., if we use full or incremental 
checkpoints, local state, and so on. */
-       private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> 
snapshotStrategy;
+       private SnapshotStrategy<SnapshotResult<KeyedStateHandle>> 
snapshotStrategy;
 
        /** Factory for priority queue state. */
        private final PriorityQueueSetFactory priorityQueueFactory;
@@ -314,12 +285,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        
RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(getNumberOfKeyGroups());
                this.kvStateInformation = new LinkedHashMap<>();
                this.restoredKvStateMetaInfos = new HashMap<>();
-               this.materializedSstFiles = new TreeMap<>();
-               this.backendUID = UUID.randomUUID();
-
-               this.snapshotStrategy = enableIncrementalCheckpointing ?
-                       new IncrementalSnapshotStrategy() :
-                       new FullSnapshotStrategy();
 
                this.writeOptions = new WriteOptions().setDisableWAL(true);
 
@@ -333,8 +298,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        default:
                                throw new IllegalArgumentException("Unknown 
priority queue state type: " + priorityQueueStateType);
                }
-
-               LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
        }
 
        private static void checkAndCreateDirectory(File directory) throws 
IOException {
@@ -508,41 +471,83 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                restoredKvStateMetaInfos.clear();
 
                try {
+                       RocksDBIncrementalRestoreOperation<K> 
incrementalRestoreOperation = null;
                        if (restoreState == null || restoreState.isEmpty()) {
                                createDB();
                        } else {
                                KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
                                if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
                                        || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
-                                       RocksDBIncrementalRestoreOperation<K> 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
-                                       restoreOperation.restore(restoreState);
+                                       incrementalRestoreOperation = new 
RocksDBIncrementalRestoreOperation<>(this);
+                                       
incrementalRestoreOperation.restore(restoreState);
                                } else {
-                                       RocksDBFullRestoreOperation<K> 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
-                                       
restoreOperation.doRestore(restoreState);
+                                       RocksDBFullRestoreOperation<K> 
fullRestoreOperation = new RocksDBFullRestoreOperation<>(this);
+                                       
fullRestoreOperation.doRestore(restoreState);
                                }
                        }
+
+                       initializeSnapshotStrategy(incrementalRestoreOperation);
                } catch (Exception ex) {
                        dispose();
                        throw ex;
                }
        }
 
-       @Override
-       public void notifyCheckpointComplete(long completedCheckpointId) {
-
-               if (!enableIncrementalCheckpointing) {
-                       return;
-               }
-
-               synchronized (materializedSstFiles) {
-
-                       if (completedCheckpointId < lastCompletedCheckpointId) {
-                               return;
+       @VisibleForTesting
+       void initializeSnapshotStrategy(
+               @Nullable RocksDBIncrementalRestoreOperation<K> 
incrementalRestoreOperation) {
+
+               final RocksFullSnapshotStrategy<K> fullSnapshotStrategy =
+                       new RocksFullSnapshotStrategy<>(
+                               db,
+                               rocksDBResourceGuard,
+                               keySerializer,
+                               kvStateInformation,
+                               keyGroupRange,
+                               keyGroupPrefixBytes,
+                               localRecoveryConfig,
+                               cancelStreamRegistry,
+                               keyGroupCompressionDecorator);
+
+               if (enableIncrementalCheckpointing) {
+                       final UUID backendUID;
+                       final SortedMap<Long, Set<StateHandleID>> 
materializedSstFiles;
+                       final long lastCompletedCheckpointId;
+
+                       if (incrementalRestoreOperation == null) {
+                               backendUID = UUID.randomUUID();
+                               materializedSstFiles = new TreeMap<>();
+                               lastCompletedCheckpointId = -1L;
+                       } else {
+                               backendUID = 
Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredBackendUID());
+                               materializedSstFiles = 
Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredSstFiles());
+                               lastCompletedCheckpointId = 
incrementalRestoreOperation.getLastCompletedCheckpointId();
+                               
Preconditions.checkState(lastCompletedCheckpointId >= 0L);
                        }
+                       // TODO eventually we might want to separate savepoint 
and snapshot strategy, i.e. having 2 strategies.
+                       this.snapshotStrategy = new 
RocksIncrementalSnapshotStrategy<>(
+                               db,
+                               rocksDBResourceGuard,
+                               keySerializer,
+                               kvStateInformation,
+                               keyGroupRange,
+                               keyGroupPrefixBytes,
+                               localRecoveryConfig,
+                               cancelStreamRegistry,
+                               instanceBasePath,
+                               backendUID,
+                               materializedSstFiles,
+                               lastCompletedCheckpointId,
+                               fullSnapshotStrategy);
+               } else {
+                       this.snapshotStrategy = fullSnapshotStrategy;
+               }
+       }
 
-                       materializedSstFiles.keySet().removeIf(checkpointId -> 
checkpointId < completedCheckpointId);
-
-                       lastCompletedCheckpointId = completedCheckpointId;
+       @Override
+       public void notifyCheckpointComplete(long completedCheckpointId) throws 
Exception {
+               if (snapshotStrategy != null) {
+                       
snapshotStrategy.notifyCheckpointComplete(completedCheckpointId);
                }
        }
 
@@ -656,10 +661,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                /**
                 * Restore the KV-state / ColumnFamily meta data for all 
key-groups referenced by the current state handle.
-                *
-                * @throws IOException
-                * @throws ClassNotFoundException
-                * @throws RocksDBException
                 */
                private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
 
@@ -724,9 +725,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                /**
                 * Restore the KV-state / ColumnFamily data for all key-groups 
referenced by the current state handle.
-                *
-                * @throws IOException
-                * @throws RocksDBException
                 */
                private void restoreKVStateData() throws IOException, 
RocksDBException {
                        //for all key-groups in the current state handle...
@@ -752,14 +750,14 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                        while 
(keyGroupHasMoreKeys) {
                                                                byte[] key = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
                                                                byte[] value = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
-                                                               if 
(RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
+                                                               if 
(hasMetaDataFollowsFlag(key)) {
                                                                        //clear 
the signal bit in the key to make it ready for insertion again
-                                                                       
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
+                                                                       
clearMetaDataFollowsFlag(key);
                                                                        
writeBatchWrapper.put(handle, key, value);
                                                                        //TODO 
this could be aware of keyGroupPrefixBytes and write only one byte if possible
-                                                                       
kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
+                                                                       
kvStateId = END_OF_KEY_GROUP_MARK
                                                                                
& compressedKgInputView.readShort();
-                                                                       if 
(RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
+                                                                       if 
(END_OF_KEY_GROUP_MARK == kvStateId) {
                                                                                
keyGroupHasMoreKeys = false;
                                                                        } else {
                                                                                
handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
@@ -781,9 +779,26 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private static class RocksDBIncrementalRestoreOperation<T> {
 
                private final RocksDBKeyedStateBackend<T> stateBackend;
+               private final SortedMap<Long, Set<StateHandleID>> 
restoredSstFiles;
+               private UUID restoredBackendUID;
+               private long lastCompletedCheckpointId;
 
                private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
+
                        this.stateBackend = stateBackend;
+                       this.restoredSstFiles = new TreeMap<>();
+               }
+
+               SortedMap<Long, Set<StateHandleID>> getRestoredSstFiles() {
+                       return restoredSstFiles;
+               }
+
+               UUID getRestoredBackendUID() {
+                       return restoredBackendUID;
+               }
+
+               long getLastCompletedCheckpointId() {
+                       return lastCompletedCheckpointId;
                }
 
                /**
@@ -872,6 +887,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 */
                void restoreWithRescaling(Collection<KeyedStateHandle> 
restoreStateHandles) throws Exception {
 
+                       this.restoredBackendUID = UUID.randomUUID();
+
                        initTargetDB(restoreStateHandles, 
stateBackend.keyGroupRange);
 
                        byte[] startKeyGroupPrefixBytes = new 
byte[stateBackend.keyGroupPrefixBytes];
@@ -949,6 +966,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        @Nonnull
                        private final List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots;
 
+                       private
+
                        RestoredDBInstance(
                                @Nonnull RocksDB db,
                                @Nonnull List<ColumnFamilyHandle> 
columnFamilyHandles,
@@ -1113,10 +1132,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        List<ColumnFamilyDescriptor> columnFamilyDescriptors,
                        List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) 
throws Exception {
                        // pick up again the old backend id, so the we can 
reference existing state
-                       stateBackend.backendUID = 
restoreStateHandle.getBackendIdentifier();
+                       this.restoredBackendUID = 
restoreStateHandle.getBackendIdentifier();
 
                        LOG.debug("Restoring keyed backend uid in operator {} 
from incremental snapshot to {}.",
-                               stateBackend.operatorIdentifier, 
stateBackend.backendUID);
+                               stateBackend.operatorIdentifier, 
this.restoredBackendUID);
 
                        // create hard links in the instance directory
                        if (!stateBackend.instanceRocksDBPath.mkdirs()) {
@@ -1150,13 +1169,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        }
 
                        // use the restore sst files as the base for succeeding 
checkpoints
-                       synchronized (stateBackend.materializedSstFiles) {
-                               stateBackend.materializedSstFiles.put(
+                               restoredSstFiles.put(
                                        restoreStateHandle.getCheckpointId(),
                                        
restoreStateHandle.getSharedStateHandleIDs());
-                       }
 
-                       stateBackend.lastCompletedCheckpointId = 
restoreStateHandle.getCheckpointId();
+                       lastCompletedCheckpointId = 
restoreStateHandle.getCheckpointId();
                }
 
                /**
@@ -1447,881 +1464,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                return count;
        }
 
-       private class FullSnapshotStrategy implements 
SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
-
-               @Override
-               public RunnableFuture<SnapshotResult<KeyedStateHandle>> 
performSnapshot(
-                       long checkpointId,
-                       long timestamp,
-                       CheckpointStreamFactory primaryStreamFactory,
-                       CheckpointOptions checkpointOptions) throws Exception {
-
-                       long startTime = System.currentTimeMillis();
-
-                       if (kvStateInformation.isEmpty()) {
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Asynchronous RocksDB 
snapshot performed on empty keyed state at {}. Returning null.",
-                                               timestamp);
-                               }
-
-                               return DoneFuture.of(SnapshotResult.empty());
-                       }
-
-                       final 
SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier =
-
-                               localRecoveryConfig.isLocalRecoveryEnabled() &&
-                                       (CheckpointType.SAVEPOINT != 
checkpointOptions.getCheckpointType()) ?
-
-                                       () -> 
CheckpointStreamWithResultProvider.createDuplicatingStream(
-                                               checkpointId,
-                                               
CheckpointedStateScope.EXCLUSIVE,
-                                               primaryStreamFactory,
-                                               
localRecoveryConfig.getLocalStateDirectoryProvider()) :
-
-                                       () -> 
CheckpointStreamWithResultProvider.createSimpleStream(
-                                               
CheckpointedStateScope.EXCLUSIVE,
-                                               primaryStreamFactory);
-
-                       final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
-
-                       final RocksDBFullSnapshotOperation<K> snapshotOperation 
=
-                               new RocksDBFullSnapshotOperation<>(
-                                       RocksDBKeyedStateBackend.this,
-                                       supplier,
-                                       snapshotCloseableRegistry);
-
-                       snapshotOperation.takeDBSnapShot();
-
-                       // implementation of the async IO operation, based on 
FutureTask
-                       
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable 
=
-                               new 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
-
-                                       @Override
-                                       protected void acquireResources() 
throws Exception {
-                                               
cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
-                                               
snapshotOperation.openCheckpointStream();
-                                       }
-
-                                       @Override
-                                       protected void releaseResources() 
throws Exception {
-                                               closeLocalRegistry();
-                                               
releaseSnapshotOperationResources();
-                                       }
-
-                                       private void 
releaseSnapshotOperationResources() {
-                                               // hold the db lock while 
operation on the db to guard us against async db disposal
-                                               
snapshotOperation.releaseSnapshotResources();
-                                       }
-
-                                       @Override
-                                       protected void stopOperation() throws 
Exception {
-                                               closeLocalRegistry();
-                                       }
-
-                                       private void closeLocalRegistry() {
-                                               if 
(cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
-                                                       try {
-                                                               
snapshotCloseableRegistry.close();
-                                                       } catch (Exception ex) {
-                                                               LOG.warn("Error 
closing local registry", ex);
-                                                       }
-                                               }
-                                       }
-
-                                       @Nonnull
-                                       @Override
-                                       public SnapshotResult<KeyedStateHandle> 
performOperation() throws Exception {
-                                               long startTime = 
System.currentTimeMillis();
-
-                                               if (isStopped()) {
-                                                       throw new 
IOException("RocksDB closed.");
-                                               }
-
-                                               
snapshotOperation.writeDBSnapshot();
-
-                                               LOG.debug("Asynchronous RocksDB 
snapshot ({}, asynchronous part) in thread {} took {} ms.",
-                                                       primaryStreamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - startTime));
-
-                                               return 
snapshotOperation.getSnapshotResultStateHandle();
-                                       }
-                               };
-
-                       LOG.debug("Asynchronous RocksDB snapshot ({}, 
synchronous part) in thread {} took {} ms.",
-                               primaryStreamFactory, Thread.currentThread(), 
(System.currentTimeMillis() - startTime));
-                       return AsyncStoppableTaskWithCallback.from(ioCallable);
-               }
-       }
-
-       private class IncrementalSnapshotStrategy implements 
SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
-
-               private final 
SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate;
-
-               public IncrementalSnapshotStrategy() {
-                       this.savepointDelegate = new FullSnapshotStrategy();
-               }
-
-               @Override
-               public RunnableFuture<SnapshotResult<KeyedStateHandle>> 
performSnapshot(
-                       long checkpointId,
-                       long checkpointTimestamp,
-                       CheckpointStreamFactory checkpointStreamFactory,
-                       CheckpointOptions checkpointOptions) throws Exception {
-
-                       // for savepoints, we delegate to the full snapshot 
strategy because savepoints are always self-contained.
-                       if (CheckpointType.SAVEPOINT == 
checkpointOptions.getCheckpointType()) {
-                               return savepointDelegate.performSnapshot(
-                                       checkpointId,
-                                       checkpointTimestamp,
-                                       checkpointStreamFactory,
-                                       checkpointOptions);
-                       }
-
-                       if (db == null) {
-                               throw new IOException("RocksDB closed.");
-                       }
-
-                       if (kvStateInformation.isEmpty()) {
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Asynchronous RocksDB 
snapshot performed on empty keyed state at {}. Returning null.", 
checkpointTimestamp);
-                               }
-                               return DoneFuture.of(SnapshotResult.empty());
-                       }
-
-                       SnapshotDirectory snapshotDirectory;
-
-                       if (localRecoveryConfig.isLocalRecoveryEnabled()) {
-                               // create a "permanent" snapshot directory for 
local recovery.
-                               LocalRecoveryDirectoryProvider 
directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
-                               File directory = 
directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
-
-                               if (directory.exists()) {
-                                       FileUtils.deleteDirectory(directory);
-                               }
-
-                               if (!directory.mkdirs()) {
-                                       throw new IOException("Local state base 
directory for checkpoint " + checkpointId +
-                                               " already exists: " + 
directory);
-                               }
-
-                               // introduces an extra directory because 
RocksDB wants a non-existing directory for native checkpoints.
-                               File rdbSnapshotDir = new File(directory, 
"rocks_db");
-                               Path path = new Path(rdbSnapshotDir.toURI());
-                               // create a "permanent" snapshot directory 
because local recovery is active.
-                               snapshotDirectory = 
SnapshotDirectory.permanent(path);
-                       } else {
-                               // create a "temporary" snapshot directory 
because local recovery is inactive.
-                               Path path = new 
Path(instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
-                               snapshotDirectory = 
SnapshotDirectory.temporary(path);
-                       }
-
-                       final RocksDBIncrementalSnapshotOperation<K> 
snapshotOperation =
-                               new RocksDBIncrementalSnapshotOperation<>(
-                                       RocksDBKeyedStateBackend.this,
-                                       checkpointStreamFactory,
-                                       snapshotDirectory,
-                                       checkpointId);
-
-                       try {
-                               snapshotOperation.takeSnapshot();
-                       } catch (Exception e) {
-                               snapshotOperation.stop();
-                               snapshotOperation.releaseResources(true);
-                               throw e;
-                       }
-
-                       return new FutureTask<SnapshotResult<KeyedStateHandle>>(
-                               snapshotOperation::runSnapshot
-                       ) {
-                               @Override
-                               public boolean cancel(boolean 
mayInterruptIfRunning) {
-                                       snapshotOperation.stop();
-                                       return 
super.cancel(mayInterruptIfRunning);
-                               }
-
-                               @Override
-                               protected void done() {
-                                       
snapshotOperation.releaseResources(isCancelled());
-                               }
-                       };
-               }
-       }
-
-       /**
-        * Encapsulates the process to perform a full snapshot of a 
RocksDBKeyedStateBackend.
-        */
-       @VisibleForTesting
-       static class RocksDBFullSnapshotOperation<K>
-               extends 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> {
-
-               static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
-               static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
-
-               private final RocksDBKeyedStateBackend<K> stateBackend;
-               private final KeyGroupRangeOffsets keyGroupRangeOffsets;
-               private final 
SupplierWithException<CheckpointStreamWithResultProvider, Exception> 
checkpointStreamSupplier;
-               private final CloseableRegistry snapshotCloseableRegistry;
-               private final ResourceGuard.Lease dbLease;
-
-               private Snapshot snapshot;
-               private ReadOptions readOptions;
-
-               /**
-                * The state meta data.
-                */
-               private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
-
-               /**
-                * The copied column handle.
-                */
-               private List<Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase>> copiedMeta;
-
-               private List<Tuple2<RocksIteratorWrapper, Integer>> 
kvStateIterators;
-
-               private CheckpointStreamWithResultProvider 
checkpointStreamWithResultProvider;
-               private DataOutputView outputView;
-
-               RocksDBFullSnapshotOperation(
-                       RocksDBKeyedStateBackend<K> stateBackend,
-                       
SupplierWithException<CheckpointStreamWithResultProvider, Exception> 
checkpointStreamSupplier,
-                       CloseableRegistry registry) throws IOException {
-
-                       this.stateBackend = stateBackend;
-                       this.checkpointStreamSupplier = 
checkpointStreamSupplier;
-                       this.keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(stateBackend.keyGroupRange);
-                       this.snapshotCloseableRegistry = registry;
-                       this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
-               }
-
-               /**
-                * 1) Create a snapshot object from RocksDB.
-                *
-                */
-               public void takeDBSnapShot() {
-                       Preconditions.checkArgument(snapshot == null, "Only one 
ongoing snapshot allowed!");
-
-                       this.stateMetaInfoSnapshots = new 
ArrayList<>(stateBackend.kvStateInformation.size());
-
-                       this.copiedMeta = new 
ArrayList<>(stateBackend.kvStateInformation.size());
-
-                       for (Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase> tuple2 :
-                               stateBackend.kvStateInformation.values()) {
-                               // snapshot meta info
-                               
this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
-                               this.copiedMeta.add(tuple2);
-                       }
-                       this.snapshot = stateBackend.db.getSnapshot();
-               }
-
-               /**
-                * 2) Open CheckpointStateOutputStream through the 
checkpointStreamFactory into which we will write.
-                *
-                * @throws Exception
-                */
-               public void openCheckpointStream() throws Exception {
-                       
Preconditions.checkArgument(checkpointStreamWithResultProvider == null,
-                               "Output stream for snapshot is already set.");
-
-                       checkpointStreamWithResultProvider = 
checkpointStreamSupplier.get();
-                       
snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);
-                       outputView = new DataOutputViewStreamWrapper(
-                               
checkpointStreamWithResultProvider.getCheckpointOutputStream());
-               }
-
-               /**
-                * 3) Write the actual data from RocksDB from the time we took 
the snapshot object in (1).
-                *
-                * @throws IOException
-                */
-               public void writeDBSnapshot() throws IOException, 
InterruptedException, RocksDBException {
-
-                       if (null == snapshot) {
-                               throw new IOException("No snapshot available. 
Might be released due to cancellation.");
-                       }
-
-                       
Preconditions.checkNotNull(checkpointStreamWithResultProvider, "No output 
stream to write snapshot.");
-                       writeKVStateMetaData();
-                       writeKVStateData();
-               }
-
-               /**
-                * 4) Returns a snapshot result for the completed snapshot.
-                *
-                * @return snapshot result for the completed snapshot.
-                */
-               @Nonnull
-               public SnapshotResult<KeyedStateHandle> 
getSnapshotResultStateHandle() throws IOException {
-
-                       if 
(snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider))
 {
-
-                               SnapshotResult<StreamStateHandle> res =
-                                       
checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
-                               checkpointStreamWithResultProvider = null;
-                               return 
CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(res, 
keyGroupRangeOffsets);
-                       }
-
-                       return SnapshotResult.empty();
-               }
-
-               /**
-                * 5) Release the snapshot object for RocksDB and clean up.
-                */
-               public void releaseSnapshotResources() {
-
-                       checkpointStreamWithResultProvider = null;
-
-                       if (null != kvStateIterators) {
-                               for (Tuple2<RocksIteratorWrapper, Integer> 
kvStateIterator : kvStateIterators) {
-                                       
IOUtils.closeQuietly(kvStateIterator.f0);
-                               }
-                               kvStateIterators = null;
-                       }
-
-                       if (null != snapshot) {
-                               if (null != stateBackend.db) {
-                                       
stateBackend.db.releaseSnapshot(snapshot);
-                               }
-                               IOUtils.closeQuietly(snapshot);
-                               snapshot = null;
-                       }
-
-                       if (null != readOptions) {
-                               IOUtils.closeQuietly(readOptions);
-                               readOptions = null;
-                       }
-
-                       this.dbLease.close();
-               }
-
-               private void writeKVStateMetaData() throws IOException {
-
-                       this.kvStateIterators = new 
ArrayList<>(copiedMeta.size());
-
-                       int kvStateId = 0;
-
-                       //retrieve iterator for this k/v states
-                       readOptions = new ReadOptions();
-                       readOptions.setSnapshot(snapshot);
-
-                       for (Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase> tuple2 : copiedMeta) {
-                               RocksIteratorWrapper rocksIteratorWrapper =
-                                       getRocksIterator(stateBackend.db, 
tuple2.f0, tuple2.f1, readOptions);
-                               kvStateIterators.add(new 
Tuple2<>(rocksIteratorWrapper, kvStateId));
-                               ++kvStateId;
-                       }
-
-                       KeyedBackendSerializationProxy<K> serializationProxy =
-                               new KeyedBackendSerializationProxy<>(
-                                       // TODO: this code assumes that writing 
a serializer is threadsafe, we should support to
-                                       // get a serialized form already at 
state registration time in the future
-                                       stateBackend.getKeySerializer(),
-                                       stateMetaInfoSnapshots,
-                                       !Objects.equals(
-                                               
UncompressedStreamCompressionDecorator.INSTANCE,
-                                               
stateBackend.keyGroupCompressionDecorator));
-
-                       serializationProxy.write(outputView);
-               }
-
-               private void writeKVStateData() throws IOException, 
InterruptedException {
-                       byte[] previousKey = null;
-                       byte[] previousValue = null;
-                       DataOutputView kgOutView = null;
-                       OutputStream kgOutStream = null;
-                       CheckpointStreamFactory.CheckpointStateOutputStream 
checkpointOutputStream =
-                               
checkpointStreamWithResultProvider.getCheckpointOutputStream();
-
-                       try {
-                               // Here we transfer ownership of RocksIterators 
to the RocksStatesPerKeyGroupMergeIterator
-                               try (RocksStatesPerKeyGroupMergeIterator 
mergeIterator = new RocksStatesPerKeyGroupMergeIterator(
-                                       kvStateIterators, 
stateBackend.keyGroupPrefixBytes)) {
-
-                                       // handover complete, null out to 
prevent double close
-                                       kvStateIterators = null;
-
-                                       //preamble: setup with first key-group 
as our lookahead
-                                       if (mergeIterator.isValid()) {
-                                               //begin first key-group by 
recording the offset
-                                               
keyGroupRangeOffsets.setKeyGroupOffset(
-                                                       
mergeIterator.keyGroup(),
-                                                       
checkpointOutputStream.getPos());
-                                               //write the k/v-state id as 
metadata
-                                               kgOutStream = 
stateBackend.keyGroupCompressionDecorator.
-                                                       
decorateWithCompression(checkpointOutputStream);
-                                               kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
-                                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                               
kgOutView.writeShort(mergeIterator.kvStateId());
-                                               previousKey = 
mergeIterator.key();
-                                               previousValue = 
mergeIterator.value();
-                                               mergeIterator.next();
-                                       }
-
-                                       //main loop: write k/v pairs ordered by 
(key-group, kv-state), thereby tracking key-group offsets.
-                                       while (mergeIterator.isValid()) {
-
-                                               assert 
(!hasMetaDataFollowsFlag(previousKey));
-
-                                               //set signal in first key byte 
that meta data will follow in the stream after this k/v pair
-                                               if 
(mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
-
-                                                       //be cooperative and 
check for interruption from time to time in the hot loop
-                                                       checkInterrupted();
-
-                                                       
setMetaDataFollowsFlagInKey(previousKey);
-                                               }
-
-                                               writeKeyValuePair(previousKey, 
previousValue, kgOutView);
-
-                                               //write meta data if we have to
-                                               if 
(mergeIterator.isNewKeyGroup()) {
-                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
-                                                       
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
-                                                       // this will just close 
the outer stream
-                                                       kgOutStream.close();
-                                                       //begin new key-group
-                                                       
keyGroupRangeOffsets.setKeyGroupOffset(
-                                                               
mergeIterator.keyGroup(),
-                                                               
checkpointOutputStream.getPos());
-                                                       //write the kev-state
-                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
-                                                       kgOutStream = 
stateBackend.keyGroupCompressionDecorator.
-                                                               
decorateWithCompression(checkpointOutputStream);
-                                                       kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
-                                                       
kgOutView.writeShort(mergeIterator.kvStateId());
-                                               } else if 
(mergeIterator.isNewKeyValueState()) {
-                                                       //write the k/v-state
-                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
-                                                       
kgOutView.writeShort(mergeIterator.kvStateId());
-                                               }
-
-                                               //request next k/v pair
-                                               previousKey = 
mergeIterator.key();
-                                               previousValue = 
mergeIterator.value();
-                                               mergeIterator.next();
-                                       }
-                               }
-
-                               //epilogue: write last key-group
-                               if (previousKey != null) {
-                                       assert 
(!hasMetaDataFollowsFlag(previousKey));
-                                       
setMetaDataFollowsFlagInKey(previousKey);
-                                       writeKeyValuePair(previousKey, 
previousValue, kgOutView);
-                                       //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                       
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
-                                       // this will just close the outer stream
-                                       kgOutStream.close();
-                                       kgOutStream = null;
-                               }
-
-                       } finally {
-                               // this will just close the outer stream
-                               IOUtils.closeQuietly(kgOutStream);
-                       }
-               }
-
-               private void writeKeyValuePair(byte[] key, byte[] value, 
DataOutputView out) throws IOException {
-                       BytePrimitiveArraySerializer.INSTANCE.serialize(key, 
out);
-                       BytePrimitiveArraySerializer.INSTANCE.serialize(value, 
out);
-               }
-
-               static void setMetaDataFollowsFlagInKey(byte[] key) {
-                       key[0] |= FIRST_BIT_IN_BYTE_MASK;
-               }
-
-               static void clearMetaDataFollowsFlag(byte[] key) {
-                       key[0] &= 
(~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
-               }
-
-               static boolean hasMetaDataFollowsFlag(byte[] key) {
-                       return 0 != (key[0] & 
RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
-               }
-
-               private static void checkInterrupted() throws 
InterruptedException {
-                       if (Thread.currentThread().isInterrupted()) {
-                               throw new InterruptedException("RocksDB 
snapshot interrupted.");
-                       }
-               }
-
-               @Override
-               protected void acquireResources() throws Exception {
-                       
stateBackend.cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
-                       openCheckpointStream();
-               }
-
-               @Override
-               protected void releaseResources() {
-                       closeLocalRegistry();
-                       releaseSnapshotOperationResources();
-               }
-
-               private void releaseSnapshotOperationResources() {
-                       // hold the db lock while operation on the db to guard 
us against async db disposal
-                       releaseSnapshotResources();
-               }
-
-               @Override
-               protected void stopOperation() {
-                       closeLocalRegistry();
-               }
-
-               private void closeLocalRegistry() {
-                       if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry))
 {
-                               try {
-                                       snapshotCloseableRegistry.close();
-                               } catch (Exception ex) {
-                                       LOG.warn("Error closing local 
registry", ex);
-                               }
-                       }
-               }
-
-               @Nonnull
-               @Override
-               public SnapshotResult<KeyedStateHandle> performOperation() 
throws Exception {
-                       long startTime = System.currentTimeMillis();
-
-                       if (isStopped()) {
-                               throw new IOException("RocksDB closed.");
-                       }
-
-                       writeDBSnapshot();
-
-                       LOG.debug("Asynchronous RocksDB snapshot ({}, 
asynchronous part) in thread {} took {} ms.",
-                               checkpointStreamSupplier, 
Thread.currentThread(), (System.currentTimeMillis() - startTime));
-
-                       return getSnapshotResultStateHandle();
-               }
-       }
-
-       /**
-        * Encapsulates the process to perform an incremental snapshot of a 
RocksDBKeyedStateBackend.
-        */
-       private static final class RocksDBIncrementalSnapshotOperation<K> {
-
-               /** The backend which we snapshot. */
-               private final RocksDBKeyedStateBackend<K> stateBackend;
-
-               /** Stream factory that creates the outpus streams to DFS. */
-               private final CheckpointStreamFactory checkpointStreamFactory;
-
-               /** Id for the current checkpoint. */
-               private final long checkpointId;
-
-               /** All sst files that were part of the last previously 
completed checkpoint. */
-               private Set<StateHandleID> baseSstFiles;
-
-               /** The state meta data. */
-               private final List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots = new ArrayList<>();
-
-               /** Local directory for the RocksDB native backup. */
-               private SnapshotDirectory localBackupDirectory;
-
-               // Registry for all opened i/o streams
-               private final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
-
-               // new sst files since the last completed checkpoint
-               private final Map<StateHandleID, StreamStateHandle> sstFiles = 
new HashMap<>();
-
-               // handles to the misc files in the current snapshot
-               private final Map<StateHandleID, StreamStateHandle> miscFiles = 
new HashMap<>();
-
-               // This lease protects from concurrent disposal of the native 
rocksdb instance.
-               private final ResourceGuard.Lease dbLease;
-
-               private SnapshotResult<StreamStateHandle> metaStateHandle = 
null;
-
-               private RocksDBIncrementalSnapshotOperation(
-                       RocksDBKeyedStateBackend<K> stateBackend,
-                       CheckpointStreamFactory checkpointStreamFactory,
-                       SnapshotDirectory localBackupDirectory,
-                       long checkpointId) throws IOException {
-
-                       this.stateBackend = stateBackend;
-                       this.checkpointStreamFactory = checkpointStreamFactory;
-                       this.checkpointId = checkpointId;
-                       this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
-                       this.localBackupDirectory = localBackupDirectory;
-               }
-
-               private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
-                       FSDataInputStream inputStream = null;
-                       CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
-
-                       try {
-                               final byte[] buffer = new byte[8 * 1024];
-
-                               FileSystem backupFileSystem = 
localBackupDirectory.getFileSystem();
-                               inputStream = backupFileSystem.open(filePath);
-                               
closeableRegistry.registerCloseable(inputStream);
-
-                               outputStream = checkpointStreamFactory
-                                       
.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
-                               
closeableRegistry.registerCloseable(outputStream);
-
-                               while (true) {
-                                       int numBytes = inputStream.read(buffer);
-
-                                       if (numBytes == -1) {
-                                               break;
-                                       }
-
-                                       outputStream.write(buffer, 0, numBytes);
-                               }
-
-                               StreamStateHandle result = null;
-                               if 
(closeableRegistry.unregisterCloseable(outputStream)) {
-                                       result = 
outputStream.closeAndGetHandle();
-                                       outputStream = null;
-                               }
-                               return result;
-
-                       } finally {
-
-                               if 
(closeableRegistry.unregisterCloseable(inputStream)) {
-                                       inputStream.close();
-                               }
-
-                               if 
(closeableRegistry.unregisterCloseable(outputStream)) {
-                                       outputStream.close();
-                               }
-                       }
-               }
-
-               @Nonnull
-               private SnapshotResult<StreamStateHandle> materializeMetaData() 
throws Exception {
-
-                       LocalRecoveryConfig localRecoveryConfig = 
stateBackend.localRecoveryConfig;
-
-                       CheckpointStreamWithResultProvider 
streamWithResultProvider =
-
-                               localRecoveryConfig.isLocalRecoveryEnabled() ?
-
-                                       
CheckpointStreamWithResultProvider.createDuplicatingStream(
-                                               checkpointId,
-                                               
CheckpointedStateScope.EXCLUSIVE,
-                                               checkpointStreamFactory,
-                                               
localRecoveryConfig.getLocalStateDirectoryProvider()) :
-
-                                       
CheckpointStreamWithResultProvider.createSimpleStream(
-                                               
CheckpointedStateScope.EXCLUSIVE,
-                                               checkpointStreamFactory);
-
-                       try {
-                               
closeableRegistry.registerCloseable(streamWithResultProvider);
-
-                               //no need for compression scheme support 
because sst-files are already compressed
-                               KeyedBackendSerializationProxy<K> 
serializationProxy =
-                                       new KeyedBackendSerializationProxy<>(
-                                               stateBackend.keySerializer,
-                                               stateMetaInfoSnapshots,
-                                               false);
-
-                               DataOutputView out =
-                                       new 
DataOutputViewStreamWrapper(streamWithResultProvider.getCheckpointOutputStream());
-
-                               serializationProxy.write(out);
-
-                               if 
(closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
-                                       SnapshotResult<StreamStateHandle> 
result =
-                                               
streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
-                                       streamWithResultProvider = null;
-                                       return result;
-                               } else {
-                                       throw new IOException("Stream already 
closed and cannot return a handle.");
-                               }
-                       } finally {
-                               if (streamWithResultProvider != null) {
-                                       if 
(closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
-                                               
IOUtils.closeQuietly(streamWithResultProvider);
-                                       }
-                               }
-                       }
-               }
-
-               void takeSnapshot() throws Exception {
-
-                       final long lastCompletedCheckpoint;
-
-                       // use the last completed checkpoint as the comparison 
base.
-                       synchronized (stateBackend.materializedSstFiles) {
-                               lastCompletedCheckpoint = 
stateBackend.lastCompletedCheckpointId;
-                               baseSstFiles = 
stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
-                       }
-
-                       LOG.trace("Taking incremental snapshot for checkpoint 
{}. Snapshot is based on last completed checkpoint {} " +
-                               "assuming the following (shared) files as base: 
{}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
-
-                       // save meta data
-                       for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase>> stateMetaInfoEntry
-                               : stateBackend.kvStateInformation.entrySet()) {
-                               
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
-                       }
-
-                       LOG.trace("Local RocksDB checkpoint goes to backup path 
{}.", localBackupDirectory);
-
-                       if (localBackupDirectory.exists()) {
-                               throw new IllegalStateException("Unexpected 
existence of the backup directory.");
-                       }
-
-                       // create hard links of living files in the snapshot 
path
-                       try (Checkpoint checkpoint = 
Checkpoint.create(stateBackend.db)) {
-                               
checkpoint.createCheckpoint(localBackupDirectory.getDirectory().getPath());
-                       }
-               }
-
-               @Nonnull
-               SnapshotResult<KeyedStateHandle> runSnapshot() throws Exception 
{
-
-                       
stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
-
-                       // write meta data
-                       metaStateHandle = materializeMetaData();
-
-                       // sanity checks - they should never fail
-                       Preconditions.checkNotNull(metaStateHandle,
-                               "Metadata was not properly created.");
-                       
Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
-                               "Metadata for job manager was not properly 
created.");
-
-                       // write state data
-                       Preconditions.checkState(localBackupDirectory.exists());
-
-                       FileStatus[] fileStatuses = 
localBackupDirectory.listStatus();
-                       if (fileStatuses != null) {
-                               for (FileStatus fileStatus : fileStatuses) {
-                                       final Path filePath = 
fileStatus.getPath();
-                                       final String fileName = 
filePath.getName();
-                                       final StateHandleID stateHandleID = new 
StateHandleID(fileName);
-
-                                       if (fileName.endsWith(SST_FILE_SUFFIX)) 
{
-                                               final boolean existsAlready =
-                                                       baseSstFiles != null && 
baseSstFiles.contains(stateHandleID);
-
-                                               if (existsAlready) {
-                                                       // we introduce a 
placeholder state handle, that is replaced with the
-                                                       // original from the 
shared state registry (created from a previous checkpoint)
-                                                       sstFiles.put(
-                                                               stateHandleID,
-                                                               new 
PlaceholderStreamStateHandle());
-                                               } else {
-                                                       
sstFiles.put(stateHandleID, materializeStateData(filePath));
-                                               }
-                                       } else {
-                                               StreamStateHandle fileHandle = 
materializeStateData(filePath);
-                                               miscFiles.put(stateHandleID, 
fileHandle);
-                                       }
-                               }
-                       }
-
-                       synchronized (stateBackend.materializedSstFiles) {
-                               
stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
-                       }
-
-                       IncrementalKeyedStateHandle 
jmIncrementalKeyedStateHandle = new IncrementalKeyedStateHandle(
-                               stateBackend.backendUID,
-                               stateBackend.keyGroupRange,
-                               checkpointId,
-                               sstFiles,
-                               miscFiles,
-                               metaStateHandle.getJobManagerOwnedSnapshot());
-
-                       StreamStateHandle taskLocalSnapshotMetaDataStateHandle 
= metaStateHandle.getTaskLocalSnapshot();
-                       DirectoryStateHandle directoryStateHandle = null;
-
-                       try {
-
-                               directoryStateHandle = 
localBackupDirectory.completeSnapshotAndGetHandle();
-                       } catch (IOException ex) {
-
-                               Exception collector = ex;
-
-                               try {
-                                       
taskLocalSnapshotMetaDataStateHandle.discardState();
-                               } catch (Exception discardEx) {
-                                       collector = 
ExceptionUtils.firstOrSuppressed(discardEx, collector);
-                               }
-
-                               LOG.warn("Problem with local state snapshot.", 
collector);
-                       }
-
-                       if (directoryStateHandle != null && 
taskLocalSnapshotMetaDataStateHandle != null) {
-
-                               IncrementalLocalKeyedStateHandle 
localDirKeyedStateHandle =
-                                       new IncrementalLocalKeyedStateHandle(
-                                               stateBackend.backendUID,
-                                               checkpointId,
-                                               directoryStateHandle,
-                                               stateBackend.keyGroupRange,
-                                               
taskLocalSnapshotMetaDataStateHandle,
-                                               sstFiles.keySet());
-                               return 
SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, 
localDirKeyedStateHandle);
-                       } else {
-                               return 
SnapshotResult.of(jmIncrementalKeyedStateHandle);
-                       }
-               }
-
-               void stop() {
-
-                       if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
-                               try {
-                                       closeableRegistry.close();
-                               } catch (IOException e) {
-                                       LOG.warn("Could not properly close io 
streams.", e);
-                               }
-                       }
-               }
-
-               void releaseResources(boolean canceled) {
-
-                       dbLease.close();
-
-                       if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
-                               try {
-                                       closeableRegistry.close();
-                               } catch (IOException e) {
-                                       LOG.warn("Exception on closing 
registry.", e);
-                               }
-                       }
-
-                       try {
-                               if (localBackupDirectory.exists()) {
-                                       LOG.trace("Running cleanup for local 
RocksDB backup directory {}.", localBackupDirectory);
-                                       boolean cleanupOk = 
localBackupDirectory.cleanup();
-
-                                       if (!cleanupOk) {
-                                               LOG.debug("Could not properly 
cleanup local RocksDB backup directory.");
-                                       }
-                               }
-                       } catch (IOException e) {
-                               LOG.warn("Could not properly cleanup local 
RocksDB backup directory.", e);
-                       }
-
-                       if (canceled) {
-                               Collection<StateObject> statesToDiscard =
-                                       new ArrayList<>(1 + miscFiles.size() + 
sstFiles.size());
-
-                               statesToDiscard.add(metaStateHandle);
-                               statesToDiscard.addAll(miscFiles.values());
-                               statesToDiscard.addAll(sstFiles.values());
-
-                               try {
-                                       
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
-                               } catch (Exception e) {
-                                       LOG.warn("Could not properly discard 
states.", e);
-                               }
-
-                               if (localBackupDirectory.isSnapshotCompleted()) 
{
-                                       try {
-                                               DirectoryStateHandle 
directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
-                                               if (directoryStateHandle != 
null) {
-                                                       
directoryStateHandle.discardState();
-                                               }
-                                       } catch (Exception e) {
-                                               LOG.warn("Could not properly 
discard local state.", e);
-                                       }
-                               }
-                       }
-               }
-       }
-
        public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
                return new RocksIteratorWrapper(db.newIterator());
        }
@@ -2332,23 +1474,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                return new 
RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
        }
 
-       @SuppressWarnings("unchecked")
-       private static RocksIteratorWrapper getRocksIterator(
-               RocksDB db,
-               ColumnFamilyHandle columnFamilyHandle,
-               RegisteredStateMetaInfoBase metaInfo,
-               ReadOptions readOptions) {
-               StateSnapshotTransformer<byte[]> stateSnapshotTransformer = 
null;
-               if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) 
{
-                       stateSnapshotTransformer = 
(StateSnapshotTransformer<byte[]>)
-                               ((RegisteredKeyValueStateBackendMetaInfo<?, ?>) 
metaInfo).getSnapshotTransformer();
-               }
-               RocksIterator rocksIterator = 
db.newIterator(columnFamilyHandle, readOptions);
-               return stateSnapshotTransformer == null ?
-                       new RocksIteratorWrapper(rocksIterator) :
-                       new RocksTransformingIteratorWrapper(rocksIterator, 
stateSnapshotTransformer);
-       }
-
        /**
         * Encapsulates the logic and resources in connection with creating 
priority queue state structures.
         */
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
new file mode 100644
index 0000000..0cc9729
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import 
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
+import 
org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.END_OF_KEY_GROUP_MARK;
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag;
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.setMetaDataFollowsFlagInKey;
+
+/**
+ * Snapshot strategy to create full snapshots of
+ * {@link org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}. 
Iterates and writes all states from a
+ * RocksDB snapshot of the column families.
+ *
+ * @param <K> type of the backend keys.
+ */
+public class RocksFullSnapshotStrategy<K> extends SnapshotStrategyBase<K> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RocksFullSnapshotStrategy.class);
+
+       /** This decorator is used to apply compression per key-group for the 
written snapshot data. */
+       @Nonnull
+       private final StreamCompressionDecorator keyGroupCompressionDecorator;
+
+       public RocksFullSnapshotStrategy(
+               @Nonnull RocksDB db,
+               @Nonnull ResourceGuard rocksDBResourceGuard,
+               @Nonnull TypeSerializer<K> keySerializer,
+               @Nonnull LinkedHashMap<String, Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase>> kvStateInformation,
+               @Nonnull KeyGroupRange keyGroupRange,
+               @Nonnegative int keyGroupPrefixBytes,
+               @Nonnull LocalRecoveryConfig localRecoveryConfig,
+               @Nonnull CloseableRegistry cancelStreamRegistry,
+               @Nonnull StreamCompressionDecorator 
keyGroupCompressionDecorator) {
+               super(
+                       db,
+                       rocksDBResourceGuard,
+                       keySerializer,
+                       kvStateInformation,
+                       keyGroupRange,
+                       keyGroupPrefixBytes,
+                       localRecoveryConfig,
+                       cancelStreamRegistry);
+
+               this.keyGroupCompressionDecorator = 
keyGroupCompressionDecorator;
+       }
+
+       @Override
+       public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(
+               long checkpointId,
+               long timestamp,
+               CheckpointStreamFactory primaryStreamFactory,
+               CheckpointOptions checkpointOptions) throws Exception {
+
+               long startTime = System.currentTimeMillis();
+
+               if (kvStateInformation.isEmpty()) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
+                                       timestamp);
+                       }
+
+                       return DoneFuture.of(SnapshotResult.empty());
+               }
+
+               final SupplierWithException<CheckpointStreamWithResultProvider, 
Exception> supplier =
+
+                       localRecoveryConfig.isLocalRecoveryEnabled() &&
+                               (CheckpointType.SAVEPOINT != 
checkpointOptions.getCheckpointType()) ?
+
+                               () -> 
CheckpointStreamWithResultProvider.createDuplicatingStream(
+                                       checkpointId,
+                                       CheckpointedStateScope.EXCLUSIVE,
+                                       primaryStreamFactory,
+                                       
localRecoveryConfig.getLocalStateDirectoryProvider()) :
+
+                               () -> 
CheckpointStreamWithResultProvider.createSimpleStream(
+                                       CheckpointedStateScope.EXCLUSIVE,
+                                       primaryStreamFactory);
+
+               final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
+
+               final RocksDBFullSnapshotCallable snapshotOperation =
+                       new RocksDBFullSnapshotCallable(supplier, 
snapshotCloseableRegistry);
+
+               return new SnapshotTask(snapshotOperation);
+       }
+
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) {
+               // nothing to do.
+       }
+
+       /**
+        * Wrapping task to run a {@link RocksDBFullSnapshotCallable} and 
delegate cancellation.
+        */
+       private class SnapshotTask extends 
FutureTask<SnapshotResult<KeyedStateHandle>> {
+
+               /** Reference to the callable for cancellation. */
+               @Nonnull
+               private final AutoCloseable callableClose;
+
+               SnapshotTask(@Nonnull RocksDBFullSnapshotCallable callable) {
+                       super(callable);
+                       this.callableClose = callable;
+               }
+
+               @Override
+               public boolean cancel(boolean mayInterruptIfRunning) {
+                       IOUtils.closeQuietly(callableClose);
+                       return super.cancel(mayInterruptIfRunning);
+               }
+       }
+
+       /**
+        * Encapsulates the process to perform a full snapshot of a 
RocksDBKeyedStateBackend.
+        */
+       @VisibleForTesting
+       private class RocksDBFullSnapshotCallable implements 
Callable<SnapshotResult<KeyedStateHandle>>, AutoCloseable {
+
+               @Nonnull
+               private final KeyGroupRangeOffsets keyGroupRangeOffsets;
+
+               @Nonnull
+               private final 
SupplierWithException<CheckpointStreamWithResultProvider, Exception> 
checkpointStreamSupplier;
+
+               @Nonnull
+               private final CloseableRegistry snapshotCloseableRegistry;
+
+               @Nonnull
+               private final ResourceGuard.Lease dbLease;
+
+               @Nonnull
+               private final Snapshot snapshot;
+
+               @Nonnull
+               private final ReadOptions readOptions;
+
+               /**
+                * The state meta data.
+                */
+               @Nonnull
+               private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+
+               /**
+                * The copied column handle.
+                */
+               @Nonnull
+               private List<Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase>> metaDataCopy;
+
+               private final AtomicBoolean ownedForCleanup;
+
+               RocksDBFullSnapshotCallable(
+                       @Nonnull 
SupplierWithException<CheckpointStreamWithResultProvider, Exception> 
checkpointStreamSupplier,
+                       @Nonnull CloseableRegistry registry) throws IOException 
{
+
+                       this.ownedForCleanup = new AtomicBoolean(false);
+                       this.checkpointStreamSupplier = 
checkpointStreamSupplier;
+                       this.keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(keyGroupRange);
+                       this.snapshotCloseableRegistry = registry;
+
+                       this.stateMetaInfoSnapshots = new 
ArrayList<>(kvStateInformation.size());
+                       this.metaDataCopy = new 
ArrayList<>(kvStateInformation.size());
+                       for (Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase> tuple2 : kvStateInformation.values()) {
+                               // snapshot meta info
+                               
this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
+                               this.metaDataCopy.add(tuple2);
+                       }
+
+                       this.dbLease = rocksDBResourceGuard.acquireResource();
+
+                       this.readOptions = new ReadOptions();
+                       this.snapshot = db.getSnapshot();
+                       this.readOptions.setSnapshot(snapshot);
+               }
+
+               @Override
+               public SnapshotResult<KeyedStateHandle> call() throws Exception 
{
+
+                       if (!ownedForCleanup.compareAndSet(false, true)) {
+                               throw new CancellationException("Snapshot task 
was already cancelled, stopping execution.");
+                       }
+
+                       final long startTime = System.currentTimeMillis();
+                       final List<Tuple2<RocksIteratorWrapper, Integer>> 
kvStateIterators = new ArrayList<>(metaDataCopy.size());
+
+                       try {
+
+                               
cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
+
+                               final CheckpointStreamWithResultProvider 
checkpointStreamWithResultProvider = checkpointStreamSupplier.get();
+                               
snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);
+
+                               final DataOutputView outputView =
+                                       new 
DataOutputViewStreamWrapper(checkpointStreamWithResultProvider.getCheckpointOutputStream());
+
+                               writeKVStateMetaData(kvStateIterators, 
outputView);
+                               writeKVStateData(kvStateIterators, 
checkpointStreamWithResultProvider);
+
+                               final SnapshotResult<KeyedStateHandle> 
snapshotResult =
+                                       
createStateHandlesFromStreamProvider(checkpointStreamWithResultProvider);
+
+                               LOG.info("Asynchronous RocksDB snapshot ({}, 
asynchronous part) in thread {} took {} ms.",
+                                       checkpointStreamSupplier, 
Thread.currentThread(), (System.currentTimeMillis() - startTime));
+
+                               return snapshotResult;
+
+                       } finally {
+
+                               for (Tuple2<RocksIteratorWrapper, Integer> 
kvStateIterator : kvStateIterators) {
+                                       
IOUtils.closeQuietly(kvStateIterator.f0);
+                               }
+
+                               cleanupSynchronousStepResources();
+                       }
+               }
+
+               private void cleanupSynchronousStepResources() {
+                       IOUtils.closeQuietly(readOptions);
+
+                       db.releaseSnapshot(snapshot);
+                       IOUtils.closeQuietly(snapshot);
+
+                       IOUtils.closeQuietly(dbLease);
+
+                       if 
(cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
+                               try {
+                                       snapshotCloseableRegistry.close();
+                               } catch (Exception ex) {
+                                       LOG.warn("Error closing local 
registry", ex);
+                               }
+                       }
+               }
+
+               private SnapshotResult<KeyedStateHandle> 
createStateHandlesFromStreamProvider(
+                       CheckpointStreamWithResultProvider 
checkpointStreamWithResultProvider) throws IOException {
+                       if 
(snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider))
 {
+                               return 
CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(
+                                       
checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(),
+                                       keyGroupRangeOffsets);
+                       } else {
+                               throw new IOException("Snapshot was already 
closed before completion.");
+                       }
+               }
+
+               private void writeKVStateMetaData(
+                       final List<Tuple2<RocksIteratorWrapper, Integer>> 
kvStateIterators,
+                       final DataOutputView outputView) throws IOException {
+
+                       int kvStateId = 0;
+
+                       for (Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase> tuple2 : metaDataCopy) {
+
+                               RocksIteratorWrapper rocksIteratorWrapper =
+                                       getRocksIterator(db, tuple2.f0, 
tuple2.f1, readOptions);
+
+                               
kvStateIterators.add(Tuple2.of(rocksIteratorWrapper, kvStateId));
+                               ++kvStateId;
+                       }
+
+                       KeyedBackendSerializationProxy<K> serializationProxy =
+                               new KeyedBackendSerializationProxy<>(
+                                       // TODO: this code assumes that writing 
a serializer is threadsafe, we should support to
+                                       // get a serialized form already at 
state registration time in the future
+                                       keySerializer,
+                                       stateMetaInfoSnapshots,
+                                       !Objects.equals(
+                                               
UncompressedStreamCompressionDecorator.INSTANCE,
+                                               keyGroupCompressionDecorator));
+
+                       serializationProxy.write(outputView);
+               }
+
+               private void writeKVStateData(
+                       final List<Tuple2<RocksIteratorWrapper, Integer>> 
kvStateIterators,
+                       final CheckpointStreamWithResultProvider 
checkpointStreamWithResultProvider) throws IOException, InterruptedException {
+
+                       byte[] previousKey = null;
+                       byte[] previousValue = null;
+                       DataOutputView kgOutView = null;
+                       OutputStream kgOutStream = null;
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
checkpointOutputStream =
+                               
checkpointStreamWithResultProvider.getCheckpointOutputStream();
+
+                       try {
+                               // Here we transfer ownership of RocksIterators 
to the RocksStatesPerKeyGroupMergeIterator
+                               try (RocksStatesPerKeyGroupMergeIterator 
mergeIterator = new RocksStatesPerKeyGroupMergeIterator(
+                                       kvStateIterators, keyGroupPrefixBytes)) 
{
+
+                                       //preamble: setup with first key-group 
as our lookahead
+                                       if (mergeIterator.isValid()) {
+                                               //begin first key-group by 
recording the offset
+                                               
keyGroupRangeOffsets.setKeyGroupOffset(
+                                                       
mergeIterator.keyGroup(),
+                                                       
checkpointOutputStream.getPos());
+                                               //write the k/v-state id as 
metadata
+                                               kgOutStream = 
keyGroupCompressionDecorator.decorateWithCompression(checkpointOutputStream);
+                                               kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
+                                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
+                                               
kgOutView.writeShort(mergeIterator.kvStateId());
+                                               previousKey = 
mergeIterator.key();
+                                               previousValue = 
mergeIterator.value();
+                                               mergeIterator.next();
+                                       }
+
+                                       //main loop: write k/v pairs ordered by 
(key-group, kv-state), thereby tracking key-group offsets.
+                                       while (mergeIterator.isValid()) {
+
+                                               assert 
(!hasMetaDataFollowsFlag(previousKey));
+
+                                               //set signal in first key byte 
that meta data will follow in the stream after this k/v pair
+                                               if 
(mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
+
+                                                       //be cooperative and 
check for interruption from time to time in the hot loop
+                                                       checkInterrupted();
+
+                                                       
setMetaDataFollowsFlagInKey(previousKey);
+                                               }
+
+                                               writeKeyValuePair(previousKey, 
previousValue, kgOutView);
+
+                                               //write meta data if we have to
+                                               if 
(mergeIterator.isNewKeyGroup()) {
+                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
+                                                       
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
+                                                       // this will just close 
the outer stream
+                                                       kgOutStream.close();
+                                                       //begin new key-group
+                                                       
keyGroupRangeOffsets.setKeyGroupOffset(
+                                                               
mergeIterator.keyGroup(),
+                                                               
checkpointOutputStream.getPos());
+                                                       //write the kev-state
+                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
+                                                       kgOutStream = 
keyGroupCompressionDecorator.decorateWithCompression(checkpointOutputStream);
+                                                       kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
+                                                       
kgOutView.writeShort(mergeIterator.kvStateId());
+                                               } else if 
(mergeIterator.isNewKeyValueState()) {
+                                                       //write the k/v-state
+                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
+                                                       
kgOutView.writeShort(mergeIterator.kvStateId());
+                                               }
+
+                                               //request next k/v pair
+                                               previousKey = 
mergeIterator.key();
+                                               previousValue = 
mergeIterator.value();
+                                               mergeIterator.next();
+                                       }
+                               }
+
+                               //epilogue: write last key-group
+                               if (previousKey != null) {
+                                       assert 
(!hasMetaDataFollowsFlag(previousKey));
+                                       
setMetaDataFollowsFlagInKey(previousKey);
+                                       writeKeyValuePair(previousKey, 
previousValue, kgOutView);
+                                       //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
+                                       
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
+                                       // this will just close the outer stream
+                                       kgOutStream.close();
+                                       kgOutStream = null;
+                               }
+
+                       } finally {
+                               // this will just close the outer stream
+                               IOUtils.closeQuietly(kgOutStream);
+                       }
+               }
+
+               private void writeKeyValuePair(byte[] key, byte[] value, 
DataOutputView out) throws IOException {
+                       BytePrimitiveArraySerializer.INSTANCE.serialize(key, 
out);
+                       BytePrimitiveArraySerializer.INSTANCE.serialize(value, 
out);
+               }
+
+               private void checkInterrupted() throws InterruptedException {
+                       if (Thread.currentThread().isInterrupted()) {
+                               throw new InterruptedException("RocksDB 
snapshot interrupted.");
+                       }
+               }
+
+               @Override
+               public void close() throws Exception {
+
+                       if (ownedForCleanup.compareAndSet(false, true)) {
+                               cleanupSynchronousStepResources();
+                       }
+
+                       if 
(cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
+                               snapshotCloseableRegistry.close();
+                       }
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private static RocksIteratorWrapper getRocksIterator(
+               RocksDB db,
+               ColumnFamilyHandle columnFamilyHandle,
+               RegisteredStateMetaInfoBase metaInfo,
+               ReadOptions readOptions) {
+               StateSnapshotTransformer<byte[]> stateSnapshotTransformer = 
null;
+               if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) 
{
+                       stateSnapshotTransformer = 
(StateSnapshotTransformer<byte[]>)
+                               ((RegisteredKeyValueStateBackendMetaInfo<?, ?>) 
metaInfo).getSnapshotTransformer();
+               }
+               RocksIterator rocksIterator = 
db.newIterator(columnFamilyHandle, readOptions);
+               return stateSnapshotTransformer == null ?
+                       new RocksIteratorWrapper(rocksIterator) :
+                       new RocksTransformingIteratorWrapper(rocksIterator, 
stateSnapshotTransformer);
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
new file mode 100644
index 0000000..3487fe6
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.DirectoryStateHandle;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.SnapshotDirectory;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.SnapshotStrategy;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.Checkpoint;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
+
+/**
+ * Snapshot strategy for {@link 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend} that is based
+ * on RocksDB's native checkpoints and creates incremental snapshots.
+ *
+ * @param <K> type of the backend keys.
+ */
+public class RocksIncrementalSnapshotStrategy<K> extends 
SnapshotStrategyBase<K> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RocksIncrementalSnapshotStrategy.class);
+
+       /** Base path of the RocksDB instance. */
+       @Nonnull
+       private final File instanceBasePath;
+
+       /** The state handle ids of all sst files materialized in snapshots for 
previous checkpoints. */
+       @Nonnull
+       private final UUID backendUID;
+
+       /** Stores the materialized sstable files from all snapshots that build 
the incremental history. */
+       @Nonnull
+       private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
+
+       /** The identifier of the last completed checkpoint. */
+       private long lastCompletedCheckpointId;
+
+       /** We delegate snapshots that are for savepoints to this. */
+       @Nonnull
+       private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> 
savepointDelegate;
+
+       public RocksIncrementalSnapshotStrategy(
+               @Nonnull RocksDB db,
+               @Nonnull ResourceGuard rocksDBResourceGuard,
+               @Nonnull TypeSerializer<K> keySerializer,
+               @Nonnull LinkedHashMap<String, Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase>> kvStateInformation,
+               @Nonnull KeyGroupRange keyGroupRange,
+               @Nonnegative int keyGroupPrefixBytes,
+               @Nonnull LocalRecoveryConfig localRecoveryConfig,
+               @Nonnull CloseableRegistry cancelStreamRegistry,
+               @Nonnull File instanceBasePath,
+               @Nonnull UUID backendUID,
+               @Nonnull SortedMap<Long, Set<StateHandleID>> 
materializedSstFiles,
+               long lastCompletedCheckpointId,
+               @Nonnull SnapshotStrategy<SnapshotResult<KeyedStateHandle>> 
savepointDelegate) {
+
+               super(
+                       db,
+                       rocksDBResourceGuard,
+                       keySerializer,
+                       kvStateInformation,
+                       keyGroupRange,
+                       keyGroupPrefixBytes,
+                       localRecoveryConfig,
+                       cancelStreamRegistry);
+
+               this.instanceBasePath = instanceBasePath;
+               this.backendUID = backendUID;
+               this.materializedSstFiles = materializedSstFiles;
+               this.lastCompletedCheckpointId = lastCompletedCheckpointId;
+               this.savepointDelegate = savepointDelegate;
+       }
+
+       @Override
+       public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(
+               long checkpointId,
+               long checkpointTimestamp,
+               CheckpointStreamFactory checkpointStreamFactory,
+               CheckpointOptions checkpointOptions) throws Exception {
+
+               // for savepoints, we delegate to the full snapshot strategy 
because savepoints are always self-contained.
+               if (CheckpointType.SAVEPOINT == 
checkpointOptions.getCheckpointType()) {
+                       return savepointDelegate.performSnapshot(
+                               checkpointId,
+                               checkpointTimestamp,
+                               checkpointStreamFactory,
+                               checkpointOptions);
+               }
+
+               if (kvStateInformation.isEmpty()) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.", checkpointTimestamp);
+                       }
+                       return DoneFuture.of(SnapshotResult.empty());
+               }
+
+               SnapshotDirectory snapshotDirectory;
+
+               if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+                       // create a "permanent" snapshot directory for local 
recovery.
+                       LocalRecoveryDirectoryProvider directoryProvider = 
localRecoveryConfig.getLocalStateDirectoryProvider();
+                       File directory = 
directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
+
+                       if (directory.exists()) {
+                               FileUtils.deleteDirectory(directory);
+                       }
+
+                       if (!directory.mkdirs()) {
+                               throw new IOException("Local state base 
directory for checkpoint " + checkpointId +
+                                       " already exists: " + directory);
+                       }
+
+                       // introduces an extra directory because RocksDB wants 
a non-existing directory for native checkpoints.
+                       File rdbSnapshotDir = new File(directory, "rocks_db");
+                       Path path = new Path(rdbSnapshotDir.toURI());
+                       // create a "permanent" snapshot directory because 
local recovery is active.
+                       snapshotDirectory = SnapshotDirectory.permanent(path);
+               } else {
+                       // create a "temporary" snapshot directory because 
local recovery is inactive.
+                       Path path = new 
Path(instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
+                       snapshotDirectory = SnapshotDirectory.temporary(path);
+               }
+
+               final RocksDBIncrementalSnapshotOperation snapshotOperation =
+                       new RocksDBIncrementalSnapshotOperation(
+                               checkpointStreamFactory,
+                               snapshotDirectory,
+                               checkpointId);
+
+               try {
+                       snapshotOperation.takeSnapshot();
+               } catch (Exception e) {
+                       snapshotOperation.stop();
+                       snapshotOperation.releaseResources(true);
+                       throw e;
+               }
+
+               return new FutureTask<SnapshotResult<KeyedStateHandle>>(
+                       snapshotOperation::runSnapshot
+               ) {
+                       @Override
+                       public boolean cancel(boolean mayInterruptIfRunning) {
+                               snapshotOperation.stop();
+                               return super.cancel(mayInterruptIfRunning);
+                       }
+
+                       @Override
+                       protected void done() {
+                               
snapshotOperation.releaseResources(isCancelled());
+                       }
+               };
+       }
+
+       @Override
+       public void notifyCheckpointComplete(long completedCheckpointId) {
+               synchronized (materializedSstFiles) {
+
+                       if (completedCheckpointId < lastCompletedCheckpointId) {
+                               return;
+                       }
+
+                       materializedSstFiles.keySet().removeIf(checkpointId -> 
checkpointId < completedCheckpointId);
+
+                       lastCompletedCheckpointId = completedCheckpointId;
+               }
+       }
+
+       /**
+        * Encapsulates the process to perform an incremental snapshot of a 
RocksDBKeyedStateBackend.
+        */
+       private final class RocksDBIncrementalSnapshotOperation {
+
+               /**
+                * Stream factory that creates the outpus streams to DFS.
+                */
+               private final CheckpointStreamFactory checkpointStreamFactory;
+
+               /**
+                * Id for the current checkpoint.
+                */
+               private final long checkpointId;
+
+               /**
+                * All sst files that were part of the last previously 
completed checkpoint.
+                */
+               private Set<StateHandleID> baseSstFiles;
+
+               /**
+                * The state meta data.
+                */
+               private final List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots;
+
+               /**
+                * Local directory for the RocksDB native backup.
+                */
+               private SnapshotDirectory localBackupDirectory;
+
+               // Registry for all opened i/o streams
+               private final CloseableRegistry closeableRegistry;
+
+               // new sst files since the last completed checkpoint
+               private final Map<StateHandleID, StreamStateHandle> sstFiles;
+
+               // handles to the misc files in the current snapshot
+               private final Map<StateHandleID, StreamStateHandle> miscFiles;
+
+               // This lease protects from concurrent disposal of the native 
rocksdb instance.
+               private final ResourceGuard.Lease dbLease;
+
+               private SnapshotResult<StreamStateHandle> metaStateHandle;
+
+               private RocksDBIncrementalSnapshotOperation(
+                       CheckpointStreamFactory checkpointStreamFactory,
+                       SnapshotDirectory localBackupDirectory,
+                       long checkpointId) throws IOException {
+
+                       this.checkpointStreamFactory = checkpointStreamFactory;
+                       this.checkpointId = checkpointId;
+                       this.localBackupDirectory = localBackupDirectory;
+                       this.stateMetaInfoSnapshots = new ArrayList<>();
+                       this.closeableRegistry = new CloseableRegistry();
+                       this.sstFiles = new HashMap<>();
+                       this.miscFiles = new HashMap<>();
+                       this.metaStateHandle = null;
+                       this.dbLease = rocksDBResourceGuard.acquireResource();
+               }
+
+               private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
+                       FSDataInputStream inputStream = null;
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+
+                       try {
+                               final byte[] buffer = new byte[8 * 1024];
+
+                               FileSystem backupFileSystem = 
localBackupDirectory.getFileSystem();
+                               inputStream = backupFileSystem.open(filePath);
+                               
closeableRegistry.registerCloseable(inputStream);
+
+                               outputStream = checkpointStreamFactory
+                                       
.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
+                               
closeableRegistry.registerCloseable(outputStream);
+
+                               while (true) {
+                                       int numBytes = inputStream.read(buffer);
+
+                                       if (numBytes == -1) {
+                                               break;
+                                       }
+
+                                       outputStream.write(buffer, 0, numBytes);
+                               }
+
+                               StreamStateHandle result = null;
+                               if 
(closeableRegistry.unregisterCloseable(outputStream)) {
+                                       result = 
outputStream.closeAndGetHandle();
+                                       outputStream = null;
+                               }
+                               return result;
+
+                       } finally {
+
+                               if 
(closeableRegistry.unregisterCloseable(inputStream)) {
+                                       inputStream.close();
+                               }
+
+                               if 
(closeableRegistry.unregisterCloseable(outputStream)) {
+                                       outputStream.close();
+                               }
+                       }
+               }
+
+               @Nonnull
+               private SnapshotResult<StreamStateHandle> materializeMetaData() 
throws Exception {
+
+                       CheckpointStreamWithResultProvider 
streamWithResultProvider =
+
+                               localRecoveryConfig.isLocalRecoveryEnabled() ?
+
+                                       
CheckpointStreamWithResultProvider.createDuplicatingStream(
+                                               checkpointId,
+                                               
CheckpointedStateScope.EXCLUSIVE,
+                                               checkpointStreamFactory,
+                                               
localRecoveryConfig.getLocalStateDirectoryProvider()) :
+
+                                       
CheckpointStreamWithResultProvider.createSimpleStream(
+                                               
CheckpointedStateScope.EXCLUSIVE,
+                                               checkpointStreamFactory);
+
+                       try {
+                               
closeableRegistry.registerCloseable(streamWithResultProvider);
+
+                               //no need for compression scheme support 
because sst-files are already compressed
+                               KeyedBackendSerializationProxy<K> 
serializationProxy =
+                                       new KeyedBackendSerializationProxy<>(
+                                               keySerializer,
+                                               stateMetaInfoSnapshots,
+                                               false);
+
+                               DataOutputView out =
+                                       new 
DataOutputViewStreamWrapper(streamWithResultProvider.getCheckpointOutputStream());
+
+                               serializationProxy.write(out);
+
+                               if 
(closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
+                                       SnapshotResult<StreamStateHandle> 
result =
+                                               
streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
+                                       streamWithResultProvider = null;
+                                       return result;
+                               } else {
+                                       throw new IOException("Stream already 
closed and cannot return a handle.");
+                               }
+                       } finally {
+                               if (streamWithResultProvider != null) {
+                                       if 
(closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
+                                               
IOUtils.closeQuietly(streamWithResultProvider);
+                                       }
+                               }
+                       }
+               }
+
+               void takeSnapshot() throws Exception {
+
+                       final long lastCompletedCheckpoint;
+
+                       // use the last completed checkpoint as the comparison 
base.
+                       synchronized (materializedSstFiles) {
+                               lastCompletedCheckpoint = 
lastCompletedCheckpointId;
+                               baseSstFiles = 
materializedSstFiles.get(lastCompletedCheckpoint);
+                       }
+
+                       LOG.trace("Taking incremental snapshot for checkpoint 
{}. Snapshot is based on last completed checkpoint {} " +
+                               "assuming the following (shared) files as base: 
{}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
+
+                       // save meta data
+                       for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase>> stateMetaInfoEntry
+                               : kvStateInformation.entrySet()) {
+                               
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
+                       }
+
+                       LOG.trace("Local RocksDB checkpoint goes to backup path 
{}.", localBackupDirectory);
+
+                       if (localBackupDirectory.exists()) {
+                               throw new IllegalStateException("Unexpected 
existence of the backup directory.");
+                       }
+
+                       // create hard links of living files in the snapshot 
path
+                       try (Checkpoint checkpoint = Checkpoint.create(db)) {
+                               
checkpoint.createCheckpoint(localBackupDirectory.getDirectory().getPath());
+                       }
+               }
+
+               @Nonnull
+               SnapshotResult<KeyedStateHandle> runSnapshot() throws Exception 
{
+
+                       
cancelStreamRegistry.registerCloseable(closeableRegistry);
+
+                       // write meta data
+                       metaStateHandle = materializeMetaData();
+
+                       // sanity checks - they should never fail
+                       Preconditions.checkNotNull(metaStateHandle,
+                               "Metadata was not properly created.");
+                       
Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
+                               "Metadata for job manager was not properly 
created.");
+
+                       // write state data
+                       Preconditions.checkState(localBackupDirectory.exists());
+
+                       FileStatus[] fileStatuses = 
localBackupDirectory.listStatus();
+                       if (fileStatuses != null) {
+                               for (FileStatus fileStatus : fileStatuses) {
+                                       final Path filePath = 
fileStatus.getPath();
+                                       final String fileName = 
filePath.getName();
+                                       final StateHandleID stateHandleID = new 
StateHandleID(fileName);
+
+                                       if (fileName.endsWith(SST_FILE_SUFFIX)) 
{
+                                               final boolean existsAlready =
+                                                       baseSstFiles != null && 
baseSstFiles.contains(stateHandleID);
+
+                                               if (existsAlready) {
+                                                       // we introduce a 
placeholder state handle, that is replaced with the
+                                                       // original from the 
shared state registry (created from a previous checkpoint)
+                                                       sstFiles.put(
+                                                               stateHandleID,
+                                                               new 
PlaceholderStreamStateHandle());
+                                               } else {
+                                                       
sstFiles.put(stateHandleID, materializeStateData(filePath));
+                                               }
+                                       } else {
+                                               StreamStateHandle fileHandle = 
materializeStateData(filePath);
+                                               miscFiles.put(stateHandleID, 
fileHandle);
+                                       }
+                               }
+                       }
+
+                       synchronized (materializedSstFiles) {
+                               materializedSstFiles.put(checkpointId, 
sstFiles.keySet());
+                       }
+
+                       IncrementalKeyedStateHandle 
jmIncrementalKeyedStateHandle = new IncrementalKeyedStateHandle(
+                               backendUID,
+                               keyGroupRange,
+                               checkpointId,
+                               sstFiles,
+                               miscFiles,
+                               metaStateHandle.getJobManagerOwnedSnapshot());
+
+                       StreamStateHandle taskLocalSnapshotMetaDataStateHandle 
= metaStateHandle.getTaskLocalSnapshot();
+                       DirectoryStateHandle directoryStateHandle = null;
+
+                       try {
+
+                               directoryStateHandle = 
localBackupDirectory.completeSnapshotAndGetHandle();
+                       } catch (IOException ex) {
+
+                               Exception collector = ex;
+
+                               try {
+                                       
taskLocalSnapshotMetaDataStateHandle.discardState();
+                               } catch (Exception discardEx) {
+                                       collector = 
ExceptionUtils.firstOrSuppressed(discardEx, collector);
+                               }
+
+                               LOG.warn("Problem with local state snapshot.", 
collector);
+                       }
+
+                       if (directoryStateHandle != null && 
taskLocalSnapshotMetaDataStateHandle != null) {
+
+                               IncrementalLocalKeyedStateHandle 
localDirKeyedStateHandle =
+                                       new IncrementalLocalKeyedStateHandle(
+                                               backendUID,
+                                               checkpointId,
+                                               directoryStateHandle,
+                                               keyGroupRange,
+                                               
taskLocalSnapshotMetaDataStateHandle,
+                                               sstFiles.keySet());
+                               return 
SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, 
localDirKeyedStateHandle);
+                       } else {
+                               return 
SnapshotResult.of(jmIncrementalKeyedStateHandle);
+                       }
+               }
+
+               void stop() {
+
+                       if 
(cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
+                               try {
+                                       closeableRegistry.close();
+                               } catch (IOException e) {
+                                       LOG.warn("Could not properly close io 
streams.", e);
+                               }
+                       }
+               }
+
+               void releaseResources(boolean canceled) {
+
+                       dbLease.close();
+
+                       if 
(cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
+                               try {
+                                       closeableRegistry.close();
+                               } catch (IOException e) {
+                                       LOG.warn("Exception on closing 
registry.", e);
+                               }
+                       }
+
+                       try {
+                               if (localBackupDirectory.exists()) {
+                                       LOG.trace("Running cleanup for local 
RocksDB backup directory {}.", localBackupDirectory);
+                                       boolean cleanupOk = 
localBackupDirectory.cleanup();
+
+                                       if (!cleanupOk) {
+                                               LOG.debug("Could not properly 
cleanup local RocksDB backup directory.");
+                                       }
+                               }
+                       } catch (IOException e) {
+                               LOG.warn("Could not properly cleanup local 
RocksDB backup directory.", e);
+                       }
+
+                       if (canceled) {
+                               Collection<StateObject> statesToDiscard =
+                                       new ArrayList<>(1 + miscFiles.size() + 
sstFiles.size());
+
+                               statesToDiscard.add(metaStateHandle);
+                               statesToDiscard.addAll(miscFiles.values());
+                               statesToDiscard.addAll(sstFiles.values());
+
+                               try {
+                                       
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
+                               } catch (Exception e) {
+                                       LOG.warn("Could not properly discard 
states.", e);
+                               }
+
+                               if (localBackupDirectory.isSnapshotCompleted()) 
{
+                                       try {
+                                               DirectoryStateHandle 
directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
+                                               if (directoryStateHandle != 
null) {
+                                                       
directoryStateHandle.discardState();
+                                               }
+                                       } catch (Exception e) {
+                                               LOG.warn("Could not properly 
discard local state.", e);
+                                       }
+                               }
+                       }
+               }
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
new file mode 100644
index 0000000..bf2bbdb
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+/**
+ * Utility methods and constants around RocksDB creating and restoring 
snapshots for
+ * {@link org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}.
+ */
+public class RocksSnapshotUtil {
+
+       /**
+        * File suffix of sstable files.
+        */
+       public static final String SST_FILE_SUFFIX = ".sst";
+
+       public static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
+
+       public static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
+
+       public static void setMetaDataFollowsFlagInKey(byte[] key) {
+               key[0] |= FIRST_BIT_IN_BYTE_MASK;
+       }
+
+       public static void clearMetaDataFollowsFlag(byte[] key) {
+               key[0] &= (~FIRST_BIT_IN_BYTE_MASK);
+       }
+
+       public static boolean hasMetaDataFollowsFlag(byte[] key) {
+               return 0 != (key[0] & FIRST_BIT_IN_BYTE_MASK);
+       }
+
+       private RocksSnapshotUtil() {
+               throw new AssertionError();
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/SnapshotStrategyBase.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/SnapshotStrategyBase.java
new file mode 100644
index 0000000..efebe8c
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/SnapshotStrategyBase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.SnapshotStrategy;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.util.LinkedHashMap;
+
+/**
+ * Base class for {@link SnapshotStrategy} implementations on RocksDB.
+ *
+ * @param <K> type of the backend keys.
+ */
+public abstract class SnapshotStrategyBase<K> implements 
SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
+
+       @Nonnull
+       protected final RocksDB db;
+
+       @Nonnull
+       protected final ResourceGuard rocksDBResourceGuard;
+
+       @Nonnull
+       protected final TypeSerializer<K> keySerializer;
+
+       @Nonnull
+       protected final LinkedHashMap<String, Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase>> kvStateInformation;
+
+       @Nonnull
+       protected final KeyGroupRange keyGroupRange;
+
+       @Nonnegative
+       protected final int keyGroupPrefixBytes;
+
+       @Nonnull
+       protected final LocalRecoveryConfig localRecoveryConfig;
+
+       @Nonnull
+       protected final CloseableRegistry cancelStreamRegistry;
+
+       public SnapshotStrategyBase(
+               @Nonnull RocksDB db,
+               @Nonnull ResourceGuard rocksDBResourceGuard,
+               @Nonnull TypeSerializer<K> keySerializer,
+               @Nonnull LinkedHashMap<String, Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase>> kvStateInformation,
+               @Nonnull KeyGroupRange keyGroupRange,
+               @Nonnegative int keyGroupPrefixBytes,
+               @Nonnull LocalRecoveryConfig localRecoveryConfig,
+               @Nonnull CloseableRegistry cancelStreamRegistry) {
+
+               this.db = db;
+               this.rocksDBResourceGuard = rocksDBResourceGuard;
+               this.keySerializer = keySerializer;
+               this.kvStateInformation = kvStateInformation;
+               this.keyGroupRange = keyGroupRange;
+               this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+               this.localRecoveryConfig = localRecoveryConfig;
+               this.cancelStreamRegistry = cancelStreamRegistry;
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index e344638..c872553 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -91,6 +91,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.END_OF_KEY_GROUP_MARK;
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.FIRST_BIT_IN_BYTE_MASK;
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.clearMetaDataFollowsFlag;
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag;
+import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.setMetaDataFollowsFlagInKey;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.spy;
@@ -425,21 +430,19 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
        @Test
        public void testConsistentSnapshotSerializationFlagsAndMasks() {
 
-               Assert.assertEquals(0xFFFF, 
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK);
-               Assert.assertEquals(0x80, 
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
+               Assert.assertEquals(0xFFFF, END_OF_KEY_GROUP_MARK);
+               Assert.assertEquals(0x80, FIRST_BIT_IN_BYTE_MASK);
 
                byte[] expectedKey = new byte[] {42, 42};
                byte[] modKey = expectedKey.clone();
 
-               Assert.assertFalse(
-                       
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+               Assert.assertFalse(hasMetaDataFollowsFlag(modKey));
 
-               
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.setMetaDataFollowsFlagInKey(modKey);
-               
Assert.assertTrue(RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+               setMetaDataFollowsFlagInKey(modKey);
+               Assert.assertTrue(hasMetaDataFollowsFlag(modKey));
 
-               
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(modKey);
-               Assert.assertFalse(
-                       
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+               clearMetaDataFollowsFlag(modKey);
+               Assert.assertFalse(hasMetaDataFollowsFlag(modKey));
 
                Assert.assertTrue(Arrays.equals(expectedKey, modKey));
        }
@@ -504,12 +507,12 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 
                @Nullable
                @Override
-               public StreamStateHandle closeAndGetHandle() throws IOException 
{
+               public StreamStateHandle closeAndGetHandle() {
                        throw new UnsupportedOperationException();
                }
 
                @Override
-               public long getPos() throws IOException {
+               public long getPos() {
                        throw new UnsupportedOperationException();
                }
 
@@ -529,7 +532,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
                }
 
                @Override
-               public void close() throws IOException {
+               public void close() {
                        throw new UnsupportedOperationException();
                }
        }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 0ea0d3f..4916251 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -191,6 +191,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                allCreatedCloseables = new ArrayList<>();
 
                keyedStateBackend.db = spy(keyedStateBackend.db);
+               keyedStateBackend.initializeSnapshotStrategy(null);
 
                doAnswer(new Answer<Object>() {
 

Reply via email to