Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168184825
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -363,1691 +372,1780 @@ public int getKeyGroupPrefixBytes() {
                final CheckpointStreamFactory streamFactory,
                CheckpointOptions checkpointOptions) throws Exception {
     
    -           if (checkpointOptions.getCheckpointType() != 
CheckpointOptions.CheckpointType.SAVEPOINT &&
    -                   enableIncrementalCheckpointing) {
    -                   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
    -           } else {
    -                   return snapshotFully(checkpointId, timestamp, 
streamFactory);
    -           }
    +           return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
        }
     
    -   private RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshotIncrementally(
    -           final long checkpointId,
    -           final long checkpointTimestamp,
    -           final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
    -
    -           if (db == null) {
    -                   throw new IOException("RocksDB closed.");
    -           }
    +   @Override
    +   public void restore(StateObjectCollection<KeyedStateHandle> 
restoreState) throws Exception {
    +           LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
     
    -           if (kvStateInformation.isEmpty()) {
    -                   if (LOG.isDebugEnabled()) {
    -                           LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " +
    -                                   checkpointTimestamp + " . Returning 
null.");
    -                   }
    -                   return DoneFuture.nullValue();
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
                }
     
    -           final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
    -                   new RocksDBIncrementalSnapshotOperation<>(
    -                           this,
    -                           checkpointStreamFactory,
    -                           checkpointId,
    -                           checkpointTimestamp);
    -
    -           snapshotOperation.takeSnapshot();
    -
    -           return new FutureTask<SnapshotResult<KeyedStateHandle>>(
    -                   () -> snapshotOperation.materializeSnapshot()
    -           ) {
    -                   @Override
    -                   public boolean cancel(boolean mayInterruptIfRunning) {
    -                           snapshotOperation.stop();
    -                           return super.cancel(mayInterruptIfRunning);
    -                   }
    +           // clear all meta data
    +           kvStateInformation.clear();
    +           restoredKvStateMetaInfos.clear();
     
    -                   @Override
    -                   protected void done() {
    -                           
snapshotOperation.releaseResources(isCancelled());
    +           try {
    +                   if (restoreState == null || restoreState.isEmpty()) {
    +                           createDB();
    +                   } else if (restoreState.iterator().next() instanceof 
IncrementalKeyedStateHandle) {
    +                           RocksDBIncrementalRestoreOperation<K> 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
    +                           restoreOperation.restore(restoreState);
    +                   } else {
    +                           RocksDBFullRestoreOperation<K> restoreOperation 
= new RocksDBFullRestoreOperation<>(this);
    +                           restoreOperation.doRestore(restoreState);
                        }
    -           };
    +           } catch (Exception ex) {
    +                   dispose();
    +                   throw ex;
    +           }
        }
     
    -   private RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotFully(
    -           final long checkpointId,
    -           final long timestamp,
    -           final CheckpointStreamFactory streamFactory) throws Exception {
    -
    -           long startTime = System.currentTimeMillis();
    -           final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
    -
    -           final RocksDBFullSnapshotOperation<K> snapshotOperation;
    -
    -           if (kvStateInformation.isEmpty()) {
    -                   if (LOG.isDebugEnabled()) {
    -                           LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " + timestamp +
    -                                   " . Returning null.");
    -                   }
    +   @Override
    +   public void notifyCheckpointComplete(long completedCheckpointId) {
     
    -                   return DoneFuture.nullValue();
    +           if (!enableIncrementalCheckpointing) {
    +                   return;
                }
     
    -           snapshotOperation = new RocksDBFullSnapshotOperation<>(this, 
streamFactory, snapshotCloseableRegistry);
    -           snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
    -
    -           // implementation of the async IO operation, based on FutureTask
    -           
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable 
=
    -                   new 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
    +           synchronized (materializedSstFiles) {
     
    -                           @Override
    -                           protected void acquireResources() throws 
Exception {
    -                                   
cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
    -                                   
snapshotOperation.openCheckpointStream();
    -                           }
    +                   if (completedCheckpointId < lastCompletedCheckpointId) {
    +                           return;
    +                   }
     
    -                           @Override
    -                           protected void releaseResources() throws 
Exception {
    -                                   closeLocalRegistry();
    -                                   releaseSnapshotOperationResources();
    -                           }
    +                   materializedSstFiles.keySet().removeIf(checkpointId -> 
checkpointId < completedCheckpointId);
     
    -                           private void 
releaseSnapshotOperationResources() {
    -                                   // hold the db lock while operation on 
the db to guard us against async db disposal
    -                                   
snapshotOperation.releaseSnapshotResources();
    -                           }
    +                   lastCompletedCheckpointId = completedCheckpointId;
    +           }
    +   }
     
    -                           @Override
    -                           protected void stopOperation() throws Exception 
{
    -                                   closeLocalRegistry();
    -                           }
    +   private void createDB() throws IOException {
    +           List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
    +           this.db = openDB(instanceRocksDBPath.getAbsolutePath(), 
Collections.emptyList(), columnFamilyHandles);
    +           this.defaultColumnFamily = columnFamilyHandles.get(0);
    +   }
     
    -                           private void closeLocalRegistry() {
    -                                   if 
(cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
    -                                           try {
    -                                                   
snapshotCloseableRegistry.close();
    -                                           } catch (Exception ex) {
    -                                                   LOG.warn("Error closing 
local registry", ex);
    -                                           }
    -                                   }
    -                           }
    +   private RocksDB openDB(
    +           String path,
    +           List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
    +           List<ColumnFamilyHandle> stateColumnFamilyHandles) throws 
IOException {
     
    -                           @Override
    -                           public SnapshotResult<KeyedStateHandle> 
performOperation() throws Exception {
    -                                   long startTime = 
System.currentTimeMillis();
    +           List<ColumnFamilyDescriptor> columnFamilyDescriptors =
    +                   new ArrayList<>(1 + 
stateColumnFamilyDescriptors.size());
     
    -                                   if (isStopped()) {
    -                                           throw new IOException("RocksDB 
closed.");
    -                                   }
    +           columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
     
    -                                   snapshotOperation.writeDBSnapshot();
    +           // we add the required descriptor for the default CF in last 
position.
    +           columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
     
    -                                   LOG.info("Asynchronous RocksDB snapshot 
({}, asynchronous part) in thread {} took {} ms.",
    -                                           streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - startTime));
    +           RocksDB dbRef;
     
    -                                   KeyGroupsStateHandle stateHandle = 
snapshotOperation.getSnapshotResultStateHandle();
    -                                   return new 
SnapshotResult<>(stateHandle, null);
    -                           }
    -                   };
    +           try {
    +                   dbRef = RocksDB.open(
    +                           Preconditions.checkNotNull(dbOptions),
    +                           Preconditions.checkNotNull(path),
    +                           columnFamilyDescriptors,
    +                           stateColumnFamilyHandles);
    +           } catch (RocksDBException e) {
    +                   throw new IOException("Error while opening RocksDB 
instance.", e);
    +           }
     
    -           LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", 
synchronous part) in thread " +
    -                   Thread.currentThread() + " took " + 
(System.currentTimeMillis() - startTime) + " ms.");
    +           // requested + default CF
    +           Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
    +                   "Not all requested column family handles have been 
created");
     
    -           return AsyncStoppableTaskWithCallback.from(ioCallable);
    +           return dbRef;
        }
     
        /**
    -    * Encapsulates the process to perform a snapshot of a 
RocksDBKeyedStateBackend.
    +    * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from a full snapshot.
         */
    -   static final class RocksDBFullSnapshotOperation<K> {
    +   private static final class RocksDBFullRestoreOperation<K> {
     
    -           static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
    -           static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
    -
    -           private final RocksDBKeyedStateBackend<K> stateBackend;
    -           private final KeyGroupRangeOffsets keyGroupRangeOffsets;
    -           private final CheckpointStreamFactory checkpointStreamFactory;
    -           private final CloseableRegistry snapshotCloseableRegistry;
    -           private final ResourceGuard.Lease dbLease;
    -
    -           private long checkpointId;
    -           private long checkpointTimeStamp;
    -
    -           private Snapshot snapshot;
    -           private ReadOptions readOptions;
    -           private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
    -
    -           private CheckpointStreamFactory.CheckpointStateOutputStream 
outStream;
    -           private DataOutputView outputView;
    -
    -           RocksDBFullSnapshotOperation(
    -                   RocksDBKeyedStateBackend<K> stateBackend,
    -                   CheckpointStreamFactory checkpointStreamFactory,
    -                   CloseableRegistry registry) throws IOException {
    +           private final RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend;
     
    -                   this.stateBackend = stateBackend;
    -                   this.checkpointStreamFactory = checkpointStreamFactory;
    -                   this.keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(stateBackend.keyGroupRange);
    -                   this.snapshotCloseableRegistry = registry;
    -                   this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
    -           }
    +           /** Current key-groups state handle from which we restore 
key-groups. */
    +           private KeyGroupsStateHandle currentKeyGroupsStateHandle;
    +           /** Current input stream we obtained from 
currentKeyGroupsStateHandle. */
    +           private FSDataInputStream currentStateHandleInStream;
    +           /** Current data input view that wraps 
currentStateHandleInStream. */
    +           private DataInputView currentStateHandleInView;
    +           /** Current list of ColumnFamilyHandles for all column families 
we restore from currentKeyGroupsStateHandle. */
    +           private List<ColumnFamilyHandle> 
currentStateHandleKVStateColumnFamilies;
    +           /** The compression decorator that was used for writing the 
state, as determined by the meta data. */
    +           private StreamCompressionDecorator 
keygroupStreamCompressionDecorator;
     
                /**
    -            * 1) Create a snapshot object from RocksDB.
    +            * Creates a restore operation object for the given state 
backend instance.
                 *
    -            * @param checkpointId id of the checkpoint for which we take 
the snapshot
    -            * @param checkpointTimeStamp timestamp of the checkpoint for 
which we take the snapshot
    +            * @param rocksDBKeyedStateBackend the state backend into which 
we restore
                 */
    -           public void takeDBSnapShot(long checkpointId, long 
checkpointTimeStamp) {
    -                   Preconditions.checkArgument(snapshot == null, "Only one 
ongoing snapshot allowed!");
    -                   this.kvStateIterators = new 
ArrayList<>(stateBackend.kvStateInformation.size());
    -                   this.checkpointId = checkpointId;
    -                   this.checkpointTimeStamp = checkpointTimeStamp;
    -                   this.snapshot = stateBackend.db.getSnapshot();
    +           public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend) {
    +                   this.rocksDBKeyedStateBackend = 
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
                }
     
                /**
    -            * 2) Open CheckpointStateOutputStream through the 
checkpointStreamFactory into which we will write.
    +            * Restores all key-groups data that is referenced by the 
passed state handles.
                 *
    -            * @throws Exception
    +            * @param keyedStateHandles List of all key groups state 
handles that shall be restored.
                 */
    -           public void openCheckpointStream() throws Exception {
    -                   Preconditions.checkArgument(outStream == null, "Output 
stream for snapshot is already set.");
    -                   outStream = 
checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, 
checkpointTimeStamp);
    -                   snapshotCloseableRegistry.registerCloseable(outStream);
    -                   outputView = new DataOutputViewStreamWrapper(outStream);
    -           }
    +           public void doRestore(Collection<KeyedStateHandle> 
keyedStateHandles)
    +                   throws IOException, StateMigrationException, 
RocksDBException {
     
    -           /**
    -            * 3) Write the actual data from RocksDB from the time we took 
the snapshot object in (1).
    -            *
    -            * @throws IOException
    -            */
    -           public void writeDBSnapshot() throws IOException, 
InterruptedException {
    +                   rocksDBKeyedStateBackend.createDB();
     
    -                   if (null == snapshot) {
    -                           throw new IOException("No snapshot available. 
Might be released due to cancellation.");
    -                   }
    +                   for (KeyedStateHandle keyedStateHandle : 
keyedStateHandles) {
    +                           if (keyedStateHandle != null) {
     
    -                   Preconditions.checkNotNull(outStream, "No output stream 
to write snapshot.");
    -                   writeKVStateMetaData();
    -                   writeKVStateData();
    +                                   if (!(keyedStateHandle instanceof 
KeyGroupsStateHandle)) {
    +                                           throw new 
IllegalStateException("Unexpected state handle type, " +
    +                                                   "expected: " + 
KeyGroupsStateHandle.class +
    +                                                   ", but found: " + 
keyedStateHandle.getClass());
    +                                   }
    +                                   this.currentKeyGroupsStateHandle = 
(KeyGroupsStateHandle) keyedStateHandle;
    +                                   restoreKeyGroupsInStateHandle();
    +                           }
    +                   }
                }
     
                /**
    -            * 4) Returns a state handle to the snapshot after the snapshot 
procedure is completed and null before.
    -            *
    -            * @return state handle to the completed snapshot
    +            * Restore one key groups state handle.
                 */
    -           public KeyGroupsStateHandle getSnapshotResultStateHandle() 
throws IOException {
    -
    -                   if 
(snapshotCloseableRegistry.unregisterCloseable(outStream)) {
    -
    -                           StreamStateHandle stateHandle = 
outStream.closeAndGetHandle();
    -                           outStream = null;
    -
    -                           if (stateHandle != null) {
    -                                   return new 
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
    +           private void restoreKeyGroupsInStateHandle()
    +                   throws IOException, StateMigrationException, 
RocksDBException {
    +                   try {
    +                           currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
    +                           
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
    +                           currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
    +                           restoreKVStateMetaData();
    +                           restoreKVStateData();
    +                   } finally {
    +                           if 
(rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
 {
    +                                   
IOUtils.closeQuietly(currentStateHandleInStream);
                                }
                        }
    -                   return null;
                }
     
                /**
    -            * 5) Release the snapshot object for RocksDB and clean up.
    +            * Restore the KV-state / ColumnFamily meta data for all 
key-groups referenced by the current state handle.
    +            *
    +            * @throws IOException
    +            * @throws ClassNotFoundException
    +            * @throws RocksDBException
                 */
    -           public void releaseSnapshotResources() {
    +           private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
     
    -                   outStream = null;
    +                   KeyedBackendSerializationProxy<K> serializationProxy =
    +                           new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
     
    -                   if (null != kvStateIterators) {
    -                           for (Tuple2<RocksIterator, Integer> 
kvStateIterator : kvStateIterators) {
    -                                   
IOUtils.closeQuietly(kvStateIterator.f0);
    -                           }
    -                           kvStateIterators = null;
    -                   }
    +                   serializationProxy.read(currentStateHandleInView);
     
    -                   if (null != snapshot) {
    -                           if (null != stateBackend.db) {
    -                                   
stateBackend.db.releaseSnapshot(snapshot);
    -                           }
    -                           IOUtils.closeQuietly(snapshot);
    -                           snapshot = null;
    -                   }
    +                   // check for key serializer compatibility; this also 
reconfigures the
    +                   // key serializer to be compatible, if it is required 
and is possible
    +                   if (CompatibilityUtil.resolveCompatibilityResult(
    +                           serializationProxy.getKeySerializer(),
    +                           UnloadableDummyTypeSerializer.class,
    +                           
serializationProxy.getKeySerializerConfigSnapshot(),
    +                           rocksDBKeyedStateBackend.keySerializer)
    +                           .isRequiresMigration()) {
     
    -                   if (null != readOptions) {
    -                           IOUtils.closeQuietly(readOptions);
    -                           readOptions = null;
    +                           // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
    +                           throw new StateMigrationException("The new key 
serializer is not compatible to read previous keys. " +
    +                                   "Aborting now since state migration is 
currently not available");
                        }
     
    -                   this.dbLease.close();
    -           }
    -
    -           private void writeKVStateMetaData() throws IOException {
    +                   this.keygroupStreamCompressionDecorator = 
serializationProxy.isUsingKeyGroupCompression() ?
    +                           SnappyStreamCompressionDecorator.INSTANCE : 
UncompressedStreamCompressionDecorator.INSTANCE;
     
    -                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> metaInfoSnapshots =
    -                           new 
ArrayList<>(stateBackend.kvStateInformation.size());
    +                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
    +                           serializationProxy.getStateMetaInfoSnapshots();
    +                   currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
    +                   //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = 
new HashMap<>(restoredMetaInfos.size());
     
    -                   int kvStateId = 0;
    -                   for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
    -                           stateBackend.kvStateInformation.entrySet()) {
    +                   for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
restoredMetaInfo : restoredMetaInfos) {
     
    -                           
metaInfoSnapshots.add(column.getValue().f1.snapshot());
    +                           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
    +                                   
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
     
    -                           //retrieve iterator for this k/v states
    -                           readOptions = new ReadOptions();
    -                           readOptions.setSnapshot(snapshot);
    +                           if (registeredColumn == null) {
    +                                   byte[] nameBytes = 
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
     
    -                           kvStateIterators.add(
    -                                   new 
Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), 
kvStateId));
    +                                   ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +                                           nameBytes,
    +                                           
rocksDBKeyedStateBackend.columnOptions);
     
    -                           ++kvStateId;
    -                   }
    +                                   RegisteredKeyedBackendStateMetaInfo<?, 
?> stateMetaInfo =
    +                                           new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                                                   
restoredMetaInfo.getStateType(),
    +                                                   
restoredMetaInfo.getName(),
    +                                                   
restoredMetaInfo.getNamespaceSerializer(),
    +                                                   
restoredMetaInfo.getStateSerializer());
     
    -                   KeyedBackendSerializationProxy<K> serializationProxy =
    -                           new KeyedBackendSerializationProxy<>(
    -                                   stateBackend.getKeySerializer(),
    -                                   metaInfoSnapshots,
    -                                   
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
stateBackend.keyGroupCompressionDecorator));
    +                                   
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
 restoredMetaInfo);
     
    -                   serializationProxy.write(outputView);
    -           }
    +                                   ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
     
    -           private void writeKVStateData() throws IOException, 
InterruptedException {
    +                                   registeredColumn = new 
Tuple2<>(columnFamily, stateMetaInfo);
    +                                   
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), 
registeredColumn);
     
    -                   byte[] previousKey = null;
    -                   byte[] previousValue = null;
    -                   OutputStream kgOutStream = null;
    -                   DataOutputView kgOutView = null;
    +                           } else {
    +                                   // TODO with eager state registration 
in place, check here for serializer migration strategies
    +                           }
    +                           
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
    +                   }
    +           }
     
    -                   try {
    -                           // Here we transfer ownership of RocksIterators 
to the RocksDBMergeIterator
    -                           try (RocksDBMergeIterator mergeIterator = new 
RocksDBMergeIterator(
    -                                   kvStateIterators, 
stateBackend.keyGroupPrefixBytes)) {
    +           /**
    +            * Restore the KV-state / ColumnFamily data for all key-groups 
referenced by the current state handle.
    +            *
    +            * @throws IOException
    +            * @throws RocksDBException
    +            */
    +           private void restoreKVStateData() throws IOException, 
RocksDBException {
    +                   //for all key-groups in the current state handle...
    +                   for (Tuple2<Integer, Long> keyGroupOffset : 
currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
    +                           int keyGroup = keyGroupOffset.f0;
     
    -                                   // handover complete, null out to 
prevent double close
    -                                   kvStateIterators = null;
    +                           // Check that restored key groups all belong to 
the backend
    +                           
Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
    +                                   "The key group must belong to the 
backend");
     
    -                                   //preamble: setup with first key-group 
as our lookahead
    -                                   if (mergeIterator.isValid()) {
    -                                           //begin first key-group by 
recording the offset
    -                                           
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
    -                                           //write the k/v-state id as 
metadata
    -                                           kgOutStream = 
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
    -                                           kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
    +                           long offset = keyGroupOffset.f1;
    +                           //not empty key-group?
    +                           if (0L != offset) {
    +                                   currentStateHandleInStream.seek(offset);
    +                                   try (InputStream compressedKgIn = 
keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
 {
    +                                           DataInputViewStreamWrapper 
compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
                                                //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
    -                                           
kgOutView.writeShort(mergeIterator.kvStateId());
    -                                           previousKey = 
mergeIterator.key();
    -                                           previousValue = 
mergeIterator.value();
    -                                           mergeIterator.next();
    +                                           int kvStateId = 
compressedKgInputView.readShort();
    +                                           ColumnFamilyHandle handle = 
currentStateHandleKVStateColumnFamilies.get(kvStateId);
    +                                           //insert all k/v pairs into DB
    +                                           boolean keyGroupHasMoreKeys = 
true;
    +                                           while (keyGroupHasMoreKeys) {
    +                                                   byte[] key = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
    +                                                   byte[] value = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
    +                                                   if 
(RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
    +                                                           //clear the 
signal bit in the key to make it ready for insertion again
    +                                                           
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
    +                                                           
rocksDBKeyedStateBackend.db.put(handle, key, value);
    +                                                           //TODO this 
could be aware of keyGroupPrefixBytes and write only one byte if possible
    +                                                           kvStateId = 
RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
    +                                                                   & 
compressedKgInputView.readShort();
    +                                                           if 
(RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
    +                                                                   
keyGroupHasMoreKeys = false;
    +                                                           } else {
    +                                                                   handle 
= currentStateHandleKVStateColumnFamilies.get(kvStateId);
    +                                                           }
    +                                                   } else {
    +                                                           
rocksDBKeyedStateBackend.db.put(handle, key, value);
    +                                                   }
    +                                           }
                                        }
    +                           }
    +                   }
    +           }
    +   }
     
    -                                   //main loop: write k/v pairs ordered by 
(key-group, kv-state), thereby tracking key-group offsets.
    -                                   while (mergeIterator.isValid()) {
    +   /**
    +    * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from an incremental snapshot.
    +    */
    +   private static class RocksDBIncrementalRestoreOperation<T> {
    --- End diff --
    
    In general, I think that makes sense by now, but I would do this in a 
different PR.


---

Reply via email to