Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168923464
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
         * @param streamFactory The factory that we can use for writing our 
state to streams.
         * @param checkpointOptions Options for how to perform this checkpoint.
         * @return Future to the state handle of the snapshot data.
    -    * @throws Exception
    +    * @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
         */
        @Override
    -   public RunnableFuture<KeyedStateHandle> snapshot(
    +   public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
                final long checkpointId,
                final long timestamp,
                final CheckpointStreamFactory streamFactory,
                CheckpointOptions checkpointOptions) throws Exception {
     
    -           if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
    -                   enableIncrementalCheckpointing) {
    -                   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
    -           } else {
    -                   return snapshotFully(checkpointId, timestamp, 
streamFactory);
    -           }
    +           return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
        }
     
    -   private RunnableFuture<KeyedStateHandle> snapshotIncrementally(
    -           final long checkpointId,
    -           final long checkpointTimestamp,
    -           final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
    -
    -           if (db == null) {
    -                   throw new IOException("RocksDB closed.");
    -           }
    +   @Override
    +   public void restore(StateObjectCollection<KeyedStateHandle> 
restoreState) throws Exception {
    +           LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
     
    -           if (kvStateInformation.isEmpty()) {
    -                   if (LOG.isDebugEnabled()) {
    -                           LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
    -                                           checkpointTimestamp);
    -                   }
    -                   return DoneFuture.nullValue();
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
                }
     
    -           final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
    -                   new RocksDBIncrementalSnapshotOperation<>(
    -                           this,
    -                           checkpointStreamFactory,
    -                           checkpointId,
    -                           checkpointTimestamp);
    +           // clear all meta data
    +           kvStateInformation.clear();
    +           restoredKvStateMetaInfos.clear();
     
                try {
    -                   snapshotOperation.takeSnapshot();
    -           } catch (Exception e) {
    -                   snapshotOperation.stop();
    -                   snapshotOperation.releaseResources(true);
    -                   throw e;
    -           }
    -
    -           return new FutureTask<KeyedStateHandle>(
    -                   new Callable<KeyedStateHandle>() {
    -                           @Override
    -                           public KeyedStateHandle call() throws Exception 
{
    -                                   return 
snapshotOperation.materializeSnapshot();
    +                   if (restoreState == null || restoreState.isEmpty()) {
    +                           createDB();
    +                   } else {
    +                           KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
    +                           if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
    +                                   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
    +                                   RocksDBIncrementalRestoreOperation<K> 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
    +                                   restoreOperation.restore(restoreState);
    +                           } else {
    +                                   RocksDBFullRestoreOperation<K> 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
    +                                   
restoreOperation.doRestore(restoreState);
                                }
                        }
    -           ) {
    -                   @Override
    -                   public boolean cancel(boolean mayInterruptIfRunning) {
    -                           snapshotOperation.stop();
    -                           return super.cancel(mayInterruptIfRunning);
    -                   }
    -
    -                   @Override
    -                   protected void done() {
    -                           
snapshotOperation.releaseResources(isCancelled());
    -                   }
    -           };
    +           } catch (Exception ex) {
    +                   dispose();
    +                   throw ex;
    +           }
        }
     
    -   private RunnableFuture<KeyedStateHandle> snapshotFully(
    -           final long checkpointId,
    -           final long timestamp,
    -           final CheckpointStreamFactory streamFactory) throws Exception {
    -
    -           long startTime = System.currentTimeMillis();
    -           final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
    -
    -           final RocksDBFullSnapshotOperation<K> snapshotOperation;
    -
    -           if (kvStateInformation.isEmpty()) {
    -                   if (LOG.isDebugEnabled()) {
    -                           LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.", timestamp);
    -                   }
    +   @Override
    +   public void notifyCheckpointComplete(long completedCheckpointId) {
     
    -                   return DoneFuture.nullValue();
    +           if (!enableIncrementalCheckpointing) {
    +                   return;
                }
     
    -           snapshotOperation = new RocksDBFullSnapshotOperation<>(this, 
streamFactory, snapshotCloseableRegistry);
    -           snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
    -
    -           // implementation of the async IO operation, based on FutureTask
    -           AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable 
=
    -                   new 
AbstractAsyncCallableWithResources<KeyedStateHandle>() {
    -
    -                           @Override
    -                           protected void acquireResources() throws 
Exception {
    -                                   
cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
    -                                   
snapshotOperation.openCheckpointStream();
    -                           }
    +           synchronized (materializedSstFiles) {
     
    -                           @Override
    -                           protected void releaseResources() throws 
Exception {
    -                                   closeLocalRegistry();
    -                                   releaseSnapshotOperationResources();
    -                           }
    +                   if (completedCheckpointId < lastCompletedCheckpointId) {
    +                           return;
    +                   }
     
    -                           private void 
releaseSnapshotOperationResources() {
    -                                   // hold the db lock while operation on 
the db to guard us against async db disposal
    -                                   
snapshotOperation.releaseSnapshotResources();
    -                           }
    +                   materializedSstFiles.keySet().removeIf(checkpointId -> 
checkpointId < completedCheckpointId);
     
    -                           @Override
    -                           protected void stopOperation() throws Exception 
{
    -                                   closeLocalRegistry();
    -                           }
    +                   lastCompletedCheckpointId = completedCheckpointId;
    +           }
    +   }
     
    -                           private void closeLocalRegistry() {
    -                                   if 
(cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
    -                                           try {
    -                                                   
snapshotCloseableRegistry.close();
    -                                           } catch (Exception ex) {
    -                                                   LOG.warn("Error closing 
local registry", ex);
    -                                           }
    -                                   }
    -                           }
    +   private void createDB() throws IOException {
    +           List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
    +           this.db = openDB(instanceRocksDBPath.getAbsolutePath(), 
Collections.emptyList(), columnFamilyHandles);
    +           this.defaultColumnFamily = columnFamilyHandles.get(0);
    +   }
     
    -                           @Override
    -                           public KeyGroupsStateHandle performOperation() 
throws Exception {
    -                                   long startTime = 
System.currentTimeMillis();
    +   private RocksDB openDB(
    +           String path,
    +           List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
    +           List<ColumnFamilyHandle> stateColumnFamilyHandles) throws 
IOException {
     
    -                                   if (isStopped()) {
    -                                           throw new IOException("RocksDB 
closed.");
    -                                   }
    +           List<ColumnFamilyDescriptor> columnFamilyDescriptors =
    +                   new ArrayList<>(1 + 
stateColumnFamilyDescriptors.size());
     
    -                                   snapshotOperation.writeDBSnapshot();
    +           // we add the required descriptor for the default CF in FIRST 
position, see
    +           // 
https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
    +           columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions));
    +           columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
     
    -                                   LOG.info("Asynchronous RocksDB snapshot 
({}, asynchronous part) in thread {} took {} ms.",
    -                                           streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - startTime));
    +           RocksDB dbRef;
     
    -                                   return 
snapshotOperation.getSnapshotResultStateHandle();
    -                           }
    -                   };
    +           try {
    +                   dbRef = RocksDB.open(
    +                           Preconditions.checkNotNull(dbOptions),
    +                           Preconditions.checkNotNull(path),
    +                           columnFamilyDescriptors,
    +                           stateColumnFamilyHandles);
    +           } catch (RocksDBException e) {
    +                   throw new IOException("Error while opening RocksDB 
instance.", e);
    +           }
     
    -           LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) 
in thread {} took {} ms.",
    -                           streamFactory, Thread.currentThread(), 
(System.currentTimeMillis() - startTime));
    +           // requested + default CF
    +           Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
    +                   "Not all requested column family handles have been 
created");
     
    -           return AsyncStoppableTaskWithCallback.from(ioCallable);
    +           return dbRef;
        }
     
        /**
    -    * Encapsulates the process to perform a snapshot of a 
RocksDBKeyedStateBackend.
    +    * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from a full snapshot.
         */
    -   static final class RocksDBFullSnapshotOperation<K> {
    -
    -           static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
    -           static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
    -
    -           private final RocksDBKeyedStateBackend<K> stateBackend;
    -           private final KeyGroupRangeOffsets keyGroupRangeOffsets;
    -           private final CheckpointStreamFactory checkpointStreamFactory;
    -           private final CloseableRegistry snapshotCloseableRegistry;
    -           private final ResourceGuard.Lease dbLease;
    -
    -           private long checkpointId;
    -           private long checkpointTimeStamp;
    -
    -           private Snapshot snapshot;
    -           private ReadOptions readOptions;
    -           private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
    -
    -           private CheckpointStreamFactory.CheckpointStateOutputStream 
outStream;
    -           private DataOutputView outputView;
    +   private static final class RocksDBFullRestoreOperation<K> {
     
    -           RocksDBFullSnapshotOperation(
    -                   RocksDBKeyedStateBackend<K> stateBackend,
    -                   CheckpointStreamFactory checkpointStreamFactory,
    -                   CloseableRegistry registry) throws IOException {
    +           private final RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend;
     
    -                   this.stateBackend = stateBackend;
    -                   this.checkpointStreamFactory = checkpointStreamFactory;
    -                   this.keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(stateBackend.keyGroupRange);
    -                   this.snapshotCloseableRegistry = registry;
    -                   this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
    -           }
    +           /** Current key-groups state handle from which we restore 
key-groups. */
    +           private KeyGroupsStateHandle currentKeyGroupsStateHandle;
    +           /** Current input stream we obtained from 
currentKeyGroupsStateHandle. */
    +           private FSDataInputStream currentStateHandleInStream;
    +           /** Current data input view that wraps 
currentStateHandleInStream. */
    +           private DataInputView currentStateHandleInView;
    +           /** Current list of ColumnFamilyHandles for all column families 
we restore from currentKeyGroupsStateHandle. */
    +           private List<ColumnFamilyHandle> 
currentStateHandleKVStateColumnFamilies;
    +           /** The compression decorator that was used for writing the 
state, as determined by the meta data. */
    +           private StreamCompressionDecorator 
keygroupStreamCompressionDecorator;
     
                /**
    -            * 1) Create a snapshot object from RocksDB.
    +            * Creates a restore operation object for the given state 
backend instance.
                 *
    -            * @param checkpointId id of the checkpoint for which we take 
the snapshot
    -            * @param checkpointTimeStamp timestamp of the checkpoint for 
which we take the snapshot
    +            * @param rocksDBKeyedStateBackend the state backend into which 
we restore
                 */
    -           public void takeDBSnapShot(long checkpointId, long 
checkpointTimeStamp) {
    -                   Preconditions.checkArgument(snapshot == null, "Only one 
ongoing snapshot allowed!");
    -                   this.kvStateIterators = new 
ArrayList<>(stateBackend.kvStateInformation.size());
    -                   this.checkpointId = checkpointId;
    -                   this.checkpointTimeStamp = checkpointTimeStamp;
    -                   this.snapshot = stateBackend.db.getSnapshot();
    +           public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend) {
    +                   this.rocksDBKeyedStateBackend = 
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
                }
     
                /**
    -            * 2) Open CheckpointStateOutputStream through the 
checkpointStreamFactory into which we will write.
    +            * Restores all key-groups data that is referenced by the 
passed state handles.
                 *
    -            * @throws Exception
    +            * @param keyedStateHandles List of all key groups state 
handles that shall be restored.
                 */
    -           public void openCheckpointStream() throws Exception {
    -                   Preconditions.checkArgument(outStream == null, "Output 
stream for snapshot is already set.");
    -                   outStream = 
checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
    -                   snapshotCloseableRegistry.registerCloseable(outStream);
    -                   outputView = new DataOutputViewStreamWrapper(outStream);
    -           }
    +           public void doRestore(Collection<KeyedStateHandle> 
keyedStateHandles)
    +                   throws IOException, StateMigrationException, 
RocksDBException {
     
    -           /**
    -            * 3) Write the actual data from RocksDB from the time we took 
the snapshot object in (1).
    -            *
    -            * @throws IOException
    -            */
    -           public void writeDBSnapshot() throws IOException, 
InterruptedException {
    +                   rocksDBKeyedStateBackend.createDB();
     
    -                   if (null == snapshot) {
    -                           throw new IOException("No snapshot available. 
Might be released due to cancellation.");
    -                   }
    +                   for (KeyedStateHandle keyedStateHandle : 
keyedStateHandles) {
    +                           if (keyedStateHandle != null) {
     
    -                   Preconditions.checkNotNull(outStream, "No output stream 
to write snapshot.");
    -                   writeKVStateMetaData();
    -                   writeKVStateData();
    +                                   if (!(keyedStateHandle instanceof 
KeyGroupsStateHandle)) {
    +                                           throw new 
IllegalStateException("Unexpected state handle type, " +
    +                                                   "expected: " + 
KeyGroupsStateHandle.class +
    +                                                   ", but found: " + 
keyedStateHandle.getClass());
    +                                   }
    +                                   this.currentKeyGroupsStateHandle = 
(KeyGroupsStateHandle) keyedStateHandle;
    +                                   restoreKeyGroupsInStateHandle();
    +                           }
    +                   }
                }
     
                /**
    -            * 4) Returns a state handle to the snapshot after the snapshot 
procedure is completed and null before.
    -            *
    -            * @return state handle to the completed snapshot
    +            * Restore one key groups state handle.
                 */
    -           public KeyGroupsStateHandle getSnapshotResultStateHandle() 
throws IOException {
    -
    -                   if 
(snapshotCloseableRegistry.unregisterCloseable(outStream)) {
    -
    -                           StreamStateHandle stateHandle = 
outStream.closeAndGetHandle();
    -                           outStream = null;
    -
    -                           if (stateHandle != null) {
    -                                   return new 
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
    +           private void restoreKeyGroupsInStateHandle()
    +                   throws IOException, StateMigrationException, 
RocksDBException {
    +                   try {
    +                           currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
    +                           
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
    +                           currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
    +                           restoreKVStateMetaData();
    +                           restoreKVStateData();
    +                   } finally {
    +                           if 
(rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
 {
    +                                   
IOUtils.closeQuietly(currentStateHandleInStream);
                                }
                        }
    -                   return null;
                }
     
                /**
    -            * 5) Release the snapshot object for RocksDB and clean up.
    +            * Restore the KV-state / ColumnFamily meta data for all 
key-groups referenced by the current state handle.
    +            *
    +            * @throws IOException
    +            * @throws ClassNotFoundException
    +            * @throws RocksDBException
                 */
    -           public void releaseSnapshotResources() {
    +           private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
     
    -                   outStream = null;
    +                   KeyedBackendSerializationProxy<K> serializationProxy =
    +                           new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
     
    -                   if (null != kvStateIterators) {
    -                           for (Tuple2<RocksIterator, Integer> 
kvStateIterator : kvStateIterators) {
    -                                   
IOUtils.closeQuietly(kvStateIterator.f0);
    -                           }
    -                           kvStateIterators = null;
    -                   }
    +                   serializationProxy.read(currentStateHandleInView);
     
    -                   if (null != snapshot) {
    -                           if (null != stateBackend.db) {
    -                                   
stateBackend.db.releaseSnapshot(snapshot);
    -                           }
    -                           IOUtils.closeQuietly(snapshot);
    -                           snapshot = null;
    -                   }
    +                   // check for key serializer compatibility; this also 
reconfigures the
    +                   // key serializer to be compatible, if it is required 
and is possible
    +                   if (CompatibilityUtil.resolveCompatibilityResult(
    +                           serializationProxy.getKeySerializer(),
    +                           UnloadableDummyTypeSerializer.class,
    +                           
serializationProxy.getKeySerializerConfigSnapshot(),
    +                           rocksDBKeyedStateBackend.keySerializer)
    +                           .isRequiresMigration()) {
     
    -                   if (null != readOptions) {
    -                           IOUtils.closeQuietly(readOptions);
    -                           readOptions = null;
    +                           // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
    +                           throw new StateMigrationException("The new key 
serializer is not compatible to read previous keys. " +
    +                                   "Aborting now since state migration is 
currently not available");
                        }
     
    -                   this.dbLease.close();
    -           }
    -
    -           private void writeKVStateMetaData() throws IOException {
    +                   this.keygroupStreamCompressionDecorator = 
serializationProxy.isUsingKeyGroupCompression() ?
    +                           SnappyStreamCompressionDecorator.INSTANCE : 
UncompressedStreamCompressionDecorator.INSTANCE;
     
    -                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> metaInfoSnapshots =
    -                           new 
ArrayList<>(stateBackend.kvStateInformation.size());
    +                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
    +                           serializationProxy.getStateMetaInfoSnapshots();
    +                   currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
    +                   //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = 
new HashMap<>(restoredMetaInfos.size());
     
    -                   int kvStateId = 0;
    -                   for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
    -                           stateBackend.kvStateInformation.entrySet()) {
    +                   for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
restoredMetaInfo : restoredMetaInfos) {
     
    -                           
metaInfoSnapshots.add(column.getValue().f1.snapshot());
    +                           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
    +                                   
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
     
    -                           //retrieve iterator for this k/v states
    -                           readOptions = new ReadOptions();
    -                           readOptions.setSnapshot(snapshot);
    +                           if (registeredColumn == null) {
    +                                   byte[] nameBytes = 
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
     
    -                           kvStateIterators.add(
    -                                   new 
Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), 
kvStateId));
    +                                   ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +                                           nameBytes,
    +                                           
rocksDBKeyedStateBackend.columnOptions);
     
    -                           ++kvStateId;
    -                   }
    +                                   RegisteredKeyedBackendStateMetaInfo<?, 
?> stateMetaInfo =
    +                                           new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                                                   
restoredMetaInfo.getStateType(),
    +                                                   
restoredMetaInfo.getName(),
    +                                                   
restoredMetaInfo.getNamespaceSerializer(),
    +                                                   
restoredMetaInfo.getStateSerializer());
     
    -                   KeyedBackendSerializationProxy<K> serializationProxy =
    -                           new KeyedBackendSerializationProxy<>(
    -                                   stateBackend.getKeySerializer(),
    -                                   metaInfoSnapshots,
    -                                   
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
stateBackend.keyGroupCompressionDecorator));
    +                                   
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
 restoredMetaInfo);
     
    -                   serializationProxy.write(outputView);
    -           }
    +                                   ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
     
    -           private void writeKVStateData() throws IOException, 
InterruptedException {
    +                                   registeredColumn = new 
Tuple2<>(columnFamily, stateMetaInfo);
    +                                   
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), 
registeredColumn);
     
    -                   byte[] previousKey = null;
    -                   byte[] previousValue = null;
    -                   OutputStream kgOutStream = null;
    -                   DataOutputView kgOutView = null;
    +                           } else {
    +                                   // TODO with eager state registration 
in place, check here for serializer migration strategies
    +                           }
    +                           
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
    +                   }
    +           }
     
    -                   try {
    -                           // Here we transfer ownership of RocksIterators 
to the RocksDBMergeIterator
    -                           try (RocksDBMergeIterator mergeIterator = new 
RocksDBMergeIterator(
    -                                   kvStateIterators, 
stateBackend.keyGroupPrefixBytes)) {
    +           /**
    +            * Restore the KV-state / ColumnFamily data for all key-groups 
referenced by the current state handle.
    +            *
    +            * @throws IOException
    +            * @throws RocksDBException
    +            */
    +           private void restoreKVStateData() throws IOException, 
RocksDBException {
    +                   //for all key-groups in the current state handle...
    +                   for (Tuple2<Integer, Long> keyGroupOffset : 
currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
    +                           int keyGroup = keyGroupOffset.f0;
     
    -                                   // handover complete, null out to 
prevent double close
    -                                   kvStateIterators = null;
    +                           // Check that restored key groups all belong to 
the backend
    +                           
Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
    +                                   "The key group must belong to the 
backend");
     
    -                                   //preamble: setup with first key-group 
as our lookahead
    -                                   if (mergeIterator.isValid()) {
    -                                           //begin first key-group by 
recording the offset
    -                                           
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
    -                                           //write the k/v-state id as 
metadata
    -                                           kgOutStream = 
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
    -                                           kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
    +                           long offset = keyGroupOffset.f1;
    +                           //not empty key-group?
    +                           if (0L != offset) {
    +                                   currentStateHandleInStream.seek(offset);
    +                                   try (InputStream compressedKgIn = 
keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
 {
    +                                           DataInputViewStreamWrapper 
compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
                                                //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
    -                                           
kgOutView.writeShort(mergeIterator.kvStateId());
    -                                           previousKey = 
mergeIterator.key();
    -                                           previousValue = 
mergeIterator.value();
    -                                           mergeIterator.next();
    +                                           int kvStateId = 
compressedKgInputView.readShort();
    +                                           ColumnFamilyHandle handle = 
currentStateHandleKVStateColumnFamilies.get(kvStateId);
    +                                           //insert all k/v pairs into DB
    +                                           boolean keyGroupHasMoreKeys = 
true;
    +                                           while (keyGroupHasMoreKeys) {
    +                                                   byte[] key = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
    +                                                   byte[] value = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
    +                                                   if 
(RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
    +                                                           //clear the 
signal bit in the key to make it ready for insertion again
    +                                                           
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
    +                                                           
rocksDBKeyedStateBackend.db.put(handle, key, value);
    +                                                           //TODO this 
could be aware of keyGroupPrefixBytes and write only one byte if possible
    +                                                           kvStateId = 
RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
    +                                                                   & 
compressedKgInputView.readShort();
    +                                                           if 
(RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
    +                                                                   
keyGroupHasMoreKeys = false;
    +                                                           } else {
    +                                                                   handle 
= currentStateHandleKVStateColumnFamilies.get(kvStateId);
    +                                                           }
    +                                                   } else {
    +                                                           
rocksDBKeyedStateBackend.db.put(handle, key, value);
    +                                                   }
    +                                           }
                                        }
    +                           }
    +                   }
    +           }
    +   }
     
    -                                   //main loop: write k/v pairs ordered by 
(key-group, kv-state), thereby tracking key-group offsets.
    -                                   while (mergeIterator.isValid()) {
    +   /**
    +    * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from an incremental snapshot.
    +    */
    +   private static class RocksDBIncrementalRestoreOperation<T> {
     
    -                                           assert 
(!hasMetaDataFollowsFlag(previousKey));
    +           private final RocksDBKeyedStateBackend<T> stateBackend;
     
    -                                           //set signal in first key byte 
that meta data will follow in the stream after this k/v pair
    -                                           if 
(mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
    +           private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
    +                   this.stateBackend = stateBackend;
    +           }
     
    -                                                   //be cooperative and 
check for interruption from time to time in the hot loop
    -                                                   checkInterrupted();
    +           /**
    +            * Root method that branches for different implementations of 
{@link KeyedStateHandle}.
    +            */
    +           void restore(Collection<KeyedStateHandle> restoreStateHandles) 
throws Exception {
     
    -                                                   
setMetaDataFollowsFlagInKey(previousKey);
    -                                           }
    +                   boolean hasExtraKeys = (restoreStateHandles.size() > 1 
||
    +                           
!Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(), 
stateBackend.keyGroupRange));
     
    -                                           writeKeyValuePair(previousKey, 
previousValue, kgOutView);
    +                   if (hasExtraKeys) {
    +                           stateBackend.createDB();
    +                   }
     
    -                                           //write meta data if we have to
    -                                           if 
(mergeIterator.isNewKeyGroup()) {
    -                                                   //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
    -                                                   
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
    -                                                   // this will just close 
the outer stream
    -                                                   kgOutStream.close();
    -                                                   //begin new key-group
    -                                                   
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
    -                                                   //write the kev-state
    -                                                   //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
    -                                                   kgOutStream = 
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
    -                                                   kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
    -                                                   
kgOutView.writeShort(mergeIterator.kvStateId());
    -                                           } else if 
(mergeIterator.isNewKeyValueState()) {
    -                                                   //write the k/v-state
    -                                                   //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
    -                                                   
kgOutView.writeShort(mergeIterator.kvStateId());
    -                                           }
    +                   for (KeyedStateHandle rawStateHandle : 
restoreStateHandles) {
     
    -                                           //request next k/v pair
    -                                           previousKey = 
mergeIterator.key();
    -                                           previousValue = 
mergeIterator.value();
    -                                           mergeIterator.next();
    -                                   }
    +                           if (rawStateHandle instanceof 
IncrementalKeyedStateHandle) {
    +                                   
restoreInstance((IncrementalKeyedStateHandle) rawStateHandle, hasExtraKeys);
    +                           } else if (rawStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
    +                                   Preconditions.checkState(!hasExtraKeys, 
"Cannot recover from local state after rescaling.");
    +                                   
restoreInstance((IncrementalLocalKeyedStateHandle) rawStateHandle);
    +                           } else {
    +                                   throw new 
IllegalStateException("Unexpected state handle type, " +
    +                                           "expected " + 
IncrementalKeyedStateHandle.class +
    +                                           ", but found " + 
rawStateHandle.getClass());
                                }
    +                   }
    +           }
     
    -                           //epilogue: write last key-group
    -                           if (previousKey != null) {
    -                                   assert 
(!hasMetaDataFollowsFlag(previousKey));
    -                                   
setMetaDataFollowsFlagInKey(previousKey);
    -                                   writeKeyValuePair(previousKey, 
previousValue, kgOutView);
    -                                   //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
    -                                   
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
    -                                   // this will just close the outer stream
    -                                   kgOutStream.close();
    -                                   kgOutStream = null;
    -                           }
    +           /**
    +            * Recovery from remote incremental state.
    +            */
    +           private void restoreInstance(
    +                   IncrementalKeyedStateHandle restoreStateHandle,
    +                   boolean hasExtraKeys) throws Exception {
    +
    +                   // read state data
    +                   Path temporaryRestoreInstancePath = new Path(
    +                           stateBackend.instanceBasePath.getAbsolutePath(),
    +                           UUID.randomUUID().toString());
    +
    +                   try {
    +
    +                           
transferAllStateDataToDirectory(restoreStateHandle, 
temporaryRestoreInstancePath);
     
    +                           // read meta data
    +                           
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
=
    +                                   
readMetaData(restoreStateHandle.getMetaStateHandle());
    +
    +                           List<ColumnFamilyDescriptor> 
columnFamilyDescriptors =
    +                                   
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
    +
    +                           if (hasExtraKeys) {
    +                                   
restoreKeyGroupsShardWithTemporaryHelperInstance(
    +                                           temporaryRestoreInstancePath,
    +                                           columnFamilyDescriptors,
    +                                           stateMetaInfoSnapshots);
    +                           } else {
    +
    +                                   // since we transferred all remote 
state to a local directory, we can use the same code as for
    +                                   // local recovery.
    +                                   IncrementalLocalKeyedStateHandle 
localKeyedStateHandle = new IncrementalLocalKeyedStateHandle(
    +                                           
restoreStateHandle.getBackendIdentifier(),
    +                                           
restoreStateHandle.getCheckpointId(),
    +                                           new 
DirectoryStateHandle(temporaryRestoreInstancePath),
    +                                           
restoreStateHandle.getKeyGroupRange(),
    +                                           
restoreStateHandle.getMetaStateHandle(),
    +                                           
restoreStateHandle.getSharedState().keySet());
    +
    +                                   restoreLocalStateIntoFullInstance(
    +                                           localKeyedStateHandle,
    +                                           columnFamilyDescriptors,
    +                                           stateMetaInfoSnapshots);
    +                           }
                        } finally {
    -                           // this will just close the outer stream
    -                           IOUtils.closeQuietly(kgOutStream);
    +                           FileSystem restoreFileSystem = 
temporaryRestoreInstancePath.getFileSystem();
    +                           if 
(restoreFileSystem.exists(temporaryRestoreInstancePath)) {
    +                                   
restoreFileSystem.delete(temporaryRestoreInstancePath, true);
    +                           }
                        }
                }
     
    -           private void writeKeyValuePair(byte[] key, byte[] value, 
DataOutputView out) throws IOException {
    -                   BytePrimitiveArraySerializer.INSTANCE.serialize(key, 
out);
    -                   BytePrimitiveArraySerializer.INSTANCE.serialize(value, 
out);
    -           }
    +           /**
    +            * Recovery from local incremental state.
    +            */
    +           private void restoreInstance(IncrementalLocalKeyedStateHandle 
localKeyedStateHandle) throws Exception {
    +                   // read meta data
    +                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots =
    +                           
readMetaData(localKeyedStateHandle.getMetaDataState());
     
    -           static void setMetaDataFollowsFlagInKey(byte[] key) {
    -                   key[0] |= FIRST_BIT_IN_BYTE_MASK;
    -           }
    +                   List<ColumnFamilyDescriptor> columnFamilyDescriptors =
    +                           
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
     
    -           static void clearMetaDataFollowsFlag(byte[] key) {
    -                   key[0] &= 
(~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
    +                   restoreLocalStateIntoFullInstance(
    +                           localKeyedStateHandle,
    +                           columnFamilyDescriptors,
    +                           stateMetaInfoSnapshots);
                }
     
    -           static boolean hasMetaDataFollowsFlag(byte[] key) {
    -                   return 0 != (key[0] & 
RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
    -           }
    +           /**
    +            * This method recreates and registers all {@link 
ColumnFamilyDescriptor} from Flink's state meta data snapshot.
    +            */
    +           private List<ColumnFamilyDescriptor> 
createAndRegisterColumnFamilyDescriptors(
    +                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) {
     
    -           private static void checkInterrupted() throws 
InterruptedException {
    -                   if (Thread.currentThread().isInterrupted()) {
    -                           throw new InterruptedException("RocksDB 
snapshot interrupted.");
    +                   List<ColumnFamilyDescriptor> columnFamilyDescriptors =
    +                           new ArrayList<>(1 + 
stateMetaInfoSnapshots.size());
    +
    +                   for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
    +
    +                           ColumnFamilyDescriptor columnFamilyDescriptor = 
new ColumnFamilyDescriptor(
    +                                   
stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
    +                                   stateBackend.columnOptions);
    +
    +                           
columnFamilyDescriptors.add(columnFamilyDescriptor);
    +                           
stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), 
stateMetaInfoSnapshot);
                        }
    +                   return columnFamilyDescriptors;
                }
    -   }
     
    -   private static final class RocksDBIncrementalSnapshotOperation<K> {
    +           /**
    +            * This method implements the core of the restore logic that 
unifies how local and remote state are recovered.
    +            */
    +           private void restoreLocalStateIntoFullInstance(
    +                   IncrementalLocalKeyedStateHandle restoreStateHandle,
    +                   List<ColumnFamilyDescriptor> columnFamilyDescriptors,
    +                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) throws Exception {
    +                   // pick up again the old backend id, so the we can 
reference existing state
    +                   stateBackend.backendUID = 
restoreStateHandle.getBackendIdentifier();
    +
    +                   LOG.debug("Restoring keyed backend uid in operator {} 
from incremental snapshot to {}.",
    +                           stateBackend.operatorIdentifier, 
stateBackend.backendUID);
    +
    +                   // create hard links in the instance directory
    +                   if (!stateBackend.instanceRocksDBPath.mkdirs()) {
    +                           throw new IOException("Could not create RocksDB 
data directory.");
    +                   }
     
    -           /** The backend which we snapshot. */
    -           private final RocksDBKeyedStateBackend<K> stateBackend;
    +                   Path restoreSourcePath = 
restoreStateHandle.getDirectoryStateHandle().getDirectory();
    +                   restoreInstanceDirectoryFromPath(restoreSourcePath);
     
    -           /** Stream factory that creates the outpus streams to DFS. */
    -           private final CheckpointStreamFactory checkpointStreamFactory;
    +                   List<ColumnFamilyHandle> columnFamilyHandles =
    +                           new ArrayList<>(1 + 
columnFamilyDescriptors.size());
     
    -           /** Id for the current checkpoint. */
    -           private final long checkpointId;
    +                   stateBackend.db = stateBackend.openDB(
    +                           
stateBackend.instanceRocksDBPath.getAbsolutePath(),
    +                           columnFamilyDescriptors, columnFamilyHandles);
     
    -           /** Timestamp for the current checkpoint. */
    -           private final long checkpointTimestamp;
    +                   // extract and store the default column family which is 
located at the first index
    +                   stateBackend.defaultColumnFamily = 
columnFamilyHandles.remove(0);
     
    -           /** All sst files that were part of the last previously 
completed checkpoint. */
    -           private Set<StateHandleID> baseSstFiles;
    +                   for (int i = 0; i < columnFamilyDescriptors.size(); 
++i) {
    +                           RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
     
    -           /** The state meta data. */
    -           private final 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
= new ArrayList<>();
    +                           ColumnFamilyHandle columnFamilyHandle = 
columnFamilyHandles.get(i);
    +                           RegisteredKeyedBackendStateMetaInfo<?, ?> 
stateMetaInfo =
    +                                   new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                                           
stateMetaInfoSnapshot.getStateType(),
    +                                           stateMetaInfoSnapshot.getName(),
    +                                           
stateMetaInfoSnapshot.getNamespaceSerializer(),
    +                                           
stateMetaInfoSnapshot.getStateSerializer());
     
    -           private FileSystem backupFileSystem;
    -           private Path backupPath;
    +                           stateBackend.kvStateInformation.put(
    +                                   stateMetaInfoSnapshot.getName(),
    +                                   new Tuple2<>(columnFamilyHandle, 
stateMetaInfo));
    +                   }
     
    -           // Registry for all opened i/o streams
    -           private final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
    +                   // use the restore sst files as the base for succeeding 
checkpoints
    +                   synchronized (stateBackend.materializedSstFiles) {
    +                           stateBackend.materializedSstFiles.put(
    +                                   restoreStateHandle.getCheckpointId(),
    +                                   
restoreStateHandle.getSharedStateHandleIDs());
    +                   }
     
    -           // new sst files since the last completed checkpoint
    -           private final Map<StateHandleID, StreamStateHandle> sstFiles = 
new HashMap<>();
    +                   stateBackend.lastCompletedCheckpointId = 
restoreStateHandle.getCheckpointId();
    +           }
     
    -           // handles to the misc files in the current snapshot
    -           private final Map<StateHandleID, StreamStateHandle> miscFiles = 
new HashMap<>();
    +           /**
    +            * This recreates the new working directory of the recovered 
RocksDB instance and links/copies the contents from
    +            * a local state.
    +            */
    +           private void restoreInstanceDirectoryFromPath(Path source) 
throws IOException {
     
    -           // This lease protects from concurrent disposal of the native 
rocksdb instance.
    -           private final ResourceGuard.Lease dbLease;
    +                   FileSystem fileSystem = source.getFileSystem();
     
    -           private StreamStateHandle metaStateHandle = null;
    +                   final FileStatus[] fileStatuses = 
fileSystem.listStatus(source);
     
    -           private RocksDBIncrementalSnapshotOperation(
    -                   RocksDBKeyedStateBackend<K> stateBackend,
    -                   CheckpointStreamFactory checkpointStreamFactory,
    -                   long checkpointId,
    -                   long checkpointTimestamp) throws IOException {
    +                   if (fileStatuses == null) {
    +                           throw new IOException("Cannot list file 
statues. Directory " + source + " does not exist.");
    +                   }
     
    -                   this.stateBackend = stateBackend;
    -                   this.checkpointStreamFactory = checkpointStreamFactory;
    -                   this.checkpointId = checkpointId;
    -                   this.checkpointTimestamp = checkpointTimestamp;
    -                   this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
    +                   for (FileStatus fileStatus : fileStatuses) {
    +                           final Path filePath = fileStatus.getPath();
    +                           final String fileName = filePath.getName();
    +                           File restoreFile = new File(source.getPath(), 
fileName);
    +                           File targetFile = new 
File(stateBackend.instanceRocksDBPath.getPath(), fileName);
    +                           if (fileName.endsWith(SST_FILE_SUFFIX)) {
    +                                   // hardlink'ing the immutable sst-files.
    +                                   Files.createLink(targetFile.toPath(), 
restoreFile.toPath());
    +                           } else {
    +                                   // true copy for all other files.
    +                                   Files.copy(restoreFile.toPath(), 
targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
    +                           }
    +                   }
                }
     
    -           private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
    +           /**
    +            * Reads Flink's state meta data file from the state handle.
    +            */
    +           private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> readMetaData(
    +                   StreamStateHandle metaStateHandle) throws Exception {
    +
                        FSDataInputStream inputStream = null;
    -                   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
     
                        try {
    -                           final byte[] buffer = new byte[8 * 1024];
    -
    -                           FileSystem backupFileSystem = 
backupPath.getFileSystem();
    -                           inputStream = backupFileSystem.open(filePath);
    -                           
closeableRegistry.registerCloseable(inputStream);
    -
    -                           outputStream = checkpointStreamFactory
    -                                   
.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
    -                           
closeableRegistry.registerCloseable(outputStream);
    -
    -                           while (true) {
    -                                   int numBytes = inputStream.read(buffer);
    +                           inputStream = metaStateHandle.openInputStream();
    +                           
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
     
    -                                   if (numBytes == -1) {
    -                                           break;
    -                                   }
    +                           KeyedBackendSerializationProxy<T> 
serializationProxy =
    +                                   new 
KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
    +                           DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
    +                           serializationProxy.read(in);
     
    -                                   outputStream.write(buffer, 0, numBytes);
    -                           }
    +                           // check for key serializer compatibility; this 
also reconfigures the
    +                           // key serializer to be compatible, if it is 
required and is possible
    +                           if 
(CompatibilityUtil.resolveCompatibilityResult(
    +                                   serializationProxy.getKeySerializer(),
    +                                   UnloadableDummyTypeSerializer.class,
    +                                   
serializationProxy.getKeySerializerConfigSnapshot(),
    +                                   stateBackend.keySerializer)
    +                                   .isRequiresMigration()) {
     
    -                           StreamStateHandle result = null;
    -                           if 
(closeableRegistry.unregisterCloseable(outputStream)) {
    -                                   result = 
outputStream.closeAndGetHandle();
    -                                   outputStream = null;
    +                                   // TODO replace with state migration; 
note that key hash codes need to remain the same after migration
    +                                   throw new StateMigrationException("The 
new key serializer is not compatible to read previous keys. " +
    +                                           "Aborting now since state 
migration is currently not available");
                                }
    -                           return result;
     
    +                           return 
serializationProxy.getStateMetaInfoSnapshots();
                        } finally {
    -                           if (inputStream != null && 
closeableRegistry.unregisterCloseable(inputStream)) {
    +                           if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
                                        inputStream.close();
                                }
    -
    -                           if (outputStream != null && 
closeableRegistry.unregisterCloseable(outputStream)) {
    -                                   outputStream.close();
    -                           }
                        }
                }
     
    -           private StreamStateHandle materializeMetaData() throws 
Exception {
    -                   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
    -
    -                   try {
    -                           outputStream = checkpointStreamFactory
    -                                   
.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
    -                           
closeableRegistry.registerCloseable(outputStream);
    +           private void transferAllStateDataToDirectory(
    +                   IncrementalKeyedStateHandle restoreStateHandle,
    +                   Path dest) throws IOException {
     
    -                           //no need for compression scheme support 
because sst-files are already compressed
    -                           KeyedBackendSerializationProxy<K> 
serializationProxy =
    -                                   new KeyedBackendSerializationProxy<>(
    -                                           stateBackend.keySerializer,
    -                                           stateMetaInfoSnapshots,
    -                                           false);
    +                   final Map<StateHandleID, StreamStateHandle> sstFiles =
    +                           restoreStateHandle.getSharedState();
    +                   final Map<StateHandleID, StreamStateHandle> miscFiles =
    +                           restoreStateHandle.getPrivateState();
     
    -                           DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
    +                   transferAllDataFromStateHandles(sstFiles, dest);
    +                   transferAllDataFromStateHandles(miscFiles, dest);
    +           }
     
    -                           serializationProxy.write(out);
    +           /**
    +            * Copies all the files from the given stream state handles to 
the given path, renaming the files w.r.t. their
    +            * {@link StateHandleID}.
    +            */
    +           private void transferAllDataFromStateHandles(
    +                   Map<StateHandleID, StreamStateHandle> stateHandleMap,
    +                   Path restoreInstancePath) throws IOException {
     
    -                           StreamStateHandle result = null;
    -                           if 
(closeableRegistry.unregisterCloseable(outputStream)) {
    -                                   result = 
outputStream.closeAndGetHandle();
    -                                   outputStream = null;
    -                           }
    -                           return result;
    -                   } finally {
    -                           if (outputStream != null) {
    -                                   if 
(closeableRegistry.unregisterCloseable(outputStream)) {
    -                                           outputStream.close();
    -                                   }
    -                           }
    +                   for (Map.Entry<StateHandleID, StreamStateHandle> entry 
: stateHandleMap.entrySet()) {
    +                           StateHandleID stateHandleID = entry.getKey();
    +                           StreamStateHandle remoteFileHandle = 
entry.getValue();
    +                           copyStateDataHandleData(new 
Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
                        }
                }
     
    -           void takeSnapshot() throws Exception {
    +           /**
    +            * Copies the file from a single state handle to the given path.
    +            */
    +           private void copyStateDataHandleData(
    +                   Path restoreFilePath,
    +                   StreamStateHandle remoteFileHandle) throws IOException {
     
    -                   final long lastCompletedCheckpoint;
    +                   FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
     
    -                   // use the last completed checkpoint as the comparison 
base.
    -                   synchronized (stateBackend.materializedSstFiles) {
    -                           lastCompletedCheckpoint = 
stateBackend.lastCompletedCheckpointId;
    -                           baseSstFiles = 
stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
    -                   }
    +                   FSDataInputStream inputStream = null;
    +                   FSDataOutputStream outputStream = null;
     
    -                   LOG.trace("Taking incremental snapshot for checkpoint 
{}. Snapshot is based on last completed checkpoint {} " +
    -                           "assuming the following (shared) files as base: 
{}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
    +                   try {
    +                           inputStream = 
remoteFileHandle.openInputStream();
    +                           
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
     
    -                   // save meta data
    -                   for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
    -                           : stateBackend.kvStateInformation.entrySet()) {
    -                           
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
    -                   }
    +                           outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
    +                           
stateBackend.cancelStreamRegistry.registerCloseable(outputStream);
     
    -                   // save state data
    -                   backupPath = new 
Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +                           byte[] buffer = new byte[8 * 1024];
    +                           while (true) {
    +                                   int numBytes = inputStream.read(buffer);
    +                                   if (numBytes == -1) {
    +                                           break;
    +                                   }
     
    -                   LOG.trace("Local RocksDB checkpoint goes to backup path 
{}.", backupPath);
    +                                   outputStream.write(buffer, 0, numBytes);
    +                           }
    +                   } finally {
    +                           if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
    +                                   inputStream.close();
    +                           }
     
    -                   backupFileSystem = backupPath.getFileSystem();
    -                   if (backupFileSystem.exists(backupPath)) {
    -                           throw new IllegalStateException("Unexpected 
existence of the backup directory.");
    +                           if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
    +                                   outputStream.close();
    +                           }
                        }
    -
    -                   // create hard links of living files in the checkpoint 
path
    -                   Checkpoint checkpoint = 
Checkpoint.create(stateBackend.db);
    -                   checkpoint.createCheckpoint(backupPath.getPath());
                }
     
    -           KeyedStateHandle materializeSnapshot() throws Exception {
    +           /**
    +            * In case of rescaling, this method creates a temporary 
RocksDB instance for a key-groups shard. All contents
    +            * from the temporary instance are copied into the real restore 
instance and then the temporary instance is
    +            * discarded.
    +            */
    +           private void restoreKeyGroupsShardWithTemporaryHelperInstance(
    +                   Path restoreInstancePath,
    +                   List<ColumnFamilyDescriptor> columnFamilyDescriptors,
    +                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) throws Exception {
     
    -                   
stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
    +                   List<ColumnFamilyHandle> columnFamilyHandles =
    +                           new ArrayList<>(1 + 
columnFamilyDescriptors.size());
     
    -                   // write meta data
    -                   metaStateHandle = materializeMetaData();
    +                   try (RocksDB restoreDb = stateBackend.openDB(
    +                           restoreInstancePath.getPath(),
    +                           columnFamilyDescriptors,
    +                           columnFamilyHandles)) {
     
    -                   // write state data
    -                   
Preconditions.checkState(backupFileSystem.exists(backupPath));
    +                           final ColumnFamilyHandle defaultColumnFamily = 
columnFamilyHandles.remove(0);
     
    -                   FileStatus[] fileStatuses = 
backupFileSystem.listStatus(backupPath);
    -                   if (fileStatuses != null) {
    -                           for (FileStatus fileStatus : fileStatuses) {
    -                                   final Path filePath = 
fileStatus.getPath();
    -                                   final String fileName = 
filePath.getName();
    -                                   final StateHandleID stateHandleID = new 
StateHandleID(fileName);
    +                           
Preconditions.checkState(columnFamilyHandles.size() == 
columnFamilyDescriptors.size());
     
    -                                   if (fileName.endsWith(SST_FILE_SUFFIX)) 
{
    -                                           final boolean existsAlready =
    -                                                   baseSstFiles != null && 
baseSstFiles.contains(stateHandleID);
    +                           try {
    +                                   for (int i = 0; i < 
columnFamilyDescriptors.size(); ++i) {
    +                                           ColumnFamilyHandle 
columnFamilyHandle = columnFamilyHandles.get(i);
    +                                           ColumnFamilyDescriptor 
columnFamilyDescriptor = columnFamilyDescriptors.get(i);
    +                                           
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
     
    -                                           if (existsAlready) {
    -                                                   // we introduce a 
placeholder state handle, that is replaced with the
    -                                                   // original from the 
shared state registry (created from a previous checkpoint)
    -                                                   sstFiles.put(
    -                                                           stateHandleID,
    -                                                           new 
PlaceholderStreamStateHandle());
    -                                           } else {
    -                                                   
sstFiles.put(stateHandleID, materializeStateData(filePath));
    -                                           }
    -                                   } else {
    -                                           StreamStateHandle fileHandle = 
materializeStateData(filePath);
    -                                           miscFiles.put(stateHandleID, 
fileHandle);
    -                                   }
    -                           }
    -                   }
    +                                           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
    +                                                   
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
     
    -                   synchronized (stateBackend.materializedSstFiles) {
    -                           
stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
    -                   }
    +                                           if (null == 
registeredStateMetaInfoEntry) {
     
    -                   return new IncrementalKeyedStateHandle(
    -                           stateBackend.backendUID,
    -                           stateBackend.keyGroupRange,
    -                           checkpointId,
    -                           sstFiles,
    -                           miscFiles,
    -                           metaStateHandle);
    -           }
    +                                                   
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
    +                                                           new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                                                                   
stateMetaInfoSnapshot.getStateType(),
    +                                                                   
stateMetaInfoSnapshot.getName(),
    +                                                                   
stateMetaInfoSnapshot.getNamespaceSerializer(),
    +                                                                   
stateMetaInfoSnapshot.getStateSerializer());
     
    -           void stop() {
    +                                                   
registeredStateMetaInfoEntry =
    +                                                           new Tuple2<>(
    +                                                                   
stateBackend.db.createColumnFamily(columnFamilyDescriptor),
    +                                                                   
stateMetaInfo);
     
    -                   if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
    -                           try {
    -                                   closeableRegistry.close();
    -                           } catch (IOException e) {
    -                                   LOG.warn("Could not properly close io 
streams.", e);
    -                           }
    -                   }
    -           }
    +                                                   
stateBackend.kvStateInformation.put(
    +                                                           
stateMetaInfoSnapshot.getName(),
    +                                                           
registeredStateMetaInfoEntry);
    +                                           }
     
    -           void releaseResources(boolean canceled) {
    +                                           ColumnFamilyHandle 
targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
     
    -                   dbLease.close();
    +                                           try (RocksIterator iterator = 
restoreDb.newIterator(columnFamilyHandle)) {
     
    -                   if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
    -                           try {
    -                                   closeableRegistry.close();
    -                           } catch (IOException e) {
    -                                   LOG.warn("Exception on closing 
registry.", e);
    -                           }
    -                   }
    +                                                   int startKeyGroup = 
stateBackend.getKeyGroupRange().getStartKeyGroup();
    +                                                   byte[] 
startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
    +                                                   for (int j = 0; j < 
stateBackend.keyGroupPrefixBytes; ++j) {
    +                                                           
startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> 
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
    +                                                   }
     
    -                   if (backupPath != null) {
    -                           try {
    -                                   if 
(backupFileSystem.exists(backupPath)) {
    +                                                   
iterator.seek(startKeyGroupPrefixBytes);
     
    -                                           LOG.trace("Deleting local 
RocksDB backup path {}.", backupPath);
    -                                           
backupFileSystem.delete(backupPath, true);
    -                                   }
    -                           } catch (Exception e) {
    -                                   LOG.warn("Could not properly delete the 
checkpoint directory.", e);
    -                           }
    -                   }
    +                                                   while 
(iterator.isValid()) {
     
    -                   if (canceled) {
    -                           Collection<StateObject> statesToDiscard =
    -                                   new ArrayList<>(1 + miscFiles.size() + 
sstFiles.size());
    +                                                           int keyGroup = 
0;
    +                                                           for (int j = 0; 
j < stateBackend.keyGroupPrefixBytes; ++j) {
    +                                                                   
keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
    +                                                           }
     
    -                           statesToDiscard.add(metaStateHandle);
    -                           statesToDiscard.addAll(miscFiles.values());
    -                           statesToDiscard.addAll(sstFiles.values());
    +                                                           if 
(stateBackend.keyGroupRange.contains(keyGroup)) {
    +                                                                   
stateBackend.db.put(targetColumnFamilyHandle,
    +                                                                           
iterator.key(), iterator.value());
    +                                                           }
     
    -                           try {
    -                                   
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
    -                           } catch (Exception e) {
    -                                   LOG.warn("Could not properly discard 
states.", e);
    +                                                           iterator.next();
    +                                                   }
    +                                           } // releases native iterator 
resources
    +                                   }
    +                           } finally {
    +
    +                                   //release native tmp db column family 
resources
    +                                   
IOUtils.closeQuietly(defaultColumnFamily);
    +
    +                                   for (ColumnFamilyHandle 
flinkColumnFamilyHandle : columnFamilyHandles) {
    +                                           
IOUtils.closeQuietly(flinkColumnFamilyHandle);
    +                                   }
                                }
    -                   }
    +                   } // releases native tmp db resources
                }
        }
     
    -   @Override
    -   public void restore(Collection<KeyedStateHandle> restoreState) throws 
Exception {
    -           LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
    +   // 
------------------------------------------------------------------------
    +   //  State factories
    +   // 
------------------------------------------------------------------------
     
    -           if (LOG.isDebugEnabled()) {
    -                   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
    -           }
    +   /**
    +    * Creates a column family handle for use with a k/v state. When 
restoring from a snapshot
    +    * we don't restore the individual k/v states, just the global RocksDB 
database and the
    +    * list of column families. When a k/v state is first requested we 
check here whether we
    +    * already have a column family for that and return it or create a new 
one if it doesn't exist.
    +    *
    +    * <p>This also checks whether the {@link StateDescriptor} for a state 
matches the one
    +    * that we checkpointed, i.e. is already in the map of column families.
    +    */
    +   @SuppressWarnings("rawtypes, unchecked")
    +   protected <N, S> ColumnFamilyHandle getColumnFamily(
    +           StateDescriptor<?, S> descriptor, TypeSerializer<N> 
namespaceSerializer) throws IOException, StateMigrationException {
     
    -           // clear all meta data
    -           kvStateInformation.clear();
    -           restoredKvStateMetaInfos.clear();
    +           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
    +                   kvStateInformation.get(descriptor.getName());
     
    -           try {
    -                   if (restoreState == null || restoreState.isEmpty()) {
    -                           createDB();
    -                   } else if (restoreState.iterator().next() instanceof 
IncrementalKeyedStateHandle) {
    -                           RocksDBIncrementalRestoreOperation<K> 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
    -                           restoreOperation.restore(restoreState);
    -                   } else {
    -                           RocksDBFullRestoreOperation<K> restoreOperation 
= new RocksDBFullRestoreOperation<>(this);
    -                           restoreOperation.doRestore(restoreState);
    +           RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                   descriptor.getType(),
    +                   descriptor.getName(),
    +                   namespaceSerializer,
    +                   descriptor.getSerializer());
    +
    +           if (stateInfo != null) {
    +                   // TODO with eager registration in place, these checks 
should be moved to restore()
    +
    +                   RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
restoredMetaInfo =
    +                           
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) 
restoredKvStateMetaInfos.get(descriptor.getName());
    +
    +                   Preconditions.checkState(
    +                           Objects.equals(newMetaInfo.getName(), 
restoredMetaInfo.getName()),
    +                           "Incompatible state names. " +
    +                                   "Was [" + restoredMetaInfo.getName() + 
"], " +
    +                                   "registered with [" + 
newMetaInfo.getName() + "].");
    +
    +                   if (!Objects.equals(newMetaInfo.getStateType(), 
StateDescriptor.Type.UNKNOWN)
    +                           && 
!Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) 
{
    +
    +                           Preconditions.checkState(
    +                                   newMetaInfo.getStateType() == 
restoredMetaInfo.getStateType(),
    +                                   "Incompatible state types. " +
    +                                           "Was [" + 
restoredMetaInfo.getStateType() + "], " +
    +                                           "registered with [" + 
newMetaInfo.getStateType() + "].");
                        }
    -           } catch (Exception ex) {
    -                   dispose();
    -                   throw ex;
    +
    +                   // check compatibility results to determine if state 
migration is required
    +                   CompatibilityResult<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
    +                           restoredMetaInfo.getNamespaceSerializer(),
    +                           null,
    +                           
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
    +                           newMetaInfo.getNamespaceSerializer());
    +
    +                   CompatibilityResult<S> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
    +                           restoredMetaInfo.getStateSerializer(),
    +                           UnloadableDummyTypeSerializer.class,
    +                           
restoredMetaInfo.getStateSerializerConfigSnapshot(),
    +                           newMetaInfo.getStateSerializer());
    +
    +                   if (namespaceCompatibility.isRequiresMigration() || 
stateCompatibility.isRequiresMigration()) {
    +                           // TODO state migration currently isn't 
possible.
    +                           throw new StateMigrationException("State 
migration isn't supported, yet.");
    +                   } else {
    +                           stateInfo.f1 = newMetaInfo;
    +                           return stateInfo.f0;
    +                   }
    +           }
    +
    +           byte[] nameBytes = 
descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
    +           
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, 
nameBytes),
    +                   "The chosen state name 'default' collides with the name 
of the default column family!");
    +
    +           ColumnFamilyDescriptor columnDescriptor = new 
ColumnFamilyDescriptor(nameBytes, columnOptions);
    +
    +           final ColumnFamilyHandle columnFamily;
    +
    +           try {
    +                   columnFamily = db.createColumnFamily(columnDescriptor);
    +           } catch (RocksDBException e) {
    +                   throw new IOException("Error creating 
ColumnFamilyHandle.", e);
                }
    +
    +           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
    +                   new Tuple2<>(columnFamily, newMetaInfo);
    +           Map rawAccess = kvStateInformation;
    +           rawAccess.put(descriptor.getName(), tuple);
    +           return columnFamily;
        }
     
        @Override
    -   public void notifyCheckpointComplete(long completedCheckpointId) {
    -           synchronized (materializedSstFiles) {
    -                   if (completedCheckpointId < lastCompletedCheckpointId) {
    -                           return;
    -                   }
    +   protected <N, T> InternalValueState<N, T> createValueState(
    +           TypeSerializer<N> namespaceSerializer,
    +           ValueStateDescriptor<T> stateDesc) throws Exception {
     
    -                   materializedSstFiles.keySet().removeIf(checkpointId -> 
checkpointId < completedCheckpointId);
    +           ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
     
    -                   lastCompletedCheckpointId = completedCheckpointId;
    -           }
    +           return new RocksDBValueState<>(columnFamily, 
namespaceSerializer,  stateDesc, this);
        }
     
    -   private void createDB() throws IOException {
    -           List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
    -           this.db = openDB(instanceRocksDBPath.getAbsolutePath(), 
Collections.emptyList(), columnFamilyHandles);
    -           this.defaultColumnFamily = columnFamilyHandles.get(0);
    +   @Override
    +   protected <N, T> InternalListState<N, T> createListState(
    +           TypeSerializer<N> namespaceSerializer,
    +           ListStateDescriptor<T> stateDesc) throws Exception {
    +
    +           ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
    +
    +           return new RocksDBListState<>(columnFamily, 
namespaceSerializer, stateDesc, this);
        }
     
    -   private RocksDB openDB(
    -           String path,
    -           List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
    -           List<ColumnFamilyHandle> stateColumnFamilyHandles) throws 
IOException {
    +   @Override
    +   protected <N, T> InternalReducingState<N, T> createReducingState(
    +           TypeSerializer<N> namespaceSerializer,
    +           ReducingStateDescriptor<T> stateDesc) throws Exception {
     
    -           List<ColumnFamilyDescriptor> columnFamilyDescriptors =
    -                   new ArrayList<>(1 + 
stateColumnFamilyDescriptors.size());
    +           ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
     
    -           columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
    +           return new RocksDBReducingState<>(columnFamily, 
namespaceSerializer,  stateDesc, this);
    +   }
     
    -           // we add the required descriptor for the default CF in last 
position.
    -           columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
    +   @Override
    +   protected <N, T, ACC, R> InternalAggregatingState<N, T, R> 
createAggregatingState(
    +           TypeSerializer<N> namespaceSerializer,
    +           AggregatingStateDescriptor<T, ACC, R> stateDesc) throws 
Exception {
     
    -           RocksDB dbRef;
    +           ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
    +           return new RocksDBAggregatingState<>(columnFamily, 
namespaceSerializer, stateDesc, this);
    +   }
     
    -           try {
    -                   dbRef = RocksDB.open(
    -                           Preconditions.checkNotNull(dbOptions),
    -                           Preconditions.checkNotNull(path),
    -                           columnFamilyDescriptors,
    -                           stateColumnFamilyHandles);
    -           } catch (RocksDBException e) {
    -                   throw new IOException("Error while opening RocksDB 
instance.", e);
    -           }
    +   @Override
    +   protected <N, T, ACC> InternalFoldingState<N, T, ACC> 
createFoldingState(
    +           TypeSerializer<N> namespaceSerializer,
    +           FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
     
    -           // requested + default CF
    -           Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
    -                   "Not all requested column family handles have been 
created");
    +           ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
     
    -           return dbRef;
    +           return new RocksDBFoldingState<>(columnFamily, 
namespaceSerializer, stateDesc, this);
    +   }
    +
    +   @Override
    +   protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
    +           TypeSerializer<N> namespaceSerializer,
    +           MapStateDescriptor<UK, UV> stateDesc) throws Exception {
    +
    +           ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
    +
    +           return new RocksDBMapState<>(columnFamily, namespaceSerializer, 
stateDesc, this);
        }
     
        /**
    -    * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from a snapshot.
    +    * Only visible for testing, DO NOT USE.
         */
    -   static final class RocksDBFullRestoreOperation<K> {
    +   public File getInstanceBasePath() {
    +           return instanceBasePath;
    +   }
     
    -           private final RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend;
    +   @Override
    +   public boolean supportsAsynchronousSnapshots() {
    +           return true;
    +   }
     
    -           /** Current key-groups state handle from which we restore 
key-groups. */
    -           private KeyGroupsStateHandle currentKeyGroupsStateHandle;
    -           /** Current input stream we obtained from 
currentKeyGroupsStateHandle. */
    -           private FSDataInputStream currentStateHandleInStream;
    -           /** Current data input view that wraps 
currentStateHandleInStream. */
    -           private DataInputView currentStateHandleInView;
    -           /** Current list of ColumnFamilyHandles for all column families 
we restore from currentKeyGroupsStateHandle. */
    -           private List<ColumnFamilyHandle> 
currentStateHandleKVStateColumnFamilies;
    -           /** The compression decorator that was used for writing the 
state, as determined by the meta data. */
    -           private StreamCompressionDecorator 
keygroupStreamCompressionDecorator;
    +   @VisibleForTesting
    +   @SuppressWarnings("unchecked")
    +   @Override
    +   public int numStateEntries() {
    +           int count = 0;
     
    -           /**
    -            * Creates a restore operation object for the given state 
backend instance.
    -            *
    -            * @param rocksDBKeyedStateBackend the state backend into which 
we restore
    -            */
    -           public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend) {
    -                   this.rocksDBKeyedStateBackend = 
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
    +           for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> column : 
kvStateInformation.values()) {
    +                   try (RocksIterator rocksIterator = 
db.newIterator(column.f0)) {
    +                           rocksIterator.seekToFirst();
    +
    +                           while (rocksIterator.isValid()) {
    +                                   count++;
    +                                   rocksIterator.next();
    +                           }
    +                   }
    +           }
    +
    +           return count;
    +   }
    +
    +
    +
    +   /**
    +    * Iterator that merges multiple RocksDB iterators to partition all 
states into contiguous key-groups.
    +    * The resulting iteration sequence is ordered by (key-group, kv-state).
    +    */
    +   @VisibleForTesting
    +   static final class RocksDBMergeIterator implements AutoCloseable {
    +
    +           private final PriorityQueue<MergeIterator> heap;
    +           private final int keyGroupPrefixByteCount;
    +           private boolean newKeyGroup;
    +           private boolean newKVState;
    +           private boolean valid;
    +
    +           private MergeIterator currentSubIterator;
    +
    +           private static final List<Comparator<MergeIterator>> 
COMPARATORS;
    +
    +           static {
    +                   int maxBytes = 4;
    +                   COMPARATORS = new ArrayList<>(maxBytes);
    +                   for (int i = 0; i < maxBytes; ++i) {
    +                           final int currentBytes = i;
    +                           COMPARATORS.add(new Comparator<MergeIterator>() 
{
    +                                   @Override
    +                                   public int compare(MergeIterator o1, 
MergeIterator o2) {
    +                                           int arrayCmpRes = 
compareKeyGroupsForByteArrays(
    +                                                   o1.currentKey, 
o2.currentKey, currentBytes);
    +                                           return arrayCmpRes == 0 ? 
o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
    +                                   }
    +                           });
    +                   }
    +           }
    +
    +           RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> 
kvStateIterators, final int keyGroupPrefixByteCount) {
    +                   Preconditions.checkNotNull(kvStateIterators);
    +                   this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
    +
    +                   Comparator<MergeIterator> iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount);
    +
    +                   if (kvStateIterators.size() > 0) {
    +                           PriorityQueue<MergeIterator> 
iteratorPriorityQueue =
    +                                   new 
PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
    +
    +                           for (Tuple2<RocksIterator, Integer> 
rocksIteratorWithKVStateId : kvStateIterators) {
    +                                   final RocksIterator rocksIterator = 
rocksIteratorWithKVStateId.f0;
    +                                   rocksIterator.seekToFirst();
    +                                   if (rocksIterator.isValid()) {
    +                                           iteratorPriorityQueue.offer(new 
MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
    +                                   } else {
    +                                           
IOUtils.closeQuietly(rocksIterator);
    +                                   }
    +                           }
    +
    +                           kvStateIterators.clear();
    +
    +                           this.heap = iteratorPriorityQueue;
    +                           this.valid = !heap.isEmpty();
    +                           this.currentSubIterator = heap.poll();
    +                   } else {
    +                           // creating a PriorityQueue of size 0 results 
in an exception.
    +                           this.heap = null;
    +                           this.valid = false;
    +                   }
    +
    +                   this.newKeyGroup = true;
    +                   this.newKVState = true;
                }
     
                /**
    -            * Restores all key-groups data that is referenced by the 
passed state handles.
    -            *
    -            * @param keyedStateHandles List of all key groups state 
handles that shall be restored.
    +            * Advance the iterator. Should only be called if {@link 
#isValid()} returned true. Valid can only chance after
    +            * calls to {@link #next()}.
                 */
    -           public void doRestore(Collection<KeyedStateHandle> 
keyedStateHandles)
    -                   throws IOException, StateMigrationException, 
RocksDBException {
    +           public void next() {
    +                   newKeyGroup = false;
    +                   newKVState = false;
     
    -                   rocksDBKeyedStateBackend.createDB();
    +                   final RocksIterator rocksIterator = 
currentSubIterator.getIterator();
    +                   rocksIterator.next();
     
    -                   for (KeyedStateHandle keyedStateHandle : 
keyedStateHandles) {
    -                           if (keyedStateHandle != null) {
    +                   byte[] oldKey = currentSubIterator.getCurrentKey();
    +                   if (rocksIterator.isValid()) {
    +                           currentSubIterator.currentKey = 
rocksIterator.key();
     
    -                                   if (!(keyedStateHandle instanceof 
KeyGroupsStateHandle)) {
    -                                           throw new 
IllegalStateException("Unexpected state handle type, " +
    -                                                   "expected: " + 
KeyGroupsStateHandle.class +
    -                                                   ", but found: " + 
keyedStateHandle.getClass());
    -                                   }
    -                                   this.currentKeyGroupsStateHandle = 
(KeyGroupsStateHandle) keyedStateHandle;
    -                                   restoreKeyGroupsInStateHandle();
    +                           if (isDifferentKeyGroup(oldKey, 
currentSubIterator.getCurrentKey())) {
    +                                   heap.offer(currentSubIterator);
    +                                   currentSubIterator 
    --- End diff --
    
    I know this part is an old code block and wasn't be change in this PR. But 
it is a bit odds to me, kvStateInformation is not a copied object and it can be 
changed when writeKVStateMetaData()is invoking ... If I am not wrong, this is a 
serious bug. Am I misunderstand something?


---

Reply via email to