http://git-wip-us.apache.org/repos/asf/flink/blob/1619fa8a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 344255f..0cb2792 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -52,19 +52,25 @@ import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.DirectoryStateHandle; import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider; import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; +import org.apache.flink.runtime.state.SnapshotDirectory; import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.SnapshotStrategy; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; @@ -77,12 +83,14 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.ResourceGuard; import org.apache.flink.util.StateMigrationException; +import org.apache.flink.util.function.SupplierWithException; import org.rocksdb.Checkpoint; import org.rocksdb.ColumnFamilyDescriptor; @@ -104,6 +112,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Files; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -122,7 +131,6 @@ import java.util.Spliterator; import java.util.Spliterators; import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; import java.util.concurrent.RunnableFuture; import java.util.stream.Stream; @@ -217,6 +225,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** The configuration of local recovery. */ private final LocalRecoveryConfig localRecoveryConfig; + /** The snapshot strategy, e.g., if we use full or incremental checkpoints, local state, and so on. */ + private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy; + public RocksDBKeyedStateBackend( String operatorIdentifier, ClassLoader userCodeClassLoader, @@ -248,26 +259,41 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath); this.instanceRocksDBPath = new File(instanceBasePath, "db"); - if (instanceBasePath.exists()) { + checkAndCreateDirectory(instanceBasePath); + + if (instanceRocksDBPath.exists()) { // Clear the base directory when the backend is created // in case something crashed and the backend never reached dispose() cleanInstanceBasePath(); } - if (!instanceBasePath.mkdirs()) { - throw new IOException( - String.format("Could not create RocksDB data directory at %s.", instanceBasePath.getAbsolutePath())); - } - this.localRecoveryConfig = Preconditions.checkNotNull(localRecoveryConfig); this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1; this.kvStateInformation = new HashMap<>(); this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); this.backendUID = UUID.randomUUID(); + + this.snapshotStrategy = enableIncrementalCheckpointing ? + new IncrementalSnapshotStrategy() : + new FullSnapshotStrategy(); + LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } + private static void checkAndCreateDirectory(File directory) throws IOException { + if (directory.exists()) { + if (!directory.isDirectory()) { + throw new IOException("Not a directory: " + directory); + } + } else { + if (!directory.mkdirs()) { + throw new IOException( + String.format("Could not create RocksDB data directory at %s.", directory)); + } + } + } + @Override public <N> Stream<K> getKeys(String state, N namespace) { Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnInfo = kvStateInformation.get(state); @@ -294,7 +320,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { RocksIterator iterator = db.newIterator(columnInfo.f0); iterator.seekToFirst(); - final RocksIteratorWrapper<K> iteratorWrapper = new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, + final RocksIteratorForKeysWrapper<K> iteratorWrapper = new RocksIteratorForKeysWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, ambiguousKeyPossible, nameSpaceBytes); Stream<K> targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED), false); @@ -381,1743 +407,2053 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { - if (checkpointOptions.getCheckpointType() != CheckpointType.SAVEPOINT && - enableIncrementalCheckpointing) { - return snapshotIncrementally(checkpointId, timestamp, streamFactory); - } else { - return snapshotFully(checkpointId, timestamp, streamFactory); - } + return snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions); } - private RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotIncrementally( - final long checkpointId, - final long checkpointTimestamp, - final CheckpointStreamFactory checkpointStreamFactory) throws Exception { - - if (db == null) { - throw new IOException("RocksDB closed."); - } + @Override + public void restore(Collection<KeyedStateHandle> restoreState) throws Exception { + LOG.info("Initializing RocksDB keyed state backend."); - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", - checkpointTimestamp); - } - return DoneFuture.of(SnapshotResult.empty()); + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring snapshot from state handles: {}.", restoreState); } - final RocksDBIncrementalSnapshotOperation<K> snapshotOperation = - new RocksDBIncrementalSnapshotOperation<>( - this, - checkpointStreamFactory, - checkpointId, - checkpointTimestamp); + // clear all meta data + kvStateInformation.clear(); + restoredKvStateMetaInfos.clear(); try { - snapshotOperation.takeSnapshot(); - } catch (Exception e) { - snapshotOperation.stop(); - snapshotOperation.releaseResources(true); - throw e; - } - - return new FutureTask<SnapshotResult<KeyedStateHandle>>( - new Callable<SnapshotResult<KeyedStateHandle>>() { - @Override - public SnapshotResult<KeyedStateHandle> call() throws Exception { - KeyedStateHandle keyedStateHandle = snapshotOperation.materializeSnapshot(); - return SnapshotResult.of(keyedStateHandle); + if (restoreState == null || restoreState.isEmpty()) { + createDB(); + } else { + KeyedStateHandle firstStateHandle = restoreState.iterator().next(); + if (firstStateHandle instanceof IncrementalKeyedStateHandle + || firstStateHandle instanceof IncrementalLocalKeyedStateHandle) { + RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this); + restoreOperation.restore(restoreState); + } else { + RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this); + restoreOperation.doRestore(restoreState); } } - ) { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - snapshotOperation.stop(); - return super.cancel(mayInterruptIfRunning); - } - - @Override - protected void done() { - snapshotOperation.releaseResources(isCancelled()); - } - }; + } catch (Exception ex) { + dispose(); + throw ex; + } } - private RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotFully( - final long checkpointId, - final long timestamp, - final CheckpointStreamFactory streamFactory) throws Exception { - - long startTime = System.currentTimeMillis(); - final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); - - final RocksDBFullSnapshotOperation<K> snapshotOperation; - - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", timestamp); - } + @Override + public void notifyCheckpointComplete(long completedCheckpointId) { - return DoneFuture.of(SnapshotResult.empty()); + if (!enableIncrementalCheckpointing) { + return; } - snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry); - snapshotOperation.takeDBSnapShot(checkpointId, timestamp); - - // implementation of the async IO operation, based on FutureTask - AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = - new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() { + synchronized (materializedSstFiles) { - @Override - protected void acquireResources() throws Exception { - cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry); - snapshotOperation.openCheckpointStream(); - } + if (completedCheckpointId < lastCompletedCheckpointId) { + return; + } - @Override - protected void releaseResources() throws Exception { - closeLocalRegistry(); - releaseSnapshotOperationResources(); - } + materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId); - private void releaseSnapshotOperationResources() { - // hold the db lock while operation on the db to guard us against async db disposal - snapshotOperation.releaseSnapshotResources(); - } + lastCompletedCheckpointId = completedCheckpointId; + } + } - @Override - protected void stopOperation() throws Exception { - closeLocalRegistry(); - } + private void createDB() throws IOException { + List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); + this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles); + this.defaultColumnFamily = columnFamilyHandles.get(0); + } - private void closeLocalRegistry() { - if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) { - try { - snapshotCloseableRegistry.close(); - } catch (Exception ex) { - LOG.warn("Error closing local registry", ex); - } - } - } + private RocksDB openDB( + String path, + List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, + List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException { - @Override - public SnapshotResult<KeyedStateHandle> performOperation() throws Exception { - long startTime = System.currentTimeMillis(); + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + new ArrayList<>(1 + stateColumnFamilyDescriptors.size()); - if (isStopped()) { - throw new IOException("RocksDB closed."); - } + columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors); - snapshotOperation.writeDBSnapshot(); + // we add the required descriptor for the default CF in last position. + columnFamilyDescriptors.add(new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions)); - LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", - streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); + RocksDB dbRef; - KeyGroupsStateHandle snapshotResultStateHandle = snapshotOperation.getSnapshotResultStateHandle(); - return SnapshotResult.of(snapshotResultStateHandle); - } - }; + try { + dbRef = RocksDB.open( + Preconditions.checkNotNull(dbOptions), + Preconditions.checkNotNull(path), + columnFamilyDescriptors, + stateColumnFamilyHandles); + } catch (RocksDBException e) { + throw new IOException("Error while opening RocksDB instance.", e); + } - LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.", - streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); + // requested + default CF + Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), + "Not all requested column family handles have been created"); - return AsyncStoppableTaskWithCallback.from(ioCallable); + return dbRef; } /** - * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend. + * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a full snapshot. */ - static final class RocksDBFullSnapshotOperation<K> { - - static final int FIRST_BIT_IN_BYTE_MASK = 0x80; - static final int END_OF_KEY_GROUP_MARK = 0xFFFF; - - private final RocksDBKeyedStateBackend<K> stateBackend; - private final KeyGroupRangeOffsets keyGroupRangeOffsets; - private final CheckpointStreamFactory checkpointStreamFactory; - private final CloseableRegistry snapshotCloseableRegistry; - private final ResourceGuard.Lease dbLease; - - private long checkpointId; - private long checkpointTimeStamp; + private static final class RocksDBFullRestoreOperation<K> { - private Snapshot snapshot; - private ReadOptions readOptions; - private List<Tuple2<RocksIterator, Integer>> kvStateIterators; - - private CheckpointStreamFactory.CheckpointStateOutputStream outStream; - private DataOutputView outputView; - - RocksDBFullSnapshotOperation( - RocksDBKeyedStateBackend<K> stateBackend, - CheckpointStreamFactory checkpointStreamFactory, - CloseableRegistry registry) throws IOException { + private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend; - this.stateBackend = stateBackend; - this.checkpointStreamFactory = checkpointStreamFactory; - this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange); - this.snapshotCloseableRegistry = registry; - this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); - } + /** Current key-groups state handle from which we restore key-groups. */ + private KeyGroupsStateHandle currentKeyGroupsStateHandle; + /** Current input stream we obtained from currentKeyGroupsStateHandle. */ + private FSDataInputStream currentStateHandleInStream; + /** Current data input view that wraps currentStateHandleInStream. */ + private DataInputView currentStateHandleInView; + /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */ + private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies; + /** The compression decorator that was used for writing the state, as determined by the meta data. */ + private StreamCompressionDecorator keygroupStreamCompressionDecorator; /** - * 1) Create a snapshot object from RocksDB. + * Creates a restore operation object for the given state backend instance. * - * @param checkpointId id of the checkpoint for which we take the snapshot - * @param checkpointTimeStamp timestamp of the checkpoint for which we take the snapshot + * @param rocksDBKeyedStateBackend the state backend into which we restore */ - public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) { - Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!"); - this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size()); - this.checkpointId = checkpointId; - this.checkpointTimeStamp = checkpointTimeStamp; - this.snapshot = stateBackend.db.getSnapshot(); + public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) { + this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend); } /** - * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write. + * Restores all key-groups data that is referenced by the passed state handles. * - * @throws Exception + * @param keyedStateHandles List of all key groups state handles that shall be restored. */ - public void openCheckpointStream() throws Exception { - Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set."); - outStream = checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); - snapshotCloseableRegistry.registerCloseable(outStream); - outputView = new DataOutputViewStreamWrapper(outStream); - } + public void doRestore(Collection<KeyedStateHandle> keyedStateHandles) + throws IOException, StateMigrationException, RocksDBException { - /** - * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1). - * - * @throws IOException - */ - public void writeDBSnapshot() throws IOException, InterruptedException { + rocksDBKeyedStateBackend.createDB(); - if (null == snapshot) { - throw new IOException("No snapshot available. Might be released due to cancellation."); - } + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + if (keyedStateHandle != null) { - Preconditions.checkNotNull(outStream, "No output stream to write snapshot."); - writeKVStateMetaData(); - writeKVStateData(); + if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + KeyGroupsStateHandle.class + + ", but found: " + keyedStateHandle.getClass()); + } + this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; + restoreKeyGroupsInStateHandle(); + } + } } /** - * 4) Returns a state handle to the snapshot after the snapshot procedure is completed and null before. - * - * @return state handle to the completed snapshot + * Restore one key groups state handle. */ - public KeyGroupsStateHandle getSnapshotResultStateHandle() throws IOException { - - if (snapshotCloseableRegistry.unregisterCloseable(outStream)) { - - StreamStateHandle stateHandle = outStream.closeAndGetHandle(); - outStream = null; - - if (stateHandle != null) { - return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); + private void restoreKeyGroupsInStateHandle() + throws IOException, StateMigrationException, RocksDBException { + try { + currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream(); + rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream); + currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream); + restoreKVStateMetaData(); + restoreKVStateData(); + } finally { + if (rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) { + IOUtils.closeQuietly(currentStateHandleInStream); } } - return null; } /** - * 5) Release the snapshot object for RocksDB and clean up. + * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws RocksDBException */ - public void releaseSnapshotResources() { + private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { - outStream = null; + KeyedBackendSerializationProxy<K> serializationProxy = + new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); - if (null != kvStateIterators) { - for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) { - IOUtils.closeQuietly(kvStateIterator.f0); - } - kvStateIterators = null; - } + serializationProxy.read(currentStateHandleInView); - if (null != snapshot) { - if (null != stateBackend.db) { - stateBackend.db.releaseSnapshot(snapshot); - } - IOUtils.closeQuietly(snapshot); - snapshot = null; - } + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (CompatibilityUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + UnloadableDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + rocksDBKeyedStateBackend.keySerializer) + .isRequiresMigration()) { - if (null != readOptions) { - IOUtils.closeQuietly(readOptions); - readOptions = null; + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); } - this.dbLease.close(); - } + this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? + SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE; - private void writeKVStateMetaData() throws IOException { + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = + serializationProxy.getStateMetaInfoSnapshots(); + currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); + //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size()); - List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots = - new ArrayList<>(stateBackend.kvStateInformation.size()); + for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) { - int kvStateId = 0; - for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> column : - stateBackend.kvStateInformation.entrySet()) { + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn = + rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName()); - metaInfoSnapshots.add(column.getValue().f1.snapshot()); + if (registeredColumn == null) { + byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); - //retrieve iterator for this k/v states - readOptions = new ReadOptions(); - readOptions.setSnapshot(snapshot); - - kvStateIterators.add( - new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId)); + ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( + nameBytes, + rocksDBKeyedStateBackend.columnOptions); - ++kvStateId; - } + RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + restoredMetaInfo.getStateType(), + restoredMetaInfo.getName(), + restoredMetaInfo.getNamespaceSerializer(), + restoredMetaInfo.getStateSerializer()); - KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>( - stateBackend.getKeySerializer(), - metaInfoSnapshots, - !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator)); + rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); - serializationProxy.write(outputView); - } + ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor); - private void writeKVStateData() throws IOException, InterruptedException { + registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo); + rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn); - byte[] previousKey = null; - byte[] previousValue = null; - OutputStream kgOutStream = null; - DataOutputView kgOutView = null; + } else { + // TODO with eager state registration in place, check here for serializer migration strategies + } + currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0); + } + } - try { - // Here we transfer ownership of RocksIterators to the RocksDBMergeIterator - try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator( - kvStateIterators, stateBackend.keyGroupPrefixBytes)) { + /** + * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle. + * + * @throws IOException + * @throws RocksDBException + */ + private void restoreKVStateData() throws IOException, RocksDBException { + //for all key-groups in the current state handle... + for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { + int keyGroup = keyGroupOffset.f0; - // handover complete, null out to prevent double close - kvStateIterators = null; + // Check that restored key groups all belong to the backend + Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), + "The key group must belong to the backend"); - //preamble: setup with first key-group as our lookahead - if (mergeIterator.isValid()) { - //begin first key-group by recording the offset - keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); - //write the k/v-state id as metadata - kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream); - kgOutView = new DataOutputViewStreamWrapper(kgOutStream); + long offset = keyGroupOffset.f1; + //not empty key-group? + if (0L != offset) { + currentStateHandleInStream.seek(offset); + try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) { + DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn); //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(mergeIterator.kvStateId()); - previousKey = mergeIterator.key(); - previousValue = mergeIterator.value(); - mergeIterator.next(); + int kvStateId = compressedKgInputView.readShort(); + ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + //insert all k/v pairs into DB + boolean keyGroupHasMoreKeys = true; + while (keyGroupHasMoreKeys) { + byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); + byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); + if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) { + //clear the signal bit in the key to make it ready for insertion again + RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key); + rocksDBKeyedStateBackend.db.put(handle, key, value); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK + & compressedKgInputView.readShort(); + if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) { + keyGroupHasMoreKeys = false; + } else { + handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + } + } else { + rocksDBKeyedStateBackend.db.put(handle, key, value); + } + } } + } + } + } + } - //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. - while (mergeIterator.isValid()) { - - assert (!hasMetaDataFollowsFlag(previousKey)); + /** + * Encapsulates the process of restoring a RocksDBKeyedStateBackend from an incremental snapshot. + */ + private static class RocksDBIncrementalRestoreOperation<T> { - //set signal in first key byte that meta data will follow in the stream after this k/v pair - if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) { + private final RocksDBKeyedStateBackend<T> stateBackend; - //be cooperative and check for interruption from time to time in the hot loop - checkInterrupted(); + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) { + this.stateBackend = stateBackend; + } - setMetaDataFollowsFlagInKey(previousKey); - } + /** + * Root method that branches for different implementations of {@link KeyedStateHandle}. + */ + void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { - writeKeyValuePair(previousKey, previousValue, kgOutView); + boolean hasExtraKeys = (restoreStateHandles.size() > 1 || + !Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(), stateBackend.keyGroupRange)); - //write meta data if we have to - if (mergeIterator.isNewKeyGroup()) { - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(END_OF_KEY_GROUP_MARK); - // this will just close the outer stream - kgOutStream.close(); - //begin new key-group - keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); - //write the kev-state - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream); - kgOutView = new DataOutputViewStreamWrapper(kgOutStream); - kgOutView.writeShort(mergeIterator.kvStateId()); - } else if (mergeIterator.isNewKeyValueState()) { - //write the k/v-state - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(mergeIterator.kvStateId()); - } + if (hasExtraKeys) { + stateBackend.createDB(); + } - //request next k/v pair - previousKey = mergeIterator.key(); - previousValue = mergeIterator.value(); - mergeIterator.next(); - } - } + for (KeyedStateHandle rawStateHandle : restoreStateHandles) { - //epilogue: write last key-group - if (previousKey != null) { - assert (!hasMetaDataFollowsFlag(previousKey)); - setMetaDataFollowsFlagInKey(previousKey); - writeKeyValuePair(previousKey, previousValue, kgOutView); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(END_OF_KEY_GROUP_MARK); - // this will just close the outer stream - kgOutStream.close(); - kgOutStream = null; + if (rawStateHandle instanceof IncrementalKeyedStateHandle) { + restoreInstance((IncrementalKeyedStateHandle) rawStateHandle, hasExtraKeys); + } else if (rawStateHandle instanceof IncrementalLocalKeyedStateHandle) { + Preconditions.checkState(!hasExtraKeys, "Cannot recover from local state after rescaling."); + restoreInstance((IncrementalLocalKeyedStateHandle) rawStateHandle); + } else { + throw new IllegalStateException("Unexpected state handle type, " + + "expected " + IncrementalKeyedStateHandle.class + + ", but found " + rawStateHandle.getClass()); } - - } finally { - // this will just close the outer stream - IOUtils.closeQuietly(kgOutStream); } } - private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException { - BytePrimitiveArraySerializer.INSTANCE.serialize(key, out); - BytePrimitiveArraySerializer.INSTANCE.serialize(value, out); - } + /** + * Recovery from remote incremental state. + */ + private void restoreInstance( + IncrementalKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { - static void setMetaDataFollowsFlagInKey(byte[] key) { - key[0] |= FIRST_BIT_IN_BYTE_MASK; - } + // read state data + Path temporaryRestoreInstancePath = new Path( + stateBackend.instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); - static void clearMetaDataFollowsFlag(byte[] key) { - key[0] &= (~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); - } + try { - static boolean hasMetaDataFollowsFlag(byte[] key) { - return 0 != (key[0] & RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); - } + transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath); - private static void checkInterrupted() throws InterruptedException { - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException("RocksDB snapshot interrupted."); + // read meta data + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = + readMetaData(restoreStateHandle.getMetaStateHandle()); + + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots); + + if (hasExtraKeys) { + restoreKeyGroupsShardWithTemporaryHelperInstance( + temporaryRestoreInstancePath, + columnFamilyDescriptors, + stateMetaInfoSnapshots); + } else { + + // since we transferred all remote state to a local directory, we can use the same code as for + // local recovery. + IncrementalLocalKeyedStateHandle localKeyedStateHandle = new IncrementalLocalKeyedStateHandle( + restoreStateHandle.getBackendIdentifier(), + restoreStateHandle.getCheckpointId(), + new DirectoryStateHandle(temporaryRestoreInstancePath), + restoreStateHandle.getKeyGroupRange(), + restoreStateHandle.getMetaStateHandle(), + restoreStateHandle.getSharedState().keySet()); + + restoreLocalStateIntoFullInstance( + localKeyedStateHandle, + columnFamilyDescriptors, + stateMetaInfoSnapshots); + } + } finally { + FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem(); + if (restoreFileSystem.exists(temporaryRestoreInstancePath)) { + restoreFileSystem.delete(temporaryRestoreInstancePath, true); + } } } - } - private static final class RocksDBIncrementalSnapshotOperation<K> { + /** + * Recovery from local incremental state. + */ + private void restoreInstance(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception { + // read meta data + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = + readMetaData(localKeyedStateHandle.getMetaDataState()); - /** The backend which we snapshot. */ - private final RocksDBKeyedStateBackend<K> stateBackend; + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots); - /** Stream factory that creates the outpus streams to DFS. */ - private final CheckpointStreamFactory checkpointStreamFactory; + restoreLocalStateIntoFullInstance( + localKeyedStateHandle, + columnFamilyDescriptors, + stateMetaInfoSnapshots); + } - /** Id for the current checkpoint. */ - private final long checkpointId; + /** + * This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state meta data snapshot. + */ + private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors( + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) { - /** Timestamp for the current checkpoint. */ - private final long checkpointTimestamp; + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + new ArrayList<>(1 + stateMetaInfoSnapshots.size()); - /** All sst files that were part of the last previously completed checkpoint. */ - private Set<StateHandleID> baseSstFiles; + for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) { - /** The state meta data. */ - private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>(); + ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( + stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), + stateBackend.columnOptions); + + columnFamilyDescriptors.add(columnFamilyDescriptor); + stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot); + } + return columnFamilyDescriptors; + } + + /** + * This method implements the core of the restore logic that unifies how local and remote state are recovered. + */ + private void restoreLocalStateIntoFullInstance( + IncrementalLocalKeyedStateHandle restoreStateHandle, + List<ColumnFamilyDescriptor> columnFamilyDescriptors, + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) throws Exception { + // pick up again the old backend id, so the we can reference existing state + stateBackend.backendUID = restoreStateHandle.getBackendIdentifier(); + + LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.", + stateBackend.operatorIdentifier, stateBackend.backendUID); + + // create hard links in the instance directory + if (!stateBackend.instanceRocksDBPath.mkdirs()) { + throw new IOException("Could not create RocksDB data directory."); + } - /** Local filesystem for the RocksDB backup. */ - private FileSystem backupFileSystem; + Path restoreSourcePath = restoreStateHandle.getDirectoryStateHandle().getDirectory(); + restoreInstanceDirectoryFromPath(restoreSourcePath); - /** Local path for the RocksDB backup. */ - private Path backupPath; + List<ColumnFamilyHandle> columnFamilyHandles = + new ArrayList<>(1 + columnFamilyDescriptors.size()); - // Registry for all opened i/o streams - private final CloseableRegistry closeableRegistry = new CloseableRegistry(); + stateBackend.db = stateBackend.openDB( + stateBackend.instanceRocksDBPath.getAbsolutePath(), + columnFamilyDescriptors, columnFamilyHandles); - // new sst files since the last completed checkpoint - private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); + // extract and store the default column family which is located at the last index + stateBackend.defaultColumnFamily = columnFamilyHandles.remove(columnFamilyHandles.size() - 1); - // handles to the misc files in the current snapshot - private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); + for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { + RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i); - // This lease protects from concurrent disposal of the native rocksdb instance. - private final ResourceGuard.Lease dbLease; + ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); + RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + stateMetaInfoSnapshot.getStateType(), + stateMetaInfoSnapshot.getName(), + stateMetaInfoSnapshot.getNamespaceSerializer(), + stateMetaInfoSnapshot.getStateSerializer()); - private StreamStateHandle metaStateHandle = null; + stateBackend.kvStateInformation.put( + stateMetaInfoSnapshot.getName(), + new Tuple2<>(columnFamilyHandle, stateMetaInfo)); + } - private RocksDBIncrementalSnapshotOperation( - RocksDBKeyedStateBackend<K> stateBackend, - CheckpointStreamFactory checkpointStreamFactory, - long checkpointId, - long checkpointTimestamp) throws IOException { + // use the restore sst files as the base for succeeding checkpoints + synchronized (stateBackend.materializedSstFiles) { + stateBackend.materializedSstFiles.put( + restoreStateHandle.getCheckpointId(), + restoreStateHandle.getSharedStateHandleIDs()); + } - this.stateBackend = stateBackend; - this.checkpointStreamFactory = checkpointStreamFactory; - this.checkpointId = checkpointId; - this.checkpointTimestamp = checkpointTimestamp; - this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); + stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId(); } - private StreamStateHandle materializeStateData(Path filePath) throws Exception { - FSDataInputStream inputStream = null; - CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + /** + * This recreates the new working directory of the recovered RocksDB instance and links/copies the contents from + * a local state. + */ + private void restoreInstanceDirectoryFromPath(Path source) throws IOException { - try { - final byte[] buffer = new byte[8 * 1024]; + FileSystem fileSystem = source.getFileSystem(); - FileSystem backupFileSystem = backupPath.getFileSystem(); - inputStream = backupFileSystem.open(filePath); - closeableRegistry.registerCloseable(inputStream); + final FileStatus[] fileStatuses = fileSystem.listStatus(source); - outputStream = checkpointStreamFactory - .createCheckpointStateOutputStream(CheckpointedStateScope.SHARED); - closeableRegistry.registerCloseable(outputStream); - - while (true) { - int numBytes = inputStream.read(buffer); - - if (numBytes == -1) { - break; - } - - outputStream.write(buffer, 0, numBytes); - } - - StreamStateHandle result = null; - if (closeableRegistry.unregisterCloseable(outputStream)) { - result = outputStream.closeAndGetHandle(); - outputStream = null; - } - return result; - - } finally { - - if (closeableRegistry.unregisterCloseable(inputStream)) { - inputStream.close(); - } + if (fileStatuses == null) { + throw new IOException("Cannot list file statues. Directory " + source + " does not exist."); + } - if (closeableRegistry.unregisterCloseable(outputStream)) { - outputStream.close(); + for (FileStatus fileStatus : fileStatuses) { + final Path filePath = fileStatus.getPath(); + final String fileName = filePath.getName(); + File restoreFile = new File(source.getPath(), fileName); + File targetFile = new File(stateBackend.instanceRocksDBPath.getPath(), fileName); + if (fileName.endsWith(SST_FILE_SUFFIX)) { + // hardlink'ing the immutable sst-files. + Files.createLink(targetFile.toPath(), restoreFile.toPath()); + } else { + // true copy for all other files. + Files.copy(restoreFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); } } } - private StreamStateHandle materializeMetaData() throws Exception { - CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + /** + * Reads Flink's state meta data file from the state handle. + */ + private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData( + StreamStateHandle metaStateHandle) throws Exception { - try { - outputStream = checkpointStreamFactory - .createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); - closeableRegistry.registerCloseable(outputStream); + FSDataInputStream inputStream = null; - //no need for compression scheme support because sst-files are already compressed - KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>( - stateBackend.keySerializer, - stateMetaInfoSnapshots, - false); + try { + inputStream = metaStateHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerCloseable(inputStream); - DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + KeyedBackendSerializationProxy<T> serializationProxy = + new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader); + DataInputView in = new DataInputViewStreamWrapper(inputStream); + serializationProxy.read(in); - serializationProxy.write(out); + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (CompatibilityUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + UnloadableDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + stateBackend.keySerializer) + .isRequiresMigration()) { - StreamStateHandle result = null; - if (closeableRegistry.unregisterCloseable(outputStream)) { - result = outputStream.closeAndGetHandle(); - outputStream = null; + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); } - return result; + + return serializationProxy.getStateMetaInfoSnapshots(); } finally { - if (outputStream != null) { - if (closeableRegistry.unregisterCloseable(outputStream)) { - outputStream.close(); - } + if (stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) { + inputStream.close(); } } } - void takeSnapshot() throws Exception { - - final long lastCompletedCheckpoint; - - // use the last completed checkpoint as the comparison base. - synchronized (stateBackend.materializedSstFiles) { - lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId; - baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint); - } - - LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " + - "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles); + private void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest) throws IOException { - // save meta data - for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry - : stateBackend.kvStateInformation.entrySet()) { - stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot()); - } + final Map<StateHandleID, StreamStateHandle> sstFiles = + restoreStateHandle.getSharedState(); + final Map<StateHandleID, StreamStateHandle> miscFiles = + restoreStateHandle.getPrivateState(); - // save state data - backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId); + transferAllDataFromStateHandles(sstFiles, dest); + transferAllDataFromStateHandles(miscFiles, dest); + } - LOG.trace("Local RocksDB checkpoint goes to backup path {}.", backupPath); + /** + * Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their + * {@link StateHandleID}. + */ + private void transferAllDataFromStateHandles( + Map<StateHandleID, StreamStateHandle> stateHandleMap, + Path restoreInstancePath) throws IOException { - backupFileSystem = backupPath.getFileSystem(); - if (backupFileSystem.exists(backupPath)) { - throw new IllegalStateException("Unexpected existence of the backup directory."); + for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle); } - - // create hard links of living files in the checkpoint path - Checkpoint checkpoint = Checkpoint.create(stateBackend.db); - checkpoint.createCheckpoint(backupPath.getPath()); } - KeyedStateHandle materializeSnapshot() throws Exception { - - stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry); + /** + * Copies the file from a single state handle to the given path. + */ + private void copyStateDataHandleData( + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { - // write meta data - metaStateHandle = materializeMetaData(); + FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); - // write state data - Preconditions.checkState(backupFileSystem.exists(backupPath)); + FSDataInputStream inputStream = null; + FSDataOutputStream outputStream = null; - FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath); - if (fileStatuses != null) { - for (FileStatus fileStatus : fileStatuses) { - final Path filePath = fileStatus.getPath(); - final String fileName = filePath.getName(); - final StateHandleID stateHandleID = new StateHandleID(fileName); + try { + inputStream = remoteFileHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerCloseable(inputStream); - if (fileName.endsWith(SST_FILE_SUFFIX)) { - final boolean existsAlready = - baseSstFiles != null && baseSstFiles.contains(stateHandleID); + outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); + stateBackend.cancelStreamRegistry.registerCloseable(outputStream); - if (existsAlready) { - // we introduce a placeholder state handle, that is replaced with the - // original from the shared state registry (created from a previous checkpoint) - sstFiles.put( - stateHandleID, - new PlaceholderStreamStateHandle()); - } else { - sstFiles.put(stateHandleID, materializeStateData(filePath)); - } - } else { - StreamStateHandle fileHandle = materializeStateData(filePath); - miscFiles.put(stateHandleID, fileHandle); + byte[] buffer = new byte[8 * 1024]; + while (true) { + int numBytes = inputStream.read(buffer); + if (numBytes == -1) { + break; } - } - } - - synchronized (stateBackend.materializedSstFiles) { - stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet()); - } - - return new IncrementalKeyedStateHandle( - stateBackend.backendUID, - stateBackend.keyGroupRange, - checkpointId, - sstFiles, - miscFiles, - metaStateHandle); - } - void stop() { + outputStream.write(buffer, 0, numBytes); + } + } finally { + if (stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) { + inputStream.close(); + } - if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { - try { - closeableRegistry.close(); - } catch (IOException e) { - LOG.warn("Could not properly close io streams.", e); + if (stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) { + outputStream.close(); } } } - void releaseResources(boolean canceled) { + /** + * In case of rescaling, this method creates a temporary RocksDB instance for a key-groups shard. All contents + * from the temporary instance are copied into the real restore instance and then the temporary instance is + * discarded. + */ + private void restoreKeyGroupsShardWithTemporaryHelperInstance( + Path restoreInstancePath, + List<ColumnFamilyDescriptor> columnFamilyDescriptors, + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) throws Exception { - dbLease.close(); + List<ColumnFamilyHandle> columnFamilyHandles = + new ArrayList<>(1 + columnFamilyDescriptors.size()); - if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { - try { - closeableRegistry.close(); - } catch (IOException e) { - LOG.warn("Exception on closing registry.", e); - } - } + try (RocksDB restoreDb = stateBackend.openDB( + restoreInstancePath.getPath(), + columnFamilyDescriptors, + columnFamilyHandles)) { - if (backupPath != null) { try { - if (backupFileSystem.exists(backupPath)) { - - LOG.trace("Deleting local RocksDB backup path {}.", backupPath); - backupFileSystem.delete(backupPath, true); - } - } catch (Exception e) { - LOG.warn("Could not properly delete the checkpoint directory.", e); - } - } + // iterating only the requested descriptors automatically skips the default column family handle + for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { + ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); + ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i); + RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i); - if (canceled) { - Collection<StateObject> statesToDiscard = - new ArrayList<>(1 + miscFiles.size() + sstFiles.size()); + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry = + stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName()); - statesToDiscard.add(metaStateHandle); - statesToDiscard.addAll(miscFiles.values()); - statesToDiscard.addAll(sstFiles.values()); + if (null == registeredStateMetaInfoEntry) { - try { - StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard); - } catch (Exception e) { - LOG.warn("Could not properly discard states.", e); - } - } - } - } + RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + stateMetaInfoSnapshot.getStateType(), + stateMetaInfoSnapshot.getName(), + stateMetaInfoSnapshot.getNamespaceSerializer(), + stateMetaInfoSnapshot.getStateSerializer()); - @Override - public void restore(Collection<KeyedStateHandle> restoreState) throws Exception { - LOG.info("Initializing RocksDB keyed state backend from snapshot."); + registeredStateMetaInfoEntry = + new Tuple2<>( + stateBackend.db.createColumnFamily(columnFamilyDescriptor), + stateMetaInfo); - if (LOG.isDebugEnabled()) { - LOG.debug("Restoring snapshot from state handles: {}.", restoreState); - } + stateBackend.kvStateInformation.put( + stateMetaInfoSnapshot.getName(), + registeredStateMetaInfoEntry); + } - // clear all meta data - kvStateInformation.clear(); - restoredKvStateMetaInfos.clear(); + ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0; - try { - if (restoreState == null || restoreState.isEmpty()) { - createDB(); - } else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) { - RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this); - restoreOperation.restore(restoreState); - } else { - RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this); - restoreOperation.doRestore(restoreState); - } - } catch (Exception ex) { - dispose(); - throw ex; - } - } + try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) { - @Override - public void notifyCheckpointComplete(long completedCheckpointId) { + int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); + byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); + } - if (!enableIncrementalCheckpointing) { - return; - } + iterator.seek(startKeyGroupPrefixBytes); - synchronized (materializedSstFiles) { + while (iterator.isValid()) { - if (completedCheckpointId < lastCompletedCheckpointId) { - return; - } + int keyGroup = 0; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; + } - materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId); + if (stateBackend.keyGroupRange.contains(keyGroup)) { + stateBackend.db.put(targetColumnFamilyHandle, + iterator.key(), iterator.value()); + } - lastCompletedCheckpointId = completedCheckpointId; + iterator.next(); + } + } // releases native iterator resources + } + } finally { + //release native tmp db column family resources + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + IOUtils.closeQuietly(columnFamilyHandle); + } + } + } // releases native tmp db resources } } - private void createDB() throws IOException { - List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); - this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles); - this.defaultColumnFamily = columnFamilyHandles.get(0); - } - - private RocksDB openDB( - String path, - List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, - List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException { + // ------------------------------------------------------------------------ + // State factories + // ------------------------------------------------------------------------ - List<ColumnFamilyDescriptor> columnFamilyDescriptors = - new ArrayList<>(1 + stateColumnFamilyDescriptors.size()); + /** + * Creates a column family handle for use with a k/v state. When restoring from a snapshot + * we don't restore the individual k/v states, just the global RocksDB database and the + * list of column families. When a k/v state is first requested we check here whether we + * already have a column family for that and return it or create a new one if it doesn't exist. + * + * <p>This also checks whether the {@link StateDescriptor} for a state matches the one + * that we checkpointed, i.e. is already in the map of column families. + */ + @SuppressWarnings("rawtypes, unchecked") + protected <N, S> ColumnFamilyHandle getColumnFamily( + StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException { - columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors); + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo = + kvStateInformation.get(descriptor.getName()); - // we add the required descriptor for the default CF in last position. - columnFamilyDescriptors.add(new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions)); + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + descriptor.getType(), + descriptor.getName(), + namespaceSerializer, + descriptor.getSerializer()); - RocksDB dbRef; + if (stateInfo != null) { + // TODO with eager registration in place, these checks should be moved to restore() + + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo = + (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName()); + + Preconditions.checkState( + Objects.equals(newMetaInfo.getName(), restoredMetaInfo.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfo.getName() + "], " + + "registered with [" + newMetaInfo.getName() + "]."); + + if (!Objects.equals(newMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType() == restoredMetaInfo.getStateType(), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); + } + + // check compatibility results to determine if state migration is required + CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfo.getNamespaceSerializer(), + null, + restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), + newMetaInfo.getNamespaceSerializer()); + + CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfo.getStateSerializer(), + UnloadableDummyTypeSerializer.class, + restoredMetaInfo.getStateSerializerConfigSnapshot(), + newMetaInfo.getStateSerializer()); + + if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) { + // TODO state migration currently isn't possible. + throw new StateMigrationException("State migration isn't supported, yet."); + } else { + stateInfo.f1 = newMetaInfo; + return stateInfo.f0; + } + } + + byte[] nameBytes = descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); + Preconditions.checkState(!Arrays.equals(DEFAULT_COLUMN_FAMILY_NAME_BYTES, nameBytes), + "The chosen state name 'default' collides with the name of the default column family!"); + + ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions); + + final ColumnFamilyHandle columnFamily; try { - dbRef = RocksDB.open( - Preconditions.checkNotNull(dbOptions), - Preconditions.checkNotNull(path), - columnFamilyDescriptors, - stateColumnFamilyHandles); + columnFamily = db.createColumnFamily(columnDescriptor); } catch (RocksDBException e) { - throw new IOException("Error while opening RocksDB instance.", e); + throw new IOException("Error creating ColumnFamilyHandle.", e); } - // requested + default CF - Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), - "Not all requested column family handles have been created"); + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tuple = + new Tuple2<>(columnFamily, newMetaInfo); + Map rawAccess = kvStateInformation; + rawAccess.put(descriptor.getName(), tuple); + return columnFamily; + } - return dbRef; + @Override + protected <N, T> InternalValueState<N, T> createValueState( + TypeSerializer<N> namespaceSerializer, + ValueStateDescriptor<T> stateDesc) throws Exception { + + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + + return new RocksDBValueState<>(columnFamily, namespaceSerializer, stateDesc, this); + } + + @Override + protected <N, T> InternalListState<N, T> createListState( + TypeSerializer<N> namespaceSerializer, + ListStateDescriptor<T> stateDesc) throws Exception { + + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + + return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this); + } + + @Override + protected <N, T> InternalReducingState<N, T> createReducingState( + TypeSerializer<N> namespaceSerializer, + ReducingStateDescriptor<T> stateDesc) throws Exception { + + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + + return new RocksDBReducingState<>(columnFamily, namespaceSerializer, stateDesc, this); + } + + @Override + protected <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState( + TypeSerializer<N> namespaceSerializer, + AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception { + + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + return new RocksDBAggregatingState<>(columnFamily, namespaceSerializer, stateDesc, this); + } + + @Override + protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState( + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { + + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + + return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this); + } + + @Override + protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState( + TypeSerializer<N> namespaceSerializer, + MapStateDescriptor<UK, UV> stateDesc) throws Exception { + + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + + return new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this); } /** - * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot. + * Only visible for testing, DO NOT USE. */ - static final class RocksDBFullRestoreOperation<K> { + public File getInstanceBasePath() { + return instanceBasePath; + } - private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend; + @Override + public boolean supportsAsynchronousSnapshots() { + return true; + } - /** Current key-groups state handle from which we restore key-groups. */ - private KeyGroupsStateHandle currentKeyGroupsStateHandle; - /** Current input stream we obtained from currentKeyGroupsStateHandle. */ - private FSDataInputStream currentStateHandleInStream; - /** Current data input view that wraps currentStateHandleInStream. */ - private DataInputView currentStateHandleInView; - /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */ - private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies; - /** The compression decorator that was used for writing the state, as determined by the meta data. */ - private StreamCompressionDecorator keygroupStreamCompressionDecorator; + @VisibleForTesting + @SuppressWarnings("unchecked") + @Override + public int numStateEntries() { + int count = 0; + + for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) { + try (RocksIterator rocksIterator = db.newIterator(column.f0)) { + rocksIterator.seekToFirst(); + + while (rocksIterator.isValid()) { + count++; + rocksIterator.next(); + } + } + } + + return count; + } + + + + /** + * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups. + * The resulting iteration sequence is ordered by (key-group, kv-state). + */ + @VisibleForTesting + static final class RocksDBMergeIterator implements AutoCloseable { + + private final PriorityQueue<MergeIterator> heap; + private final int keyGroupPrefixByteCount; + private boolean newKeyGroup; + private boolean newKVState; + private boolean valid; + + private MergeIterator currentSubIterator; + + private static final List<Comparator<MergeIterator>> COMPARATORS; + + static { + int maxBytes = 4; + COMPARATORS = new ArrayList<>(maxBytes); + for (int i = 0; i < maxBytes; ++i) { + final int currentBytes = i; + COMPARATORS.add(new Comparator<MergeIterator>() { + @Override + public int compare(MergeIterator o1, MergeIterator o2) { + int arrayCmpRes = compareKeyGroupsForByteArrays( + o1.currentKey, o2.currentKey, currentBytes); + return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes; + } + }); + } + } + + RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) { + Preconditions.checkNotNull(kvStateIterators); + this.keyGroupPrefixByteCount = keyGroupPrefixByteCount; + + Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount); + + if (kvStateIterators.size() > 0) { + PriorityQueue<MergeIterator> iteratorPriorityQueue = + new PriorityQueue<>(kvStateIterators.size(), iteratorComparator); + + for (Tuple2<RocksIterator, Integer> rocksIteratorWithKVStateId : kvStateIterators) { + final RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0; + rocksIterator.seekToFirst(); + if (rocksIterator.isValid()) { + iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1)); + } else { + IOUtils.closeQuietly(rocksIterator); + } + } + + kvStateIterators.clear(); + + this.heap = iteratorPriorityQueue; + this.valid = !heap.isEmpty(); + this.currentSubIterator = heap.poll(); + } else { + // creating a PriorityQueue of size 0 results in an exception. + this.heap = null; + this.valid = false; + } + + this.newKeyGroup = true; + this.newKVState = true; + } /** - * Creates a restore operation object for the given state backend instance. - * - * @param rocksDBKeyedStateBackend the state backend into which we restore + * Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after + * calls to {@link #next()}. */ - public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) { - this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend); + public void next() { + newKeyGroup = false; + newKVState = false; + + final RocksIterator rocksIterator = currentSubIterator.getIterator(); + rocksIterator.next(); + + byte[] oldKey = currentSubIterator.getCurrentKey(); + if (rocksIterator.isValid()) { + currentSubIterator.currentKey = rocksIterator.key(); + + if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) { + heap.offer(currentSubIterator); + currentSubIterator = heap.poll(); + newKVState = currentSubIterator.getIterator() != rocksIterator; + detectNewKeyGroup(oldKey); + } + } else { + IOUtils.closeQuietly(rocksIterator); + + if (heap.isEmpty()) { + currentSubIterator = null; + valid = false; + } else { + currentSubIterator = heap.poll(); + newKVState = true; + detectNewKeyGroup(oldKey); + } + } + } + + private boolean isDifferentKeyGroup(byte[] a, byte[] b) { + return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount); + } + + private void detectNewKeyGroup(byte[] oldKey) { + if (isDifferentKeyGroup(oldKey, currentSubIterator.currentKey)) { + newKeyGroup = true; + } } /** - * Restores all key-groups data that is referenced by the passed state handles. - * - * @param keyedStateHandles List of all key groups state handles that shall be restored. + * @return key-group for the current key */ - public void doRestore(Collection<KeyedStateHandle> keyedStateHandles) - throws IOException, StateMigrationException, RocksDBException { + public int keyGroup() { + int result = 0; + //big endian decode + for (int i = 0; i < keyGroupPrefixByteCount; ++i) { + result <<= 8; + result |= (currentSubIterator.currentKey[i] & 0xFF); + } + return result; + } - rocksDBKeyedStateBackend.createDB(); + public byte[] key() { + return currentSubIterator.getCurrentKey(); + } + + public byte[] value() { + return currentSubIterator.getIterator().value(); + } + + /** + * @return Id of K/V state to which the current key belongs. + */ + public int kvStateId() { + return currentSubIterator.getKvStateId(); + } + + /** + * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor. + * @return true iff the current key belong to a different k/v-state than it's predecessor. + */ + public boolean isNewKeyValueState() { + return newKVState; + } + + /** + * Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor. + * @return true iff the current key belong to a different key-group than it's predecessor. + */ + public boolean isNewKeyGroup() { + return newKeyGroup; + } + + /** + * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as + * {@link #next()} should only be called if valid returned true. Should be checked after each call to + * {@link #next()} before accessing iterator state. + * @return True iff this iterator is valid. + */ + public boolean isValid() { + return valid; + } + + private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) { + for (int i = 0; i < len; ++i) { + int diff = (a[i] & 0xFF) - (b[i] & 0xFF); + if (diff != 0) { + return diff; + } + } + return 0; + } + + @Override + public void close() { + IOUtils.closeQuietly(currentSubIterator); + currentSubIterator = null; + + IOUtils.closeAllQuietly(heap); + heap.clear(); + } + } + + /** + * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator. + * Used by #MergeIterator. + */ + private static final class MergeIterator implements AutoCloseable { + + /** + * @param iterator The #RocksIterator to wrap . + * @param kvStateId Id of the K/V state to which this iterator belongs. + */ + MergeIterator(RocksIterator iterator, int kvStateId) { + this.iterator = Preconditions.checkNotNull(iterator); + this.currentKey = iterator.key(); + this.kvStateId = kvStateId; + } + + private final RocksIterator iterator; + private byte[] currentKey; + private final int kvStateId; + + public byte[] getCurrentKey() { + return currentKey; + } + + public void setCurrentKey(byte[] currentKey) { + this.currentKey = currentKey; + } + + public RocksIterator getIterator() { + return iterator; + } + + public int getKvStateId() { + return kvStateId; + } + + @Override + public void close() { + IOUtils.closeQuietly(iterator); + } + } + + /** + * Adapter class to bridge between {@link RocksIterator} and {@link Iterator} to iterate over the keys. This class + * is not thread safe. + * + * @param <K> the type of the iterated objects, which are keys in RocksDB. + */ + static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseable { + private final RocksIterator iterator; + private final String state; + private final TypeSerializer<K> keySerializer; + private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private final boolean ambiguousKeyPossible; + private K nextKey; + + RocksIteratorForKeysWrapper( + RocksIterator iterator, + String state, + TypeSerializer<K> keySerializer, + int keyGroupPrefixBytes, + boolean ambiguousKeyPossible, + byte[] namespaceBytes) { + this.iterator = Preconditions.checkNotNull(iterator); + this.state = Preconditions.checkNotNull(state); + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; + this.ambiguousKeyPossible = ambiguousKeyPossible; + } + + @Override + public boolean hasNext() { + while (nextKey == null && iterator.isValid()) { + try { + byte[] key = iterator.key(); + if (isMatchingNameSpace(key)) { + ByteArrayInputStreamWithPos inputStream = + new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes); + DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream); + K value = RocksDBKeySerializationUtils.readKey( + keySerializer, + inputStream, + dataInput, + ambiguousKeyPossible); + nextKey = value; + } + iterator.next(); + } catch (IOException e) { + throw new FlinkRuntimeException("Failed to access state [" + state + "]", e); + } + } + return nextKey != null; + } + + @Override + public K next() { + if (!hasNext()) { + throw new NoSuchElementException("Failed to access state [" + state + "]"); + } - for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { - if (keyedStateHandle != null) { + K tmpKey = nextKey; + nextKey = null; + return tmpKey; + } - if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { - throw new IllegalStateException("Unexpected state handle type, " + - "expected: " + KeyGroupsStateHandle.class + - ", but found: " + keyedStateHandle.getClass()); + private boolean isMatchingNameSpace(@Nonnull byte[] key) { + final int namespaceBytesLength = namespaceBytes.length; + final int basicLength = namespaceBytesLength + keyGroupPrefixBytes; + if (key.length >= basicLength) { + for (int i = 1; i <= namespaceBytesLength; ++i) { + if (key[key.length - i] != namespaceBytes[namespaceBytesLength - i]) { + return false; } - this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; - restoreKeyGroupsInStateHandle(); } + return true; } + return false; } - /** - * Restore one key groups state handle. - */ - private void restoreKeyGroupsInStateHandle() - throws IOException, StateMigrationException, RocksDBException { - try { - currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream(); - rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream); - currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream); - restoreKVStateMetaData(); - restoreKVStateData(); - } finally { - if (rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) { - IOUtils.closeQuietly(currentStateHandleInStream); - } - } + @Override + public void close() { + iterator.close(); } + } - /** - * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws RocksDBException - */ - private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { + private class FullSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> { - KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); + @Override + public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot( + long checkpointId, + long timestamp, + CheckpointStreamFactory primaryStreamFactory, + CheckpointOptions checkpointOptions) throws Exception { - serializationProxy.read(currentStateHandleInView); + long startTime = System.currentTimeMillis(); + final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); - // check for key serializer compatibility; this also reconfigures the - // key serializer to be compatible, if it is required and is possible - if (CompatibilityUtil.resolveCompatibilityResult( - serializationProxy.getKeySerializer(), - UnloadableDummyTypeSerializer.class, - serializationProxy.getKeySerializerConfigSnapshot(), - rocksDBKeyedStateBackend.keySerializer) - .isRequiresMigration()) { + if (kvStateInformation.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", + timestamp); + } - // TODO replace with state migration; note that key hash codes need to remain the same after migration - throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + - "Aborting now since state migration is currently not available"); + return DoneFuture.of(SnapshotResult.empty()); } - this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? - SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE; - - List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = - serializationProxy.getStateMetaInfoSnapshots(); - currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); - //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size()); - - for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) { + final SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier = - Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn = - rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName()); + isWithLocalRecovery( + checkpointOptions.getCheckpointType(), + localRecoveryConfig.getLocalRecoveryMode()) ? - if (registeredColumn == null) { - byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); + () -> CheckpointStreamWithResultProvider.createDuplicatingStream( + checkpointId, + CheckpointedStateScope.EXCLUSIVE, + primaryStreamFactory, + localRecoveryConfig.getLocalStateDirectoryProvider()) : - ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( - nameBytes, - rocksDBKeyedStateBackend.columnOptions); + () -> CheckpointStreamWithResultProvider.createSimpleStream( + CheckpointedStateScope.EXCLUSIVE, + primaryStreamFactory); - RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo = - new RegisteredKeyedBackendStateMetaInfo<>( - restoredMetaInfo.getStateType(), - restoredMetaInfo.getName(), - restoredMetaInfo.getNamespaceSerializer(), - restoredMetaInfo.getStateSerializer()); + final RocksDBFullSnapshotOperation<K> snapshotOperation = + new RocksDBFullSnapshotOperation<>( + RocksDBKeyedStateBackend.this, + supplier, + snapshotCloseableRegistry); - rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); + snapshotOperation.takeDBSnapShot(); - ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor); + // implementation of the async IO operation, based on FutureTask + AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = + new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() { - registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo); - rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn); + @Override + protected void acquireResources() throws Exception { + cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry); + snapshotOperation.openCheckpointStream(); + } - } else { - // TODO with eager state registration in place, check here for serializer migration strategies - } - currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0); - } - } + @Override + protected void releaseResources() throws Exception { + closeLocalRegistry(); + releaseSnapshotOperationResources(); + } - /** - * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle. - * - * @throws IOException - * @throws RocksDBException - */ - private void restoreKVStateData() throws IOException, RocksDBException { - //for all key-groups in the current state handle... - for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { - int keyGroup = keyGroupOffset.f0; + private void releaseSnapshotOperationResources() { + // hold the db lock while operation on the db to guard us against async db disposal + snapshotOperation.releaseSnapshotResources(); + } - // Check that restored key groups all belong to the backend - Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), - "The key group must belong to the backend"); + @Override + protected void stopOperation() throws Exception { + closeLocalRegistry(); + } - long offset = keyGroupOffset.f1; - //not empty key-group? - if (0L != offset) { - currentStateHandleInStream.seek(offset); - try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) { - DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - int kvStateId = compressedKgInputView.readShort(); - ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); - //insert all k/v pairs into DB - boolean keyGroupHasMoreKeys = true; - while (keyGroupHasMoreKeys) { - byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); - byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); - if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) { - //clear the signal bit in the key to make it ready for insertion again - RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key); - rocksDBKeyedStateBackend.db.put(handle, key, value); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK - & compressedKgInputView.readShort(); - if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) { - keyGroupHasMoreKeys = false; - } else { - handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); - } - } else { - rocksDBKeyedStateBackend.db.put(handle, key, value); + private void closeLocalRegistry() { + if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) { + try { + snapshotCloseableRegistry.close(); + } catch (Exception ex) { + LOG.warn("Error closing local registry", ex); } } } - } - } - } - } - private static class RocksDBIncrementalRestoreOperation<T> { + @Nonnull + @Override + public SnapshotResult<KeyedStateHandle> performOperation() throws Exception { + long startTime = System.currentTimeMillis(); - private final RocksDBKeyedStateBackend<T> stateBackend; + if (isStopped()) { + throw new IOException("RocksDB closed."); + } - private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) { - this.stateBackend = stateBackend; - } + snapshotOperation.writeDBSnapshot(); - private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData( - StreamStateHandle metaStateHandle) throws Exception { + LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", + primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); - FSDataInputStream inputStream = null; + return snapshotOperation.getSnapshotResultStateHandle(); + } + }; - try { - inputStream = metaStateHandle.openInputStream(); - stateBackend.cancelStreamRegistry.registerCloseable(inputStream); + LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.", + primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); + return AsyncStoppableTaskWithCallback.from(ioCallable); + } - KeyedBackendSerializationProxy<T> serializationProxy = - new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader); - DataInputView in = new DataInputViewStreamWrapper(inputStream); - serializationProxy.read(in); + private boolean isWithLocalRecovery( + CheckpointType checkpointType, + LocalRecoveryConfig.Loc
<TRUNCATED>
