Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r165349688
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -363,1691 +372,1780 @@ public int getKeyGroupPrefixBytes() {
                final CheckpointStreamFactory streamFactory,
                CheckpointOptions checkpointOptions) throws Exception {
     
    -           if (checkpointOptions.getCheckpointType() != 
CheckpointOptions.CheckpointType.SAVEPOINT &&
    -                   enableIncrementalCheckpointing) {
    -                   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
    -           } else {
    -                   return snapshotFully(checkpointId, timestamp, 
streamFactory);
    -           }
    +           return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
        }
     
    -   private RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshotIncrementally(
    -           final long checkpointId,
    -           final long checkpointTimestamp,
    -           final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
    -
    -           if (db == null) {
    -                   throw new IOException("RocksDB closed.");
    -           }
    +   @Override
    +   public void restore(StateObjectCollection<KeyedStateHandle> 
restoreState) throws Exception {
    +           LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
     
    -           if (kvStateInformation.isEmpty()) {
    -                   if (LOG.isDebugEnabled()) {
    -                           LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " +
    -                                   checkpointTimestamp + " . Returning 
null.");
    -                   }
    -                   return DoneFuture.nullValue();
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
                }
     
    -           final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
    -                   new RocksDBIncrementalSnapshotOperation<>(
    -                           this,
    -                           checkpointStreamFactory,
    -                           checkpointId,
    -                           checkpointTimestamp);
    -
    -           snapshotOperation.takeSnapshot();
    -
    -           return new FutureTask<SnapshotResult<KeyedStateHandle>>(
    -                   () -> snapshotOperation.materializeSnapshot()
    -           ) {
    -                   @Override
    -                   public boolean cancel(boolean mayInterruptIfRunning) {
    -                           snapshotOperation.stop();
    -                           return super.cancel(mayInterruptIfRunning);
    -                   }
    +           // clear all meta data
    +           kvStateInformation.clear();
    +           restoredKvStateMetaInfos.clear();
     
    -                   @Override
    -                   protected void done() {
    -                           
snapshotOperation.releaseResources(isCancelled());
    +           try {
    +                   if (restoreState == null || restoreState.isEmpty()) {
    +                           createDB();
    +                   } else if (restoreState.iterator().next() instanceof 
IncrementalKeyedStateHandle) {
    +                           RocksDBIncrementalRestoreOperation<K> 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
    +                           restoreOperation.restore(restoreState);
    +                   } else {
    +                           RocksDBFullRestoreOperation<K> restoreOperation 
= new RocksDBFullRestoreOperation<>(this);
    +                           restoreOperation.doRestore(restoreState);
                        }
    -           };
    +           } catch (Exception ex) {
    +                   dispose();
    +                   throw ex;
    +           }
        }
     
    -   private RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotFully(
    -           final long checkpointId,
    -           final long timestamp,
    -           final CheckpointStreamFactory streamFactory) throws Exception {
    -
    -           long startTime = System.currentTimeMillis();
    -           final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
    -
    -           final RocksDBFullSnapshotOperation<K> snapshotOperation;
    -
    -           if (kvStateInformation.isEmpty()) {
    -                   if (LOG.isDebugEnabled()) {
    -                           LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " + timestamp +
    -                                   " . Returning null.");
    -                   }
    +   @Override
    +   public void notifyCheckpointComplete(long completedCheckpointId) {
     
    -                   return DoneFuture.nullValue();
    +           if (!enableIncrementalCheckpointing) {
    +                   return;
                }
     
    -           snapshotOperation = new RocksDBFullSnapshotOperation<>(this, 
streamFactory, snapshotCloseableRegistry);
    -           snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
    -
    -           // implementation of the async IO operation, based on FutureTask
    -           
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable 
=
    -                   new 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
    +           synchronized (materializedSstFiles) {
     
    -                           @Override
    -                           protected void acquireResources() throws 
Exception {
    -                                   
cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
    -                                   
snapshotOperation.openCheckpointStream();
    -                           }
    +                   if (completedCheckpointId < lastCompletedCheckpointId) {
    +                           return;
    +                   }
     
    -                           @Override
    -                           protected void releaseResources() throws 
Exception {
    -                                   closeLocalRegistry();
    -                                   releaseSnapshotOperationResources();
    -                           }
    +                   materializedSstFiles.keySet().removeIf(checkpointId -> 
checkpointId < completedCheckpointId);
     
    -                           private void 
releaseSnapshotOperationResources() {
    -                                   // hold the db lock while operation on 
the db to guard us against async db disposal
    -                                   
snapshotOperation.releaseSnapshotResources();
    -                           }
    +                   lastCompletedCheckpointId = completedCheckpointId;
    +           }
    +   }
     
    -                           @Override
    -                           protected void stopOperation() throws Exception 
{
    -                                   closeLocalRegistry();
    -                           }
    +   private void createDB() throws IOException {
    +           List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
    +           this.db = openDB(instanceRocksDBPath.getAbsolutePath(), 
Collections.emptyList(), columnFamilyHandles);
    +           this.defaultColumnFamily = columnFamilyHandles.get(0);
    +   }
     
    -                           private void closeLocalRegistry() {
    -                                   if 
(cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
    -                                           try {
    -                                                   
snapshotCloseableRegistry.close();
    -                                           } catch (Exception ex) {
    -                                                   LOG.warn("Error closing 
local registry", ex);
    -                                           }
    -                                   }
    -                           }
    +   private RocksDB openDB(
    +           String path,
    +           List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
    +           List<ColumnFamilyHandle> stateColumnFamilyHandles) throws 
IOException {
     
    -                           @Override
    -                           public SnapshotResult<KeyedStateHandle> 
performOperation() throws Exception {
    -                                   long startTime = 
System.currentTimeMillis();
    +           List<ColumnFamilyDescriptor> columnFamilyDescriptors =
    +                   new ArrayList<>(1 + 
stateColumnFamilyDescriptors.size());
     
    -                                   if (isStopped()) {
    -                                           throw new IOException("RocksDB 
closed.");
    -                                   }
    +           columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
     
    -                                   snapshotOperation.writeDBSnapshot();
    +           // we add the required descriptor for the default CF in last 
position.
    +           columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
     
    -                                   LOG.info("Asynchronous RocksDB snapshot 
({}, asynchronous part) in thread {} took {} ms.",
    -                                           streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - startTime));
    +           RocksDB dbRef;
     
    -                                   KeyGroupsStateHandle stateHandle = 
snapshotOperation.getSnapshotResultStateHandle();
    -                                   return new 
SnapshotResult<>(stateHandle, null);
    -                           }
    -                   };
    +           try {
    +                   dbRef = RocksDB.open(
    +                           Preconditions.checkNotNull(dbOptions),
    +                           Preconditions.checkNotNull(path),
    +                           columnFamilyDescriptors,
    +                           stateColumnFamilyHandles);
    +           } catch (RocksDBException e) {
    +                   throw new IOException("Error while opening RocksDB 
instance.", e);
    +           }
     
    -           LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", 
synchronous part) in thread " +
    -                   Thread.currentThread() + " took " + 
(System.currentTimeMillis() - startTime) + " ms.");
    +           // requested + default CF
    +           Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
    +                   "Not all requested column family handles have been 
created");
     
    -           return AsyncStoppableTaskWithCallback.from(ioCallable);
    +           return dbRef;
        }
     
        /**
    -    * Encapsulates the process to perform a snapshot of a 
RocksDBKeyedStateBackend.
    +    * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from a full snapshot.
         */
    -   static final class RocksDBFullSnapshotOperation<K> {
    +   private static final class RocksDBFullRestoreOperation<K> {
     
    -           static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
    -           static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
    -
    -           private final RocksDBKeyedStateBackend<K> stateBackend;
    -           private final KeyGroupRangeOffsets keyGroupRangeOffsets;
    -           private final CheckpointStreamFactory checkpointStreamFactory;
    -           private final CloseableRegistry snapshotCloseableRegistry;
    -           private final ResourceGuard.Lease dbLease;
    -
    -           private long checkpointId;
    -           private long checkpointTimeStamp;
    -
    -           private Snapshot snapshot;
    -           private ReadOptions readOptions;
    -           private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
    -
    -           private CheckpointStreamFactory.CheckpointStateOutputStream 
outStream;
    -           private DataOutputView outputView;
    -
    -           RocksDBFullSnapshotOperation(
    -                   RocksDBKeyedStateBackend<K> stateBackend,
    -                   CheckpointStreamFactory checkpointStreamFactory,
    -                   CloseableRegistry registry) throws IOException {
    +           private final RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend;
     
    -                   this.stateBackend = stateBackend;
    -                   this.checkpointStreamFactory = checkpointStreamFactory;
    -                   this.keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(stateBackend.keyGroupRange);
    -                   this.snapshotCloseableRegistry = registry;
    -                   this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
    -           }
    +           /** Current key-groups state handle from which we restore 
key-groups. */
    +           private KeyGroupsStateHandle currentKeyGroupsStateHandle;
    +           /** Current input stream we obtained from 
currentKeyGroupsStateHandle. */
    +           private FSDataInputStream currentStateHandleInStream;
    +           /** Current data input view that wraps 
currentStateHandleInStream. */
    +           private DataInputView currentStateHandleInView;
    +           /** Current list of ColumnFamilyHandles for all column families 
we restore from currentKeyGroupsStateHandle. */
    +           private List<ColumnFamilyHandle> 
currentStateHandleKVStateColumnFamilies;
    +           /** The compression decorator that was used for writing the 
state, as determined by the meta data. */
    +           private StreamCompressionDecorator 
keygroupStreamCompressionDecorator;
     
                /**
    -            * 1) Create a snapshot object from RocksDB.
    +            * Creates a restore operation object for the given state 
backend instance.
                 *
    -            * @param checkpointId id of the checkpoint for which we take 
the snapshot
    -            * @param checkpointTimeStamp timestamp of the checkpoint for 
which we take the snapshot
    +            * @param rocksDBKeyedStateBackend the state backend into which 
we restore
                 */
    -           public void takeDBSnapShot(long checkpointId, long 
checkpointTimeStamp) {
    -                   Preconditions.checkArgument(snapshot == null, "Only one 
ongoing snapshot allowed!");
    -                   this.kvStateIterators = new 
ArrayList<>(stateBackend.kvStateInformation.size());
    -                   this.checkpointId = checkpointId;
    -                   this.checkpointTimeStamp = checkpointTimeStamp;
    -                   this.snapshot = stateBackend.db.getSnapshot();
    +           public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend) {
    +                   this.rocksDBKeyedStateBackend = 
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
                }
     
                /**
    -            * 2) Open CheckpointStateOutputStream through the 
checkpointStreamFactory into which we will write.
    +            * Restores all key-groups data that is referenced by the 
passed state handles.
                 *
    -            * @throws Exception
    +            * @param keyedStateHandles List of all key groups state 
handles that shall be restored.
                 */
    -           public void openCheckpointStream() throws Exception {
    -                   Preconditions.checkArgument(outStream == null, "Output 
stream for snapshot is already set.");
    -                   outStream = 
checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, 
checkpointTimeStamp);
    -                   snapshotCloseableRegistry.registerCloseable(outStream);
    -                   outputView = new DataOutputViewStreamWrapper(outStream);
    -           }
    +           public void doRestore(Collection<KeyedStateHandle> 
keyedStateHandles)
    +                   throws IOException, StateMigrationException, 
RocksDBException {
     
    -           /**
    -            * 3) Write the actual data from RocksDB from the time we took 
the snapshot object in (1).
    -            *
    -            * @throws IOException
    -            */
    -           public void writeDBSnapshot() throws IOException, 
InterruptedException {
    +                   rocksDBKeyedStateBackend.createDB();
     
    -                   if (null == snapshot) {
    -                           throw new IOException("No snapshot available. 
Might be released due to cancellation.");
    -                   }
    +                   for (KeyedStateHandle keyedStateHandle : 
keyedStateHandles) {
    +                           if (keyedStateHandle != null) {
     
    -                   Preconditions.checkNotNull(outStream, "No output stream 
to write snapshot.");
    -                   writeKVStateMetaData();
    -                   writeKVStateData();
    +                                   if (!(keyedStateHandle instanceof 
KeyGroupsStateHandle)) {
    +                                           throw new 
IllegalStateException("Unexpected state handle type, " +
    +                                                   "expected: " + 
KeyGroupsStateHandle.class +
    +                                                   ", but found: " + 
keyedStateHandle.getClass());
    +                                   }
    +                                   this.currentKeyGroupsStateHandle = 
(KeyGroupsStateHandle) keyedStateHandle;
    +                                   restoreKeyGroupsInStateHandle();
    +                           }
    +                   }
                }
     
                /**
    -            * 4) Returns a state handle to the snapshot after the snapshot 
procedure is completed and null before.
    -            *
    -            * @return state handle to the completed snapshot
    +            * Restore one key groups state handle.
                 */
    -           public KeyGroupsStateHandle getSnapshotResultStateHandle() 
throws IOException {
    -
    -                   if 
(snapshotCloseableRegistry.unregisterCloseable(outStream)) {
    -
    -                           StreamStateHandle stateHandle = 
outStream.closeAndGetHandle();
    -                           outStream = null;
    -
    -                           if (stateHandle != null) {
    -                                   return new 
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
    +           private void restoreKeyGroupsInStateHandle()
    +                   throws IOException, StateMigrationException, 
RocksDBException {
    +                   try {
    +                           currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
    +                           
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
    +                           currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
    +                           restoreKVStateMetaData();
    +                           restoreKVStateData();
    +                   } finally {
    +                           if 
(rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
 {
    +                                   
IOUtils.closeQuietly(currentStateHandleInStream);
                                }
                        }
    -                   return null;
                }
     
                /**
    -            * 5) Release the snapshot object for RocksDB and clean up.
    +            * Restore the KV-state / ColumnFamily meta data for all 
key-groups referenced by the current state handle.
    +            *
    +            * @throws IOException
    +            * @throws ClassNotFoundException
    +            * @throws RocksDBException
                 */
    -           public void releaseSnapshotResources() {
    +           private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
     
    -                   outStream = null;
    +                   KeyedBackendSerializationProxy<K> serializationProxy =
    +                           new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
     
    -                   if (null != kvStateIterators) {
    -                           for (Tuple2<RocksIterator, Integer> 
kvStateIterator : kvStateIterators) {
    -                                   
IOUtils.closeQuietly(kvStateIterator.f0);
    -                           }
    -                           kvStateIterators = null;
    -                   }
    +                   serializationProxy.read(currentStateHandleInView);
     
    -                   if (null != snapshot) {
    -                           if (null != stateBackend.db) {
    -                                   
stateBackend.db.releaseSnapshot(snapshot);
    -                           }
    -                           IOUtils.closeQuietly(snapshot);
    -                           snapshot = null;
    -                   }
    +                   // check for key serializer compatibility; this also 
reconfigures the
    +                   // key serializer to be compatible, if it is required 
and is possible
    +                   if (CompatibilityUtil.resolveCompatibilityResult(
    +                           serializationProxy.getKeySerializer(),
    +                           UnloadableDummyTypeSerializer.class,
    +                           
serializationProxy.getKeySerializerConfigSnapshot(),
    +                           rocksDBKeyedStateBackend.keySerializer)
    +                           .isRequiresMigration()) {
     
    -                   if (null != readOptions) {
    -                           IOUtils.closeQuietly(readOptions);
    -                           readOptions = null;
    +                           // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
    +                           throw new StateMigrationException("The new key 
serializer is not compatible to read previous keys. " +
    +                                   "Aborting now since state migration is 
currently not available");
                        }
     
    -                   this.dbLease.close();
    -           }
    -
    -           private void writeKVStateMetaData() throws IOException {
    +                   this.keygroupStreamCompressionDecorator = 
serializationProxy.isUsingKeyGroupCompression() ?
    +                           SnappyStreamCompressionDecorator.INSTANCE : 
UncompressedStreamCompressionDecorator.INSTANCE;
     
    -                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> metaInfoSnapshots =
    -                           new 
ArrayList<>(stateBackend.kvStateInformation.size());
    +                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
    +                           serializationProxy.getStateMetaInfoSnapshots();
    +                   currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
    +                   //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = 
new HashMap<>(restoredMetaInfos.size());
     
    -                   int kvStateId = 0;
    -                   for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
    -                           stateBackend.kvStateInformation.entrySet()) {
    +                   for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
restoredMetaInfo : restoredMetaInfos) {
     
    -                           
metaInfoSnapshots.add(column.getValue().f1.snapshot());
    +                           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
    +                                   
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
     
    -                           //retrieve iterator for this k/v states
    -                           readOptions = new ReadOptions();
    -                           readOptions.setSnapshot(snapshot);
    +                           if (registeredColumn == null) {
    +                                   byte[] nameBytes = 
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
     
    -                           kvStateIterators.add(
    -                                   new 
Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), 
kvStateId));
    +                                   ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +                                           nameBytes,
    +                                           
rocksDBKeyedStateBackend.columnOptions);
     
    -                           ++kvStateId;
    -                   }
    +                                   RegisteredKeyedBackendStateMetaInfo<?, 
?> stateMetaInfo =
    +                                           new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                                                   
restoredMetaInfo.getStateType(),
    +                                                   
restoredMetaInfo.getName(),
    +                                                   
restoredMetaInfo.getNamespaceSerializer(),
    +                                                   
restoredMetaInfo.getStateSerializer());
     
    -                   KeyedBackendSerializationProxy<K> serializationProxy =
    -                           new KeyedBackendSerializationProxy<>(
    -                                   stateBackend.getKeySerializer(),
    -                                   metaInfoSnapshots,
    -                                   
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
stateBackend.keyGroupCompressionDecorator));
    +                                   
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
 restoredMetaInfo);
     
    -                   serializationProxy.write(outputView);
    -           }
    +                                   ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
     
    -           private void writeKVStateData() throws IOException, 
InterruptedException {
    +                                   registeredColumn = new 
Tuple2<>(columnFamily, stateMetaInfo);
    +                                   
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), 
registeredColumn);
     
    -                   byte[] previousKey = null;
    -                   byte[] previousValue = null;
    -                   OutputStream kgOutStream = null;
    -                   DataOutputView kgOutView = null;
    +                           } else {
    +                                   // TODO with eager state registration 
in place, check here for serializer migration strategies
    +                           }
    +                           
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
    +                   }
    +           }
     
    -                   try {
    -                           // Here we transfer ownership of RocksIterators 
to the RocksDBMergeIterator
    -                           try (RocksDBMergeIterator mergeIterator = new 
RocksDBMergeIterator(
    -                                   kvStateIterators, 
stateBackend.keyGroupPrefixBytes)) {
    +           /**
    +            * Restore the KV-state / ColumnFamily data for all key-groups 
referenced by the current state handle.
    +            *
    +            * @throws IOException
    +            * @throws RocksDBException
    +            */
    +           private void restoreKVStateData() throws IOException, 
RocksDBException {
    +                   //for all key-groups in the current state handle...
    +                   for (Tuple2<Integer, Long> keyGroupOffset : 
currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
    +                           int keyGroup = keyGroupOffset.f0;
     
    -                                   // handover complete, null out to 
prevent double close
    -                                   kvStateIterators = null;
    +                           // Check that restored key groups all belong to 
the backend
    +                           
Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
    +                                   "The key group must belong to the 
backend");
     
    -                                   //preamble: setup with first key-group 
as our lookahead
    -                                   if (mergeIterator.isValid()) {
    -                                           //begin first key-group by 
recording the offset
    -                                           
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
    -                                           //write the k/v-state id as 
metadata
    -                                           kgOutStream = 
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
    -                                           kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
    +                           long offset = keyGroupOffset.f1;
    +                           //not empty key-group?
    +                           if (0L != offset) {
    +                                   currentStateHandleInStream.seek(offset);
    +                                   try (InputStream compressedKgIn = 
keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
 {
    +                                           DataInputViewStreamWrapper 
compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
                                                //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
    -                                           
kgOutView.writeShort(mergeIterator.kvStateId());
    -                                           previousKey = 
mergeIterator.key();
    -                                           previousValue = 
mergeIterator.value();
    -                                           mergeIterator.next();
    +                                           int kvStateId = 
compressedKgInputView.readShort();
    +                                           ColumnFamilyHandle handle = 
currentStateHandleKVStateColumnFamilies.get(kvStateId);
    +                                           //insert all k/v pairs into DB
    +                                           boolean keyGroupHasMoreKeys = 
true;
    +                                           while (keyGroupHasMoreKeys) {
    +                                                   byte[] key = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
    +                                                   byte[] value = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
    +                                                   if 
(RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
    +                                                           //clear the 
signal bit in the key to make it ready for insertion again
    +                                                           
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
    +                                                           
rocksDBKeyedStateBackend.db.put(handle, key, value);
    +                                                           //TODO this 
could be aware of keyGroupPrefixBytes and write only one byte if possible
    +                                                           kvStateId = 
RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
    +                                                                   & 
compressedKgInputView.readShort();
    +                                                           if 
(RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
    +                                                                   
keyGroupHasMoreKeys = false;
    +                                                           } else {
    +                                                                   handle 
= currentStateHandleKVStateColumnFamilies.get(kvStateId);
    +                                                           }
    +                                                   } else {
    +                                                           
rocksDBKeyedStateBackend.db.put(handle, key, value);
    +                                                   }
    +                                           }
                                        }
    +                           }
    +                   }
    +           }
    +   }
     
    -                                   //main loop: write k/v pairs ordered by 
(key-group, kv-state), thereby tracking key-group offsets.
    -                                   while (mergeIterator.isValid()) {
    +   /**
    +    * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from an incremental snapshot.
    +    */
    +   private static class RocksDBIncrementalRestoreOperation<T> {
     
    -                                           assert 
(!hasMetaDataFollowsFlag(previousKey));
    +           private final RocksDBKeyedStateBackend<T> stateBackend;
     
    -                                           //set signal in first key byte 
that meta data will follow in the stream after this k/v pair
    -                                           if 
(mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
    +           private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
    +                   this.stateBackend = stateBackend;
    +           }
     
    -                                                   //be cooperative and 
check for interruption from time to time in the hot loop
    -                                                   checkInterrupted();
    +           private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> readMetaData(
    +                   StreamStateHandle metaStateHandle) throws Exception {
     
    -                                                   
setMetaDataFollowsFlagInKey(previousKey);
    -                                           }
    +                   FSDataInputStream inputStream = null;
     
    -                                           writeKeyValuePair(previousKey, 
previousValue, kgOutView);
    +                   try {
    +                           inputStream = metaStateHandle.openInputStream();
    +                           
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
     
    -                                           //write meta data if we have to
    -                                           if 
(mergeIterator.isNewKeyGroup()) {
    -                                                   //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
    -                                                   
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
    -                                                   // this will just close 
the outer stream
    -                                                   kgOutStream.close();
    -                                                   //begin new key-group
    -                                                   
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
    -                                                   //write the kev-state
    -                                                   //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
    -                                                   kgOutStream = 
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
    -                                                   kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
    -                                                   
kgOutView.writeShort(mergeIterator.kvStateId());
    -                                           } else if 
(mergeIterator.isNewKeyValueState()) {
    -                                                   //write the k/v-state
    -                                                   //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
    -                                                   
kgOutView.writeShort(mergeIterator.kvStateId());
    -                                           }
    +                           KeyedBackendSerializationProxy<T> 
serializationProxy =
    +                                   new 
KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
    +                           DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
    +                           serializationProxy.read(in);
     
    -                                           //request next k/v pair
    -                                           previousKey = 
mergeIterator.key();
    -                                           previousValue = 
mergeIterator.value();
    -                                           mergeIterator.next();
    -                                   }
    -                           }
    +                           // check for key serializer compatibility; this 
also reconfigures the
    +                           // key serializer to be compatible, if it is 
required and is possible
    +                           if 
(CompatibilityUtil.resolveCompatibilityResult(
    +                                   serializationProxy.getKeySerializer(),
    +                                   UnloadableDummyTypeSerializer.class,
    +                                   
serializationProxy.getKeySerializerConfigSnapshot(),
    +                                   stateBackend.keySerializer)
    +                                   .isRequiresMigration()) {
     
    -                           //epilogue: write last key-group
    -                           if (previousKey != null) {
    -                                   assert 
(!hasMetaDataFollowsFlag(previousKey));
    -                                   
setMetaDataFollowsFlagInKey(previousKey);
    -                                   writeKeyValuePair(previousKey, 
previousValue, kgOutView);
    -                                   //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
    -                                   
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
    -                                   // this will just close the outer stream
    -                                   kgOutStream.close();
    -                                   kgOutStream = null;
    +                                   // TODO replace with state migration; 
note that key hash codes need to remain the same after migration
    +                                   throw new StateMigrationException("The 
new key serializer is not compatible to read previous keys. " +
    +                                           "Aborting now since state 
migration is currently not available");
                                }
     
    +                           return 
serializationProxy.getStateMetaInfoSnapshots();
                        } finally {
    -                           // this will just close the outer stream
    -                           IOUtils.closeQuietly(kgOutStream);
    +                           if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
    +                                   inputStream.close();
    +                           }
                        }
                }
     
    -           private void writeKeyValuePair(byte[] key, byte[] value, 
DataOutputView out) throws IOException {
    -                   BytePrimitiveArraySerializer.INSTANCE.serialize(key, 
out);
    -                   BytePrimitiveArraySerializer.INSTANCE.serialize(value, 
out);
    -           }
    +           private void readStateData(
    +                   Path restoreFilePath,
    +                   StreamStateHandle remoteFileHandle) throws IOException {
     
    -           static void setMetaDataFollowsFlagInKey(byte[] key) {
    -                   key[0] |= FIRST_BIT_IN_BYTE_MASK;
    -           }
    +                   FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
     
    -           static void clearMetaDataFollowsFlag(byte[] key) {
    -                   key[0] &= 
(~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
    -           }
    +                   FSDataInputStream inputStream = null;
    +                   FSDataOutputStream outputStream = null;
     
    -           static boolean hasMetaDataFollowsFlag(byte[] key) {
    -                   return 0 != (key[0] & 
RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
    -           }
    +                   try {
    +                           inputStream = 
remoteFileHandle.openInputStream();
    +                           
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
     
    -           private static void checkInterrupted() throws 
InterruptedException {
    -                   if (Thread.currentThread().isInterrupted()) {
    -                           throw new InterruptedException("RocksDB 
snapshot interrupted.");
    -                   }
    -           }
    -   }
    +                           outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
    +                           
stateBackend.cancelStreamRegistry.registerCloseable(outputStream);
     
    -   private static final class RocksDBIncrementalSnapshotOperation<K> {
    +                           byte[] buffer = new byte[8 * 1024];
    +                           while (true) {
    +                                   int numBytes = inputStream.read(buffer);
    +                                   if (numBytes == -1) {
    +                                           break;
    +                                   }
     
    -           /** The backend which we snapshot. */
    -           private final RocksDBKeyedStateBackend<K> stateBackend;
    +                                   outputStream.write(buffer, 0, numBytes);
    +                           }
    +                   } finally {
    +                           if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
    +                                   inputStream.close();
    +                           }
     
    -           /** Stream factory that creates the outpus streams to DFS. */
    -           private final CheckpointStreamFactory checkpointStreamFactory;
    +                           if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
    +                                   outputStream.close();
    +                           }
    +                   }
    +           }
     
    -           /** Id for the current checkpoint. */
    -           private final long checkpointId;
    +           private void restoreInstance(
    +                   IncrementalKeyedStateHandle restoreStateHandle,
    +                   boolean hasExtraKeys) throws Exception {
     
    -           /** Timestamp for the current checkpoint. */
    -           private final long checkpointTimestamp;
    +                   // read state data
    +                   Path restoreInstancePath = new Path(
    +                           stateBackend.instanceBasePath.getAbsolutePath(),
    +                           UUID.randomUUID().toString());
     
    -           /** All sst files that were part of the last previously 
completed checkpoint. */
    -           private Set<StateHandleID> baseSstFiles;
    +                   try {
    +                           final Map<StateHandleID, StreamStateHandle> 
sstFiles =
    +                                   restoreStateHandle.getSharedState();
    +                           final Map<StateHandleID, StreamStateHandle> 
miscFiles =
    +                                   restoreStateHandle.getPrivateState();
     
    -           /** The state meta data. */
    -           private final 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
= new ArrayList<>();
    +                           readAllStateData(sstFiles, restoreInstancePath);
    +                           readAllStateData(miscFiles, 
restoreInstancePath);
     
    -           /** Local filesystem for the RocksDB backup. */
    -           private FileSystem backupFileSystem;
    +                           // read meta data
    +                           
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
=
    +                                   
readMetaData(restoreStateHandle.getMetaStateHandle());
     
    -           /** Local path for the RocksDB backup. */
    -           private Path backupPath;
    +                           List<ColumnFamilyDescriptor> 
columnFamilyDescriptors =
    +                                   new ArrayList<>(1 + 
stateMetaInfoSnapshots.size());
     
    -           // Registry for all opened i/o streams
    -           private final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
    +                           for 
(RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : 
stateMetaInfoSnapshots) {
     
    -           // new sst files since the last completed checkpoint
    -           private final Map<StateHandleID, StreamStateHandle> sstFiles = 
new HashMap<>();
    +                                   ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +                                           
stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
    +                                           stateBackend.columnOptions);
     
    -           // handles to the misc files in the current snapshot
    -           private final Map<StateHandleID, StreamStateHandle> miscFiles = 
new HashMap<>();
    +                                   
columnFamilyDescriptors.add(columnFamilyDescriptor);
    +                                   
stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), 
stateMetaInfoSnapshot);
    +                           }
     
    -           // This lease protects from concurrent disposal of the native 
rocksdb instance.
    -           private final ResourceGuard.Lease dbLease;
    +                           if (hasExtraKeys) {
     
    -           private StreamStateHandle metaStateHandle = null;
    +                                   List<ColumnFamilyHandle> 
columnFamilyHandles =
    +                                           new ArrayList<>(1 + 
columnFamilyDescriptors.size());
     
    -           private RocksDBIncrementalSnapshotOperation(
    -                   RocksDBKeyedStateBackend<K> stateBackend,
    -                   CheckpointStreamFactory checkpointStreamFactory,
    -                   long checkpointId,
    -                   long checkpointTimestamp) throws IOException {
    +                                   try (RocksDB restoreDb = 
stateBackend.openDB(
    +                                           restoreInstancePath.getPath(),
    +                                           columnFamilyDescriptors,
    +                                           columnFamilyHandles)) {
     
    -                   this.stateBackend = stateBackend;
    -                   this.checkpointStreamFactory = checkpointStreamFactory;
    -                   this.checkpointId = checkpointId;
    -                   this.checkpointTimestamp = checkpointTimestamp;
    -                   this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
    -           }
    +                                           try {
    +                                                   // iterating only the 
requested descriptors automatically skips the default column family handle
    +                                                   for (int i = 0; i < 
columnFamilyDescriptors.size(); ++i) {
    +                                                           
ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
    +                                                           
ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
    +                                                           
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
     
    -           private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
    -                   FSDataInputStream inputStream = null;
    -                   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
    +                                                           
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> 
registeredStateMetaInfoEntry =
    +                                                                   
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
     
    -                   try {
    -                           final byte[] buffer = new byte[8 * 1024];
    +                                                           if (null == 
registeredStateMetaInfoEntry) {
     
    -                           FileSystem backupFileSystem = 
backupPath.getFileSystem();
    -                           inputStream = backupFileSystem.open(filePath);
    -                           
closeableRegistry.registerCloseable(inputStream);
    +                                                                   
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
    +                                                                           
new RegisteredKeyedBackendStateMetaInfo<>(
    +                                                                           
        stateMetaInfoSnapshot.getStateType(),
    +                                                                           
        stateMetaInfoSnapshot.getName(),
    +                                                                           
        stateMetaInfoSnapshot.getNamespaceSerializer(),
    +                                                                           
        stateMetaInfoSnapshot.getStateSerializer());
     
    -                           outputStream = checkpointStreamFactory
    -                                   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    -                           
closeableRegistry.registerCloseable(outputStream);
    +                                                                   
registeredStateMetaInfoEntry =
    +                                                                           
new Tuple2<>(
    +                                                                           
        stateBackend.db.createColumnFamily(columnFamilyDescriptor),
    +                                                                           
        stateMetaInfo);
     
    -                           while (true) {
    -                                   int numBytes = inputStream.read(buffer);
    +                                                                   
stateBackend.kvStateInformation.put(
    +                                                                           
stateMetaInfoSnapshot.getName(),
    +                                                                           
registeredStateMetaInfoEntry);
    +                                                           }
     
    -                                   if (numBytes == -1) {
    -                                           break;
    -                                   }
    +                                                           
ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
     
    -                                   outputStream.write(buffer, 0, numBytes);
    -                           }
    +                                                           try 
(RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) {
     
    -                           StreamStateHandle result = null;
    -                           if 
(closeableRegistry.unregisterCloseable(outputStream)) {
    -                                   result = 
outputStream.closeAndGetHandle();
    -                                   outputStream = null;
    -                           }
    -                           return result;
    +                                                                   int 
startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
    +                                                                   byte[] 
startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
    +                                                                   for 
(int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
    +                                                                           
startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> 
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
    +                                                                   }
     
    -                   } finally {
    +                                                                   
iterator.seek(startKeyGroupPrefixBytes);
     
    -                           if 
(closeableRegistry.unregisterCloseable(inputStream)) {
    -                                   inputStream.close();
    -                           }
    +                                                                   while 
(iterator.isValid()) {
     
    -                           if 
(closeableRegistry.unregisterCloseable(outputStream)) {
    -                                   outputStream.close();
    -                           }
    -                   }
    -           }
    +                                                                           
int keyGroup = 0;
    +                                                                           
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
    +                                                                           
        keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
    +                                                                           
}
     
    -           private StreamStateHandle materializeMetaData() throws 
Exception {
    -                   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
    +                                                                           
if (stateBackend.keyGroupRange.contains(keyGroup)) {
    +                                                                           
        stateBackend.db.put(targetColumnFamilyHandle,
    +                                                                           
                iterator.key(), iterator.value());
    +                                                                           
}
     
    -                   try {
    -                           outputStream = checkpointStreamFactory
    -                                   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    -                           
closeableRegistry.registerCloseable(outputStream);
    +                                                                           
iterator.next();
    +                                                                   }
    +                                                           } // releases 
native iterator resources
    +                                                   }
    +                                           } finally {
    +                                                   //release native tmp db 
column family resources
    +                                                   for (ColumnFamilyHandle 
columnFamilyHandle : columnFamilyHandles) {
    +                                                           
IOUtils.closeQuietly(columnFamilyHandle);
    +                                                   }
    +                                           }
    +                                   } // releases native tmp db resources
    +                           } else {
    +                                   // pick up again the old backend id, so 
the we can reference existing state
    +                                   stateBackend.backendUID = 
restoreStateHandle.getBackendIdentifier();
     
    -                           //no need for compression scheme support 
because sst-files are already compressed
    -                           KeyedBackendSerializationProxy<K> 
serializationProxy =
    -                                   new KeyedBackendSerializationProxy<>(
    -                                           stateBackend.keySerializer,
    -                                           stateMetaInfoSnapshots,
    -                                           false);
    +                                   LOG.debug("Restoring keyed backend uid 
in operator {} from incremental snapshot to {}.",
    +                                           
stateBackend.operatorIdentifier, stateBackend.backendUID);
     
    -                           DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
    +                                   // create hard links in the instance 
directory
    +                                   if 
(!stateBackend.instanceRocksDBPath.mkdirs()) {
    +                                           throw new IOException("Could 
not create RocksDB data directory.");
    +                                   }
     
    -                           serializationProxy.write(out);
    +                                   
createFileHardLinksInRestorePath(sstFiles, restoreInstancePath);
    +                                   
createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);
     
    -                           StreamStateHandle result = null;
    -                           if 
(closeableRegistry.unregisterCloseable(outputStream)) {
    -                                   result = 
outputStream.closeAndGetHandle();
    -                                   outputStream = null;
    +                                   List<ColumnFamilyHandle> 
columnFamilyHandles =
    +                                           new ArrayList<>(1 + 
columnFamilyDescriptors.size());
    +
    +                                   stateBackend.db = stateBackend.openDB(
    +                                           
stateBackend.instanceRocksDBPath.getAbsolutePath(),
    +                                           columnFamilyDescriptors, 
columnFamilyHandles);
    +
    +                                   // extract and store the default column 
family which is located at the last index
    +                                   stateBackend.defaultColumnFamily = 
columnFamilyHandles.remove(columnFamilyHandles.size() - 1);
    +
    +                                   for (int i = 0; i < 
columnFamilyDescriptors.size(); ++i) {
    +                                           
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
    +
    +                                           ColumnFamilyHandle 
columnFamilyHandle = columnFamilyHandles.get(i);
    +                                           
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
    +                                                   new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                                                           
stateMetaInfoSnapshot.getStateType(),
    +                                                           
stateMetaInfoSnapshot.getName(),
    +                                                           
stateMetaInfoSnapshot.getNamespaceSerializer(),
    +                                                           
stateMetaInfoSnapshot.getStateSerializer());
    +
    +                                           
stateBackend.kvStateInformation.put(
    +                                                   
stateMetaInfoSnapshot.getName(),
    +                                                   new 
Tuple2<>(columnFamilyHandle, stateMetaInfo));
    +                                   }
    +
    +                                   // use the restore sst files as the 
base for succeeding checkpoints
    +                                   synchronized 
(stateBackend.materializedSstFiles) {
    +                                           
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), 
sstFiles.keySet());
    +                                   }
    +
    +                                   stateBackend.lastCompletedCheckpointId 
= restoreStateHandle.getCheckpointId();
                                }
    -                           return result;
                        } finally {
    -                           if (outputStream != null) {
    -                                   if 
(closeableRegistry.unregisterCloseable(outputStream)) {
    -                                           outputStream.close();
    -                                   }
    +                           FileSystem restoreFileSystem = 
restoreInstancePath.getFileSystem();
    +                           if 
(restoreFileSystem.exists(restoreInstancePath)) {
    +                                   
restoreFileSystem.delete(restoreInstancePath, true);
                                }
                        }
                }
     
    -           void takeSnapshot() throws Exception {
    -
    -                   final long lastCompletedCheckpoint;
    +           private void readAllStateData(
    +                   Map<StateHandleID, StreamStateHandle> stateHandleMap,
    +                   Path restoreInstancePath) throws IOException {
     
    -                   // use the last completed checkpoint as the comparison 
base.
    -                   synchronized (stateBackend.materializedSstFiles) {
    -                           lastCompletedCheckpoint = 
stateBackend.lastCompletedCheckpointId;
    -                           baseSstFiles = 
stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
    +                   for (Map.Entry<StateHandleID, StreamStateHandle> entry 
: stateHandleMap.entrySet()) {
    +                           StateHandleID stateHandleID = entry.getKey();
    +                           StreamStateHandle remoteFileHandle = 
entry.getValue();
    +                           readStateData(new Path(restoreInstancePath, 
stateHandleID.toString()), remoteFileHandle);
                        }
    +           }
     
    -                   LOG.trace("Taking incremental snapshot for checkpoint 
{}. Snapshot is based on last completed checkpoint {} " +
    -                           "assuming the following (shared) files as base: 
{}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
    +           private void createFileHardLinksInRestorePath(
    +                   Map<StateHandleID, StreamStateHandle> stateHandleMap,
    +                   Path restoreInstancePath) throws IOException {
     
    -                   // save meta data
    -                   for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
    -                           : stateBackend.kvStateInformation.entrySet()) {
    -                           
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
    +                   for (StateHandleID stateHandleID : 
stateHandleMap.keySet()) {
    +                           String newSstFileName = 
stateHandleID.toString();
    +                           File restoreFile = new 
File(restoreInstancePath.getPath(), newSstFileName);
    +                           File targetFile = new 
File(stateBackend.instanceRocksDBPath, newSstFileName);
    +                           Files.createLink(targetFile.toPath(), 
restoreFile.toPath());
                        }
    +           }
     
    -                   // save state data
    -                   backupPath = new 
Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +           void restore(Collection<KeyedStateHandle> restoreStateHandles) 
throws Exception {
     
    -                   LOG.trace("Local RocksDB checkpoint goes to backup path 
{}.", backupPath);
    +                   boolean hasExtraKeys = (restoreStateHandles.size() > 1 
||
    +                           
!Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(), 
stateBackend.keyGroupRange));
     
    -                   backupFileSystem = backupPath.getFileSystem();
    -                   if (backupFileSystem.exists(backupPath)) {
    -                           throw new IllegalStateException("Unexpected 
existence of the backup directory.");
    +                   if (hasExtraKeys) {
    +                           stateBackend.createDB();
                        }
     
    -                   // create hard links of living files in the checkpoint 
path
    -                   Checkpoint checkpoint = 
Checkpoint.create(stateBackend.db);
    -                   checkpoint.createCheckpoint(backupPath.getPath());
    -           }
    -
    -           SnapshotResult<KeyedStateHandle> materializeSnapshot() throws 
Exception {
    -
    -                   
stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
    +                   for (KeyedStateHandle rawStateHandle : 
restoreStateHandles) {
     
    -                   // write meta data
    -                   metaStateHandle = materializeMetaData();
    -
    -                   // write state data
    -                   
Preconditions.checkState(backupFileSystem.exists(backupPath));
    -
    -                   FileStatus[] fileStatuses = 
backupFileSystem.listStatus(backupPath);
    -                   if (fileStatuses != null) {
    -                           for (FileStatus fileStatus : fileStatuses) {
    -                                   final Path filePath = 
fileStatus.getPath();
    -                                   final String fileName = 
filePath.getName();
    -                                   final StateHandleID stateHandleID = new 
StateHandleID(fileName);
    -
    -                                   if (fileName.endsWith(SST_FILE_SUFFIX)) 
{
    -                                           final boolean existsAlready =
    -                                                   baseSstFiles != null && 
baseSstFiles.contains(stateHandleID);
    -
    -                                           if (existsAlready) {
    -                                                   // we introduce a 
placeholder state handle, that is replaced with the
    -                                                   // original from the 
shared state registry (created from a previous checkpoint)
    -                                                   sstFiles.put(
    -                                                           stateHandleID,
    -                                                           new 
PlaceholderStreamStateHandle());
    -                                           } else {
    -                                                   
sstFiles.put(stateHandleID, materializeStateData(filePath));
    -                                           }
    -                                   } else {
    -                                           StreamStateHandle fileHandle = 
materializeStateData(filePath);
    -                                           miscFiles.put(stateHandleID, 
fileHandle);
    -                                   }
    +                           if (!(rawStateHandle instanceof 
IncrementalKeyedStateHandle)) {
    +                                   throw new 
IllegalStateException("Unexpected state handle type, " +
    +                                           "expected " + 
IncrementalKeyedStateHandle.class +
    +                                           ", but found " + 
rawStateHandle.getClass());
                                }
    -                   }
     
    -                   synchronized (stateBackend.materializedSstFiles) {
    -                           
stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
    +                           IncrementalKeyedStateHandle keyedStateHandle = 
(IncrementalKeyedStateHandle) rawStateHandle;
    +
    +                           restoreInstance(keyedStateHandle, hasExtraKeys);
                        }
    +           }
    +   }
     
    -                   IncrementalKeyedStateHandle incrementalKeyedStateHandle 
= new IncrementalKeyedStateHandle(
    -                           stateBackend.backendUID,
    -                           stateBackend.keyGroupRange,
    -                           checkpointId,
    -                           sstFiles,
    -                           miscFiles,
    -                           metaStateHandle);
    +   // 
------------------------------------------------------------------------
    +   //  State factories
    +   // 
------------------------------------------------------------------------
     
    -                   return new 
SnapshotResult<>(incrementalKeyedStateHandle, null);
    -           }
    +   /**
    +    * Creates a column family handle for use with a k/v state. When 
restoring from a snapshot
    +    * we don't restore the individual k/v states, just the global RocksDB 
data base and the
    +    * list of column families. When a k/v state is first requested we 
check here whether we
    +    * already have a column family for that and return it or create a new 
one if it doesn't exist.
    +    *
    +    * <p>This also checks whether the {@link StateDescriptor} for a state 
matches the one
    +    * that we checkpointed, i.e. is already in the map of column families.
    +    */
    +   @SuppressWarnings("rawtypes, unchecked")
    +   protected <N, S> ColumnFamilyHandle getColumnFamily(
    +           StateDescriptor<?, S> descriptor, TypeSerializer<N> 
namespaceSerializer) throws IOException, StateMigrationException {
     
    -           void stop() {
    +           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
    +                   kvStateInformation.get(descriptor.getName());
     
    -                   if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
    -                           try {
    -                                   closeableRegistry.close();
    -                           } catch (IOException e) {
    -                                   LOG.warn("Could not properly close io 
streams.", e);
    -                           }
    -                   }
    -           }
    +           RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                   descriptor.getType(),
    +                   descriptor.getName(),
    +                   namespaceSerializer,
    +                   descriptor.getSerializer());
     
    -           void releaseResources(boolean canceled) {
    +           if (stateInfo != null) {
    +                   // TODO with eager registration in place, these checks 
should be moved to restore()
     
    -                   dbLease.close();
    +                   RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
restoredMetaInfo =
    +                           
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) 
restoredKvStateMetaInfos.get(descriptor.getName());
     
    -                   if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
    -                           try {
    -                                   closeableRegistry.close();
    -                           } catch (IOException e) {
    -                                   LOG.warn("Exception on closing 
registry.", e);
    -                           }
    -                   }
    +                   Preconditions.checkState(
    +                           Objects.equals(newMetaInfo.getName(), 
restoredMetaInfo.getName()),
    +                           "Incompatible state names. " +
    +                                   "Was [" + restoredMetaInfo.getName() + 
"], " +
    +                                   "registered with [" + 
newMetaInfo.getName() + "].");
     
    -                   if (backupPath != null) {
    -                           try {
    -                                   if 
(backupFileSystem.exists(backupPath)) {
    +                   if (!Objects.equals(newMetaInfo.getStateType(), 
StateDescriptor.Type.UNKNOWN)
    +                           && 
!Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) 
{
     
    -                                           LOG.trace("Deleting local 
RocksDB backup path {}.", backupPath);
    -                                           
backupFileSystem.delete(backupPath, true);
    -                                   }
    -                           } catch (Exception e) {
    -                                   LOG.warn("Could not properly delete the 
checkpoint directory.", e);
    -                           }
    +                           Preconditions.checkState(
    +                                   newMetaInfo.getStateType() == 
restoredMetaInfo.getStateType(),
    +                                   "Incompatible state types. " +
    +                                           "Was [" + 
restoredMetaInfo.getStateType() + "], " +
    +                                           "registered with [" + 
newMetaInfo.getStateType() + "].");
                        }
     
    -                   if (canceled) {
    -                           Collection<StateObject> statesToDiscard =
    -                                   new ArrayList<>(1 + miscFiles.size() + 
sstFiles.size());
    +                   // check compatibility results to determine if state 
migration is required
    +                   CompatibilityResult<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
    +                           restoredMetaInfo.getNamespaceSerializer(),
    +                           null,
    +                           
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
    +                           newMetaInfo.getNamespaceSerializer());
     
    -                           statesToDiscard.add(metaStateHandle);
    -                           statesToDiscard.addAll(miscFiles.values());
    -                           statesToDiscard.addAll(sstFiles.values());
    +                   CompatibilityResult<S> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
    +                           restoredMetaInfo.getStateSerializer(),
    +                           UnloadableDummyTypeSerializer.class,
    +                           
restoredMetaInfo.getStateSerializerConfigSnapshot(),
    +                           newMetaInfo.getStateSerializer());
     
    -                           try {
    -                                   
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
    -                           } catch (Exception e) {
    -                                   LOG.warn("Could not properly discard 
states.", e);
    -                           }
    +                   if (namespaceCompatibility.isRequiresMigration() || 
stateCompatibility.isRequiresMigration()) {
    +                           // TODO state migration currently isn't 
possible.
    +                           throw new StateMigrationException("State 
migration isn't supported, yet.");
    +                   } else {
    +                           stateInfo.f1 = newMetaInfo;
    +                           return stateInfo.f0;
                        }
                }
    -   }
     
    -   @Override
    -   public void restore(StateObjectCollection<KeyedStateHandle> 
restoreState) throws Exception {
    -           LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
    +           byte[] nameBytes = 
descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
    +           
Preconditions.checkState(!Arrays.equals(DEFAULT_COLUMN_FAMILY_NAME_BYTES, 
nameBytes),
    +                   "The chosen state name 'default' collides with the name 
of the default column family!");
     
    -           if (LOG.isDebugEnabled()) {
    -                   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
    -           }
    +           ColumnFamilyDescriptor columnDescriptor = new 
ColumnFamilyDescriptor(nameBytes, columnOptions);
     
    -           // clear all meta data
    -           kvStateInformation.clear();
    -           restoredKvStateMetaInfos.clear();
    +           final ColumnFamilyHandle columnFamily;
     
                try {
    -                   if (restoreState == null || restoreState.isEmpty()) {
    -                           createDB();
    -                   } else if (restoreState.iterator().next() instanceof 
IncrementalKeyedStateHandle) {
    -                           RocksDBIncrementalRestoreOperation<K> 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
    -                           restoreOperation.restore(restoreState);
    -                   } else {
    -                           RocksDBFullRestoreOperation<K> restoreOperation 
= new RocksDBFullRestoreOperation<>(this);
    -                           restoreOperation.doRestore(restoreState);
    -                   }
    -           } catch (Exception ex) {
    -                   dispose();
    -                   throw ex;
    +                   columnFamily = db.createColumnFamily(columnDescriptor);
    +           } catch (RocksDBException e) {
    +                   throw new IOException("Error creating 
ColumnFamilyHandle.", e);
                }
    +
    +           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
    +                   new Tuple2<>(columnFamily, newMetaInfo);
    +           Map rawAccess = kvStateInformation;
    +           rawAccess.put(descriptor.getName(), tuple);
    +           return columnFamily;
        }
     
        @Override
    -   public void notifyCheckpointComplete(long completedCheckpointId) {
    -
    -           if (!enableIncrementalCheckpointing) {
    -                   return;
    -           }
    +   protected <N, T> InternalValueState<N, T> createValueState(
    +           TypeSerializer<N> namespaceSerializer,
    +           ValueStateDescriptor<T> stateDesc) throws Exception {
     
    -           synchronized (materializedSstFiles) {
    +           ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
     
    -                   if (completedCheckpointId < lastCompletedCheckpointId) {
    -                           return;
    -                   }
    +           return new RocksDBValueState<>(columnFamily, 
namespaceSerializer,  stateDesc, this);
    +   }
     
    -                   materializedSstFiles.keySet().removeIf(checkpointId -> 
checkpointId < completedCheckpointId);
    +   @Override
    +   protected <N, T> InternalListState<N, T> createListState(
    +           TypeSerializer<N> namespaceSerializer,
    +           ListStateDescriptor<T> stateDesc) throws Exception {
     
    -                   lastCompletedCheckpointId = completedCheckpointId;
    -           }
    -   }
    +           ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
     
    -   private void createDB() throws IOException {
    -           List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
    -           this.db = openDB(instanceRocksDBPath.getAbsolutePath(), 
Collections.emptyList(), columnFamilyHandles);
    -           this.defaultColumnFamily = columnFamilyHandles.get(0);
    +           return new RocksDBListState<>(columnFamily, 
namespaceSerializer, stateDesc, this);
        }
     
    -   private RocksDB openDB(
    -           String path,
    -           List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
    -           List<ColumnFamilyHandle> stateColumnFamilyHandles) throws 
IOException {
    +   @Override
    +   protected <N, T> InternalReducingState<N, T> createReducingState(
    +           TypeSerializer<N> namespaceSerializer,
    +           ReducingStateDescriptor<T> stateDesc) throws Exception {
     
    -           List<ColumnFamilyDescriptor> columnFamilyDescriptors =
    -                   new ArrayList<>(1 + 
stateColumnFamilyDescriptors.size());
    +           ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
     
    -           columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
    +           return new RocksDBReducingState<>(columnFamily, 
namespaceSerializer,  stateDesc, this);
    +   }
     
    -           // we add the required descriptor for the default CF in last 
position.
    -           columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
    +   @Override
    +   protected <N, T, ACC, R> InternalAggregatingState<N, T, R> 
createAggregatingState(
    +           TypeSerializer<N> namespaceSerializer,
    +           AggregatingStateDescriptor<T, ACC, R> stateDesc) throws 
Exception {
     
    -           RocksDB dbRef;
    +           ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
    +           return new RocksDBAggregatingState<>(columnFamily, 
namespaceSerializer, stateDesc, this);
    +   }
     
    -           try {
    -                   dbRef = RocksDB.open(
    -                           Preconditions.checkNotNull(dbOptions),
    -                           Preconditions.checkNotNull(path),
    -                           columnFamilyDescriptors,
    -                           stateColumnFamilyHandles);
    -           } catch (RocksDBException e) {
    -                   throw new IOException("Error while opening RocksDB 
instance.", e);
    -           }
    +   @Override
    +   protected <N, T, ACC> InternalFoldingState<N, T, ACC> 
createFoldingState(
    +           TypeSerializer<N> namespaceSerializer,
    +           FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
     
    -           // requested + default CF
    -           Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
    -                   "Not all requested column family handles have been 
created");
    +           ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
     
    -           return dbRef;
    +           return new RocksDBFoldingState<>(columnFamily, 
namespaceSerializer, stateDesc, this);
        }
     
    -   /**
    -    * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from a snapshot.
    -    */
    -   static final class RocksDBFullRestoreOperation<K> {
    -
    -           private final RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend;
    +   @Override
    +   protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
    +           TypeSerializer<N> namespaceSerializer,
    +           MapStateDescriptor<UK, UV> stateDesc) throws Exception {
     
    -           /** Current key-groups state handle from which we restore 
key-groups. */
    -           private KeyGroupsStateHandle currentKeyGroupsStateHandle;
    -           /** Current input stream we obtained from 
currentKeyGroupsStateHandle. */
    -           private FSDataInputStream currentStateHandleInStream;
    -           /** Current data input view that wraps 
currentStateHandleInStream. */
    -           private DataInputView currentStateHandleInView;
    -           /** Current list of ColumnFamilyHandles for all column families 
we restore from currentKeyGroupsStateHandle. */
    -           private List<ColumnFamilyHandle> 
currentStateHandleKVStateColumnFamilies;
    -           /** The compression decorator that was used for writing the 
state, as determined by the meta data. */
    -           private StreamCompressionDecorator 
keygroupStreamCompressionDecorator;
    +           ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
     
    -           /**
    -            * Creates a restore operation object for the given state 
backend instance.
    -            *
    -            * @param rocksDBKeyedStateBackend the state backend into which 
we restore
    -            */
    -           public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend) {
    -                   this.rocksDBKeyedStateBackend = 
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
    +           return new RocksDBMapState<>(columnFamily, namespaceSerializer, 
stateDesc, this);
    +   }
    +
    +   /**
    +    * Only visible for testing, DO NOT USE.
    +    */
    +   public File getInstanceBasePath() {
    +           return instanceBasePath;
    +   }
    +
    +   @Override
    +   public boolean supportsAsynchronousSnapshots() {
    +           return true;
    +   }
    +
    +   @VisibleForTesting
    +   @SuppressWarnings("unchecked")
    +   @Override
    +   public int numStateEntries() {
    +           int count = 0;
    +
    +           for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> column : 
kvStateInformation.values()) {
    +                   try (RocksIterator rocksIterator = 
db.newIterator(column.f0)) {
    +                           rocksIterator.seekToFirst();
    +
    +                           while (rocksIterator.isValid()) {
    +                                   count++;
    +                                   rocksIterator.next();
    +                           }
    +                   }
                }
     
    -           /**
    -            * Restores all key-groups data that is referenced by the 
passed state handles.
    -            *
    -            * @param keyedStateHandles List of all key groups state 
handles that shall be restored.
    -            */
    -           public void doRestore(Collection<KeyedStateHandle> 
keyedStateHandles)
    -                   throws IOException, StateMigrationException, 
RocksDBException {
    +           return count;
    +   }
     
    -                   rocksDBKeyedStateBackend.createDB();
     
    -                   for (KeyedStateHandle keyedStateHandle : 
keyedStateHandles) {
    -                           if (keyedStateHandle != null) {
     
    -                                   if (!(keyedStateHandle instanceof 
KeyGroupsStateHandle)) {
    -                                           throw new 
IllegalStateException("Unexpected state handle type, " +
    -                                                   "expected: " + 
KeyGroupsStateHandle.class +
    -                                                   ", but found: " + 
keyedStateHandle.getClass());
    +   /**
    +    * Iterator that merges multiple RocksDB iterators to partition all 
states into contiguous key-groups.
    +    * The resulting iteration sequence is ordered by (key-group, kv-state).
    +    */
    +   @VisibleForTesting
    +   static final class RocksDBMergeIterator implements AutoCloseable {
    +
    +           private final PriorityQueue<MergeIterator> heap;
    +           private final int keyGroupPrefixByteCount;
    +           private boolean newKeyGroup;
    +           private boolean newKVState;
    +           private boolean valid;
    +
    +           private MergeIterator currentSubIterator;
    +
    +           private static final List<Comparator<MergeIterator>> 
COMPARATORS;
    +
    +           static {
    +                   int maxBytes = 4;
    +                   COMPARATORS = new ArrayList<>(maxBytes);
    +                   for (int i = 0; i < maxBytes; ++i) {
    +                           final int currentBytes = i;
    +                           COMPARATORS.add(new Comparator<MergeIterator>() 
{
    +                                   @Override
    +                                   public int compare(MergeIterator o1, 
MergeIterator o2) {
    +                                           int arrayCmpRes = 
compareKeyGroupsForByteArrays(
    +                                                   o1.currentKey, 
o2.currentKey, currentBytes);
    +                                           return arrayCmpRes == 0 ? 
o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
                                        }
    -                                   this.currentKeyGroupsStateHandle = 
(KeyGroupsStateHandle) keyedStateHandle;
    -                                   restoreKeyGroupsInStateHandle();
    -                           }
    +                           });
                        }
                }
     
    -           /**
    -            * Restore one key groups state handle.
    -            */
    -           private void restoreKeyGroupsInStateHandle()
    -                   throws IOException, StateMigrationException, 
RocksDBException {
    -                   try {
    -                           currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
    -                           
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
    -                           currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
    -                           restoreKVStateMetaData();
    -                           restoreKVStateData();
    -                   } finally {
    -                           if 
(rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
 {
    -                                   
IOUtils.closeQuietly(currentStateHandleInStream);
    +           RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> 
kvStateIterators, final int keyGroupPrefixByteCount) {
    +                   Preconditions.checkNotNull(kvStateIterators);
    +                   this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
    +
    +                   Comparator<MergeIterator> iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount);
    +
    +                   if (kvStateIterators.size() > 0) {
    +                           PriorityQueue<MergeIterator> 
iteratorPriorityQueue =
    +                                   new 
PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
    +
    +                           for (Tuple2<RocksIterator, Integer> 
rocksIteratorWithKVStateId : kvStateIterators) {
    +                                   final RocksIterator rocksIterator = 
rocksIteratorWithKVStateId.f0;
    +                                   rocksIterator.seekToFirst();
    +                                   if (rocksIterator.isValid()) {
    +                                           iteratorPriorityQueue.offer(new 
MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
    +                                   } else {
    +                                           
IOUtils.closeQuietly(rocksIterator);
    +                                   }
                                }
    +
    +                           kvStateIterators.clear();
    +
    +                           this.heap = iteratorPriorityQueue;
    +                           this.valid = !heap.isEmpty();
    +                           this.currentSubIterator = heap.poll();
    +                   } else {
    +                           // creating a PriorityQueue of size 0 results 
in an exception.
    +                           this.heap = null;
    +                           this.valid = false;
                        }
    +
    +                   this.newKeyGroup = true;
    +                   this.newKVState = true;
                }
     
                /**
    -            * Restore the KV-state / ColumnFamily meta data for all 
key-groups referenced by the current state handle.
    -            *
    -            * @throws IOException
    -            * @throws ClassNotFoundException
    -            * @throws RocksDBException
    +            * Advance the iterator. Should only be called if {@link 
#isValid()} returned true. Valid can only chance after
    +            * calls to {@link #next()}.
                 */
    -           private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
    +           public void next() {
    +                   newKeyGroup = false;
    +                   newKVState = false;
     
    -                   KeyedBackendSerializationProxy<K> serializationProxy =
    -                           new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
    +                   final RocksIterator rocksIterator = 
currentSubIterator.getIterator();
    +                   rocksIterator.next();
     
    -                   serializationProxy.read(currentStateHandleInView);
    +                   byte[] oldKey = currentSubIterator.getCurrentKey();
    +                   if (rocksIterator.isValid()) {
    +                           currentSubIterator.currentKey = 
rocksIterator.key();
     
    -                   // check for key serializer compatibility; this also 
reconfigures the
    -                   // key serializer to be compatible, if it is required 
and is possible
    -                   if (CompatibilityUtil.resolveCompatibilityResult(
    -                           serializationProxy.getKeySerializer(),
    -                           UnloadableDummyTypeSerializer.class,
    -                           
serializationProxy.getKeySerializerConfigSnapshot(),
    -                           rocksDBKeyedStateBackend.keySerializer)
    -                           .isRequiresMigration()) {
    +                           if (isDifferentKeyGroup(oldKey, 
currentSubIterator.getCurrentKey())) {
    +                                   heap.offer(currentSubIterator);
    +                                   currentSubIterator = heap.poll();
    +                                   newKVState = 
currentSubIterator.getIterator() != rocksIterator;
    +                                   detectNewKeyGroup(oldKey);
    +                           }
    +                   } else {
    +                           IOUtils.closeQuietly(rocksIterator);
     
    -                           // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
    -                           throw new StateMigrationException("The new key 
serializer is not compatible to read previous keys. " +
    -                                   "Aborting now since state migration is 
currently not available");
    +                           if (heap.isEmpty()) {
    +                                   currentSubIterator = null;
    +                                   valid = false;
    +                           } else {
    +                                   currentSubIterator = heap.poll();
    +                                   newKVState = true;
    +                                   detectNewKeyGroup(oldKey);
    +                           }
                        }
    +           }
     
    -                   this.keygroupStreamCompressionDecorator = 
serializationProxy.isUsingKeyGroupCompression() ?
    -                           SnappyStreamCompressionDecorator.INSTANCE : 
UncompressedStreamCompressionDecorator.INSTANCE;
    -
    -                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
    -                           serializationProxy.getStateMetaInfoSnapshots();
    -                   currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
    -                   //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = 
new HashMap<>(restoredMetaInfos.size());
    +           private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
    +                   return 0 != compareKeyGroupsForByteArrays(a, b, 
keyGroupPrefixByteCount);
    +           }
     
    -                   for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
restoredMetaInfo : restoredMetaInfos) {
    +           private void detectNewKeyGroup(byte[] oldKey) {
    +                   if (isDifferentKeyGroup(oldKey, 
currentSubIterator.currentKey)) {
    +                           newKeyGroup = true;
    +                   }
    +           }
     
    -                           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
    -                                   
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
    +           /**
    +            * Returns the key-group for the current key.
    +            * @return key-group for the current key
    +            */
    +           public int keyGroup() {
    +                   int result = 0;
    +                   //big endian decode
    +                   for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
    +                           result <<= 8;
    +                           result |= (currentSubIterator.currentKey[i] & 
0xFF);
    +                   }
    +                   return result;
    +           }
     
    -                           if (registeredColumn == null) {
    -                                   byte[] nameBytes = 
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
    +           public byte[] key() {
    +                   return currentSubIterator.getCurrentKey();
    +           }
     
    -                                   ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
    -                                           nameBytes,
    -                                           
rocksDBKeyedStateBackend.columnOptions);
    +           public byte[] value() {
    +                   return currentSubIterator.getIterator().value();
    +           }
     
    -                                   RegisteredKeyedBackendStateMetaInfo<?, 
?> stateMetaInfo =
    -                                           new 
RegisteredKeyedBackendStateMetaInfo<>(
    -                                                   
restoredMetaInfo.getStateType(),
    -                                                   
restoredMetaInfo.getName(),
    -                                                   
restoredMetaInfo.getNamespaceSerializer(),
    -                                                   
restoredMetaInfo.getStateSerializer());
    +           /**
    +            * Returns the Id of the k/v state to which the current key 
belongs.
    +            * @return Id of K/V state to which the current key belongs.
    +            */
    +           public int kvStateId() {
    +                   return currentSubIterator.getKvStateId();
    +           }
     
    -                                   
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restored
    --- End diff --
    
    I skimmed over the changes and I assume that you've mostly copied the code 
from one place to another without bigger changes. But it's really hard to see 
any real changes due to the combined movement of the code.


---

Reply via email to