Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r169638326
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
         * @param streamFactory The factory that we can use for writing our 
state to streams.
         * @param checkpointOptions Options for how to perform this checkpoint.
         * @return Future to the state handle of the snapshot data.
    -    * @throws Exception
    +    * @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
         */
        @Override
    -   public RunnableFuture<KeyedStateHandle> snapshot(
    +   public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
                final long checkpointId,
                final long timestamp,
                final CheckpointStreamFactory streamFactory,
                CheckpointOptions checkpointOptions) throws Exception {
     
    -           if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
    -                   enableIncrementalCheckpointing) {
    -                   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
    -           } else {
    -                   return snapshotFully(checkpointId, timestamp, 
streamFactory);
    -           }
    +           return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
        }
     
    -   private RunnableFuture<KeyedStateHandle> snapshotIncrementally(
    -           final long checkpointId,
    -           final long checkpointTimestamp,
    -           final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
    -
    -           if (db == null) {
    -                   throw new IOException("RocksDB closed.");
    -           }
    +   @Override
    +   public void restore(StateObjectCollection<KeyedStateHandle> 
restoreState) throws Exception {
    +           LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
     
    -           if (kvStateInformation.isEmpty()) {
    -                   if (LOG.isDebugEnabled()) {
    -                           LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
    -                                           checkpointTimestamp);
    -                   }
    -                   return DoneFuture.nullValue();
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
                }
     
    -           final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
    -                   new RocksDBIncrementalSnapshotOperation<>(
    -                           this,
    -                           checkpointStreamFactory,
    -                           checkpointId,
    -                           checkpointTimestamp);
    +           // clear all meta data
    +           kvStateInformation.clear();
    +           restoredKvStateMetaInfos.clear();
     
                try {
    -                   snapshotOperation.takeSnapshot();
    -           } catch (Exception e) {
    -                   snapshotOperation.stop();
    -                   snapshotOperation.releaseResources(true);
    -                   throw e;
    -           }
    -
    -           return new FutureTask<KeyedStateHandle>(
    -                   new Callable<KeyedStateHandle>() {
    -                           @Override
    -                           public KeyedStateHandle call() throws Exception 
{
    -                                   return 
snapshotOperation.materializeSnapshot();
    +                   if (restoreState == null || restoreState.isEmpty()) {
    +                           createDB();
    +                   } else {
    +                           KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
    +                           if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
    +                                   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
    +                                   RocksDBIncrementalRestoreOperation<K> 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
    +                                   restoreOperation.restore(restoreState);
    +                           } else {
    +                                   RocksDBFullRestoreOperation<K> 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
    +                                   
restoreOperation.doRestore(restoreState);
                                }
                        }
    -           ) {
    -                   @Override
    -                   public boolean cancel(boolean mayInterruptIfRunning) {
    -                           snapshotOperation.stop();
    -                           return super.cancel(mayInterruptIfRunning);
    -                   }
    -
    -                   @Override
    -                   protected void done() {
    -                           
snapshotOperation.releaseResources(isCancelled());
    -                   }
    -           };
    +           } catch (Exception ex) {
    +                   dispose();
    +                   throw ex;
    +           }
        }
     
    -   private RunnableFuture<KeyedStateHandle> snapshotFully(
    -           final long checkpointId,
    -           final long timestamp,
    -           final CheckpointStreamFactory streamFactory) throws Exception {
    -
    -           long startTime = System.currentTimeMillis();
    -           final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
    -
    -           final RocksDBFullSnapshotOperation<K> snapshotOperation;
    -
    -           if (kvStateInformation.isEmpty()) {
    -                   if (LOG.isDebugEnabled()) {
    -                           LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.", timestamp);
    -                   }
    +   @Override
    +   public void notifyCheckpointComplete(long completedCheckpointId) {
     
    -                   return DoneFuture.nullValue();
    +           if (!enableIncrementalCheckpointing) {
    +                   return;
                }
     
    -           snapshotOperation = new RocksDBFullSnapshotOperation<>(this, 
streamFactory, snapshotCloseableRegistry);
    -           snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
    -
    -           // implementation of the async IO operation, based on FutureTask
    -           AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable 
=
    -                   new 
AbstractAsyncCallableWithResources<KeyedStateHandle>() {
    -
    -                           @Override
    -                           protected void acquireResources() throws 
Exception {
    -                                   
cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
    -                                   
snapshotOperation.openCheckpointStream();
    -                           }
    +           synchronized (materializedSstFiles) {
     
    -                           @Override
    -                           protected void releaseResources() throws 
Exception {
    -                                   closeLocalRegistry();
    -                                   releaseSnapshotOperationResources();
    -                           }
    +                   if (completedCheckpointId < lastCompletedCheckpointId) {
    +                           return;
    +                   }
     
    -                           private void 
releaseSnapshotOperationResources() {
    -                                   // hold the db lock while operation on 
the db to guard us against async db disposal
    -                                   
snapshotOperation.releaseSnapshotResources();
    -                           }
    +                   materializedSstFiles.keySet().removeIf(checkpointId -> 
checkpointId < completedCheckpointId);
     
    -                           @Override
    -                           protected void stopOperation() throws Exception 
{
    -                                   closeLocalRegistry();
    -                           }
    +                   lastCompletedCheckpointId = completedCheckpointId;
    +           }
    +   }
     
    -                           private void closeLocalRegistry() {
    -                                   if 
(cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
    -                                           try {
    -                                                   
snapshotCloseableRegistry.close();
    -                                           } catch (Exception ex) {
    -                                                   LOG.warn("Error closing 
local registry", ex);
    -                                           }
    -                                   }
    -                           }
    +   private void createDB() throws IOException {
    +           List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
    +           this.db = openDB(instanceRocksDBPath.getAbsolutePath(), 
Collections.emptyList(), columnFamilyHandles);
    +           this.defaultColumnFamily = columnFamilyHandles.get(0);
    +   }
     
    -                           @Override
    -                           public KeyGroupsStateHandle performOperation() 
throws Exception {
    -                                   long startTime = 
System.currentTimeMillis();
    +   private RocksDB openDB(
    +           String path,
    +           List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
    +           List<ColumnFamilyHandle> stateColumnFamilyHandles) throws 
IOException {
     
    -                                   if (isStopped()) {
    -                                           throw new IOException("RocksDB 
closed.");
    -                                   }
    +           List<ColumnFamilyDescriptor> columnFamilyDescriptors =
    +                   new ArrayList<>(1 + 
stateColumnFamilyDescriptors.size());
     
    -                                   snapshotOperation.writeDBSnapshot();
    +           // we add the required descriptor for the default CF in FIRST 
position, see
    +           // 
https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
    +           columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions));
    +           columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
     
    -                                   LOG.info("Asynchronous RocksDB snapshot 
({}, asynchronous part) in thread {} took {} ms.",
    -                                           streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - startTime));
    +           RocksDB dbRef;
     
    -                                   return 
snapshotOperation.getSnapshotResultStateHandle();
    -                           }
    -                   };
    +           try {
    +                   dbRef = RocksDB.open(
    +                           Preconditions.checkNotNull(dbOptions),
    +                           Preconditions.checkNotNull(path),
    +                           columnFamilyDescriptors,
    +                           stateColumnFamilyHandles);
    +           } catch (RocksDBException e) {
    +                   throw new IOException("Error while opening RocksDB 
instance.", e);
    +           }
     
    -           LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) 
in thread {} took {} ms.",
    -                           streamFactory, Thread.currentThread(), 
(System.currentTimeMillis() - startTime));
    +           // requested + default CF
    +           Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
    +                   "Not all requested column family handles have been 
created");
     
    -           return AsyncStoppableTaskWithCallback.from(ioCallable);
    +           return dbRef;
        }
     
        /**
    -    * Encapsulates the process to perform a snapshot of a 
RocksDBKeyedStateBackend.
    +    * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from a full snapshot.
         */
    -   static final class RocksDBFullSnapshotOperation<K> {
    -
    -           static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
    -           static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
    -
    -           private final RocksDBKeyedStateBackend<K> stateBackend;
    -           private final KeyGroupRangeOffsets keyGroupRangeOffsets;
    -           private final CheckpointStreamFactory checkpointStreamFactory;
    -           private final CloseableRegistry snapshotCloseableRegistry;
    -           private final ResourceGuard.Lease dbLease;
    -
    -           private long checkpointId;
    -           private long checkpointTimeStamp;
    -
    -           private Snapshot snapshot;
    -           private ReadOptions readOptions;
    -           private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
    -
    -           private CheckpointStreamFactory.CheckpointStateOutputStream 
outStream;
    -           private DataOutputView outputView;
    +   private static final class RocksDBFullRestoreOperation<K> {
     
    -           RocksDBFullSnapshotOperation(
    -                   RocksDBKeyedStateBackend<K> stateBackend,
    -                   CheckpointStreamFactory checkpointStreamFactory,
    -                   CloseableRegistry registry) throws IOException {
    +           private final RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend;
     
    -                   this.stateBackend = stateBackend;
    -                   this.checkpointStreamFactory = checkpointStreamFactory;
    -                   this.keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(stateBackend.keyGroupRange);
    -                   this.snapshotCloseableRegistry = registry;
    -                   this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
    -           }
    +           /** Current key-groups state handle from which we restore 
key-groups. */
    +           private KeyGroupsStateHandle currentKeyGroupsStateHandle;
    +           /** Current input stream we obtained from 
currentKeyGroupsStateHandle. */
    +           private FSDataInputStream currentStateHandleInStream;
    +           /** Current data input view that wraps 
currentStateHandleInStream. */
    +           private DataInputView currentStateHandleInView;
    +           /** Current list of ColumnFamilyHandles for all column families 
we restore from currentKeyGroupsStateHandle. */
    +           private List<ColumnFamilyHandle> 
currentStateHandleKVStateColumnFamilies;
    +           /** The compression decorator that was used for writing the 
state, as determined by the meta data. */
    +           private StreamCompressionDecorator 
keygroupStreamCompressionDecorator;
     
                /**
    -            * 1) Create a snapshot object from RocksDB.
    +            * Creates a restore operation object for the given state 
backend instance.
                 *
    -            * @param checkpointId id of the checkpoint for which we take 
the snapshot
    -            * @param checkpointTimeStamp timestamp of the checkpoint for 
which we take the snapshot
    +            * @param rocksDBKeyedStateBackend the state backend into which 
we restore
                 */
    -           public void takeDBSnapShot(long checkpointId, long 
checkpointTimeStamp) {
    -                   Preconditions.checkArgument(snapshot == null, "Only one 
ongoing snapshot allowed!");
    -                   this.kvStateIterators = new 
ArrayList<>(stateBackend.kvStateInformation.size());
    -                   this.checkpointId = checkpointId;
    -                   this.checkpointTimeStamp = checkpointTimeStamp;
    -                   this.snapshot = stateBackend.db.getSnapshot();
    +           public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend) {
    +                   this.rocksDBKeyedStateBackend = 
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
                }
     
                /**
    -            * 2) Open CheckpointStateOutputStream through the 
checkpointStreamFactory into which we will write.
    +            * Restores all key-groups data that is referenced by the 
passed state handles.
                 *
    -            * @throws Exception
    +            * @param keyedStateHandles List of all key groups state 
handles that shall be restored.
                 */
    -           public void openCheckpointStream() throws Exception {
    -                   Preconditions.checkArgument(outStream == null, "Output 
stream for snapshot is already set.");
    -                   outStream = 
checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
    -                   snapshotCloseableRegistry.registerCloseable(outStream);
    -                   outputView = new DataOutputViewStreamWrapper(outStream);
    -           }
    +           public void doRestore(Collection<KeyedStateHandle> 
keyedStateHandles)
    +                   throws IOException, StateMigrationException, 
RocksDBException {
     
    -           /**
    -            * 3) Write the actual data from RocksDB from the time we took 
the snapshot object in (1).
    -            *
    -            * @throws IOException
    -            */
    -           public void writeDBSnapshot() throws IOException, 
InterruptedException {
    +                   rocksDBKeyedStateBackend.createDB();
     
    -                   if (null == snapshot) {
    -                           throw new IOException("No snapshot available. 
Might be released due to cancellation.");
    -                   }
    +                   for (KeyedStateHandle keyedStateHandle : 
keyedStateHandles) {
    +                           if (keyedStateHandle != null) {
     
    -                   Preconditions.checkNotNull(outStream, "No output stream 
to write snapshot.");
    -                   writeKVStateMetaData();
    -                   writeKVStateData();
    +                                   if (!(keyedStateHandle instanceof 
KeyGroupsStateHandle)) {
    +                                           throw new 
IllegalStateException("Unexpected state handle type, " +
    +                                                   "expected: " + 
KeyGroupsStateHandle.class +
    +                                                   ", but found: " + 
keyedStateHandle.getClass());
    +                                   }
    +                                   this.currentKeyGroupsStateHandle = 
(KeyGroupsStateHandle) keyedStateHandle;
    +                                   restoreKeyGroupsInStateHandle();
    +                           }
    +                   }
                }
     
                /**
    -            * 4) Returns a state handle to the snapshot after the snapshot 
procedure is completed and null before.
    -            *
    -            * @return state handle to the completed snapshot
    +            * Restore one key groups state handle.
                 */
    -           public KeyGroupsStateHandle getSnapshotResultStateHandle() 
throws IOException {
    -
    -                   if 
(snapshotCloseableRegistry.unregisterCloseable(outStream)) {
    -
    -                           StreamStateHandle stateHandle = 
outStream.closeAndGetHandle();
    -                           outStream = null;
    -
    -                           if (stateHandle != null) {
    -                                   return new 
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
    +           private void restoreKeyGroupsInStateHandle()
    +                   throws IOException, StateMigrationException, 
RocksDBException {
    +                   try {
    +                           currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
    +                           
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
    +                           currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
    +                           restoreKVStateMetaData();
    +                           restoreKVStateData();
    +                   } finally {
    +                           if 
(rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
 {
    +                                   
IOUtils.closeQuietly(currentStateHandleInStream);
                                }
                        }
    -                   return null;
                }
     
                /**
    -            * 5) Release the snapshot object for RocksDB and clean up.
    +            * Restore the KV-state / ColumnFamily meta data for all 
key-groups referenced by the current state handle.
    +            *
    +            * @throws IOException
    +            * @throws ClassNotFoundException
    +            * @throws RocksDBException
                 */
    -           public void releaseSnapshotResources() {
    +           private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
     
    -                   outStream = null;
    +                   KeyedBackendSerializationProxy<K> serializationProxy =
    +                           new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
     
    -                   if (null != kvStateIterators) {
    -                           for (Tuple2<RocksIterator, Integer> 
kvStateIterator : kvStateIterators) {
    -                                   
IOUtils.closeQuietly(kvStateIterator.f0);
    -                           }
    -                           kvStateIterators = null;
    -                   }
    +                   serializationProxy.read(currentStateHandleInView);
     
    -                   if (null != snapshot) {
    -                           if (null != stateBackend.db) {
    -                                   
stateBackend.db.releaseSnapshot(snapshot);
    -                           }
    -                           IOUtils.closeQuietly(snapshot);
    -                           snapshot = null;
    -                   }
    +                   // check for key serializer compatibility; this also 
reconfigures the
    +                   // key serializer to be compatible, if it is required 
and is possible
    +                   if (CompatibilityUtil.resolveCompatibilityResult(
    +                           serializationProxy.getKeySerializer(),
    +                           UnloadableDummyTypeSerializer.class,
    +                           
serializationProxy.getKeySerializerConfigSnapshot(),
    +                           rocksDBKeyedStateBackend.keySerializer)
    +                           .isRequiresMigration()) {
     
    -                   if (null != readOptions) {
    -                           IOUtils.closeQuietly(readOptions);
    -                           readOptions = null;
    +                           // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
    +                           throw new StateMigrationException("The new key 
serializer is not compatible to read previous keys. " +
    +                                   "Aborting now since state migration is 
currently not available");
                        }
     
    -                   this.dbLease.close();
    -           }
    -
    -           private void writeKVStateMetaData() throws IOException {
    +                   this.keygroupStreamCompressionDecorator = 
serializationProxy.isUsingKeyGroupCompression() ?
    +                           SnappyStreamCompressionDecorator.INSTANCE : 
UncompressedStreamCompressionDecorator.INSTANCE;
     
    -                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> metaInfoSnapshots =
    -                           new 
ArrayList<>(stateBackend.kvStateInformation.size());
    +                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
    +                           serializationProxy.getStateMetaInfoSnapshots();
    +                   currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
    +                   //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = 
new HashMap<>(restoredMetaInfos.size());
     
    -                   int kvStateId = 0;
    -                   for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
    -                           stateBackend.kvStateInformation.entrySet()) {
    +                   for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
restoredMetaInfo : restoredMetaInfos) {
     
    -                           
metaInfoSnapshots.add(column.getValue().f1.snapshot());
    +                           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
    +                                   
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
     
    -                           //retrieve iterator for this k/v states
    -                           readOptions = new ReadOptions();
    -                           readOptions.setSnapshot(snapshot);
    +                           if (registeredColumn == null) {
    +                                   byte[] nameBytes = 
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
     
    -                           kvStateIterators.add(
    -                                   new 
Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), 
kvStateId));
    +                                   ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +                                           nameBytes,
    +                                           
rocksDBKeyedStateBackend.columnOptions);
     
    -                           ++kvStateId;
    -                   }
    +                                   RegisteredKeyedBackendStateMetaInfo<?, 
?> stateMetaInfo =
    +                                           new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                                                   
restoredMetaInfo.getStateType(),
    +                                                   
restoredMetaInfo.getName(),
    +                                                   
restoredMetaInfo.getNamespaceSerializer(),
    +                                                   
restoredMetaInfo.getStateSerializer());
     
    -                   KeyedBackendSerializationProxy<K> serializationProxy =
    -                           new KeyedBackendSerializationProxy<>(
    -                                   stateBackend.getKeySerializer(),
    -                                   metaInfoSnapshots,
    -                                   
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
stateBackend.keyGroupCompressionDecorator));
    +                                   
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
 restoredMetaInfo);
     
    -                   serializationProxy.write(outputView);
    -           }
    +                                   ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
     
    -           private void writeKVStateData() throws IOException, 
InterruptedException {
    +                                   registeredColumn = new 
Tuple2<>(columnFamily, stateMetaInfo);
    +                                   
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), 
registeredColumn);
     
    -                   byte[] previousKey = null;
    -                   byte[] previousValue = null;
    -                   OutputStream kgOutStream = null;
    -                   DataOutputView kgOutView = null;
    +                           } else {
    +                                   // TODO with eager state registration 
in place, check here for serializer migration strategies
    +                           }
    +                           
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
    +                   }
    +           }
     
    -                   try {
    -                           // Here we transfer ownership of RocksIterators 
to the RocksDBMergeIterator
    -                           try (RocksDBMergeIterator mergeIterator = new 
RocksDBMergeIterator(
    -                                   kvStateIterators, 
stateBackend.keyGroupPrefixBytes)) {
    +           /**
    +            * Restore the KV-state / ColumnFamily data for all key-groups 
referenced by the current state handle.
    +            *
    +            * @throws IOException
    +            * @throws RocksDBException
    +            */
    +           private void restoreKVStateData() throws IOException, 
RocksDBException {
    +                   //for all key-groups in the current state handle...
    +                   for (Tuple2<Integer, Long> keyGroupOffset : 
currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
    +                           int keyGroup = keyGroupOffset.f0;
     
    -                                   // handover complete, null out to 
prevent double close
    -                                   kvStateIterators = null;
    +                           // Check that restored key groups all belong to 
the backend
    +                           
Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
    +                                   "The key group must belong to the 
backend");
     
    -                                   //preamble: setup with first key-group 
as our lookahead
    -                                   if (mergeIterator.isValid()) {
    -                                           //begin first key-group by 
recording the offset
    -                                           
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
    -                                           //write the k/v-state id as 
metadata
    -                                           kgOutStream = 
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
    -                                           kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
    +                           long offset = keyGroupOffset.f1;
    +                           //not empty key-group?
    +                           if (0L != offset) {
    +                                   currentStateHandleInStream.seek(offset);
    +                                   try (InputStream compressedKgIn = 
keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
 {
    +                                           DataInputViewStreamWrapper 
compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
                                                //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
    -                                           
kgOutView.writeShort(mergeIterator.kvStateId());
    -                                           previousKey = 
mergeIterator.key();
    -                                           previousValue = 
mergeIterator.value();
    -                                           mergeIterator.next();
    +                                           int kvStateId = 
compressedKgInputView.readShort();
    +                                           ColumnFamilyHandle handle = 
currentStateHandleKVStateColumnFamilies.get(kvStateId);
    +                                           //insert all k/v pairs into DB
    +                                           boolean keyGroupHasMoreKeys = 
true;
    +                                           while (keyGroupHasMoreKeys) {
    +                                                   byte[] key = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
    +                                                   byte[] value = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
    +                                                   if 
(RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
    +                                                           //clear the 
signal bit in the key to make it ready for insertion again
    +                                                           
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
    +                                                           
rocksDBKeyedStateBackend.db.put(handle, key, value);
    +                                                           //TODO this 
could be aware of keyGroupPrefixBytes and write only one byte if possible
    +                                                           kvStateId = 
RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
    +                                                                   & 
compressedKgInputView.readShort();
    +                                                           if 
(RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
    +                                                                   
keyGroupHasMoreKeys = false;
    +                                                           } else {
    +                                                                   handle 
= currentStateHandleKVStateColumnFamilies.get(kvStateId);
    +                                                           }
    +                                                   } else {
    +                                                           
rocksDBKeyedStateBackend.db.put(handle, key, value);
    +                                                   }
    +                                           }
                                        }
    +                           }
    +                   }
    +           }
    +   }
     
    -                                   //main loop: write k/v pairs ordered by 
(key-group, kv-state), thereby tracking key-group offsets.
    -                                   while (mergeIterator.isValid()) {
    +   /**
    +    * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from an incremental snapshot.
    +    */
    +   private static class RocksDBIncrementalRestoreOperation<T> {
     
    -                                           assert 
(!hasMetaDataFollowsFlag(previousKey));
    +           private final RocksDBKeyedStateBackend<T> stateBackend;
     
    -                                           //set signal in first key byte 
that meta data will follow in the stream after this k/v pair
    -                                           if 
(mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
    +           private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
    +                   this.stateBackend = stateBackend;
    +           }
     
    -                                                   //be cooperative and 
check for interruption from time to time in the hot loop
    -                                                   checkInterrupted();
    +           /**
    +            * Root method that branches for different implementations of 
{@link KeyedStateHandle}.
    +            */
    +           void restore(Collection<KeyedStateHandle> restoreStateHandles) 
throws Exception {
     
    -                                                   
setMetaDataFollowsFlagInKey(previousKey);
    -                                           }
    +                   boolean hasExtraKeys = (restoreStateHandles.size() > 1 
||
    +                           
!Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(), 
stateBackend.keyGroupRange));
     
    -                                           writeKeyValuePair(previousKey, 
previousValue, kgOutView);
    +                   if (hasExtraKeys) {
    +                           stateBackend.createDB();
    +                   }
     
    -                                           //write meta data if we have to
    -                                           if 
(mergeIterator.isNewKeyGroup()) {
    -                                                   //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
    -                                                   
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
    -                                                   // this will just close 
the outer stream
    -                                                   kgOutStream.close();
    -                                                   //begin new key-group
    -                                                   
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
    -                                                   //write the kev-state
    -                                                   //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
    -                                                   kgOutStream = 
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
    -                                                   kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
    -                                                   
kgOutView.writeShort(mergeIterator.kvStateId());
    -                                           } else if 
(mergeIterator.isNewKeyValueState()) {
    -                                                   //write the k/v-state
    -                                                   //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
    -                                                   
kgOutView.writeShort(mergeIterator.kvStateId());
    -                                           }
    +                   for (KeyedStateHandle rawStateHandle : 
restoreStateHandles) {
     
    -                                           //request next k/v pair
    -                                           previousKey = 
mergeIterator.key();
    -                                           previousValue = 
mergeIterator.value();
    -                                           mergeIterator.next();
    -                                   }
    +                           if (rawStateHandle instanceof 
IncrementalKeyedStateHandle) {
    +                                   
restoreInstance((IncrementalKeyedStateHandle) rawStateHandle, hasExtraKeys);
    +                           } else if (rawStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
    +                                   Preconditions.checkState(!hasExtraKeys, 
"Cannot recover from local state after rescaling.");
    +                                   
restoreInstance((IncrementalLocalKeyedStateHandle) rawStateHandle);
    +                           } else {
    +                                   throw new 
IllegalStateException("Unexpected state handle type, " +
    +                                           "expected " + 
IncrementalKeyedStateHandle.class +
    +                                           ", but found " + 
rawStateHandle.getClass());
                                }
    +                   }
    +           }
     
    -                           //epilogue: write last key-group
    -                           if (previousKey != null) {
    -                                   assert 
(!hasMetaDataFollowsFlag(previousKey));
    -                                   
setMetaDataFollowsFlagInKey(previousKey);
    -                                   writeKeyValuePair(previousKey, 
previousValue, kgOutView);
    -                                   //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
    -                                   
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
    -                                   // this will just close the outer stream
    -                                   kgOutStream.close();
    -                                   kgOutStream = null;
    -                           }
    +           /**
    +            * Recovery from remote incremental state.
    +            */
    +           private void restoreInstance(
    +                   IncrementalKeyedStateHandle restoreStateHandle,
    +                   boolean hasExtraKeys) throws Exception {
    +
    +                   // read state data
    +                   Path temporaryRestoreInstancePath = new Path(
    +                           stateBackend.instanceBasePath.getAbsolutePath(),
    +                           UUID.randomUUID().toString());
    +
    +                   try {
    +
    +                           
transferAllStateDataToDirectory(restoreStateHandle, 
temporaryRestoreInstancePath);
     
    +                           // read meta data
    +                           
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
=
    +                                   
readMetaData(restoreStateHandle.getMetaStateHandle());
    +
    +                           List<ColumnFamilyDescriptor> 
columnFamilyDescriptors =
    +                                   
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
    +
    +                           if (hasExtraKeys) {
    +                                   
restoreKeyGroupsShardWithTemporaryHelperInstance(
    +                                           temporaryRestoreInstancePath,
    +                                           columnFamilyDescriptors,
    +                                           stateMetaInfoSnapshots);
    +                           } else {
    +
    +                                   // since we transferred all remote 
state to a local directory, we can use the same code as for
    +                                   // local recovery.
    +                                   IncrementalLocalKeyedStateHandle 
localKeyedStateHandle = new IncrementalLocalKeyedStateHandle(
    +                                           
restoreStateHandle.getBackendIdentifier(),
    +                                           
restoreStateHandle.getCheckpointId(),
    +                                           new 
DirectoryStateHandle(temporaryRestoreInstancePath),
    +                                           
restoreStateHandle.getKeyGroupRange(),
    +                                           
restoreStateHandle.getMetaStateHandle(),
    +                                           
restoreStateHandle.getSharedState().keySet());
    +
    +                                   restoreLocalStateIntoFullInstance(
    +                                           localKeyedStateHandle,
    +                                           columnFamilyDescriptors,
    +                                           stateMetaInfoSnapshots);
    +                           }
                        } finally {
    -                           // this will just close the outer stream
    -                           IOUtils.closeQuietly(kgOutStream);
    +                           FileSystem restoreFileSystem = 
temporaryRestoreInstancePath.getFileSystem();
    +                           if 
(restoreFileSystem.exists(temporaryRestoreInstancePath)) {
    +                                   
restoreFileSystem.delete(temporaryRestoreInstancePath, true);
    +                           }
                        }
                }
     
    -           private void writeKeyValuePair(byte[] key, byte[] value, 
DataOutputView out) throws IOException {
    -                   BytePrimitiveArraySerializer.INSTANCE.serialize(key, 
out);
    -                   BytePrimitiveArraySerializer.INSTANCE.serialize(value, 
out);
    -           }
    +           /**
    +            * Recovery from local incremental state.
    +            */
    +           private void restoreInstance(IncrementalLocalKeyedStateHandle 
localKeyedStateHandle) throws Exception {
    +                   // read meta data
    +                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots =
    +                           
readMetaData(localKeyedStateHandle.getMetaDataState());
     
    -           static void setMetaDataFollowsFlagInKey(byte[] key) {
    -                   key[0] |= FIRST_BIT_IN_BYTE_MASK;
    -           }
    +                   List<ColumnFamilyDescriptor> columnFamilyDescriptors =
    +                           
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
     
    -           static void clearMetaDataFollowsFlag(byte[] key) {
    -                   key[0] &= 
(~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
    +                   restoreLocalStateIntoFullInstance(
    +                           localKeyedStateHandle,
    +                           columnFamilyDescriptors,
    +                           stateMetaInfoSnapshots);
                }
     
    -           static boolean hasMetaDataFollowsFlag(byte[] key) {
    -                   return 0 != (key[0] & 
RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
    -           }
    +           /**
    +            * This method recreates and registers all {@link 
ColumnFamilyDescriptor} from Flink's state meta data snapshot.
    +            */
    +           private List<ColumnFamilyDescriptor> 
createAndRegisterColumnFamilyDescriptors(
    +                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) {
     
    -           private static void checkInterrupted() throws 
InterruptedException {
    -                   if (Thread.currentThread().isInterrupted()) {
    -                           throw new InterruptedException("RocksDB 
snapshot interrupted.");
    +                   List<ColumnFamilyDescriptor> columnFamilyDescriptors =
    +                           new ArrayList<>(1 + 
stateMetaInfoSnapshots.size());
    +
    +                   for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
    +
    +                           ColumnFamilyDescriptor columnFamilyDescriptor = 
new ColumnFamilyDescriptor(
    +                                   
stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
    +                                   stateBackend.columnOptions);
    +
    +                           
columnFamilyDescriptors.add(columnFamilyDescriptor);
    +                           
stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), 
stateMetaInfoSnapshot);
                        }
    +                   return columnFamilyDescriptors;
                }
    -   }
     
    -   private static final class RocksDBIncrementalSnapshotOperation<K> {
    +           /**
    +            * This method implements the core of the restore logic that 
unifies how local and remote state are recovered.
    +            */
    +           private void restoreLocalStateIntoFullInstance(
    +                   IncrementalLocalKeyedStateHandle restoreStateHandle,
    +                   List<ColumnFamilyDescriptor> columnFamilyDescriptors,
    +                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) throws Exception {
    +                   // pick up again the old backend id, so the we can 
reference existing state
    +                   stateBackend.backendUID = 
restoreStateHandle.getBackendIdentifier();
    +
    +                   LOG.debug("Restoring keyed backend uid in operator {} 
from incremental snapshot to {}.",
    +                           stateBackend.operatorIdentifier, 
stateBackend.backendUID);
    +
    +                   // create hard links in the instance directory
    +                   if (!stateBackend.instanceRocksDBPath.mkdirs()) {
    +                           throw new IOException("Could not create RocksDB 
data directory.");
    +                   }
     
    -           /** The backend which we snapshot. */
    -           private final RocksDBKeyedStateBackend<K> stateBackend;
    +                   Path restoreSourcePath = 
restoreStateHandle.getDirectoryStateHandle().getDirectory();
    +                   restoreInstanceDirectoryFromPath(restoreSourcePath);
     
    -           /** Stream factory that creates the outpus streams to DFS. */
    -           private final CheckpointStreamFactory checkpointStreamFactory;
    +                   List<ColumnFamilyHandle> columnFamilyHandles =
    +                           new ArrayList<>(1 + 
columnFamilyDescriptors.size());
     
    -           /** Id for the current checkpoint. */
    -           private final long checkpointId;
    +                   stateBackend.db = stateBackend.openDB(
    +                           
stateBackend.instanceRocksDBPath.getAbsolutePath(),
    +                           columnFamilyDescriptors, columnFamilyHandles);
     
    -           /** Timestamp for the current checkpoint. */
    -           private final long checkpointTimestamp;
    +                   // extract and store the default column family which is 
located at the first index
    +                   stateBackend.defaultColumnFamily = 
columnFamilyHandles.remove(0);
     
    -           /** All sst files that were part of the last previously 
completed checkpoint. */
    -           private Set<StateHandleID> baseSstFiles;
    +                   for (int i = 0; i < columnFamilyDescriptors.size(); 
++i) {
    +                           RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
     
    -           /** The state meta data. */
    -           private final 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
= new ArrayList<>();
    +                           ColumnFamilyHandle columnFamilyHandle = 
columnFamilyHandles.get(i);
    +                           RegisteredKeyedBackendStateMetaInfo<?, ?> 
stateMetaInfo =
    +                                   new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                                           
stateMetaInfoSnapshot.getStateType(),
    +                                           stateMetaInfoSnapshot.getName(),
    +                                           
stateMetaInfoSnapshot.getNamespaceSerializer(),
    +                                           
stateMetaInfoSnapshot.getStateSerializer());
     
    -           private FileSystem backupFileSystem;
    -           private Path backupPath;
    +                           stateBackend.kvStateInformation.put(
    +                                   stateMetaInfoSnapshot.getName(),
    +                                   new Tuple2<>(columnFamilyHandle, 
stateMetaInfo));
    +                   }
     
    -           // Registry for all opened i/o streams
    -           private final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
    +                   // use the restore sst files as the base for succeeding 
checkpoints
    +                   synchronized (stateBackend.materializedSstFiles) {
    +                           stateBackend.materializedSstFiles.put(
    +                                   restoreStateHandle.getCheckpointId(),
    +                                   
restoreStateHandle.getSharedStateHandleIDs());
    +                   }
     
    -           // new sst files since the last completed checkpoint
    -           private final Map<StateHandleID, StreamStateHandle> sstFiles = 
new HashMap<>();
    +                   stateBackend.lastCompletedCheckpointId = 
restoreStateHandle.getCheckpointId();
    +           }
     
    -           // handles to the misc files in the current snapshot
    -           private final Map<StateHandleID, StreamStateHandle> miscFiles = 
new HashMap<>();
    +           /**
    +            * This recreates the new working directory of the recovered 
RocksDB instance and links/copies the contents from
    +            * a local state.
    +            */
    +           private void restoreInstanceDirectoryFromPath(Path source) 
throws IOException {
     
    -           // This lease protects from concurrent disposal of the native 
rocksdb instance.
    -           private final ResourceGuard.Lease dbLease;
    +                   FileSystem fileSystem = source.getFileSystem();
     
    -           private StreamStateHandle metaStateHandle = null;
    +                   final FileStatus[] fileStatuses = 
fileSystem.listStatus(source);
     
    -           private RocksDBIncrementalSnapshotOperation(
    -                   RocksDBKeyedStateBackend<K> stateBackend,
    -                   CheckpointStreamFactory checkpointStreamFactory,
    -                   long checkpointId,
    -                   long checkpointTimestamp) throws IOException {
    +                   if (fileStatuses == null) {
    +                           throw new IOException("Cannot list file 
statues. Directory " + source + " does not exist.");
    +                   }
     
    -                   this.stateBackend = stateBackend;
    -                   this.checkpointStreamFactory = checkpointStreamFactory;
    -                   this.checkpointId = checkpointId;
    -                   this.checkpointTimestamp = checkpointTimestamp;
    -                   this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
    +                   for (FileStatus fileStatus : fileStatuses) {
    +                           final Path filePath = fileStatus.getPath();
    +                           final String fileName = filePath.getName();
    +                           File restoreFile = new File(source.getPath(), 
fileName);
    +                           File targetFile = new 
File(stateBackend.instanceRocksDBPath.getPath(), fileName);
    +                           if (fileName.endsWith(SST_FILE_SUFFIX)) {
    +                                   // hardlink'ing the immutable sst-files.
    +                                   Files.createLink(targetFile.toPath(), 
restoreFile.toPath());
    +                           } else {
    +                                   // true copy for all other files.
    +                                   Files.copy(restoreFile.toPath(), 
targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
    +                           }
    +                   }
                }
     
    -           private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
    +           /**
    +            * Reads Flink's state meta data file from the state handle.
    +            */
    +           private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> readMetaData(
    +                   StreamStateHandle metaStateHandle) throws Exception {
    +
                        FSDataInputStream inputStream = null;
    -                   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
     
                        try {
    -                           final byte[] buffer = new byte[8 * 1024];
    -
    -                           FileSystem backupFileSystem = 
backupPath.getFileSystem();
    -                           inputStream = backupFileSystem.open(filePath);
    -                           
closeableRegistry.registerCloseable(inputStream);
    -
    -                           outputStream = checkpointStreamFactory
    -                                   
.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
    -                           
closeableRegistry.registerCloseable(outputStream);
    -
    -                           while (true) {
    -                                   int numBytes = inputStream.read(buffer);
    +                           inputStream = metaStateHandle.openInputStream();
    +                           
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
     
    -                                   if (numBytes == -1) {
    -                                           break;
    -                                   }
    +                           KeyedBackendSerializationProxy<T> 
serializationProxy =
    +                                   new 
KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
    +                           DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
    +                           serializationProxy.read(in);
     
    -                                   outputStream.write(buffer, 0, numBytes);
    -                           }
    +                           // check for key serializer compatibility; this 
also reconfigures the
    +                           // key serializer to be compatible, if it is 
required and is possible
    +                           if 
(CompatibilityUtil.resolveCompatibilityResult(
    +                                   serializationProxy.getKeySerializer(),
    +                                   UnloadableDummyTypeSerializer.class,
    +                                   
serializationProxy.getKeySerializerConfigSnapshot(),
    +                                   stateBackend.keySerializer)
    +                                   .isRequiresMigration()) {
     
    -                           StreamStateHandle result = null;
    -                           if 
(closeableRegistry.unregisterCloseable(outputStream)) {
    -                                   result = 
outputStream.closeAndGetHandle();
    -                                   outputStream = null;
    +                                   // TODO replace with state migration; 
note that key hash codes need to remain the same after migration
    +                                   throw new StateMigrationException("The 
new key serializer is not compatible to read previous keys. " +
    +                                           "Aborting now since state 
migration is currently not available");
                                }
    -                           return result;
     
    +                           return 
serializationProxy.getStateMetaInfoSnapshots();
                        } finally {
    -                           if (inputStream != null && 
closeableRegistry.unregisterCloseable(inputStream)) {
    +                           if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
                                        inputStream.close();
                                }
    -
    -                           if (outputStream != null && 
closeableRegistry.unregisterCloseable(outputStream)) {
    -                                   outputStream.close();
    -                           }
                        }
                }
     
    -           private StreamStateHandle materializeMetaData() throws 
Exception {
    -                   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
    -
    -                   try {
    -                           outputStream = checkpointStreamFactory
    -                                   
.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
    -                           
closeableRegistry.registerCloseable(outputStream);
    +           private void transferAllStateDataToDirectory(
    +                   IncrementalKeyedStateHandle restoreStateHandle,
    +                   Path dest) throws IOException {
     
    -                           //no need for compression scheme support 
because sst-files are already compressed
    -                           KeyedBackendSerializationProxy<K> 
serializationProxy =
    -                                   new KeyedBackendSerializationProxy<>(
    -                                           stateBackend.keySerializer,
    -                                           stateMetaInfoSnapshots,
    -                                           false);
    +                   final Map<StateHandleID, StreamStateHandle> sstFiles =
    +                           restoreStateHandle.getSharedState();
    +                   final Map<StateHandleID, StreamStateHandle> miscFiles =
    +                           restoreStateHandle.getPrivateState();
     
    -                           DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
    +                   transferAllDataFromStateHandles(sstFiles, dest);
    +                   transferAllDataFromStateHandles(miscFiles, dest);
    +           }
     
    -                           serializationProxy.write(out);
    +           /**
    +            * Copies all the files from the given stream state handles to 
the given path, renaming the files w.r.t. their
    +            * {@link StateHandleID}.
    +            */
    +           private void transferAllDataFromStateHandles(
    +                   Map<StateHandleID, StreamStateHandle> stateHandleMap,
    +                   Path restoreInstancePath) throws IOException {
     
    -                           StreamStateHandle result = null;
    -                           if 
(closeableRegistry.unregisterCloseable(outputStream)) {
    -                                   result = 
outputStream.closeAndGetHandle();
    -                                   outputStream = null;
    -                           }
    -                           return result;
    -                   } finally {
    -                           if (outputStream != null) {
    -                                   if 
(closeableRegistry.unregisterCloseable(outputStream)) {
    -                                           outputStream.close();
    -                                   }
    -                           }
    +                   for (Map.Entry<StateHandleID, StreamStateHandle> entry 
: stateHandleMap.entrySet()) {
    +                           StateHandleID stateHandleID = entry.getKey();
    +                           StreamStateHandle remoteFileHandle = 
entry.getValue();
    +                           copyStateDataHandleData(new 
Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
                        }
                }
     
    -           void takeSnapshot() throws Exception {
    +           /**
    +            * Copies the file from a single state handle to the given path.
    +            */
    +           private void copyStateDataHandleData(
    +                   Path restoreFilePath,
    +                   StreamStateHandle remoteFileHandle) throws IOException {
     
    -                   final long lastCompletedCheckpoint;
    +                   FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
     
    -                   // use the last completed checkpoint as the comparison 
base.
    -                   synchronized (stateBackend.materializedSstFiles) {
    -                           lastCompletedCheckpoint = 
stateBackend.lastCompletedCheckpointId;
    -                           baseSstFiles = 
stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
    -                   }
    +                   FSDataInputStream inputStream = null;
    +                   FSDataOutputStream outputStream = null;
     
    -                   LOG.trace("Taking incremental snapshot for checkpoint 
{}. Snapshot is based on last completed checkpoint {} " +
    -                           "assuming the following (shared) files as base: 
{}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
    +                   try {
    +                           inputStream = 
remoteFileHandle.openInputStream();
    +                           
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
     
    -                   // save meta data
    -                   for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
    -                           : stateBackend.kvStateInformation.entrySet()) {
    -                           
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
    -                   }
    +                           outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
    +                           
stateBackend.cancelStreamRegistry.registerCloseable(outputStream);
     
    -                   // save state data
    -                   backupPath = new 
Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +                           byte[] buffer = new byte[8 * 1024];
    +                           while (true) {
    +                                   int numBytes = inputStream.read(buffer);
    +                                   if (numBytes == -1) {
    +                                           break;
    +                                   }
     
    -                   LOG.trace("Local RocksDB checkpoint goes to backup path 
{}.", backupPath);
    +                                   outputStream.write(buffer, 0, numBytes);
    +                           }
    +                   } finally {
    +                           if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
    +                                   inputStream.close();
    +                           }
     
    -                   backupFileSystem = backupPath.getFileSystem();
    -                   if (backupFileSystem.exists(backupPath)) {
    -                           throw new IllegalStateException("Unexpected 
existence of the backup directory.");
    +                           if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
    +                                   outputStream.close();
    +                           }
                        }
    -
    -                   // create hard links of living files in the checkpoint 
path
    -                   Checkpoint checkpoint = 
Checkpoint.create(stateBackend.db);
    -                   checkpoint.createCheckpoint(backupPath.getPath());
                }
     
    -           KeyedStateHandle materializeSnapshot() throws Exception {
    +           /**
    +            * In case of rescaling, this method creates a temporary 
RocksDB instance for a key-groups shard. All contents
    +            * from the temporary instance are copied into the real restore 
instance and then the temporary instance is
    +            * discarded.
    +            */
    +           private void restoreKeyGroupsShardWithTemporaryHelperInstance(
    +                   Path restoreInstancePath,
    +                   List<ColumnFamilyDescriptor> columnFamilyDescriptors,
    +                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) throws Exception {
     
    -                   
stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
    +                   List<ColumnFamilyHandle> columnFamilyHandles =
    +                           new ArrayList<>(1 + 
columnFamilyDescriptors.size());
     
    -                   // write meta data
    -                   metaStateHandle = materializeMetaData();
    +                   try (RocksDB restoreDb = stateBackend.openDB(
    +                           restoreInstancePath.getPath(),
    +                           columnFamilyDescriptors,
    +                           columnFamilyHandles)) {
    --- End diff --
    
    This can be optimized. If a `restoreInstance` satisfy `startKeyGroup >= 
backend.keygroup.startKeyGroup() && endKeyGroup <= 
backend.keygroup.endKeyGroup()` we can open it directly as the target rocksdb, 
and use it for the rest of the recovery. (for now, we have to iterate all 
restored rocksdbs.)


---

Reply via email to