Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r169638326 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() { * @param streamFactory The factory that we can use for writing our state to streams. * @param checkpointOptions Options for how to perform this checkpoint. * @return Future to the state handle of the snapshot data. - * @throws Exception + * @throws Exception indicating a problem in the synchronous part of the checkpoint. */ @Override - public RunnableFuture<KeyedStateHandle> snapshot( + public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot( final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { - if (checkpointOptions.getCheckpointType() != CheckpointType.SAVEPOINT && - enableIncrementalCheckpointing) { - return snapshotIncrementally(checkpointId, timestamp, streamFactory); - } else { - return snapshotFully(checkpointId, timestamp, streamFactory); - } + return snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions); } - private RunnableFuture<KeyedStateHandle> snapshotIncrementally( - final long checkpointId, - final long checkpointTimestamp, - final CheckpointStreamFactory checkpointStreamFactory) throws Exception { - - if (db == null) { - throw new IOException("RocksDB closed."); - } + @Override + public void restore(StateObjectCollection<KeyedStateHandle> restoreState) throws Exception { + LOG.info("Initializing RocksDB keyed state backend from snapshot."); - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", - checkpointTimestamp); - } - return DoneFuture.nullValue(); + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring snapshot from state handles: {}.", restoreState); } - final RocksDBIncrementalSnapshotOperation<K> snapshotOperation = - new RocksDBIncrementalSnapshotOperation<>( - this, - checkpointStreamFactory, - checkpointId, - checkpointTimestamp); + // clear all meta data + kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); try { - snapshotOperation.takeSnapshot(); - } catch (Exception e) { - snapshotOperation.stop(); - snapshotOperation.releaseResources(true); - throw e; - } - - return new FutureTask<KeyedStateHandle>( - new Callable<KeyedStateHandle>() { - @Override - public KeyedStateHandle call() throws Exception { - return snapshotOperation.materializeSnapshot(); + if (restoreState == null || restoreState.isEmpty()) { + createDB(); + } else { + KeyedStateHandle firstStateHandle = restoreState.iterator().next(); + if (firstStateHandle instanceof IncrementalKeyedStateHandle + || firstStateHandle instanceof IncrementalLocalKeyedStateHandle) { + RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this); + restoreOperation.restore(restoreState); + } else { + RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this); + restoreOperation.doRestore(restoreState); } } - ) { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - snapshotOperation.stop(); - return super.cancel(mayInterruptIfRunning); - } - - @Override - protected void done() { - snapshotOperation.releaseResources(isCancelled()); - } - }; + } catch (Exception ex) { + dispose(); + throw ex; + } } - private RunnableFuture<KeyedStateHandle> snapshotFully( - final long checkpointId, - final long timestamp, - final CheckpointStreamFactory streamFactory) throws Exception { - - long startTime = System.currentTimeMillis(); - final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); - - final RocksDBFullSnapshotOperation<K> snapshotOperation; - - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", timestamp); - } + @Override + public void notifyCheckpointComplete(long completedCheckpointId) { - return DoneFuture.nullValue(); + if (!enableIncrementalCheckpointing) { + return; } - snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry); - snapshotOperation.takeDBSnapShot(checkpointId, timestamp); - - // implementation of the async IO operation, based on FutureTask - AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable = - new AbstractAsyncCallableWithResources<KeyedStateHandle>() { - - @Override - protected void acquireResources() throws Exception { - cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry); - snapshotOperation.openCheckpointStream(); - } + synchronized (materializedSstFiles) { - @Override - protected void releaseResources() throws Exception { - closeLocalRegistry(); - releaseSnapshotOperationResources(); - } + if (completedCheckpointId < lastCompletedCheckpointId) { + return; + } - private void releaseSnapshotOperationResources() { - // hold the db lock while operation on the db to guard us against async db disposal - snapshotOperation.releaseSnapshotResources(); - } + materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId); - @Override - protected void stopOperation() throws Exception { - closeLocalRegistry(); - } + lastCompletedCheckpointId = completedCheckpointId; + } + } - private void closeLocalRegistry() { - if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) { - try { - snapshotCloseableRegistry.close(); - } catch (Exception ex) { - LOG.warn("Error closing local registry", ex); - } - } - } + private void createDB() throws IOException { + List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); + this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles); + this.defaultColumnFamily = columnFamilyHandles.get(0); + } - @Override - public KeyGroupsStateHandle performOperation() throws Exception { - long startTime = System.currentTimeMillis(); + private RocksDB openDB( + String path, + List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, + List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException { - if (isStopped()) { - throw new IOException("RocksDB closed."); - } + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + new ArrayList<>(1 + stateColumnFamilyDescriptors.size()); - snapshotOperation.writeDBSnapshot(); + // we add the required descriptor for the default CF in FIRST position, see + // https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families + columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions)); + columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors); - LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", - streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); + RocksDB dbRef; - return snapshotOperation.getSnapshotResultStateHandle(); - } - }; + try { + dbRef = RocksDB.open( + Preconditions.checkNotNull(dbOptions), + Preconditions.checkNotNull(path), + columnFamilyDescriptors, + stateColumnFamilyHandles); + } catch (RocksDBException e) { + throw new IOException("Error while opening RocksDB instance.", e); + } - LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.", - streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); + // requested + default CF + Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), + "Not all requested column family handles have been created"); - return AsyncStoppableTaskWithCallback.from(ioCallable); + return dbRef; } /** - * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend. + * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a full snapshot. */ - static final class RocksDBFullSnapshotOperation<K> { - - static final int FIRST_BIT_IN_BYTE_MASK = 0x80; - static final int END_OF_KEY_GROUP_MARK = 0xFFFF; - - private final RocksDBKeyedStateBackend<K> stateBackend; - private final KeyGroupRangeOffsets keyGroupRangeOffsets; - private final CheckpointStreamFactory checkpointStreamFactory; - private final CloseableRegistry snapshotCloseableRegistry; - private final ResourceGuard.Lease dbLease; - - private long checkpointId; - private long checkpointTimeStamp; - - private Snapshot snapshot; - private ReadOptions readOptions; - private List<Tuple2<RocksIterator, Integer>> kvStateIterators; - - private CheckpointStreamFactory.CheckpointStateOutputStream outStream; - private DataOutputView outputView; + private static final class RocksDBFullRestoreOperation<K> { - RocksDBFullSnapshotOperation( - RocksDBKeyedStateBackend<K> stateBackend, - CheckpointStreamFactory checkpointStreamFactory, - CloseableRegistry registry) throws IOException { + private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend; - this.stateBackend = stateBackend; - this.checkpointStreamFactory = checkpointStreamFactory; - this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange); - this.snapshotCloseableRegistry = registry; - this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); - } + /** Current key-groups state handle from which we restore key-groups. */ + private KeyGroupsStateHandle currentKeyGroupsStateHandle; + /** Current input stream we obtained from currentKeyGroupsStateHandle. */ + private FSDataInputStream currentStateHandleInStream; + /** Current data input view that wraps currentStateHandleInStream. */ + private DataInputView currentStateHandleInView; + /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */ + private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies; + /** The compression decorator that was used for writing the state, as determined by the meta data. */ + private StreamCompressionDecorator keygroupStreamCompressionDecorator; /** - * 1) Create a snapshot object from RocksDB. + * Creates a restore operation object for the given state backend instance. * - * @param checkpointId id of the checkpoint for which we take the snapshot - * @param checkpointTimeStamp timestamp of the checkpoint for which we take the snapshot + * @param rocksDBKeyedStateBackend the state backend into which we restore */ - public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) { - Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!"); - this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size()); - this.checkpointId = checkpointId; - this.checkpointTimeStamp = checkpointTimeStamp; - this.snapshot = stateBackend.db.getSnapshot(); + public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) { + this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend); } /** - * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write. + * Restores all key-groups data that is referenced by the passed state handles. * - * @throws Exception + * @param keyedStateHandles List of all key groups state handles that shall be restored. */ - public void openCheckpointStream() throws Exception { - Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set."); - outStream = checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); - snapshotCloseableRegistry.registerCloseable(outStream); - outputView = new DataOutputViewStreamWrapper(outStream); - } + public void doRestore(Collection<KeyedStateHandle> keyedStateHandles) + throws IOException, StateMigrationException, RocksDBException { - /** - * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1). - * - * @throws IOException - */ - public void writeDBSnapshot() throws IOException, InterruptedException { + rocksDBKeyedStateBackend.createDB(); - if (null == snapshot) { - throw new IOException("No snapshot available. Might be released due to cancellation."); - } + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + if (keyedStateHandle != null) { - Preconditions.checkNotNull(outStream, "No output stream to write snapshot."); - writeKVStateMetaData(); - writeKVStateData(); + if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + KeyGroupsStateHandle.class + + ", but found: " + keyedStateHandle.getClass()); + } + this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; + restoreKeyGroupsInStateHandle(); + } + } } /** - * 4) Returns a state handle to the snapshot after the snapshot procedure is completed and null before. - * - * @return state handle to the completed snapshot + * Restore one key groups state handle. */ - public KeyGroupsStateHandle getSnapshotResultStateHandle() throws IOException { - - if (snapshotCloseableRegistry.unregisterCloseable(outStream)) { - - StreamStateHandle stateHandle = outStream.closeAndGetHandle(); - outStream = null; - - if (stateHandle != null) { - return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); + private void restoreKeyGroupsInStateHandle() + throws IOException, StateMigrationException, RocksDBException { + try { + currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream(); + rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream); + currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream); + restoreKVStateMetaData(); + restoreKVStateData(); + } finally { + if (rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) { + IOUtils.closeQuietly(currentStateHandleInStream); } } - return null; } /** - * 5) Release the snapshot object for RocksDB and clean up. + * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws RocksDBException */ - public void releaseSnapshotResources() { + private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { - outStream = null; + KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); - if (null != kvStateIterators) { - for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) { - IOUtils.closeQuietly(kvStateIterator.f0); - } - kvStateIterators = null; - } + serializationProxy.read(currentStateHandleInView); - if (null != snapshot) { - if (null != stateBackend.db) { - stateBackend.db.releaseSnapshot(snapshot); - } - IOUtils.closeQuietly(snapshot); - snapshot = null; - } + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (CompatibilityUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + UnloadableDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + rocksDBKeyedStateBackend.keySerializer) + .isRequiresMigration()) { - if (null != readOptions) { - IOUtils.closeQuietly(readOptions); - readOptions = null; + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); } - this.dbLease.close(); - } - - private void writeKVStateMetaData() throws IOException { + this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? + SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE; - List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots = - new ArrayList<>(stateBackend.kvStateInformation.size()); + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = + serializationProxy.getStateMetaInfoSnapshots(); + currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); + //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size()); - int kvStateId = 0; - for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> column : - stateBackend.kvStateInformation.entrySet()) { + for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) { - metaInfoSnapshots.add(column.getValue().f1.snapshot()); + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn = + rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName()); - //retrieve iterator for this k/v states - readOptions = new ReadOptions(); - readOptions.setSnapshot(snapshot); + if (registeredColumn == null) { + byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); - kvStateIterators.add( - new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId)); + ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( + nameBytes, + rocksDBKeyedStateBackend.columnOptions); - ++kvStateId; - } + RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + restoredMetaInfo.getStateType(), + restoredMetaInfo.getName(), + restoredMetaInfo.getNamespaceSerializer(), + restoredMetaInfo.getStateSerializer()); - KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>( - stateBackend.getKeySerializer(), - metaInfoSnapshots, - !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator)); + rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); - serializationProxy.write(outputView); - } + ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor); - private void writeKVStateData() throws IOException, InterruptedException { + registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo); + rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn); - byte[] previousKey = null; - byte[] previousValue = null; - OutputStream kgOutStream = null; - DataOutputView kgOutView = null; + } else { + // TODO with eager state registration in place, check here for serializer migration strategies + } + currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0); + } + } - try { - // Here we transfer ownership of RocksIterators to the RocksDBMergeIterator - try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator( - kvStateIterators, stateBackend.keyGroupPrefixBytes)) { + /** + * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle. + * + * @throws IOException + * @throws RocksDBException + */ + private void restoreKVStateData() throws IOException, RocksDBException { + //for all key-groups in the current state handle... + for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { + int keyGroup = keyGroupOffset.f0; - // handover complete, null out to prevent double close - kvStateIterators = null; + // Check that restored key groups all belong to the backend + Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), + "The key group must belong to the backend"); - //preamble: setup with first key-group as our lookahead - if (mergeIterator.isValid()) { - //begin first key-group by recording the offset - keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); - //write the k/v-state id as metadata - kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream); - kgOutView = new DataOutputViewStreamWrapper(kgOutStream); + long offset = keyGroupOffset.f1; + //not empty key-group? + if (0L != offset) { + currentStateHandleInStream.seek(offset); + try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) { + DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn); //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(mergeIterator.kvStateId()); - previousKey = mergeIterator.key(); - previousValue = mergeIterator.value(); - mergeIterator.next(); + int kvStateId = compressedKgInputView.readShort(); + ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + //insert all k/v pairs into DB + boolean keyGroupHasMoreKeys = true; + while (keyGroupHasMoreKeys) { + byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); + byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); + if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) { + //clear the signal bit in the key to make it ready for insertion again + RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key); + rocksDBKeyedStateBackend.db.put(handle, key, value); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK + & compressedKgInputView.readShort(); + if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) { + keyGroupHasMoreKeys = false; + } else { + handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + } + } else { + rocksDBKeyedStateBackend.db.put(handle, key, value); + } + } } + } + } + } + } - //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. - while (mergeIterator.isValid()) { + /** + * Encapsulates the process of restoring a RocksDBKeyedStateBackend from an incremental snapshot. + */ + private static class RocksDBIncrementalRestoreOperation<T> { - assert (!hasMetaDataFollowsFlag(previousKey)); + private final RocksDBKeyedStateBackend<T> stateBackend; - //set signal in first key byte that meta data will follow in the stream after this k/v pair - if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) { + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) { + this.stateBackend = stateBackend; + } - //be cooperative and check for interruption from time to time in the hot loop - checkInterrupted(); + /** + * Root method that branches for different implementations of {@link KeyedStateHandle}. + */ + void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { - setMetaDataFollowsFlagInKey(previousKey); - } + boolean hasExtraKeys = (restoreStateHandles.size() > 1 || + !Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(), stateBackend.keyGroupRange)); - writeKeyValuePair(previousKey, previousValue, kgOutView); + if (hasExtraKeys) { + stateBackend.createDB(); + } - //write meta data if we have to - if (mergeIterator.isNewKeyGroup()) { - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(END_OF_KEY_GROUP_MARK); - // this will just close the outer stream - kgOutStream.close(); - //begin new key-group - keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); - //write the kev-state - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream); - kgOutView = new DataOutputViewStreamWrapper(kgOutStream); - kgOutView.writeShort(mergeIterator.kvStateId()); - } else if (mergeIterator.isNewKeyValueState()) { - //write the k/v-state - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(mergeIterator.kvStateId()); - } + for (KeyedStateHandle rawStateHandle : restoreStateHandles) { - //request next k/v pair - previousKey = mergeIterator.key(); - previousValue = mergeIterator.value(); - mergeIterator.next(); - } + if (rawStateHandle instanceof IncrementalKeyedStateHandle) { + restoreInstance((IncrementalKeyedStateHandle) rawStateHandle, hasExtraKeys); + } else if (rawStateHandle instanceof IncrementalLocalKeyedStateHandle) { + Preconditions.checkState(!hasExtraKeys, "Cannot recover from local state after rescaling."); + restoreInstance((IncrementalLocalKeyedStateHandle) rawStateHandle); + } else { + throw new IllegalStateException("Unexpected state handle type, " + + "expected " + IncrementalKeyedStateHandle.class + + ", but found " + rawStateHandle.getClass()); } + } + } - //epilogue: write last key-group - if (previousKey != null) { - assert (!hasMetaDataFollowsFlag(previousKey)); - setMetaDataFollowsFlagInKey(previousKey); - writeKeyValuePair(previousKey, previousValue, kgOutView); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(END_OF_KEY_GROUP_MARK); - // this will just close the outer stream - kgOutStream.close(); - kgOutStream = null; - } + /** + * Recovery from remote incremental state. + */ + private void restoreInstance( + IncrementalKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { + + // read state data + Path temporaryRestoreInstancePath = new Path( + stateBackend.instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); + + try { + + transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath); + // read meta data + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = + readMetaData(restoreStateHandle.getMetaStateHandle()); + + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots); + + if (hasExtraKeys) { + restoreKeyGroupsShardWithTemporaryHelperInstance( + temporaryRestoreInstancePath, + columnFamilyDescriptors, + stateMetaInfoSnapshots); + } else { + + // since we transferred all remote state to a local directory, we can use the same code as for + // local recovery. + IncrementalLocalKeyedStateHandle localKeyedStateHandle = new IncrementalLocalKeyedStateHandle( + restoreStateHandle.getBackendIdentifier(), + restoreStateHandle.getCheckpointId(), + new DirectoryStateHandle(temporaryRestoreInstancePath), + restoreStateHandle.getKeyGroupRange(), + restoreStateHandle.getMetaStateHandle(), + restoreStateHandle.getSharedState().keySet()); + + restoreLocalStateIntoFullInstance( + localKeyedStateHandle, + columnFamilyDescriptors, + stateMetaInfoSnapshots); + } } finally { - // this will just close the outer stream - IOUtils.closeQuietly(kgOutStream); + FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem(); + if (restoreFileSystem.exists(temporaryRestoreInstancePath)) { + restoreFileSystem.delete(temporaryRestoreInstancePath, true); + } } } - private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException { - BytePrimitiveArraySerializer.INSTANCE.serialize(key, out); - BytePrimitiveArraySerializer.INSTANCE.serialize(value, out); - } + /** + * Recovery from local incremental state. + */ + private void restoreInstance(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception { + // read meta data + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = + readMetaData(localKeyedStateHandle.getMetaDataState()); - static void setMetaDataFollowsFlagInKey(byte[] key) { - key[0] |= FIRST_BIT_IN_BYTE_MASK; - } + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots); - static void clearMetaDataFollowsFlag(byte[] key) { - key[0] &= (~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); + restoreLocalStateIntoFullInstance( + localKeyedStateHandle, + columnFamilyDescriptors, + stateMetaInfoSnapshots); } - static boolean hasMetaDataFollowsFlag(byte[] key) { - return 0 != (key[0] & RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); - } + /** + * This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state meta data snapshot. + */ + private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors( + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) { - private static void checkInterrupted() throws InterruptedException { - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException("RocksDB snapshot interrupted."); + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + new ArrayList<>(1 + stateMetaInfoSnapshots.size()); + + for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) { + + ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( + stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), + stateBackend.columnOptions); + + columnFamilyDescriptors.add(columnFamilyDescriptor); + stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot); } + return columnFamilyDescriptors; } - } - private static final class RocksDBIncrementalSnapshotOperation<K> { + /** + * This method implements the core of the restore logic that unifies how local and remote state are recovered. + */ + private void restoreLocalStateIntoFullInstance( + IncrementalLocalKeyedStateHandle restoreStateHandle, + List<ColumnFamilyDescriptor> columnFamilyDescriptors, + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) throws Exception { + // pick up again the old backend id, so the we can reference existing state + stateBackend.backendUID = restoreStateHandle.getBackendIdentifier(); + + LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.", + stateBackend.operatorIdentifier, stateBackend.backendUID); + + // create hard links in the instance directory + if (!stateBackend.instanceRocksDBPath.mkdirs()) { + throw new IOException("Could not create RocksDB data directory."); + } - /** The backend which we snapshot. */ - private final RocksDBKeyedStateBackend<K> stateBackend; + Path restoreSourcePath = restoreStateHandle.getDirectoryStateHandle().getDirectory(); + restoreInstanceDirectoryFromPath(restoreSourcePath); - /** Stream factory that creates the outpus streams to DFS. */ - private final CheckpointStreamFactory checkpointStreamFactory; + List<ColumnFamilyHandle> columnFamilyHandles = + new ArrayList<>(1 + columnFamilyDescriptors.size()); - /** Id for the current checkpoint. */ - private final long checkpointId; + stateBackend.db = stateBackend.openDB( + stateBackend.instanceRocksDBPath.getAbsolutePath(), + columnFamilyDescriptors, columnFamilyHandles); - /** Timestamp for the current checkpoint. */ - private final long checkpointTimestamp; + // extract and store the default column family which is located at the first index + stateBackend.defaultColumnFamily = columnFamilyHandles.remove(0); - /** All sst files that were part of the last previously completed checkpoint. */ - private Set<StateHandleID> baseSstFiles; + for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { + RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i); - /** The state meta data. */ - private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>(); + ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); + RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + stateMetaInfoSnapshot.getStateType(), + stateMetaInfoSnapshot.getName(), + stateMetaInfoSnapshot.getNamespaceSerializer(), + stateMetaInfoSnapshot.getStateSerializer()); - private FileSystem backupFileSystem; - private Path backupPath; + stateBackend.kvStateInformation.put( + stateMetaInfoSnapshot.getName(), + new Tuple2<>(columnFamilyHandle, stateMetaInfo)); + } - // Registry for all opened i/o streams - private final CloseableRegistry closeableRegistry = new CloseableRegistry(); + // use the restore sst files as the base for succeeding checkpoints + synchronized (stateBackend.materializedSstFiles) { + stateBackend.materializedSstFiles.put( + restoreStateHandle.getCheckpointId(), + restoreStateHandle.getSharedStateHandleIDs()); + } - // new sst files since the last completed checkpoint - private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); + stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId(); + } - // handles to the misc files in the current snapshot - private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); + /** + * This recreates the new working directory of the recovered RocksDB instance and links/copies the contents from + * a local state. + */ + private void restoreInstanceDirectoryFromPath(Path source) throws IOException { - // This lease protects from concurrent disposal of the native rocksdb instance. - private final ResourceGuard.Lease dbLease; + FileSystem fileSystem = source.getFileSystem(); - private StreamStateHandle metaStateHandle = null; + final FileStatus[] fileStatuses = fileSystem.listStatus(source); - private RocksDBIncrementalSnapshotOperation( - RocksDBKeyedStateBackend<K> stateBackend, - CheckpointStreamFactory checkpointStreamFactory, - long checkpointId, - long checkpointTimestamp) throws IOException { + if (fileStatuses == null) { + throw new IOException("Cannot list file statues. Directory " + source + " does not exist."); + } - this.stateBackend = stateBackend; - this.checkpointStreamFactory = checkpointStreamFactory; - this.checkpointId = checkpointId; - this.checkpointTimestamp = checkpointTimestamp; - this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); + for (FileStatus fileStatus : fileStatuses) { + final Path filePath = fileStatus.getPath(); + final String fileName = filePath.getName(); + File restoreFile = new File(source.getPath(), fileName); + File targetFile = new File(stateBackend.instanceRocksDBPath.getPath(), fileName); + if (fileName.endsWith(SST_FILE_SUFFIX)) { + // hardlink'ing the immutable sst-files. + Files.createLink(targetFile.toPath(), restoreFile.toPath()); + } else { + // true copy for all other files. + Files.copy(restoreFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + } } - private StreamStateHandle materializeStateData(Path filePath) throws Exception { + /** + * Reads Flink's state meta data file from the state handle. + */ + private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData( + StreamStateHandle metaStateHandle) throws Exception { + FSDataInputStream inputStream = null; - CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; try { - final byte[] buffer = new byte[8 * 1024]; - - FileSystem backupFileSystem = backupPath.getFileSystem(); - inputStream = backupFileSystem.open(filePath); - closeableRegistry.registerCloseable(inputStream); - - outputStream = checkpointStreamFactory - .createCheckpointStateOutputStream(CheckpointedStateScope.SHARED); - closeableRegistry.registerCloseable(outputStream); - - while (true) { - int numBytes = inputStream.read(buffer); + inputStream = metaStateHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerCloseable(inputStream); - if (numBytes == -1) { - break; - } + KeyedBackendSerializationProxy<T> serializationProxy = + new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader); + DataInputView in = new DataInputViewStreamWrapper(inputStream); + serializationProxy.read(in); - outputStream.write(buffer, 0, numBytes); - } + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (CompatibilityUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + UnloadableDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + stateBackend.keySerializer) + .isRequiresMigration()) { - StreamStateHandle result = null; - if (closeableRegistry.unregisterCloseable(outputStream)) { - result = outputStream.closeAndGetHandle(); - outputStream = null; + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); } - return result; + return serializationProxy.getStateMetaInfoSnapshots(); } finally { - if (inputStream != null && closeableRegistry.unregisterCloseable(inputStream)) { + if (stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) { inputStream.close(); } - - if (outputStream != null && closeableRegistry.unregisterCloseable(outputStream)) { - outputStream.close(); - } } } - private StreamStateHandle materializeMetaData() throws Exception { - CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; - - try { - outputStream = checkpointStreamFactory - .createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); - closeableRegistry.registerCloseable(outputStream); + private void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest) throws IOException { - //no need for compression scheme support because sst-files are already compressed - KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>( - stateBackend.keySerializer, - stateMetaInfoSnapshots, - false); + final Map<StateHandleID, StreamStateHandle> sstFiles = + restoreStateHandle.getSharedState(); + final Map<StateHandleID, StreamStateHandle> miscFiles = + restoreStateHandle.getPrivateState(); - DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + transferAllDataFromStateHandles(sstFiles, dest); + transferAllDataFromStateHandles(miscFiles, dest); + } - serializationProxy.write(out); + /** + * Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their + * {@link StateHandleID}. + */ + private void transferAllDataFromStateHandles( + Map<StateHandleID, StreamStateHandle> stateHandleMap, + Path restoreInstancePath) throws IOException { - StreamStateHandle result = null; - if (closeableRegistry.unregisterCloseable(outputStream)) { - result = outputStream.closeAndGetHandle(); - outputStream = null; - } - return result; - } finally { - if (outputStream != null) { - if (closeableRegistry.unregisterCloseable(outputStream)) { - outputStream.close(); - } - } + for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle); } } - void takeSnapshot() throws Exception { + /** + * Copies the file from a single state handle to the given path. + */ + private void copyStateDataHandleData( + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { - final long lastCompletedCheckpoint; + FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); - // use the last completed checkpoint as the comparison base. - synchronized (stateBackend.materializedSstFiles) { - lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId; - baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint); - } + FSDataInputStream inputStream = null; + FSDataOutputStream outputStream = null; - LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " + - "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles); + try { + inputStream = remoteFileHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerCloseable(inputStream); - // save meta data - for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry - : stateBackend.kvStateInformation.entrySet()) { - stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot()); - } + outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); + stateBackend.cancelStreamRegistry.registerCloseable(outputStream); - // save state data - backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId); + byte[] buffer = new byte[8 * 1024]; + while (true) { + int numBytes = inputStream.read(buffer); + if (numBytes == -1) { + break; + } - LOG.trace("Local RocksDB checkpoint goes to backup path {}.", backupPath); + outputStream.write(buffer, 0, numBytes); + } + } finally { + if (stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) { + inputStream.close(); + } - backupFileSystem = backupPath.getFileSystem(); - if (backupFileSystem.exists(backupPath)) { - throw new IllegalStateException("Unexpected existence of the backup directory."); + if (stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) { + outputStream.close(); + } } - - // create hard links of living files in the checkpoint path - Checkpoint checkpoint = Checkpoint.create(stateBackend.db); - checkpoint.createCheckpoint(backupPath.getPath()); } - KeyedStateHandle materializeSnapshot() throws Exception { + /** + * In case of rescaling, this method creates a temporary RocksDB instance for a key-groups shard. All contents + * from the temporary instance are copied into the real restore instance and then the temporary instance is + * discarded. + */ + private void restoreKeyGroupsShardWithTemporaryHelperInstance( + Path restoreInstancePath, + List<ColumnFamilyDescriptor> columnFamilyDescriptors, + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) throws Exception { - stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry); + List<ColumnFamilyHandle> columnFamilyHandles = + new ArrayList<>(1 + columnFamilyDescriptors.size()); - // write meta data - metaStateHandle = materializeMetaData(); + try (RocksDB restoreDb = stateBackend.openDB( + restoreInstancePath.getPath(), + columnFamilyDescriptors, + columnFamilyHandles)) { --- End diff -- This can be optimized. If a `restoreInstance` satisfy `startKeyGroup >= backend.keygroup.startKeyGroup() && endKeyGroup <= backend.keygroup.endKeyGroup()` we can open it directly as the target rocksdb, and use it for the rest of the recovery. (for now, we have to iterate all restored rocksdbs.)
---