http://git-wip-us.apache.org/repos/asf/flink/blob/1619fa8a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 344255f..0cb2792 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -52,19 +52,25 @@ import 
org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.DirectoryStateHandle;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
+import org.apache.flink.runtime.state.SnapshotDirectory;
 import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.SnapshotStrategy;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
@@ -77,12 +83,14 @@ import 
org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ResourceGuard;
 import org.apache.flink.util.StateMigrationException;
+import org.apache.flink.util.function.SupplierWithException;
 
 import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyDescriptor;
@@ -104,6 +112,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -122,7 +131,6 @@ import java.util.Spliterator;
 import java.util.Spliterators;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableFuture;
 import java.util.stream.Stream;
@@ -217,6 +225,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /** The configuration of local recovery. */
        private final LocalRecoveryConfig localRecoveryConfig;
 
+       /** The snapshot strategy, e.g., if we use full or incremental 
checkpoints, local state, and so on. */
+       private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> 
snapshotStrategy;
+
        public RocksDBKeyedStateBackend(
                String operatorIdentifier,
                ClassLoader userCodeClassLoader,
@@ -248,26 +259,41 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                this.instanceBasePath = 
Preconditions.checkNotNull(instanceBasePath);
                this.instanceRocksDBPath = new File(instanceBasePath, "db");
 
-               if (instanceBasePath.exists()) {
+               checkAndCreateDirectory(instanceBasePath);
+
+               if (instanceRocksDBPath.exists()) {
                        // Clear the base directory when the backend is created
                        // in case something crashed and the backend never 
reached dispose()
                        cleanInstanceBasePath();
                }
 
-               if (!instanceBasePath.mkdirs()) {
-                       throw new IOException(
-                                       String.format("Could not create RocksDB 
data directory at %s.", instanceBasePath.getAbsolutePath()));
-               }
-
                this.localRecoveryConfig = 
Preconditions.checkNotNull(localRecoveryConfig);
                this.keyGroupPrefixBytes = getNumberOfKeyGroups() > 
(Byte.MAX_VALUE + 1) ? 2 : 1;
                this.kvStateInformation = new HashMap<>();
                this.restoredKvStateMetaInfos = new HashMap<>();
                this.materializedSstFiles = new TreeMap<>();
                this.backendUID = UUID.randomUUID();
+
+               this.snapshotStrategy = enableIncrementalCheckpointing ?
+                       new IncrementalSnapshotStrategy() :
+                       new FullSnapshotStrategy();
+
                LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
        }
 
+       private static void checkAndCreateDirectory(File directory) throws 
IOException {
+               if (directory.exists()) {
+                       if (!directory.isDirectory()) {
+                               throw new IOException("Not a directory: " + 
directory);
+                       }
+               } else {
+                       if (!directory.mkdirs()) {
+                               throw new IOException(
+                                       String.format("Could not create RocksDB 
data directory at %s.", directory));
+                       }
+               }
+       }
+
        @Override
        public <N> Stream<K> getKeys(String state, N namespace) {
                Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> columnInfo = 
kvStateInformation.get(state);
@@ -294,7 +320,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                RocksIterator iterator = db.newIterator(columnInfo.f0);
                iterator.seekToFirst();
 
-               final RocksIteratorWrapper<K> iteratorWrapper = new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
+               final RocksIteratorForKeysWrapper<K> iteratorWrapper = new 
RocksIteratorForKeysWrapper<>(iterator, state, keySerializer, 
keyGroupPrefixBytes,
                        ambiguousKeyPossible, nameSpaceBytes);
 
                Stream<K> targetStream = 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, 
Spliterator.ORDERED), false);
@@ -381,1743 +407,2053 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                final CheckpointStreamFactory streamFactory,
                CheckpointOptions checkpointOptions) throws Exception {
 
-               if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-                       enableIncrementalCheckpointing) {
-                       return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-               } else {
-                       return snapshotFully(checkpointId, timestamp, 
streamFactory);
-               }
+               return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
        }
 
-       private RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshotIncrementally(
-               final long checkpointId,
-               final long checkpointTimestamp,
-               final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-               if (db == null) {
-                       throw new IOException("RocksDB closed.");
-               }
+       @Override
+       public void restore(Collection<KeyedStateHandle> restoreState) throws 
Exception {
+               LOG.info("Initializing RocksDB keyed state backend.");
 
-               if (kvStateInformation.isEmpty()) {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-                                               checkpointTimestamp);
-                       }
-                       return DoneFuture.of(SnapshotResult.empty());
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
                }
 
-               final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
-                       new RocksDBIncrementalSnapshotOperation<>(
-                               this,
-                               checkpointStreamFactory,
-                               checkpointId,
-                               checkpointTimestamp);
+               // clear all meta data
+               kvStateInformation.clear();
+               restoredKvStateMetaInfos.clear();
 
                try {
-                       snapshotOperation.takeSnapshot();
-               } catch (Exception e) {
-                       snapshotOperation.stop();
-                       snapshotOperation.releaseResources(true);
-                       throw e;
-               }
-
-               return new FutureTask<SnapshotResult<KeyedStateHandle>>(
-                       new Callable<SnapshotResult<KeyedStateHandle>>() {
-                               @Override
-                               public SnapshotResult<KeyedStateHandle> call() 
throws Exception {
-                                       KeyedStateHandle keyedStateHandle = 
snapshotOperation.materializeSnapshot();
-                                       return 
SnapshotResult.of(keyedStateHandle);
+                       if (restoreState == null || restoreState.isEmpty()) {
+                               createDB();
+                       } else {
+                               KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+                               if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+                                       || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+                                       RocksDBIncrementalRestoreOperation<K> 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+                                       restoreOperation.restore(restoreState);
+                               } else {
+                                       RocksDBFullRestoreOperation<K> 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+                                       
restoreOperation.doRestore(restoreState);
                                }
                        }
-               ) {
-                       @Override
-                       public boolean cancel(boolean mayInterruptIfRunning) {
-                               snapshotOperation.stop();
-                               return super.cancel(mayInterruptIfRunning);
-                       }
-
-                       @Override
-                       protected void done() {
-                               
snapshotOperation.releaseResources(isCancelled());
-                       }
-               };
+               } catch (Exception ex) {
+                       dispose();
+                       throw ex;
+               }
        }
 
-       private RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotFully(
-               final long checkpointId,
-               final long timestamp,
-               final CheckpointStreamFactory streamFactory) throws Exception {
-
-               long startTime = System.currentTimeMillis();
-               final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
-
-               final RocksDBFullSnapshotOperation<K> snapshotOperation;
-
-               if (kvStateInformation.isEmpty()) {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.", timestamp);
-                       }
+       @Override
+       public void notifyCheckpointComplete(long completedCheckpointId) {
 
-                       return DoneFuture.of(SnapshotResult.empty());
+               if (!enableIncrementalCheckpointing) {
+                       return;
                }
 
-               snapshotOperation = new RocksDBFullSnapshotOperation<>(this, 
streamFactory, snapshotCloseableRegistry);
-               snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
-
-               // implementation of the async IO operation, based on FutureTask
-               
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable 
=
-                       new 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
+               synchronized (materializedSstFiles) {
 
-                               @Override
-                               protected void acquireResources() throws 
Exception {
-                                       
cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
-                                       
snapshotOperation.openCheckpointStream();
-                               }
+                       if (completedCheckpointId < lastCompletedCheckpointId) {
+                               return;
+                       }
 
-                               @Override
-                               protected void releaseResources() throws 
Exception {
-                                       closeLocalRegistry();
-                                       releaseSnapshotOperationResources();
-                               }
+                       materializedSstFiles.keySet().removeIf(checkpointId -> 
checkpointId < completedCheckpointId);
 
-                               private void 
releaseSnapshotOperationResources() {
-                                       // hold the db lock while operation on 
the db to guard us against async db disposal
-                                       
snapshotOperation.releaseSnapshotResources();
-                               }
+                       lastCompletedCheckpointId = completedCheckpointId;
+               }
+       }
 
-                               @Override
-                               protected void stopOperation() throws Exception 
{
-                                       closeLocalRegistry();
-                               }
+       private void createDB() throws IOException {
+               List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
+               this.db = openDB(instanceRocksDBPath.getAbsolutePath(), 
Collections.emptyList(), columnFamilyHandles);
+               this.defaultColumnFamily = columnFamilyHandles.get(0);
+       }
 
-                               private void closeLocalRegistry() {
-                                       if 
(cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
-                                               try {
-                                                       
snapshotCloseableRegistry.close();
-                                               } catch (Exception ex) {
-                                                       LOG.warn("Error closing 
local registry", ex);
-                                               }
-                                       }
-                               }
+       private RocksDB openDB(
+               String path,
+               List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
+               List<ColumnFamilyHandle> stateColumnFamilyHandles) throws 
IOException {
 
-                               @Override
-                               public SnapshotResult<KeyedStateHandle> 
performOperation() throws Exception {
-                                       long startTime = 
System.currentTimeMillis();
+               List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+                       new ArrayList<>(1 + 
stateColumnFamilyDescriptors.size());
 
-                                       if (isStopped()) {
-                                               throw new IOException("RocksDB 
closed.");
-                                       }
+               columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
 
-                                       snapshotOperation.writeDBSnapshot();
+               // we add the required descriptor for the default CF in last 
position.
+               columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
 
-                                       LOG.info("Asynchronous RocksDB snapshot 
({}, asynchronous part) in thread {} took {} ms.",
-                                               streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - startTime));
+               RocksDB dbRef;
 
-                                       KeyGroupsStateHandle 
snapshotResultStateHandle = snapshotOperation.getSnapshotResultStateHandle();
-                                       return 
SnapshotResult.of(snapshotResultStateHandle);
-                               }
-                       };
+               try {
+                       dbRef = RocksDB.open(
+                               Preconditions.checkNotNull(dbOptions),
+                               Preconditions.checkNotNull(path),
+                               columnFamilyDescriptors,
+                               stateColumnFamilyHandles);
+               } catch (RocksDBException e) {
+                       throw new IOException("Error while opening RocksDB 
instance.", e);
+               }
 
-               LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) 
in thread {} took {} ms.",
-                               streamFactory, Thread.currentThread(), 
(System.currentTimeMillis() - startTime));
+               // requested + default CF
+               Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
+                       "Not all requested column family handles have been 
created");
 
-               return AsyncStoppableTaskWithCallback.from(ioCallable);
+               return dbRef;
        }
 
        /**
-        * Encapsulates the process to perform a snapshot of a 
RocksDBKeyedStateBackend.
+        * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from a full snapshot.
         */
-       static final class RocksDBFullSnapshotOperation<K> {
-
-               static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
-               static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
-
-               private final RocksDBKeyedStateBackend<K> stateBackend;
-               private final KeyGroupRangeOffsets keyGroupRangeOffsets;
-               private final CheckpointStreamFactory checkpointStreamFactory;
-               private final CloseableRegistry snapshotCloseableRegistry;
-               private final ResourceGuard.Lease dbLease;
-
-               private long checkpointId;
-               private long checkpointTimeStamp;
+       private static final class RocksDBFullRestoreOperation<K> {
 
-               private Snapshot snapshot;
-               private ReadOptions readOptions;
-               private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
-
-               private CheckpointStreamFactory.CheckpointStateOutputStream 
outStream;
-               private DataOutputView outputView;
-
-               RocksDBFullSnapshotOperation(
-                       RocksDBKeyedStateBackend<K> stateBackend,
-                       CheckpointStreamFactory checkpointStreamFactory,
-                       CloseableRegistry registry) throws IOException {
+               private final RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend;
 
-                       this.stateBackend = stateBackend;
-                       this.checkpointStreamFactory = checkpointStreamFactory;
-                       this.keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(stateBackend.keyGroupRange);
-                       this.snapshotCloseableRegistry = registry;
-                       this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
-               }
+               /** Current key-groups state handle from which we restore 
key-groups. */
+               private KeyGroupsStateHandle currentKeyGroupsStateHandle;
+               /** Current input stream we obtained from 
currentKeyGroupsStateHandle. */
+               private FSDataInputStream currentStateHandleInStream;
+               /** Current data input view that wraps 
currentStateHandleInStream. */
+               private DataInputView currentStateHandleInView;
+               /** Current list of ColumnFamilyHandles for all column families 
we restore from currentKeyGroupsStateHandle. */
+               private List<ColumnFamilyHandle> 
currentStateHandleKVStateColumnFamilies;
+               /** The compression decorator that was used for writing the 
state, as determined by the meta data. */
+               private StreamCompressionDecorator 
keygroupStreamCompressionDecorator;
 
                /**
-                * 1) Create a snapshot object from RocksDB.
+                * Creates a restore operation object for the given state 
backend instance.
                 *
-                * @param checkpointId id of the checkpoint for which we take 
the snapshot
-                * @param checkpointTimeStamp timestamp of the checkpoint for 
which we take the snapshot
+                * @param rocksDBKeyedStateBackend the state backend into which 
we restore
                 */
-               public void takeDBSnapShot(long checkpointId, long 
checkpointTimeStamp) {
-                       Preconditions.checkArgument(snapshot == null, "Only one 
ongoing snapshot allowed!");
-                       this.kvStateIterators = new 
ArrayList<>(stateBackend.kvStateInformation.size());
-                       this.checkpointId = checkpointId;
-                       this.checkpointTimeStamp = checkpointTimeStamp;
-                       this.snapshot = stateBackend.db.getSnapshot();
+               public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend) {
+                       this.rocksDBKeyedStateBackend = 
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
                }
 
                /**
-                * 2) Open CheckpointStateOutputStream through the 
checkpointStreamFactory into which we will write.
+                * Restores all key-groups data that is referenced by the 
passed state handles.
                 *
-                * @throws Exception
+                * @param keyedStateHandles List of all key groups state 
handles that shall be restored.
                 */
-               public void openCheckpointStream() throws Exception {
-                       Preconditions.checkArgument(outStream == null, "Output 
stream for snapshot is already set.");
-                       outStream = 
checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
-                       snapshotCloseableRegistry.registerCloseable(outStream);
-                       outputView = new DataOutputViewStreamWrapper(outStream);
-               }
+               public void doRestore(Collection<KeyedStateHandle> 
keyedStateHandles)
+                       throws IOException, StateMigrationException, 
RocksDBException {
 
-               /**
-                * 3) Write the actual data from RocksDB from the time we took 
the snapshot object in (1).
-                *
-                * @throws IOException
-                */
-               public void writeDBSnapshot() throws IOException, 
InterruptedException {
+                       rocksDBKeyedStateBackend.createDB();
 
-                       if (null == snapshot) {
-                               throw new IOException("No snapshot available. 
Might be released due to cancellation.");
-                       }
+                       for (KeyedStateHandle keyedStateHandle : 
keyedStateHandles) {
+                               if (keyedStateHandle != null) {
 
-                       Preconditions.checkNotNull(outStream, "No output stream 
to write snapshot.");
-                       writeKVStateMetaData();
-                       writeKVStateData();
+                                       if (!(keyedStateHandle instanceof 
KeyGroupsStateHandle)) {
+                                               throw new 
IllegalStateException("Unexpected state handle type, " +
+                                                       "expected: " + 
KeyGroupsStateHandle.class +
+                                                       ", but found: " + 
keyedStateHandle.getClass());
+                                       }
+                                       this.currentKeyGroupsStateHandle = 
(KeyGroupsStateHandle) keyedStateHandle;
+                                       restoreKeyGroupsInStateHandle();
+                               }
+                       }
                }
 
                /**
-                * 4) Returns a state handle to the snapshot after the snapshot 
procedure is completed and null before.
-                *
-                * @return state handle to the completed snapshot
+                * Restore one key groups state handle.
                 */
-               public KeyGroupsStateHandle getSnapshotResultStateHandle() 
throws IOException {
-
-                       if 
(snapshotCloseableRegistry.unregisterCloseable(outStream)) {
-
-                               StreamStateHandle stateHandle = 
outStream.closeAndGetHandle();
-                               outStream = null;
-
-                               if (stateHandle != null) {
-                                       return new 
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+               private void restoreKeyGroupsInStateHandle()
+                       throws IOException, StateMigrationException, 
RocksDBException {
+                       try {
+                               currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
+                               
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
+                               currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
+                               restoreKVStateMetaData();
+                               restoreKVStateData();
+                       } finally {
+                               if 
(rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
 {
+                                       
IOUtils.closeQuietly(currentStateHandleInStream);
                                }
                        }
-                       return null;
                }
 
                /**
-                * 5) Release the snapshot object for RocksDB and clean up.
+                * Restore the KV-state / ColumnFamily meta data for all 
key-groups referenced by the current state handle.
+                *
+                * @throws IOException
+                * @throws ClassNotFoundException
+                * @throws RocksDBException
                 */
-               public void releaseSnapshotResources() {
+               private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
 
-                       outStream = null;
+                       KeyedBackendSerializationProxy<K> serializationProxy =
+                               new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
 
-                       if (null != kvStateIterators) {
-                               for (Tuple2<RocksIterator, Integer> 
kvStateIterator : kvStateIterators) {
-                                       
IOUtils.closeQuietly(kvStateIterator.f0);
-                               }
-                               kvStateIterators = null;
-                       }
+                       serializationProxy.read(currentStateHandleInView);
 
-                       if (null != snapshot) {
-                               if (null != stateBackend.db) {
-                                       
stateBackend.db.releaseSnapshot(snapshot);
-                               }
-                               IOUtils.closeQuietly(snapshot);
-                               snapshot = null;
-                       }
+                       // check for key serializer compatibility; this also 
reconfigures the
+                       // key serializer to be compatible, if it is required 
and is possible
+                       if (CompatibilityUtil.resolveCompatibilityResult(
+                               serializationProxy.getKeySerializer(),
+                               UnloadableDummyTypeSerializer.class,
+                               
serializationProxy.getKeySerializerConfigSnapshot(),
+                               rocksDBKeyedStateBackend.keySerializer)
+                               .isRequiresMigration()) {
 
-                       if (null != readOptions) {
-                               IOUtils.closeQuietly(readOptions);
-                               readOptions = null;
+                               // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
+                               throw new StateMigrationException("The new key 
serializer is not compatible to read previous keys. " +
+                                       "Aborting now since state migration is 
currently not available");
                        }
 
-                       this.dbLease.close();
-               }
+                       this.keygroupStreamCompressionDecorator = 
serializationProxy.isUsingKeyGroupCompression() ?
+                               SnappyStreamCompressionDecorator.INSTANCE : 
UncompressedStreamCompressionDecorator.INSTANCE;
 
-               private void writeKVStateMetaData() throws IOException {
+                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
+                               serializationProxy.getStateMetaInfoSnapshots();
+                       currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
+                       //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = 
new HashMap<>(restoredMetaInfos.size());
 
-                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> metaInfoSnapshots =
-                               new 
ArrayList<>(stateBackend.kvStateInformation.size());
+                       for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
restoredMetaInfo : restoredMetaInfos) {
 
-                       int kvStateId = 0;
-                       for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
-                               stateBackend.kvStateInformation.entrySet()) {
+                               Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
+                                       
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
 
-                               
metaInfoSnapshots.add(column.getValue().f1.snapshot());
+                               if (registeredColumn == null) {
+                                       byte[] nameBytes = 
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
 
-                               //retrieve iterator for this k/v states
-                               readOptions = new ReadOptions();
-                               readOptions.setSnapshot(snapshot);
-
-                               kvStateIterators.add(
-                                       new 
Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), 
kvStateId));
+                                       ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
+                                               nameBytes,
+                                               
rocksDBKeyedStateBackend.columnOptions);
 
-                               ++kvStateId;
-                       }
+                                       RegisteredKeyedBackendStateMetaInfo<?, 
?> stateMetaInfo =
+                                               new 
RegisteredKeyedBackendStateMetaInfo<>(
+                                                       
restoredMetaInfo.getStateType(),
+                                                       
restoredMetaInfo.getName(),
+                                                       
restoredMetaInfo.getNamespaceSerializer(),
+                                                       
restoredMetaInfo.getStateSerializer());
 
-                       KeyedBackendSerializationProxy<K> serializationProxy =
-                               new KeyedBackendSerializationProxy<>(
-                                       stateBackend.getKeySerializer(),
-                                       metaInfoSnapshots,
-                                       
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
stateBackend.keyGroupCompressionDecorator));
+                                       
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
 restoredMetaInfo);
 
-                       serializationProxy.write(outputView);
-               }
+                                       ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
 
-               private void writeKVStateData() throws IOException, 
InterruptedException {
+                                       registeredColumn = new 
Tuple2<>(columnFamily, stateMetaInfo);
+                                       
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), 
registeredColumn);
 
-                       byte[] previousKey = null;
-                       byte[] previousValue = null;
-                       OutputStream kgOutStream = null;
-                       DataOutputView kgOutView = null;
+                               } else {
+                                       // TODO with eager state registration 
in place, check here for serializer migration strategies
+                               }
+                               
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
+                       }
+               }
 
-                       try {
-                               // Here we transfer ownership of RocksIterators 
to the RocksDBMergeIterator
-                               try (RocksDBMergeIterator mergeIterator = new 
RocksDBMergeIterator(
-                                       kvStateIterators, 
stateBackend.keyGroupPrefixBytes)) {
+               /**
+                * Restore the KV-state / ColumnFamily data for all key-groups 
referenced by the current state handle.
+                *
+                * @throws IOException
+                * @throws RocksDBException
+                */
+               private void restoreKVStateData() throws IOException, 
RocksDBException {
+                       //for all key-groups in the current state handle...
+                       for (Tuple2<Integer, Long> keyGroupOffset : 
currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
+                               int keyGroup = keyGroupOffset.f0;
 
-                                       // handover complete, null out to 
prevent double close
-                                       kvStateIterators = null;
+                               // Check that restored key groups all belong to 
the backend
+                               
Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
+                                       "The key group must belong to the 
backend");
 
-                                       //preamble: setup with first key-group 
as our lookahead
-                                       if (mergeIterator.isValid()) {
-                                               //begin first key-group by 
recording the offset
-                                               
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
-                                               //write the k/v-state id as 
metadata
-                                               kgOutStream = 
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
-                                               kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
+                               long offset = keyGroupOffset.f1;
+                               //not empty key-group?
+                               if (0L != offset) {
+                                       currentStateHandleInStream.seek(offset);
+                                       try (InputStream compressedKgIn = 
keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
 {
+                                               DataInputViewStreamWrapper 
compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
                                                //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                               
kgOutView.writeShort(mergeIterator.kvStateId());
-                                               previousKey = 
mergeIterator.key();
-                                               previousValue = 
mergeIterator.value();
-                                               mergeIterator.next();
+                                               int kvStateId = 
compressedKgInputView.readShort();
+                                               ColumnFamilyHandle handle = 
currentStateHandleKVStateColumnFamilies.get(kvStateId);
+                                               //insert all k/v pairs into DB
+                                               boolean keyGroupHasMoreKeys = 
true;
+                                               while (keyGroupHasMoreKeys) {
+                                                       byte[] key = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+                                                       byte[] value = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+                                                       if 
(RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
+                                                               //clear the 
signal bit in the key to make it ready for insertion again
+                                                               
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
+                                                               
rocksDBKeyedStateBackend.db.put(handle, key, value);
+                                                               //TODO this 
could be aware of keyGroupPrefixBytes and write only one byte if possible
+                                                               kvStateId = 
RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
+                                                                       & 
compressedKgInputView.readShort();
+                                                               if 
(RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
+                                                                       
keyGroupHasMoreKeys = false;
+                                                               } else {
+                                                                       handle 
= currentStateHandleKVStateColumnFamilies.get(kvStateId);
+                                                               }
+                                                       } else {
+                                                               
rocksDBKeyedStateBackend.db.put(handle, key, value);
+                                                       }
+                                               }
                                        }
+                               }
+                       }
+               }
+       }
 
-                                       //main loop: write k/v pairs ordered by 
(key-group, kv-state), thereby tracking key-group offsets.
-                                       while (mergeIterator.isValid()) {
-
-                                               assert 
(!hasMetaDataFollowsFlag(previousKey));
+       /**
+        * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from an incremental snapshot.
+        */
+       private static class RocksDBIncrementalRestoreOperation<T> {
 
-                                               //set signal in first key byte 
that meta data will follow in the stream after this k/v pair
-                                               if 
(mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
+               private final RocksDBKeyedStateBackend<T> stateBackend;
 
-                                                       //be cooperative and 
check for interruption from time to time in the hot loop
-                                                       checkInterrupted();
+               private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
+                       this.stateBackend = stateBackend;
+               }
 
-                                                       
setMetaDataFollowsFlagInKey(previousKey);
-                                               }
+               /**
+                * Root method that branches for different implementations of 
{@link KeyedStateHandle}.
+                */
+               void restore(Collection<KeyedStateHandle> restoreStateHandles) 
throws Exception {
 
-                                               writeKeyValuePair(previousKey, 
previousValue, kgOutView);
+                       boolean hasExtraKeys = (restoreStateHandles.size() > 1 
||
+                               
!Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(), 
stateBackend.keyGroupRange));
 
-                                               //write meta data if we have to
-                                               if 
(mergeIterator.isNewKeyGroup()) {
-                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
-                                                       
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
-                                                       // this will just close 
the outer stream
-                                                       kgOutStream.close();
-                                                       //begin new key-group
-                                                       
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
-                                                       //write the kev-state
-                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
-                                                       kgOutStream = 
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
-                                                       kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
-                                                       
kgOutView.writeShort(mergeIterator.kvStateId());
-                                               } else if 
(mergeIterator.isNewKeyValueState()) {
-                                                       //write the k/v-state
-                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
-                                                       
kgOutView.writeShort(mergeIterator.kvStateId());
-                                               }
+                       if (hasExtraKeys) {
+                               stateBackend.createDB();
+                       }
 
-                                               //request next k/v pair
-                                               previousKey = 
mergeIterator.key();
-                                               previousValue = 
mergeIterator.value();
-                                               mergeIterator.next();
-                                       }
-                               }
+                       for (KeyedStateHandle rawStateHandle : 
restoreStateHandles) {
 
-                               //epilogue: write last key-group
-                               if (previousKey != null) {
-                                       assert 
(!hasMetaDataFollowsFlag(previousKey));
-                                       
setMetaDataFollowsFlagInKey(previousKey);
-                                       writeKeyValuePair(previousKey, 
previousValue, kgOutView);
-                                       //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                       
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
-                                       // this will just close the outer stream
-                                       kgOutStream.close();
-                                       kgOutStream = null;
+                               if (rawStateHandle instanceof 
IncrementalKeyedStateHandle) {
+                                       
restoreInstance((IncrementalKeyedStateHandle) rawStateHandle, hasExtraKeys);
+                               } else if (rawStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+                                       Preconditions.checkState(!hasExtraKeys, 
"Cannot recover from local state after rescaling.");
+                                       
restoreInstance((IncrementalLocalKeyedStateHandle) rawStateHandle);
+                               } else {
+                                       throw new 
IllegalStateException("Unexpected state handle type, " +
+                                               "expected " + 
IncrementalKeyedStateHandle.class +
+                                               ", but found " + 
rawStateHandle.getClass());
                                }
-
-                       } finally {
-                               // this will just close the outer stream
-                               IOUtils.closeQuietly(kgOutStream);
                        }
                }
 
-               private void writeKeyValuePair(byte[] key, byte[] value, 
DataOutputView out) throws IOException {
-                       BytePrimitiveArraySerializer.INSTANCE.serialize(key, 
out);
-                       BytePrimitiveArraySerializer.INSTANCE.serialize(value, 
out);
-               }
+               /**
+                * Recovery from remote incremental state.
+                */
+               private void restoreInstance(
+                       IncrementalKeyedStateHandle restoreStateHandle,
+                       boolean hasExtraKeys) throws Exception {
 
-               static void setMetaDataFollowsFlagInKey(byte[] key) {
-                       key[0] |= FIRST_BIT_IN_BYTE_MASK;
-               }
+                       // read state data
+                       Path temporaryRestoreInstancePath = new Path(
+                               stateBackend.instanceBasePath.getAbsolutePath(),
+                               UUID.randomUUID().toString());
 
-               static void clearMetaDataFollowsFlag(byte[] key) {
-                       key[0] &= 
(~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
-               }
+                       try {
 
-               static boolean hasMetaDataFollowsFlag(byte[] key) {
-                       return 0 != (key[0] & 
RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
-               }
+                               
transferAllStateDataToDirectory(restoreStateHandle, 
temporaryRestoreInstancePath);
 
-               private static void checkInterrupted() throws 
InterruptedException {
-                       if (Thread.currentThread().isInterrupted()) {
-                               throw new InterruptedException("RocksDB 
snapshot interrupted.");
+                               // read meta data
+                               
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
=
+                                       
readMetaData(restoreStateHandle.getMetaStateHandle());
+
+                               List<ColumnFamilyDescriptor> 
columnFamilyDescriptors =
+                                       
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
+
+                               if (hasExtraKeys) {
+                                       
restoreKeyGroupsShardWithTemporaryHelperInstance(
+                                               temporaryRestoreInstancePath,
+                                               columnFamilyDescriptors,
+                                               stateMetaInfoSnapshots);
+                               } else {
+
+                                       // since we transferred all remote 
state to a local directory, we can use the same code as for
+                                       // local recovery.
+                                       IncrementalLocalKeyedStateHandle 
localKeyedStateHandle = new IncrementalLocalKeyedStateHandle(
+                                               
restoreStateHandle.getBackendIdentifier(),
+                                               
restoreStateHandle.getCheckpointId(),
+                                               new 
DirectoryStateHandle(temporaryRestoreInstancePath),
+                                               
restoreStateHandle.getKeyGroupRange(),
+                                               
restoreStateHandle.getMetaStateHandle(),
+                                               
restoreStateHandle.getSharedState().keySet());
+
+                                       restoreLocalStateIntoFullInstance(
+                                               localKeyedStateHandle,
+                                               columnFamilyDescriptors,
+                                               stateMetaInfoSnapshots);
+                               }
+                       } finally {
+                               FileSystem restoreFileSystem = 
temporaryRestoreInstancePath.getFileSystem();
+                               if 
(restoreFileSystem.exists(temporaryRestoreInstancePath)) {
+                                       
restoreFileSystem.delete(temporaryRestoreInstancePath, true);
+                               }
                        }
                }
-       }
 
-       private static final class RocksDBIncrementalSnapshotOperation<K> {
+               /**
+                * Recovery from local incremental state.
+                */
+               private void restoreInstance(IncrementalLocalKeyedStateHandle 
localKeyedStateHandle) throws Exception {
+                       // read meta data
+                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots =
+                               
readMetaData(localKeyedStateHandle.getMetaDataState());
 
-               /** The backend which we snapshot. */
-               private final RocksDBKeyedStateBackend<K> stateBackend;
+                       List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+                               
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
 
-               /** Stream factory that creates the outpus streams to DFS. */
-               private final CheckpointStreamFactory checkpointStreamFactory;
+                       restoreLocalStateIntoFullInstance(
+                               localKeyedStateHandle,
+                               columnFamilyDescriptors,
+                               stateMetaInfoSnapshots);
+               }
 
-               /** Id for the current checkpoint. */
-               private final long checkpointId;
+               /**
+                * This method recreates and registers all {@link 
ColumnFamilyDescriptor} from Flink's state meta data snapshot.
+                */
+               private List<ColumnFamilyDescriptor> 
createAndRegisterColumnFamilyDescriptors(
+                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) {
 
-               /** Timestamp for the current checkpoint. */
-               private final long checkpointTimestamp;
+                       List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+                               new ArrayList<>(1 + 
stateMetaInfoSnapshots.size());
 
-               /** All sst files that were part of the last previously 
completed checkpoint. */
-               private Set<StateHandleID> baseSstFiles;
+                       for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
 
-               /** The state meta data. */
-               private final 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
= new ArrayList<>();
+                               ColumnFamilyDescriptor columnFamilyDescriptor = 
new ColumnFamilyDescriptor(
+                                       
stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+                                       stateBackend.columnOptions);
+
+                               
columnFamilyDescriptors.add(columnFamilyDescriptor);
+                               
stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), 
stateMetaInfoSnapshot);
+                       }
+                       return columnFamilyDescriptors;
+               }
+
+               /**
+                * This method implements the core of the restore logic that 
unifies how local and remote state are recovered.
+                */
+               private void restoreLocalStateIntoFullInstance(
+                       IncrementalLocalKeyedStateHandle restoreStateHandle,
+                       List<ColumnFamilyDescriptor> columnFamilyDescriptors,
+                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) throws Exception {
+                       // pick up again the old backend id, so the we can 
reference existing state
+                       stateBackend.backendUID = 
restoreStateHandle.getBackendIdentifier();
+
+                       LOG.debug("Restoring keyed backend uid in operator {} 
from incremental snapshot to {}.",
+                               stateBackend.operatorIdentifier, 
stateBackend.backendUID);
+
+                       // create hard links in the instance directory
+                       if (!stateBackend.instanceRocksDBPath.mkdirs()) {
+                               throw new IOException("Could not create RocksDB 
data directory.");
+                       }
 
-               /** Local filesystem for the RocksDB backup. */
-               private FileSystem backupFileSystem;
+                       Path restoreSourcePath = 
restoreStateHandle.getDirectoryStateHandle().getDirectory();
+                       restoreInstanceDirectoryFromPath(restoreSourcePath);
 
-               /** Local path for the RocksDB backup. */
-               private Path backupPath;
+                       List<ColumnFamilyHandle> columnFamilyHandles =
+                               new ArrayList<>(1 + 
columnFamilyDescriptors.size());
 
-               // Registry for all opened i/o streams
-               private final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+                       stateBackend.db = stateBackend.openDB(
+                               
stateBackend.instanceRocksDBPath.getAbsolutePath(),
+                               columnFamilyDescriptors, columnFamilyHandles);
 
-               // new sst files since the last completed checkpoint
-               private final Map<StateHandleID, StreamStateHandle> sstFiles = 
new HashMap<>();
+                       // extract and store the default column family which is 
located at the last index
+                       stateBackend.defaultColumnFamily = 
columnFamilyHandles.remove(columnFamilyHandles.size() - 1);
 
-               // handles to the misc files in the current snapshot
-               private final Map<StateHandleID, StreamStateHandle> miscFiles = 
new HashMap<>();
+                       for (int i = 0; i < columnFamilyDescriptors.size(); 
++i) {
+                               RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
 
-               // This lease protects from concurrent disposal of the native 
rocksdb instance.
-               private final ResourceGuard.Lease dbLease;
+                               ColumnFamilyHandle columnFamilyHandle = 
columnFamilyHandles.get(i);
+                               RegisteredKeyedBackendStateMetaInfo<?, ?> 
stateMetaInfo =
+                                       new 
RegisteredKeyedBackendStateMetaInfo<>(
+                                               
stateMetaInfoSnapshot.getStateType(),
+                                               stateMetaInfoSnapshot.getName(),
+                                               
stateMetaInfoSnapshot.getNamespaceSerializer(),
+                                               
stateMetaInfoSnapshot.getStateSerializer());
 
-               private StreamStateHandle metaStateHandle = null;
+                               stateBackend.kvStateInformation.put(
+                                       stateMetaInfoSnapshot.getName(),
+                                       new Tuple2<>(columnFamilyHandle, 
stateMetaInfo));
+                       }
 
-               private RocksDBIncrementalSnapshotOperation(
-                       RocksDBKeyedStateBackend<K> stateBackend,
-                       CheckpointStreamFactory checkpointStreamFactory,
-                       long checkpointId,
-                       long checkpointTimestamp) throws IOException {
+                       // use the restore sst files as the base for succeeding 
checkpoints
+                       synchronized (stateBackend.materializedSstFiles) {
+                               stateBackend.materializedSstFiles.put(
+                                       restoreStateHandle.getCheckpointId(),
+                                       
restoreStateHandle.getSharedStateHandleIDs());
+                       }
 
-                       this.stateBackend = stateBackend;
-                       this.checkpointStreamFactory = checkpointStreamFactory;
-                       this.checkpointId = checkpointId;
-                       this.checkpointTimestamp = checkpointTimestamp;
-                       this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
+                       stateBackend.lastCompletedCheckpointId = 
restoreStateHandle.getCheckpointId();
                }
 
-               private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
-                       FSDataInputStream inputStream = null;
-                       CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+               /**
+                * This recreates the new working directory of the recovered 
RocksDB instance and links/copies the contents from
+                * a local state.
+                */
+               private void restoreInstanceDirectoryFromPath(Path source) 
throws IOException {
 
-                       try {
-                               final byte[] buffer = new byte[8 * 1024];
+                       FileSystem fileSystem = source.getFileSystem();
 
-                               FileSystem backupFileSystem = 
backupPath.getFileSystem();
-                               inputStream = backupFileSystem.open(filePath);
-                               
closeableRegistry.registerCloseable(inputStream);
+                       final FileStatus[] fileStatuses = 
fileSystem.listStatus(source);
 
-                               outputStream = checkpointStreamFactory
-                                       
.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
-                               
closeableRegistry.registerCloseable(outputStream);
-
-                               while (true) {
-                                       int numBytes = inputStream.read(buffer);
-
-                                       if (numBytes == -1) {
-                                               break;
-                                       }
-
-                                       outputStream.write(buffer, 0, numBytes);
-                               }
-
-                               StreamStateHandle result = null;
-                               if 
(closeableRegistry.unregisterCloseable(outputStream)) {
-                                       result = 
outputStream.closeAndGetHandle();
-                                       outputStream = null;
-                               }
-                               return result;
-
-                       } finally {
-
-                               if 
(closeableRegistry.unregisterCloseable(inputStream)) {
-                                       inputStream.close();
-                               }
+                       if (fileStatuses == null) {
+                               throw new IOException("Cannot list file 
statues. Directory " + source + " does not exist.");
+                       }
 
-                               if 
(closeableRegistry.unregisterCloseable(outputStream)) {
-                                       outputStream.close();
+                       for (FileStatus fileStatus : fileStatuses) {
+                               final Path filePath = fileStatus.getPath();
+                               final String fileName = filePath.getName();
+                               File restoreFile = new File(source.getPath(), 
fileName);
+                               File targetFile = new 
File(stateBackend.instanceRocksDBPath.getPath(), fileName);
+                               if (fileName.endsWith(SST_FILE_SUFFIX)) {
+                                       // hardlink'ing the immutable sst-files.
+                                       Files.createLink(targetFile.toPath(), 
restoreFile.toPath());
+                               } else {
+                                       // true copy for all other files.
+                                       Files.copy(restoreFile.toPath(), 
targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
                                }
                        }
                }
 
-               private StreamStateHandle materializeMetaData() throws 
Exception {
-                       CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
+               /**
+                * Reads Flink's state meta data file from the state handle.
+                */
+               private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> readMetaData(
+                       StreamStateHandle metaStateHandle) throws Exception {
 
-                       try {
-                               outputStream = checkpointStreamFactory
-                                       
.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
-                               
closeableRegistry.registerCloseable(outputStream);
+                       FSDataInputStream inputStream = null;
 
-                               //no need for compression scheme support 
because sst-files are already compressed
-                               KeyedBackendSerializationProxy<K> 
serializationProxy =
-                                       new KeyedBackendSerializationProxy<>(
-                                               stateBackend.keySerializer,
-                                               stateMetaInfoSnapshots,
-                                               false);
+                       try {
+                               inputStream = metaStateHandle.openInputStream();
+                               
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
 
-                               DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
+                               KeyedBackendSerializationProxy<T> 
serializationProxy =
+                                       new 
KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
+                               DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
+                               serializationProxy.read(in);
 
-                               serializationProxy.write(out);
+                               // check for key serializer compatibility; this 
also reconfigures the
+                               // key serializer to be compatible, if it is 
required and is possible
+                               if 
(CompatibilityUtil.resolveCompatibilityResult(
+                                       serializationProxy.getKeySerializer(),
+                                       UnloadableDummyTypeSerializer.class,
+                                       
serializationProxy.getKeySerializerConfigSnapshot(),
+                                       stateBackend.keySerializer)
+                                       .isRequiresMigration()) {
 
-                               StreamStateHandle result = null;
-                               if 
(closeableRegistry.unregisterCloseable(outputStream)) {
-                                       result = 
outputStream.closeAndGetHandle();
-                                       outputStream = null;
+                                       // TODO replace with state migration; 
note that key hash codes need to remain the same after migration
+                                       throw new StateMigrationException("The 
new key serializer is not compatible to read previous keys. " +
+                                               "Aborting now since state 
migration is currently not available");
                                }
-                               return result;
+
+                               return 
serializationProxy.getStateMetaInfoSnapshots();
                        } finally {
-                               if (outputStream != null) {
-                                       if 
(closeableRegistry.unregisterCloseable(outputStream)) {
-                                               outputStream.close();
-                                       }
+                               if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
+                                       inputStream.close();
                                }
                        }
                }
 
-               void takeSnapshot() throws Exception {
-
-                       final long lastCompletedCheckpoint;
-
-                       // use the last completed checkpoint as the comparison 
base.
-                       synchronized (stateBackend.materializedSstFiles) {
-                               lastCompletedCheckpoint = 
stateBackend.lastCompletedCheckpointId;
-                               baseSstFiles = 
stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
-                       }
-
-                       LOG.trace("Taking incremental snapshot for checkpoint 
{}. Snapshot is based on last completed checkpoint {} " +
-                               "assuming the following (shared) files as base: 
{}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
+               private void transferAllStateDataToDirectory(
+                       IncrementalKeyedStateHandle restoreStateHandle,
+                       Path dest) throws IOException {
 
-                       // save meta data
-                       for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
-                               : stateBackend.kvStateInformation.entrySet()) {
-                               
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
-                       }
+                       final Map<StateHandleID, StreamStateHandle> sstFiles =
+                               restoreStateHandle.getSharedState();
+                       final Map<StateHandleID, StreamStateHandle> miscFiles =
+                               restoreStateHandle.getPrivateState();
 
-                       // save state data
-                       backupPath = new 
Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
+                       transferAllDataFromStateHandles(sstFiles, dest);
+                       transferAllDataFromStateHandles(miscFiles, dest);
+               }
 
-                       LOG.trace("Local RocksDB checkpoint goes to backup path 
{}.", backupPath);
+               /**
+                * Copies all the files from the given stream state handles to 
the given path, renaming the files w.r.t. their
+                * {@link StateHandleID}.
+                */
+               private void transferAllDataFromStateHandles(
+                       Map<StateHandleID, StreamStateHandle> stateHandleMap,
+                       Path restoreInstancePath) throws IOException {
 
-                       backupFileSystem = backupPath.getFileSystem();
-                       if (backupFileSystem.exists(backupPath)) {
-                               throw new IllegalStateException("Unexpected 
existence of the backup directory.");
+                       for (Map.Entry<StateHandleID, StreamStateHandle> entry 
: stateHandleMap.entrySet()) {
+                               StateHandleID stateHandleID = entry.getKey();
+                               StreamStateHandle remoteFileHandle = 
entry.getValue();
+                               copyStateDataHandleData(new 
Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
                        }
-
-                       // create hard links of living files in the checkpoint 
path
-                       Checkpoint checkpoint = 
Checkpoint.create(stateBackend.db);
-                       checkpoint.createCheckpoint(backupPath.getPath());
                }
 
-               KeyedStateHandle materializeSnapshot() throws Exception {
-
-                       
stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
+               /**
+                * Copies the file from a single state handle to the given path.
+                */
+               private void copyStateDataHandleData(
+                       Path restoreFilePath,
+                       StreamStateHandle remoteFileHandle) throws IOException {
 
-                       // write meta data
-                       metaStateHandle = materializeMetaData();
+                       FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
 
-                       // write state data
-                       
Preconditions.checkState(backupFileSystem.exists(backupPath));
+                       FSDataInputStream inputStream = null;
+                       FSDataOutputStream outputStream = null;
 
-                       FileStatus[] fileStatuses = 
backupFileSystem.listStatus(backupPath);
-                       if (fileStatuses != null) {
-                               for (FileStatus fileStatus : fileStatuses) {
-                                       final Path filePath = 
fileStatus.getPath();
-                                       final String fileName = 
filePath.getName();
-                                       final StateHandleID stateHandleID = new 
StateHandleID(fileName);
+                       try {
+                               inputStream = 
remoteFileHandle.openInputStream();
+                               
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
 
-                                       if (fileName.endsWith(SST_FILE_SUFFIX)) 
{
-                                               final boolean existsAlready =
-                                                       baseSstFiles != null && 
baseSstFiles.contains(stateHandleID);
+                               outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+                               
stateBackend.cancelStreamRegistry.registerCloseable(outputStream);
 
-                                               if (existsAlready) {
-                                                       // we introduce a 
placeholder state handle, that is replaced with the
-                                                       // original from the 
shared state registry (created from a previous checkpoint)
-                                                       sstFiles.put(
-                                                               stateHandleID,
-                                                               new 
PlaceholderStreamStateHandle());
-                                               } else {
-                                                       
sstFiles.put(stateHandleID, materializeStateData(filePath));
-                                               }
-                                       } else {
-                                               StreamStateHandle fileHandle = 
materializeStateData(filePath);
-                                               miscFiles.put(stateHandleID, 
fileHandle);
+                               byte[] buffer = new byte[8 * 1024];
+                               while (true) {
+                                       int numBytes = inputStream.read(buffer);
+                                       if (numBytes == -1) {
+                                               break;
                                        }
-                               }
-                       }
-
-                       synchronized (stateBackend.materializedSstFiles) {
-                               
stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
-                       }
-
-                       return new IncrementalKeyedStateHandle(
-                               stateBackend.backendUID,
-                               stateBackend.keyGroupRange,
-                               checkpointId,
-                               sstFiles,
-                               miscFiles,
-                               metaStateHandle);
-               }
 
-               void stop() {
+                                       outputStream.write(buffer, 0, numBytes);
+                               }
+                       } finally {
+                               if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
+                                       inputStream.close();
+                               }
 
-                       if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
-                               try {
-                                       closeableRegistry.close();
-                               } catch (IOException e) {
-                                       LOG.warn("Could not properly close io 
streams.", e);
+                               if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
+                                       outputStream.close();
                                }
                        }
                }
 
-               void releaseResources(boolean canceled) {
+               /**
+                * In case of rescaling, this method creates a temporary 
RocksDB instance for a key-groups shard. All contents
+                * from the temporary instance are copied into the real restore 
instance and then the temporary instance is
+                * discarded.
+                */
+               private void restoreKeyGroupsShardWithTemporaryHelperInstance(
+                       Path restoreInstancePath,
+                       List<ColumnFamilyDescriptor> columnFamilyDescriptors,
+                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) throws Exception {
 
-                       dbLease.close();
+                       List<ColumnFamilyHandle> columnFamilyHandles =
+                               new ArrayList<>(1 + 
columnFamilyDescriptors.size());
 
-                       if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
-                               try {
-                                       closeableRegistry.close();
-                               } catch (IOException e) {
-                                       LOG.warn("Exception on closing 
registry.", e);
-                               }
-                       }
+                       try (RocksDB restoreDb = stateBackend.openDB(
+                               restoreInstancePath.getPath(),
+                               columnFamilyDescriptors,
+                               columnFamilyHandles)) {
 
-                       if (backupPath != null) {
                                try {
-                                       if 
(backupFileSystem.exists(backupPath)) {
-
-                                               LOG.trace("Deleting local 
RocksDB backup path {}.", backupPath);
-                                               
backupFileSystem.delete(backupPath, true);
-                                       }
-                               } catch (Exception e) {
-                                       LOG.warn("Could not properly delete the 
checkpoint directory.", e);
-                               }
-                       }
+                                       // iterating only the requested 
descriptors automatically skips the default column family handle
+                                       for (int i = 0; i < 
columnFamilyDescriptors.size(); ++i) {
+                                               ColumnFamilyHandle 
columnFamilyHandle = columnFamilyHandles.get(i);
+                                               ColumnFamilyDescriptor 
columnFamilyDescriptor = columnFamilyDescriptors.get(i);
+                                               
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
 
-                       if (canceled) {
-                               Collection<StateObject> statesToDiscard =
-                                       new ArrayList<>(1 + miscFiles.size() + 
sstFiles.size());
+                                               Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
+                                                       
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
 
-                               statesToDiscard.add(metaStateHandle);
-                               statesToDiscard.addAll(miscFiles.values());
-                               statesToDiscard.addAll(sstFiles.values());
+                                               if (null == 
registeredStateMetaInfoEntry) {
 
-                               try {
-                                       
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
-                               } catch (Exception e) {
-                                       LOG.warn("Could not properly discard 
states.", e);
-                               }
-                       }
-               }
-       }
+                                                       
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
+                                                               new 
RegisteredKeyedBackendStateMetaInfo<>(
+                                                                       
stateMetaInfoSnapshot.getStateType(),
+                                                                       
stateMetaInfoSnapshot.getName(),
+                                                                       
stateMetaInfoSnapshot.getNamespaceSerializer(),
+                                                                       
stateMetaInfoSnapshot.getStateSerializer());
 
-       @Override
-       public void restore(Collection<KeyedStateHandle> restoreState) throws 
Exception {
-               LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
+                                                       
registeredStateMetaInfoEntry =
+                                                               new Tuple2<>(
+                                                                       
stateBackend.db.createColumnFamily(columnFamilyDescriptor),
+                                                                       
stateMetaInfo);
 
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
-               }
+                                                       
stateBackend.kvStateInformation.put(
+                                                               
stateMetaInfoSnapshot.getName(),
+                                                               
registeredStateMetaInfoEntry);
+                                               }
 
-               // clear all meta data
-               kvStateInformation.clear();
-               restoredKvStateMetaInfos.clear();
+                                               ColumnFamilyHandle 
targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
 
-               try {
-                       if (restoreState == null || restoreState.isEmpty()) {
-                               createDB();
-                       } else if (restoreState.iterator().next() instanceof 
IncrementalKeyedStateHandle) {
-                               RocksDBIncrementalRestoreOperation<K> 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
-                               restoreOperation.restore(restoreState);
-                       } else {
-                               RocksDBFullRestoreOperation<K> restoreOperation 
= new RocksDBFullRestoreOperation<>(this);
-                               restoreOperation.doRestore(restoreState);
-                       }
-               } catch (Exception ex) {
-                       dispose();
-                       throw ex;
-               }
-       }
+                                               try (RocksIterator iterator = 
restoreDb.newIterator(columnFamilyHandle)) {
 
-       @Override
-       public void notifyCheckpointComplete(long completedCheckpointId) {
+                                                       int startKeyGroup = 
stateBackend.getKeyGroupRange().getStartKeyGroup();
+                                                       byte[] 
startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
+                                                       for (int j = 0; j < 
stateBackend.keyGroupPrefixBytes; ++j) {
+                                                               
startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> 
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
+                                                       }
 
-               if (!enableIncrementalCheckpointing) {
-                       return;
-               }
+                                                       
iterator.seek(startKeyGroupPrefixBytes);
 
-               synchronized (materializedSstFiles) {
+                                                       while 
(iterator.isValid()) {
 
-                       if (completedCheckpointId < lastCompletedCheckpointId) {
-                               return;
-                       }
+                                                               int keyGroup = 
0;
+                                                               for (int j = 0; 
j < stateBackend.keyGroupPrefixBytes; ++j) {
+                                                                       
keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
+                                                               }
 
-                       materializedSstFiles.keySet().removeIf(checkpointId -> 
checkpointId < completedCheckpointId);
+                                                               if 
(stateBackend.keyGroupRange.contains(keyGroup)) {
+                                                                       
stateBackend.db.put(targetColumnFamilyHandle,
+                                                                               
iterator.key(), iterator.value());
+                                                               }
 
-                       lastCompletedCheckpointId = completedCheckpointId;
+                                                               iterator.next();
+                                                       }
+                                               } // releases native iterator 
resources
+                                       }
+                               } finally {
+                                       //release native tmp db column family 
resources
+                                       for (ColumnFamilyHandle 
columnFamilyHandle : columnFamilyHandles) {
+                                               
IOUtils.closeQuietly(columnFamilyHandle);
+                                       }
+                               }
+                       } // releases native tmp db resources
                }
        }
 
-       private void createDB() throws IOException {
-               List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
-               this.db = openDB(instanceRocksDBPath.getAbsolutePath(), 
Collections.emptyList(), columnFamilyHandles);
-               this.defaultColumnFamily = columnFamilyHandles.get(0);
-       }
-
-       private RocksDB openDB(
-               String path,
-               List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
-               List<ColumnFamilyHandle> stateColumnFamilyHandles) throws 
IOException {
+       // 
------------------------------------------------------------------------
+       //  State factories
+       // 
------------------------------------------------------------------------
 
-               List<ColumnFamilyDescriptor> columnFamilyDescriptors =
-                       new ArrayList<>(1 + 
stateColumnFamilyDescriptors.size());
+       /**
+        * Creates a column family handle for use with a k/v state. When 
restoring from a snapshot
+        * we don't restore the individual k/v states, just the global RocksDB 
database and the
+        * list of column families. When a k/v state is first requested we 
check here whether we
+        * already have a column family for that and return it or create a new 
one if it doesn't exist.
+        *
+        * <p>This also checks whether the {@link StateDescriptor} for a state 
matches the one
+        * that we checkpointed, i.e. is already in the map of column families.
+        */
+       @SuppressWarnings("rawtypes, unchecked")
+       protected <N, S> ColumnFamilyHandle getColumnFamily(
+               StateDescriptor<?, S> descriptor, TypeSerializer<N> 
namespaceSerializer) throws IOException, StateMigrationException {
 
-               columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
+               Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
+                       kvStateInformation.get(descriptor.getName());
 
-               // we add the required descriptor for the default CF in last 
position.
-               columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
+               RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
+                       descriptor.getType(),
+                       descriptor.getName(),
+                       namespaceSerializer,
+                       descriptor.getSerializer());
 
-               RocksDB dbRef;
+               if (stateInfo != null) {
+                       // TODO with eager registration in place, these checks 
should be moved to restore()
+
+                       RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
restoredMetaInfo =
+                               
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) 
restoredKvStateMetaInfos.get(descriptor.getName());
+
+                       Preconditions.checkState(
+                               Objects.equals(newMetaInfo.getName(), 
restoredMetaInfo.getName()),
+                               "Incompatible state names. " +
+                                       "Was [" + restoredMetaInfo.getName() + 
"], " +
+                                       "registered with [" + 
newMetaInfo.getName() + "].");
+
+                       if (!Objects.equals(newMetaInfo.getStateType(), 
StateDescriptor.Type.UNKNOWN)
+                               && 
!Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) 
{
+
+                               Preconditions.checkState(
+                                       newMetaInfo.getStateType() == 
restoredMetaInfo.getStateType(),
+                                       "Incompatible state types. " +
+                                               "Was [" + 
restoredMetaInfo.getStateType() + "], " +
+                                               "registered with [" + 
newMetaInfo.getStateType() + "].");
+                       }
+
+                       // check compatibility results to determine if state 
migration is required
+                       CompatibilityResult<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+                               restoredMetaInfo.getNamespaceSerializer(),
+                               null,
+                               
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
+                               newMetaInfo.getNamespaceSerializer());
+
+                       CompatibilityResult<S> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+                               restoredMetaInfo.getStateSerializer(),
+                               UnloadableDummyTypeSerializer.class,
+                               
restoredMetaInfo.getStateSerializerConfigSnapshot(),
+                               newMetaInfo.getStateSerializer());
+
+                       if (namespaceCompatibility.isRequiresMigration() || 
stateCompatibility.isRequiresMigration()) {
+                               // TODO state migration currently isn't 
possible.
+                               throw new StateMigrationException("State 
migration isn't supported, yet.");
+                       } else {
+                               stateInfo.f1 = newMetaInfo;
+                               return stateInfo.f0;
+                       }
+               }
+
+               byte[] nameBytes = 
descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
+               
Preconditions.checkState(!Arrays.equals(DEFAULT_COLUMN_FAMILY_NAME_BYTES, 
nameBytes),
+                       "The chosen state name 'default' collides with the name 
of the default column family!");
+
+               ColumnFamilyDescriptor columnDescriptor = new 
ColumnFamilyDescriptor(nameBytes, columnOptions);
+
+               final ColumnFamilyHandle columnFamily;
 
                try {
-                       dbRef = RocksDB.open(
-                               Preconditions.checkNotNull(dbOptions),
-                               Preconditions.checkNotNull(path),
-                               columnFamilyDescriptors,
-                               stateColumnFamilyHandles);
+                       columnFamily = db.createColumnFamily(columnDescriptor);
                } catch (RocksDBException e) {
-                       throw new IOException("Error while opening RocksDB 
instance.", e);
+                       throw new IOException("Error creating 
ColumnFamilyHandle.", e);
                }
 
-               // requested + default CF
-               Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
-                       "Not all requested column family handles have been 
created");
+               Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
+                       new Tuple2<>(columnFamily, newMetaInfo);
+               Map rawAccess = kvStateInformation;
+               rawAccess.put(descriptor.getName(), tuple);
+               return columnFamily;
+       }
 
-               return dbRef;
+       @Override
+       protected <N, T> InternalValueState<N, T> createValueState(
+               TypeSerializer<N> namespaceSerializer,
+               ValueStateDescriptor<T> stateDesc) throws Exception {
+
+               ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
+
+               return new RocksDBValueState<>(columnFamily, 
namespaceSerializer,  stateDesc, this);
+       }
+
+       @Override
+       protected <N, T> InternalListState<N, T> createListState(
+               TypeSerializer<N> namespaceSerializer,
+               ListStateDescriptor<T> stateDesc) throws Exception {
+
+               ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
+
+               return new RocksDBListState<>(columnFamily, 
namespaceSerializer, stateDesc, this);
+       }
+
+       @Override
+       protected <N, T> InternalReducingState<N, T> createReducingState(
+               TypeSerializer<N> namespaceSerializer,
+               ReducingStateDescriptor<T> stateDesc) throws Exception {
+
+               ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
+
+               return new RocksDBReducingState<>(columnFamily, 
namespaceSerializer,  stateDesc, this);
+       }
+
+       @Override
+       protected <N, T, ACC, R> InternalAggregatingState<N, T, R> 
createAggregatingState(
+               TypeSerializer<N> namespaceSerializer,
+               AggregatingStateDescriptor<T, ACC, R> stateDesc) throws 
Exception {
+
+               ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
+               return new RocksDBAggregatingState<>(columnFamily, 
namespaceSerializer, stateDesc, this);
+       }
+
+       @Override
+       protected <N, T, ACC> InternalFoldingState<N, T, ACC> 
createFoldingState(
+               TypeSerializer<N> namespaceSerializer,
+               FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+
+               ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
+
+               return new RocksDBFoldingState<>(columnFamily, 
namespaceSerializer, stateDesc, this);
+       }
+
+       @Override
+       protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
+               TypeSerializer<N> namespaceSerializer,
+               MapStateDescriptor<UK, UV> stateDesc) throws Exception {
+
+               ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
+
+               return new RocksDBMapState<>(columnFamily, namespaceSerializer, 
stateDesc, this);
        }
 
        /**
-        * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from a snapshot.
+        * Only visible for testing, DO NOT USE.
         */
-       static final class RocksDBFullRestoreOperation<K> {
+       public File getInstanceBasePath() {
+               return instanceBasePath;
+       }
 
-               private final RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend;
+       @Override
+       public boolean supportsAsynchronousSnapshots() {
+               return true;
+       }
 
-               /** Current key-groups state handle from which we restore 
key-groups. */
-               private KeyGroupsStateHandle currentKeyGroupsStateHandle;
-               /** Current input stream we obtained from 
currentKeyGroupsStateHandle. */
-               private FSDataInputStream currentStateHandleInStream;
-               /** Current data input view that wraps 
currentStateHandleInStream. */
-               private DataInputView currentStateHandleInView;
-               /** Current list of ColumnFamilyHandles for all column families 
we restore from currentKeyGroupsStateHandle. */
-               private List<ColumnFamilyHandle> 
currentStateHandleKVStateColumnFamilies;
-               /** The compression decorator that was used for writing the 
state, as determined by the meta data. */
-               private StreamCompressionDecorator 
keygroupStreamCompressionDecorator;
+       @VisibleForTesting
+       @SuppressWarnings("unchecked")
+       @Override
+       public int numStateEntries() {
+               int count = 0;
+
+               for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> column : 
kvStateInformation.values()) {
+                       try (RocksIterator rocksIterator = 
db.newIterator(column.f0)) {
+                               rocksIterator.seekToFirst();
+
+                               while (rocksIterator.isValid()) {
+                                       count++;
+                                       rocksIterator.next();
+                               }
+                       }
+               }
+
+               return count;
+       }
+
+
+
+       /**
+        * Iterator that merges multiple RocksDB iterators to partition all 
states into contiguous key-groups.
+        * The resulting iteration sequence is ordered by (key-group, kv-state).
+        */
+       @VisibleForTesting
+       static final class RocksDBMergeIterator implements AutoCloseable {
+
+               private final PriorityQueue<MergeIterator> heap;
+               private final int keyGroupPrefixByteCount;
+               private boolean newKeyGroup;
+               private boolean newKVState;
+               private boolean valid;
+
+               private MergeIterator currentSubIterator;
+
+               private static final List<Comparator<MergeIterator>> 
COMPARATORS;
+
+               static {
+                       int maxBytes = 4;
+                       COMPARATORS = new ArrayList<>(maxBytes);
+                       for (int i = 0; i < maxBytes; ++i) {
+                               final int currentBytes = i;
+                               COMPARATORS.add(new Comparator<MergeIterator>() 
{
+                                       @Override
+                                       public int compare(MergeIterator o1, 
MergeIterator o2) {
+                                               int arrayCmpRes = 
compareKeyGroupsForByteArrays(
+                                                       o1.currentKey, 
o2.currentKey, currentBytes);
+                                               return arrayCmpRes == 0 ? 
o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
+                                       }
+                               });
+                       }
+               }
+
+               RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> 
kvStateIterators, final int keyGroupPrefixByteCount) {
+                       Preconditions.checkNotNull(kvStateIterators);
+                       this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+                       Comparator<MergeIterator> iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount);
+
+                       if (kvStateIterators.size() > 0) {
+                               PriorityQueue<MergeIterator> 
iteratorPriorityQueue =
+                                       new 
PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
+
+                               for (Tuple2<RocksIterator, Integer> 
rocksIteratorWithKVStateId : kvStateIterators) {
+                                       final RocksIterator rocksIterator = 
rocksIteratorWithKVStateId.f0;
+                                       rocksIterator.seekToFirst();
+                                       if (rocksIterator.isValid()) {
+                                               iteratorPriorityQueue.offer(new 
MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+                                       } else {
+                                               
IOUtils.closeQuietly(rocksIterator);
+                                       }
+                               }
+
+                               kvStateIterators.clear();
+
+                               this.heap = iteratorPriorityQueue;
+                               this.valid = !heap.isEmpty();
+                               this.currentSubIterator = heap.poll();
+                       } else {
+                               // creating a PriorityQueue of size 0 results 
in an exception.
+                               this.heap = null;
+                               this.valid = false;
+                       }
+
+                       this.newKeyGroup = true;
+                       this.newKVState = true;
+               }
 
                /**
-                * Creates a restore operation object for the given state 
backend instance.
-                *
-                * @param rocksDBKeyedStateBackend the state backend into which 
we restore
+                * Advance the iterator. Should only be called if {@link 
#isValid()} returned true. Valid can only chance after
+                * calls to {@link #next()}.
                 */
-               public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend) {
-                       this.rocksDBKeyedStateBackend = 
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
+               public void next() {
+                       newKeyGroup = false;
+                       newKVState = false;
+
+                       final RocksIterator rocksIterator = 
currentSubIterator.getIterator();
+                       rocksIterator.next();
+
+                       byte[] oldKey = currentSubIterator.getCurrentKey();
+                       if (rocksIterator.isValid()) {
+                               currentSubIterator.currentKey = 
rocksIterator.key();
+
+                               if (isDifferentKeyGroup(oldKey, 
currentSubIterator.getCurrentKey())) {
+                                       heap.offer(currentSubIterator);
+                                       currentSubIterator = heap.poll();
+                                       newKVState = 
currentSubIterator.getIterator() != rocksIterator;
+                                       detectNewKeyGroup(oldKey);
+                               }
+                       } else {
+                               IOUtils.closeQuietly(rocksIterator);
+
+                               if (heap.isEmpty()) {
+                                       currentSubIterator = null;
+                                       valid = false;
+                               } else {
+                                       currentSubIterator = heap.poll();
+                                       newKVState = true;
+                                       detectNewKeyGroup(oldKey);
+                               }
+                       }
+               }
+
+               private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
+                       return 0 != compareKeyGroupsForByteArrays(a, b, 
keyGroupPrefixByteCount);
+               }
+
+               private void detectNewKeyGroup(byte[] oldKey) {
+                       if (isDifferentKeyGroup(oldKey, 
currentSubIterator.currentKey)) {
+                               newKeyGroup = true;
+                       }
                }
 
                /**
-                * Restores all key-groups data that is referenced by the 
passed state handles.
-                *
-                * @param keyedStateHandles List of all key groups state 
handles that shall be restored.
+                * @return key-group for the current key
                 */
-               public void doRestore(Collection<KeyedStateHandle> 
keyedStateHandles)
-                       throws IOException, StateMigrationException, 
RocksDBException {
+               public int keyGroup() {
+                       int result = 0;
+                       //big endian decode
+                       for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
+                               result <<= 8;
+                               result |= (currentSubIterator.currentKey[i] & 
0xFF);
+                       }
+                       return result;
+               }
 
-                       rocksDBKeyedStateBackend.createDB();
+               public byte[] key() {
+                       return currentSubIterator.getCurrentKey();
+               }
+
+               public byte[] value() {
+                       return currentSubIterator.getIterator().value();
+               }
+
+               /**
+                * @return Id of K/V state to which the current key belongs.
+                */
+               public int kvStateId() {
+                       return currentSubIterator.getKvStateId();
+               }
+
+               /**
+                * Indicates if current key starts a new k/v-state, i.e. belong 
to a different k/v-state than it's predecessor.
+                * @return true iff the current key belong to a different 
k/v-state than it's predecessor.
+                */
+               public boolean isNewKeyValueState() {
+                       return newKVState;
+               }
+
+               /**
+                * Indicates if current key starts a new key-group, i.e. belong 
to a different key-group than it's predecessor.
+                * @return true iff the current key belong to a different 
key-group than it's predecessor.
+                */
+               public boolean isNewKeyGroup() {
+                       return newKeyGroup;
+               }
+
+               /**
+                * Check if the iterator is still valid. Getters like {@link 
#key()}, {@link #value()}, etc. as well as
+                * {@link #next()} should only be called if valid returned 
true. Should be checked after each call to
+                * {@link #next()} before accessing iterator state.
+                * @return True iff this iterator is valid.
+                */
+               public boolean isValid() {
+                       return valid;
+               }
+
+               private static int compareKeyGroupsForByteArrays(byte[] a, 
byte[] b, int len) {
+                       for (int i = 0; i < len; ++i) {
+                               int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
+                               if (diff != 0) {
+                                       return diff;
+                               }
+                       }
+                       return 0;
+               }
+
+               @Override
+               public void close() {
+                       IOUtils.closeQuietly(currentSubIterator);
+                       currentSubIterator = null;
+
+                       IOUtils.closeAllQuietly(heap);
+                       heap.clear();
+               }
+       }
+
+       /**
+        * Wraps a RocksDB iterator to cache it's current key and assigns an id 
for the key/value state to the iterator.
+        * Used by #MergeIterator.
+        */
+       private static final class MergeIterator implements AutoCloseable {
+
+               /**
+                * @param iterator  The #RocksIterator to wrap .
+                * @param kvStateId Id of the K/V state to which this iterator 
belongs.
+                */
+               MergeIterator(RocksIterator iterator, int kvStateId) {
+                       this.iterator = Preconditions.checkNotNull(iterator);
+                       this.currentKey = iterator.key();
+                       this.kvStateId = kvStateId;
+               }
+
+               private final RocksIterator iterator;
+               private byte[] currentKey;
+               private final int kvStateId;
+
+               public byte[] getCurrentKey() {
+                       return currentKey;
+               }
+
+               public void setCurrentKey(byte[] currentKey) {
+                       this.currentKey = currentKey;
+               }
+
+               public RocksIterator getIterator() {
+                       return iterator;
+               }
+
+               public int getKvStateId() {
+                       return kvStateId;
+               }
+
+               @Override
+               public void close() {
+                       IOUtils.closeQuietly(iterator);
+               }
+       }
+
+       /**
+        * Adapter class to bridge between {@link RocksIterator} and {@link 
Iterator} to iterate over the keys. This class
+        * is not thread safe.
+        *
+        * @param <K> the type of the iterated objects, which are keys in 
RocksDB.
+        */
+       static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, 
AutoCloseable {
+               private final RocksIterator iterator;
+               private final String state;
+               private final TypeSerializer<K> keySerializer;
+               private final int keyGroupPrefixBytes;
+               private final byte[] namespaceBytes;
+               private final boolean ambiguousKeyPossible;
+               private K nextKey;
+
+               RocksIteratorForKeysWrapper(
+                       RocksIterator iterator,
+                       String state,
+                       TypeSerializer<K> keySerializer,
+                       int keyGroupPrefixBytes,
+                       boolean ambiguousKeyPossible,
+                       byte[] namespaceBytes) {
+                       this.iterator = Preconditions.checkNotNull(iterator);
+                       this.state = Preconditions.checkNotNull(state);
+                       this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
+                       this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+                       this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+                       this.nextKey = null;
+                       this.ambiguousKeyPossible = ambiguousKeyPossible;
+               }
+
+               @Override
+               public boolean hasNext() {
+                       while (nextKey == null && iterator.isValid()) {
+                               try {
+                                       byte[] key = iterator.key();
+                                       if (isMatchingNameSpace(key)) {
+                                               ByteArrayInputStreamWithPos 
inputStream =
+                                                       new 
ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - 
keyGroupPrefixBytes);
+                                               DataInputViewStreamWrapper 
dataInput = new DataInputViewStreamWrapper(inputStream);
+                                               K value = 
RocksDBKeySerializationUtils.readKey(
+                                                       keySerializer,
+                                                       inputStream,
+                                                       dataInput,
+                                                       ambiguousKeyPossible);
+                                               nextKey = value;
+                                       }
+                                       iterator.next();
+                               } catch (IOException e) {
+                                       throw new FlinkRuntimeException("Failed 
to access state [" + state + "]", e);
+                               }
+                       }
+                       return nextKey != null;
+               }
+
+               @Override
+               public K next() {
+                       if (!hasNext()) {
+                               throw new NoSuchElementException("Failed to 
access state [" + state + "]");
+                       }
 
-                       for (KeyedStateHandle keyedStateHandle : 
keyedStateHandles) {
-                               if (keyedStateHandle != null) {
+                       K tmpKey = nextKey;
+                       nextKey = null;
+                       return tmpKey;
+               }
 
-                                       if (!(keyedStateHandle instanceof 
KeyGroupsStateHandle)) {
-                                               throw new 
IllegalStateException("Unexpected state handle type, " +
-                                                       "expected: " + 
KeyGroupsStateHandle.class +
-                                                       ", but found: " + 
keyedStateHandle.getClass());
+               private boolean isMatchingNameSpace(@Nonnull byte[] key) {
+                       final int namespaceBytesLength = namespaceBytes.length;
+                       final int basicLength = namespaceBytesLength + 
keyGroupPrefixBytes;
+                       if (key.length >= basicLength) {
+                               for (int i = 1; i <= namespaceBytesLength; ++i) 
{
+                                       if (key[key.length - i] != 
namespaceBytes[namespaceBytesLength - i]) {
+                                               return false;
                                        }
-                                       this.currentKeyGroupsStateHandle = 
(KeyGroupsStateHandle) keyedStateHandle;
-                                       restoreKeyGroupsInStateHandle();
                                }
+                               return true;
                        }
+                       return false;
                }
 
-               /**
-                * Restore one key groups state handle.
-                */
-               private void restoreKeyGroupsInStateHandle()
-                       throws IOException, StateMigrationException, 
RocksDBException {
-                       try {
-                               currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
-                               
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
-                               currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
-                               restoreKVStateMetaData();
-                               restoreKVStateData();
-                       } finally {
-                               if 
(rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
 {
-                                       
IOUtils.closeQuietly(currentStateHandleInStream);
-                               }
-                       }
+               @Override
+               public void close() {
+                       iterator.close();
                }
+       }
 
-               /**
-                * Restore the KV-state / ColumnFamily meta data for all 
key-groups referenced by the current state handle.
-                *
-                * @throws IOException
-                * @throws ClassNotFoundException
-                * @throws RocksDBException
-                */
-               private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
+       private class FullSnapshotStrategy implements 
SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
 
-                       KeyedBackendSerializationProxy<K> serializationProxy =
-                               new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
+               @Override
+               public RunnableFuture<SnapshotResult<KeyedStateHandle>> 
performSnapshot(
+                       long checkpointId,
+                       long timestamp,
+                       CheckpointStreamFactory primaryStreamFactory,
+                       CheckpointOptions checkpointOptions) throws Exception {
 
-                       serializationProxy.read(currentStateHandleInView);
+                       long startTime = System.currentTimeMillis();
+                       final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
 
-                       // check for key serializer compatibility; this also 
reconfigures the
-                       // key serializer to be compatible, if it is required 
and is possible
-                       if (CompatibilityUtil.resolveCompatibilityResult(
-                               serializationProxy.getKeySerializer(),
-                               UnloadableDummyTypeSerializer.class,
-                               
serializationProxy.getKeySerializerConfigSnapshot(),
-                               rocksDBKeyedStateBackend.keySerializer)
-                               .isRequiresMigration()) {
+                       if (kvStateInformation.isEmpty()) {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Asynchronous RocksDB 
snapshot performed on empty keyed state at {}. Returning null.",
+                                               timestamp);
+                               }
 
-                               // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
-                               throw new StateMigrationException("The new key 
serializer is not compatible to read previous keys. " +
-                                       "Aborting now since state migration is 
currently not available");
+                               return DoneFuture.of(SnapshotResult.empty());
                        }
 
-                       this.keygroupStreamCompressionDecorator = 
serializationProxy.isUsingKeyGroupCompression() ?
-                               SnappyStreamCompressionDecorator.INSTANCE : 
UncompressedStreamCompressionDecorator.INSTANCE;
-
-                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
-                               serializationProxy.getStateMetaInfoSnapshots();
-                       currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
-                       //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = 
new HashMap<>(restoredMetaInfos.size());
-
-                       for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
restoredMetaInfo : restoredMetaInfos) {
+                       final 
SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier =
 
-                               Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
-                                       
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
+                               isWithLocalRecovery(
+                                       checkpointOptions.getCheckpointType(),
+                                       
localRecoveryConfig.getLocalRecoveryMode()) ?
 
-                               if (registeredColumn == null) {
-                                       byte[] nameBytes = 
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
+                                       () -> 
CheckpointStreamWithResultProvider.createDuplicatingStream(
+                                               checkpointId,
+                                               
CheckpointedStateScope.EXCLUSIVE,
+                                               primaryStreamFactory,
+                                               
localRecoveryConfig.getLocalStateDirectoryProvider()) :
 
-                                       ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
-                                               nameBytes,
-                                               
rocksDBKeyedStateBackend.columnOptions);
+                                       () -> 
CheckpointStreamWithResultProvider.createSimpleStream(
+                                               
CheckpointedStateScope.EXCLUSIVE,
+                                               primaryStreamFactory);
 
-                                       RegisteredKeyedBackendStateMetaInfo<?, 
?> stateMetaInfo =
-                                               new 
RegisteredKeyedBackendStateMetaInfo<>(
-                                                       
restoredMetaInfo.getStateType(),
-                                                       
restoredMetaInfo.getName(),
-                                                       
restoredMetaInfo.getNamespaceSerializer(),
-                                                       
restoredMetaInfo.getStateSerializer());
+                       final RocksDBFullSnapshotOperation<K> snapshotOperation 
=
+                               new RocksDBFullSnapshotOperation<>(
+                                       RocksDBKeyedStateBackend.this,
+                                       supplier,
+                                       snapshotCloseableRegistry);
 
-                                       
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
 restoredMetaInfo);
+                       snapshotOperation.takeDBSnapShot();
 
-                                       ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
+                       // implementation of the async IO operation, based on 
FutureTask
+                       
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable 
=
+                               new 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
 
-                                       registeredColumn = new 
Tuple2<>(columnFamily, stateMetaInfo);
-                                       
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), 
registeredColumn);
+                                       @Override
+                                       protected void acquireResources() 
throws Exception {
+                                               
cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
+                                               
snapshotOperation.openCheckpointStream();
+                                       }
 
-                               } else {
-                                       // TODO with eager state registration 
in place, check here for serializer migration strategies
-                               }
-                               
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
-                       }
-               }
+                                       @Override
+                                       protected void releaseResources() 
throws Exception {
+                                               closeLocalRegistry();
+                                               
releaseSnapshotOperationResources();
+                                       }
 
-               /**
-                * Restore the KV-state / ColumnFamily data for all key-groups 
referenced by the current state handle.
-                *
-                * @throws IOException
-                * @throws RocksDBException
-                */
-               private void restoreKVStateData() throws IOException, 
RocksDBException {
-                       //for all key-groups in the current state handle...
-                       for (Tuple2<Integer, Long> keyGroupOffset : 
currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
-                               int keyGroup = keyGroupOffset.f0;
+                                       private void 
releaseSnapshotOperationResources() {
+                                               // hold the db lock while 
operation on the db to guard us against async db disposal
+                                               
snapshotOperation.releaseSnapshotResources();
+                                       }
 
-                               // Check that restored key groups all belong to 
the backend
-                               
Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
-                                       "The key group must belong to the 
backend");
+                                       @Override
+                                       protected void stopOperation() throws 
Exception {
+                                               closeLocalRegistry();
+                                       }
 
-                               long offset = keyGroupOffset.f1;
-                               //not empty key-group?
-                               if (0L != offset) {
-                                       currentStateHandleInStream.seek(offset);
-                                       try (InputStream compressedKgIn = 
keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
 {
-                                               DataInputViewStreamWrapper 
compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
-                                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                               int kvStateId = 
compressedKgInputView.readShort();
-                                               ColumnFamilyHandle handle = 
currentStateHandleKVStateColumnFamilies.get(kvStateId);
-                                               //insert all k/v pairs into DB
-                                               boolean keyGroupHasMoreKeys = 
true;
-                                               while (keyGroupHasMoreKeys) {
-                                                       byte[] key = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
-                                                       byte[] value = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
-                                                       if 
(RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
-                                                               //clear the 
signal bit in the key to make it ready for insertion again
-                                                               
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
-                                                               
rocksDBKeyedStateBackend.db.put(handle, key, value);
-                                                               //TODO this 
could be aware of keyGroupPrefixBytes and write only one byte if possible
-                                                               kvStateId = 
RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
-                                                                       & 
compressedKgInputView.readShort();
-                                                               if 
(RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
-                                                                       
keyGroupHasMoreKeys = false;
-                                                               } else {
-                                                                       handle 
= currentStateHandleKVStateColumnFamilies.get(kvStateId);
-                                                               }
-                                                       } else {
-                                                               
rocksDBKeyedStateBackend.db.put(handle, key, value);
+                                       private void closeLocalRegistry() {
+                                               if 
(cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
+                                                       try {
+                                                               
snapshotCloseableRegistry.close();
+                                                       } catch (Exception ex) {
+                                                               LOG.warn("Error 
closing local registry", ex);
                                                        }
                                                }
                                        }
-                               }
-                       }
-               }
-       }
 
-       private static class RocksDBIncrementalRestoreOperation<T> {
+                                       @Nonnull
+                                       @Override
+                                       public SnapshotResult<KeyedStateHandle> 
performOperation() throws Exception {
+                                               long startTime = 
System.currentTimeMillis();
 
-               private final RocksDBKeyedStateBackend<T> stateBackend;
+                                               if (isStopped()) {
+                                                       throw new 
IOException("RocksDB closed.");
+                                               }
 
-               private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
-                       this.stateBackend = stateBackend;
-               }
+                                               
snapshotOperation.writeDBSnapshot();
 
-               private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> readMetaData(
-                       StreamStateHandle metaStateHandle) throws Exception {
+                                               LOG.info("Asynchronous RocksDB 
snapshot ({}, asynchronous part) in thread {} took {} ms.",
+                                                       primaryStreamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - startTime));
 
-                       FSDataInputStream inputStream = null;
+                                               return 
snapshotOperation.getSnapshotResultStateHandle();
+                                       }
+                               };
 
-                       try {
-                               inputStream = metaStateHandle.openInputStream();
-                               
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
+                       LOG.info("Asynchronous RocksDB snapshot ({}, 
synchronous part) in thread {} took {} ms.",
+                               primaryStreamFactory, Thread.currentThread(), 
(System.currentTimeMillis() - startTime));
+                       return AsyncStoppableTaskWithCallback.from(ioCallable);
+               }
 
-                               KeyedBackendSerializationProxy<T> 
serializationProxy =
-                                       new 
KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
-                               DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
-                               serializationProxy.read(in);
+               private boolean isWithLocalRecovery(
+                       CheckpointType checkpointType,
+                       LocalRecoveryConfig.Loc

<TRUNCATED>

Reply via email to