Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168193098
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
* @param streamFactory The factory that we can use for writing our
state to streams.
* @param checkpointOptions Options for how to perform this checkpoint.
* @return Future to the state handle of the snapshot data.
- * @throws Exception
+ * @throws Exception indicating a problem in the synchronous part of
the checkpoint.
*/
@Override
- public RunnableFuture<KeyedStateHandle> snapshot(
+ public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
- if (checkpointOptions.getCheckpointType() !=
CheckpointType.SAVEPOINT &&
- enableIncrementalCheckpointing) {
- return snapshotIncrementally(checkpointId, timestamp,
streamFactory);
- } else {
- return snapshotFully(checkpointId, timestamp,
streamFactory);
- }
+ return snapshotStrategy.performSnapshot(checkpointId,
timestamp, streamFactory, checkpointOptions);
}
- private RunnableFuture<KeyedStateHandle> snapshotIncrementally(
- final long checkpointId,
- final long checkpointTimestamp,
- final CheckpointStreamFactory checkpointStreamFactory) throws
Exception {
-
- if (db == null) {
- throw new IOException("RocksDB closed.");
- }
+ @Override
+ public void restore(StateObjectCollection<KeyedStateHandle>
restoreState) throws Exception {
+ LOG.info("Initializing RocksDB keyed state backend from
snapshot.");
- if (kvStateInformation.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Asynchronous RocksDB snapshot
performed on empty keyed state at {}. Returning null.",
- checkpointTimestamp);
- }
- return DoneFuture.nullValue();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Restoring snapshot from state handles: {}.",
restoreState);
}
- final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
- new RocksDBIncrementalSnapshotOperation<>(
- this,
- checkpointStreamFactory,
- checkpointId,
- checkpointTimestamp);
+ // clear all meta data
+ kvStateInformation.clear();
+ restoredKvStateMetaInfos.clear();
try {
- snapshotOperation.takeSnapshot();
- } catch (Exception e) {
- snapshotOperation.stop();
- snapshotOperation.releaseResources(true);
- throw e;
- }
-
- return new FutureTask<KeyedStateHandle>(
- new Callable<KeyedStateHandle>() {
- @Override
- public KeyedStateHandle call() throws Exception
{
- return
snapshotOperation.materializeSnapshot();
+ if (restoreState == null || restoreState.isEmpty()) {
+ createDB();
+ } else {
+ KeyedStateHandle firstStateHandle =
restoreState.iterator().next();
+ if (firstStateHandle instanceof
IncrementalKeyedStateHandle
+ || firstStateHandle instanceof
IncrementalLocalKeyedStateHandle) {
+ RocksDBIncrementalRestoreOperation<K>
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+ restoreOperation.restore(restoreState);
+ } else {
+ RocksDBFullRestoreOperation<K>
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+
restoreOperation.doRestore(restoreState);
}
}
- ) {
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- snapshotOperation.stop();
- return super.cancel(mayInterruptIfRunning);
- }
-
- @Override
- protected void done() {
-
snapshotOperation.releaseResources(isCancelled());
- }
- };
+ } catch (Exception ex) {
+ dispose();
+ throw ex;
+ }
}
- private RunnableFuture<KeyedStateHandle> snapshotFully(
- final long checkpointId,
- final long timestamp,
- final CheckpointStreamFactory streamFactory) throws Exception {
-
- long startTime = System.currentTimeMillis();
- final CloseableRegistry snapshotCloseableRegistry = new
CloseableRegistry();
-
- final RocksDBFullSnapshotOperation<K> snapshotOperation;
-
- if (kvStateInformation.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Asynchronous RocksDB snapshot
performed on empty keyed state at {}. Returning null.", timestamp);
- }
+ @Override
+ public void notifyCheckpointComplete(long completedCheckpointId) {
- return DoneFuture.nullValue();
+ if (!enableIncrementalCheckpointing) {
+ return;
}
- snapshotOperation = new RocksDBFullSnapshotOperation<>(this,
streamFactory, snapshotCloseableRegistry);
- snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
-
- // implementation of the async IO operation, based on FutureTask
- AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable
=
- new
AbstractAsyncCallableWithResources<KeyedStateHandle>() {
-
- @Override
- protected void acquireResources() throws
Exception {
-
cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
-
snapshotOperation.openCheckpointStream();
- }
+ synchronized (materializedSstFiles) {
- @Override
- protected void releaseResources() throws
Exception {
- closeLocalRegistry();
- releaseSnapshotOperationResources();
- }
+ if (completedCheckpointId < lastCompletedCheckpointId) {
+ return;
+ }
- private void
releaseSnapshotOperationResources() {
- // hold the db lock while operation on
the db to guard us against async db disposal
-
snapshotOperation.releaseSnapshotResources();
- }
+ materializedSstFiles.keySet().removeIf(checkpointId ->
checkpointId < completedCheckpointId);
- @Override
- protected void stopOperation() throws Exception
{
- closeLocalRegistry();
- }
+ lastCompletedCheckpointId = completedCheckpointId;
+ }
+ }
- private void closeLocalRegistry() {
- if
(cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
- try {
-
snapshotCloseableRegistry.close();
- } catch (Exception ex) {
- LOG.warn("Error closing
local registry", ex);
- }
- }
- }
+ private void createDB() throws IOException {
+ List<ColumnFamilyHandle> columnFamilyHandles = new
ArrayList<>(1);
+ this.db = openDB(instanceRocksDBPath.getAbsolutePath(),
Collections.emptyList(), columnFamilyHandles);
+ this.defaultColumnFamily = columnFamilyHandles.get(0);
+ }
- @Override
- public KeyGroupsStateHandle performOperation()
throws Exception {
- long startTime =
System.currentTimeMillis();
+ private RocksDB openDB(
+ String path,
+ List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
+ List<ColumnFamilyHandle> stateColumnFamilyHandles) throws
IOException {
- if (isStopped()) {
- throw new IOException("RocksDB
closed.");
- }
+ List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+ new ArrayList<>(1 +
stateColumnFamilyDescriptors.size());
- snapshotOperation.writeDBSnapshot();
+ // we add the required descriptor for the default CF in FIRST
position, see
+ //
https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
+ columnFamilyDescriptors.add(new
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions));
+ columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
- LOG.info("Asynchronous RocksDB snapshot
({}, asynchronous part) in thread {} took {} ms.",
- streamFactory,
Thread.currentThread(), (System.currentTimeMillis() - startTime));
+ RocksDB dbRef;
- return
snapshotOperation.getSnapshotResultStateHandle();
- }
- };
+ try {
+ dbRef = RocksDB.open(
+ Preconditions.checkNotNull(dbOptions),
+ Preconditions.checkNotNull(path),
+ columnFamilyDescriptors,
+ stateColumnFamilyHandles);
+ } catch (RocksDBException e) {
+ throw new IOException("Error while opening RocksDB
instance.", e);
+ }
- LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part)
in thread {} took {} ms.",
- streamFactory, Thread.currentThread(),
(System.currentTimeMillis() - startTime));
+ // requested + default CF
+ Preconditions.checkState(1 +
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
+ "Not all requested column family handles have been
created");
- return AsyncStoppableTaskWithCallback.from(ioCallable);
+ return dbRef;
}
/**
- * Encapsulates the process to perform a snapshot of a
RocksDBKeyedStateBackend.
+ * Encapsulates the process of restoring a RocksDBKeyedStateBackend
from a full snapshot.
*/
- static final class RocksDBFullSnapshotOperation<K> {
-
- static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
- static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
-
- private final RocksDBKeyedStateBackend<K> stateBackend;
- private final KeyGroupRangeOffsets keyGroupRangeOffsets;
- private final CheckpointStreamFactory checkpointStreamFactory;
- private final CloseableRegistry snapshotCloseableRegistry;
- private final ResourceGuard.Lease dbLease;
-
- private long checkpointId;
- private long checkpointTimeStamp;
-
- private Snapshot snapshot;
- private ReadOptions readOptions;
- private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
-
- private CheckpointStreamFactory.CheckpointStateOutputStream
outStream;
- private DataOutputView outputView;
+ private static final class RocksDBFullRestoreOperation<K> {
- RocksDBFullSnapshotOperation(
- RocksDBKeyedStateBackend<K> stateBackend,
- CheckpointStreamFactory checkpointStreamFactory,
- CloseableRegistry registry) throws IOException {
+ private final RocksDBKeyedStateBackend<K>
rocksDBKeyedStateBackend;
- this.stateBackend = stateBackend;
- this.checkpointStreamFactory = checkpointStreamFactory;
- this.keyGroupRangeOffsets = new
KeyGroupRangeOffsets(stateBackend.keyGroupRange);
- this.snapshotCloseableRegistry = registry;
- this.dbLease =
this.stateBackend.rocksDBResourceGuard.acquireResource();
- }
+ /** Current key-groups state handle from which we restore
key-groups. */
+ private KeyGroupsStateHandle currentKeyGroupsStateHandle;
+ /** Current input stream we obtained from
currentKeyGroupsStateHandle. */
+ private FSDataInputStream currentStateHandleInStream;
+ /** Current data input view that wraps
currentStateHandleInStream. */
+ private DataInputView currentStateHandleInView;
+ /** Current list of ColumnFamilyHandles for all column families
we restore from currentKeyGroupsStateHandle. */
+ private List<ColumnFamilyHandle>
currentStateHandleKVStateColumnFamilies;
+ /** The compression decorator that was used for writing the
state, as determined by the meta data. */
+ private StreamCompressionDecorator
keygroupStreamCompressionDecorator;
/**
- * 1) Create a snapshot object from RocksDB.
+ * Creates a restore operation object for the given state
backend instance.
*
- * @param checkpointId id of the checkpoint for which we take
the snapshot
- * @param checkpointTimeStamp timestamp of the checkpoint for
which we take the snapshot
+ * @param rocksDBKeyedStateBackend the state backend into which
we restore
*/
- public void takeDBSnapShot(long checkpointId, long
checkpointTimeStamp) {
- Preconditions.checkArgument(snapshot == null, "Only one
ongoing snapshot allowed!");
- this.kvStateIterators = new
ArrayList<>(stateBackend.kvStateInformation.size());
- this.checkpointId = checkpointId;
- this.checkpointTimeStamp = checkpointTimeStamp;
- this.snapshot = stateBackend.db.getSnapshot();
+ public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K>
rocksDBKeyedStateBackend) {
+ this.rocksDBKeyedStateBackend =
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
}
/**
- * 2) Open CheckpointStateOutputStream through the
checkpointStreamFactory into which we will write.
+ * Restores all key-groups data that is referenced by the
passed state handles.
*
- * @throws Exception
+ * @param keyedStateHandles List of all key groups state
handles that shall be restored.
*/
- public void openCheckpointStream() throws Exception {
- Preconditions.checkArgument(outStream == null, "Output
stream for snapshot is already set.");
- outStream =
checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
- snapshotCloseableRegistry.registerCloseable(outStream);
- outputView = new DataOutputViewStreamWrapper(outStream);
- }
+ public void doRestore(Collection<KeyedStateHandle>
keyedStateHandles)
+ throws IOException, StateMigrationException,
RocksDBException {
- /**
- * 3) Write the actual data from RocksDB from the time we took
the snapshot object in (1).
- *
- * @throws IOException
- */
- public void writeDBSnapshot() throws IOException,
InterruptedException {
+ rocksDBKeyedStateBackend.createDB();
- if (null == snapshot) {
- throw new IOException("No snapshot available.
Might be released due to cancellation.");
- }
+ for (KeyedStateHandle keyedStateHandle :
keyedStateHandles) {
+ if (keyedStateHandle != null) {
- Preconditions.checkNotNull(outStream, "No output stream
to write snapshot.");
- writeKVStateMetaData();
- writeKVStateData();
+ if (!(keyedStateHandle instanceof
KeyGroupsStateHandle)) {
+ throw new
IllegalStateException("Unexpected state handle type, " +
+ "expected: " +
KeyGroupsStateHandle.class +
+ ", but found: " +
keyedStateHandle.getClass());
+ }
+ this.currentKeyGroupsStateHandle =
(KeyGroupsStateHandle) keyedStateHandle;
+ restoreKeyGroupsInStateHandle();
+ }
+ }
}
/**
- * 4) Returns a state handle to the snapshot after the snapshot
procedure is completed and null before.
- *
- * @return state handle to the completed snapshot
+ * Restore one key groups state handle.
*/
- public KeyGroupsStateHandle getSnapshotResultStateHandle()
throws IOException {
-
- if
(snapshotCloseableRegistry.unregisterCloseable(outStream)) {
-
- StreamStateHandle stateHandle =
outStream.closeAndGetHandle();
- outStream = null;
-
- if (stateHandle != null) {
- return new
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+ private void restoreKeyGroupsInStateHandle()
+ throws IOException, StateMigrationException,
RocksDBException {
+ try {
+ currentStateHandleInStream =
currentKeyGroupsStateHandle.openInputStream();
+
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
+ currentStateHandleInView = new
DataInputViewStreamWrapper(currentStateHandleInStream);
+ restoreKVStateMetaData();
+ restoreKVStateData();
+ } finally {
+ if
(rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
{
+
IOUtils.closeQuietly(currentStateHandleInStream);
}
}
- return null;
}
/**
- * 5) Release the snapshot object for RocksDB and clean up.
+ * Restore the KV-state / ColumnFamily meta data for all
key-groups referenced by the current state handle.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws RocksDBException
*/
- public void releaseSnapshotResources() {
+ private void restoreKVStateMetaData() throws IOException,
StateMigrationException, RocksDBException {
- outStream = null;
+ KeyedBackendSerializationProxy<K> serializationProxy =
+ new
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
- if (null != kvStateIterators) {
- for (Tuple2<RocksIterator, Integer>
kvStateIterator : kvStateIterators) {
-
IOUtils.closeQuietly(kvStateIterator.f0);
- }
- kvStateIterators = null;
- }
+ serializationProxy.read(currentStateHandleInView);
- if (null != snapshot) {
- if (null != stateBackend.db) {
-
stateBackend.db.releaseSnapshot(snapshot);
- }
- IOUtils.closeQuietly(snapshot);
- snapshot = null;
- }
+ // check for key serializer compatibility; this also
reconfigures the
+ // key serializer to be compatible, if it is required
and is possible
+ if (CompatibilityUtil.resolveCompatibilityResult(
+ serializationProxy.getKeySerializer(),
+ UnloadableDummyTypeSerializer.class,
+
serializationProxy.getKeySerializerConfigSnapshot(),
+ rocksDBKeyedStateBackend.keySerializer)
+ .isRequiresMigration()) {
- if (null != readOptions) {
- IOUtils.closeQuietly(readOptions);
- readOptions = null;
+ // TODO replace with state migration; note that
key hash codes need to remain the same after migration
+ throw new StateMigrationException("The new key
serializer is not compatible to read previous keys. " +
+ "Aborting now since state migration is
currently not available");
}
- this.dbLease.close();
- }
-
- private void writeKVStateMetaData() throws IOException {
+ this.keygroupStreamCompressionDecorator =
serializationProxy.isUsingKeyGroupCompression() ?
+ SnappyStreamCompressionDecorator.INSTANCE :
UncompressedStreamCompressionDecorator.INSTANCE;
- List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> metaInfoSnapshots =
- new
ArrayList<>(stateBackend.kvStateInformation.size());
+ List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> restoredMetaInfos =
+ serializationProxy.getStateMetaInfoSnapshots();
+ currentStateHandleKVStateColumnFamilies = new
ArrayList<>(restoredMetaInfos.size());
+ //rocksDBKeyedStateBackend.restoredKvStateMetaInfos =
new HashMap<>(restoredMetaInfos.size());
- int kvStateId = 0;
- for (Map.Entry<String, Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
- stateBackend.kvStateInformation.entrySet()) {
+ for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>
restoredMetaInfo : restoredMetaInfos) {
-
metaInfoSnapshots.add(column.getValue().f1.snapshot());
+ Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
+
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
- //retrieve iterator for this k/v states
- readOptions = new ReadOptions();
- readOptions.setSnapshot(snapshot);
+ if (registeredColumn == null) {
+ byte[] nameBytes =
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
- kvStateIterators.add(
- new
Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions),
kvStateId));
+ ColumnFamilyDescriptor
columnFamilyDescriptor = new ColumnFamilyDescriptor(
+ nameBytes,
+
rocksDBKeyedStateBackend.columnOptions);
- ++kvStateId;
- }
+ RegisteredKeyedBackendStateMetaInfo<?,
?> stateMetaInfo =
+ new
RegisteredKeyedBackendStateMetaInfo<>(
+
restoredMetaInfo.getStateType(),
+
restoredMetaInfo.getName(),
+
restoredMetaInfo.getNamespaceSerializer(),
+
restoredMetaInfo.getStateSerializer());
- KeyedBackendSerializationProxy<K> serializationProxy =
- new KeyedBackendSerializationProxy<>(
- stateBackend.getKeySerializer(),
- metaInfoSnapshots,
-
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE,
stateBackend.keyGroupCompressionDecorator));
+
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
restoredMetaInfo);
- serializationProxy.write(outputView);
- }
+ ColumnFamilyHandle columnFamily =
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
- private void writeKVStateData() throws IOException,
InterruptedException {
+ registeredColumn = new
Tuple2<>(columnFamily, stateMetaInfo);
+
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(),
registeredColumn);
- byte[] previousKey = null;
- byte[] previousValue = null;
- OutputStream kgOutStream = null;
- DataOutputView kgOutView = null;
+ } else {
+ // TODO with eager state registration
in place, check here for serializer migration strategies
+ }
+
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
+ }
+ }
- try {
- // Here we transfer ownership of RocksIterators
to the RocksDBMergeIterator
- try (RocksDBMergeIterator mergeIterator = new
RocksDBMergeIterator(
- kvStateIterators,
stateBackend.keyGroupPrefixBytes)) {
+ /**
+ * Restore the KV-state / ColumnFamily data for all key-groups
referenced by the current state handle.
+ *
+ * @throws IOException
+ * @throws RocksDBException
+ */
+ private void restoreKVStateData() throws IOException,
RocksDBException {
+ //for all key-groups in the current state handle...
+ for (Tuple2<Integer, Long> keyGroupOffset :
currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
+ int keyGroup = keyGroupOffset.f0;
- // handover complete, null out to
prevent double close
- kvStateIterators = null;
+ // Check that restored key groups all belong to
the backend
+
Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
+ "The key group must belong to the
backend");
- //preamble: setup with first key-group
as our lookahead
- if (mergeIterator.isValid()) {
- //begin first key-group by
recording the offset
-
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(),
outStream.getPos());
- //write the k/v-state id as
metadata
- kgOutStream =
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
- kgOutView = new
DataOutputViewStreamWrapper(kgOutStream);
+ long offset = keyGroupOffset.f1;
+ //not empty key-group?
+ if (0L != offset) {
+ currentStateHandleInStream.seek(offset);
+ try (InputStream compressedKgIn =
keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
{
+ DataInputViewStreamWrapper
compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
//TODO this could be aware of
keyGroupPrefixBytes and write only one byte if possible
-
kgOutView.writeShort(mergeIterator.kvStateId());
- previousKey =
mergeIterator.key();
- previousValue =
mergeIterator.value();
- mergeIterator.next();
+ int kvStateId =
compressedKgInputView.readShort();
+ ColumnFamilyHandle handle =
currentStateHandleKVStateColumnFamilies.get(kvStateId);
+ //insert all k/v pairs into DB
+ boolean keyGroupHasMoreKeys =
true;
+ while (keyGroupHasMoreKeys) {
+ byte[] key =
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+ byte[] value =
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+ if
(RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
+ //clear the
signal bit in the key to make it ready for insertion again
+
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
+
rocksDBKeyedStateBackend.db.put(handle, key, value);
+ //TODO this
could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kvStateId =
RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
+ &
compressedKgInputView.readShort();
+ if
(RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
+
keyGroupHasMoreKeys = false;
+ } else {
+ handle
= currentStateHandleKVStateColumnFamilies.get(kvStateId);
+ }
+ } else {
+
rocksDBKeyedStateBackend.db.put(handle, key, value);
+ }
+ }
}
+ }
+ }
+ }
+ }
- //main loop: write k/v pairs ordered by
(key-group, kv-state), thereby tracking key-group offsets.
- while (mergeIterator.isValid()) {
+ /**
+ * Encapsulates the process of restoring a RocksDBKeyedStateBackend
from an incremental snapshot.
+ */
+ private static class RocksDBIncrementalRestoreOperation<T> {
- assert
(!hasMetaDataFollowsFlag(previousKey));
+ private final RocksDBKeyedStateBackend<T> stateBackend;
- //set signal in first key byte
that meta data will follow in the stream after this k/v pair
- if
(mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
+ private
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
+ this.stateBackend = stateBackend;
+ }
- //be cooperative and
check for interruption from time to time in the hot loop
- checkInterrupted();
+ /**
+ * Root method that branches for different implementations of
{@link KeyedStateHandle}.
+ */
+ void restore(Collection<KeyedStateHandle> restoreStateHandles)
throws Exception {
-
setMetaDataFollowsFlagInKey(previousKey);
- }
+ boolean hasExtraKeys = (restoreStateHandles.size() > 1
||
+
!Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(),
stateBackend.keyGroupRange));
- writeKeyValuePair(previousKey,
previousValue, kgOutView);
+ if (hasExtraKeys) {
+ stateBackend.createDB();
+ }
- //write meta data if we have to
- if
(mergeIterator.isNewKeyGroup()) {
- //TODO this could be
aware of keyGroupPrefixBytes and write only one byte if possible
-
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
- // this will just close
the outer stream
- kgOutStream.close();
- //begin new key-group
-
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(),
outStream.getPos());
- //write the kev-state
- //TODO this could be
aware of keyGroupPrefixBytes and write only one byte if possible
- kgOutStream =
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
- kgOutView = new
DataOutputViewStreamWrapper(kgOutStream);
-
kgOutView.writeShort(mergeIterator.kvStateId());
- } else if
(mergeIterator.isNewKeyValueState()) {
- //write the k/v-state
- //TODO this could be
aware of keyGroupPrefixBytes and write only one byte if possible
-
kgOutView.writeShort(mergeIterator.kvStateId());
- }
+ for (KeyedStateHandle rawStateHandle :
restoreStateHandles) {
- //request next k/v pair
- previousKey =
mergeIterator.key();
- previousValue =
mergeIterator.value();
- mergeIterator.next();
- }
+ if (rawStateHandle instanceof
IncrementalKeyedStateHandle) {
+
restoreInstance((IncrementalKeyedStateHandle) rawStateHandle, hasExtraKeys);
+ } else if (rawStateHandle instanceof
IncrementalLocalKeyedStateHandle) {
+ Preconditions.checkState(!hasExtraKeys,
"Cannot recover from local state after rescaling.");
+
restoreInstance((IncrementalLocalKeyedStateHandle) rawStateHandle);
+ } else {
+ throw new
IllegalStateException("Unexpected state handle type, " +
+ "expected " +
IncrementalKeyedStateHandle.class +
+ ", but found " +
rawStateHandle.getClass());
}
+ }
+ }
- //epilogue: write last key-group
- if (previousKey != null) {
- assert
(!hasMetaDataFollowsFlag(previousKey));
-
setMetaDataFollowsFlagInKey(previousKey);
- writeKeyValuePair(previousKey,
previousValue, kgOutView);
- //TODO this could be aware of
keyGroupPrefixBytes and write only one byte if possible
-
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
- // this will just close the outer stream
- kgOutStream.close();
- kgOutStream = null;
- }
+ /**
+ * Recovery from remote incremental state.
+ */
+ private void restoreInstance(
+ IncrementalKeyedStateHandle restoreStateHandle,
+ boolean hasExtraKeys) throws Exception {
+
+ // read state data
+ Path temporaryRestoreInstancePath = new Path(
+ stateBackend.instanceBasePath.getAbsolutePath(),
+ UUID.randomUUID().toString());
+
+ try {
+
+
transferAllStateDataToDirectory(restoreStateHandle,
temporaryRestoreInstancePath);
+ // read meta data
+
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots
=
+
readMetaData(restoreStateHandle.getMetaStateHandle());
+
+ List<ColumnFamilyDescriptor>
columnFamilyDescriptors =
+
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
+
+ if (hasExtraKeys) {
+
restoreKeyGroupsShardWithTemporaryHelperInstance(
+ temporaryRestoreInstancePath,
+ columnFamilyDescriptors,
+ stateMetaInfoSnapshots);
+ } else {
+
+ // since we transferred all remote
state to a local directory, we can use the same code as for
+ // local recovery.
+ IncrementalLocalKeyedStateHandle
localKeyedStateHandle = new IncrementalLocalKeyedStateHandle(
+
restoreStateHandle.getBackendIdentifier(),
+
restoreStateHandle.getCheckpointId(),
+ new
DirectoryStateHandle(temporaryRestoreInstancePath),
+
restoreStateHandle.getKeyGroupRange(),
+
restoreStateHandle.getMetaStateHandle(),
+
restoreStateHandle.getSharedState().keySet());
+
+ restoreLocalStateIntoFullInstance(
+ localKeyedStateHandle,
+ columnFamilyDescriptors,
+ stateMetaInfoSnapshots);
+ }
} finally {
- // this will just close the outer stream
- IOUtils.closeQuietly(kgOutStream);
+ FileSystem restoreFileSystem =
temporaryRestoreInstancePath.getFileSystem();
+ if
(restoreFileSystem.exists(temporaryRestoreInstancePath)) {
+
restoreFileSystem.delete(temporaryRestoreInstancePath, true);
+ }
}
}
- private void writeKeyValuePair(byte[] key, byte[] value,
DataOutputView out) throws IOException {
- BytePrimitiveArraySerializer.INSTANCE.serialize(key,
out);
- BytePrimitiveArraySerializer.INSTANCE.serialize(value,
out);
- }
+ /**
+ * Recovery from local incremental state.
+ */
+ private void restoreInstance(IncrementalLocalKeyedStateHandle
localKeyedStateHandle) throws Exception {
+ // read meta data
+ List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> stateMetaInfoSnapshots =
+
readMetaData(localKeyedStateHandle.getMetaDataState());
- static void setMetaDataFollowsFlagInKey(byte[] key) {
- key[0] |= FIRST_BIT_IN_BYTE_MASK;
- }
+ List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
- static void clearMetaDataFollowsFlag(byte[] key) {
- key[0] &=
(~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
+ restoreLocalStateIntoFullInstance(
+ localKeyedStateHandle,
+ columnFamilyDescriptors,
+ stateMetaInfoSnapshots);
}
- static boolean hasMetaDataFollowsFlag(byte[] key) {
- return 0 != (key[0] &
RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
- }
+ /**
+ * This method recreates and registers all {@link
ColumnFamilyDescriptor} from Flink's state meta data snapshot.
+ */
+ private List<ColumnFamilyDescriptor>
createAndRegisterColumnFamilyDescriptors(
+ List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> stateMetaInfoSnapshots) {
- private static void checkInterrupted() throws
InterruptedException {
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException("RocksDB
snapshot interrupted.");
+ List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+ new ArrayList<>(1 +
stateMetaInfoSnapshots.size());
+
+ for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>
stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
+
+ ColumnFamilyDescriptor columnFamilyDescriptor =
new ColumnFamilyDescriptor(
+
stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+ stateBackend.columnOptions);
+
+
columnFamilyDescriptors.add(columnFamilyDescriptor);
+
stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(),
stateMetaInfoSnapshot);
}
+ return columnFamilyDescriptors;
}
- }
- private static final class RocksDBIncrementalSnapshotOperation<K> {
+ /**
+ * This method implements the core of the restore logic that
unifies how local and remote state are recovered.
+ */
+ private void restoreLocalStateIntoFullInstance(
+ IncrementalLocalKeyedStateHandle restoreStateHandle,
+ List<ColumnFamilyDescriptor> columnFamilyDescriptors,
+ List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> stateMetaInfoSnapshots) throws Exception {
+ // pick up again the old backend id, so the we can
reference existing state
+ stateBackend.backendUID =
restoreStateHandle.getBackendIdentifier();
+
+ LOG.debug("Restoring keyed backend uid in operator {}
from incremental snapshot to {}.",
+ stateBackend.operatorIdentifier,
stateBackend.backendUID);
+
+ // create hard links in the instance directory
+ if (!stateBackend.instanceRocksDBPath.mkdirs()) {
+ throw new IOException("Could not create RocksDB
data directory.");
+ }
- /** The backend which we snapshot. */
- private final RocksDBKeyedStateBackend<K> stateBackend;
+ Path restoreSourcePath =
restoreStateHandle.getDirectoryStateHandle().getDirectory();
+ restoreInstanceDirectoryFromPath(restoreSourcePath);
- /** Stream factory that creates the outpus streams to DFS. */
- private final CheckpointStreamFactory checkpointStreamFactory;
+ List<ColumnFamilyHandle> columnFamilyHandles =
+ new ArrayList<>(1 +
columnFamilyDescriptors.size());
- /** Id for the current checkpoint. */
- private final long checkpointId;
+ stateBackend.db = stateBackend.openDB(
+
stateBackend.instanceRocksDBPath.getAbsolutePath(),
+ columnFamilyDescriptors, columnFamilyHandles);
- /** Timestamp for the current checkpoint. */
- private final long checkpointTimestamp;
+ // extract and store the default column family which is
located at the first index
+ stateBackend.defaultColumnFamily =
columnFamilyHandles.remove(0);
- /** All sst files that were part of the last previously
completed checkpoint. */
- private Set<StateHandleID> baseSstFiles;
+ for (int i = 0; i < columnFamilyDescriptors.size();
++i) {
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
- /** The state meta data. */
- private final
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots
= new ArrayList<>();
+ ColumnFamilyHandle columnFamilyHandle =
columnFamilyHandles.get(i);
+ RegisteredKeyedBackendStateMetaInfo<?, ?>
stateMetaInfo =
+ new
RegisteredKeyedBackendStateMetaInfo<>(
+
stateMetaInfoSnapshot.getStateType(),
+ stateMetaInfoSnapshot.getName(),
+
stateMetaInfoSnapshot.getNamespaceSerializer(),
+
stateMetaInfoSnapshot.getStateSerializer());
- private FileSystem backupFileSystem;
- private Path backupPath;
+ stateBackend.kvStateInformation.put(
+ stateMetaInfoSnapshot.getName(),
+ new Tuple2<>(columnFamilyHandle,
stateMetaInfo));
+ }
- // Registry for all opened i/o streams
- private final CloseableRegistry closeableRegistry = new
CloseableRegistry();
+ // use the restore sst files as the base for succeeding
checkpoints
+ synchronized (stateBackend.materializedSstFiles) {
+ stateBackend.materializedSstFiles.put(
+ restoreStateHandle.getCheckpointId(),
+
restoreStateHandle.getSharedStateHandleIDs());
+ }
- // new sst files since the last completed checkpoint
- private final Map<StateHandleID, StreamStateHandle> sstFiles =
new HashMap<>();
+ stateBackend.lastCompletedCheckpointId =
restoreStateHandle.getCheckpointId();
+ }
- // handles to the misc files in the current snapshot
- private final Map<StateHandleID, StreamStateHandle> miscFiles =
new HashMap<>();
+ /**
+ * This recreates the new working directory of the recovered
RocksDB instance and links/copies the contents from
+ * a local state.
+ */
+ private void restoreInstanceDirectoryFromPath(Path source)
throws IOException {
- // This lease protects from concurrent disposal of the native
rocksdb instance.
- private final ResourceGuard.Lease dbLease;
+ FileSystem fileSystem = source.getFileSystem();
- private StreamStateHandle metaStateHandle = null;
+ final FileStatus[] fileStatuses =
fileSystem.listStatus(source);
- private RocksDBIncrementalSnapshotOperation(
- RocksDBKeyedStateBackend<K> stateBackend,
- CheckpointStreamFactory checkpointStreamFactory,
- long checkpointId,
- long checkpointTimestamp) throws IOException {
+ if (fileStatuses == null) {
+ throw new IOException("Cannot list file
statues. Directory " + source + " does not exist.");
+ }
- this.stateBackend = stateBackend;
- this.checkpointStreamFactory = checkpointStreamFactory;
- this.checkpointId = checkpointId;
- this.checkpointTimestamp = checkpointTimestamp;
- this.dbLease =
this.stateBackend.rocksDBResourceGuard.acquireResource();
+ for (FileStatus fileStatus : fileStatuses) {
+ final Path filePath = fileStatus.getPath();
+ final String fileName = filePath.getName();
+ File restoreFile = new File(source.getPath(),
fileName);
+ File targetFile = new
File(stateBackend.instanceRocksDBPath.getPath(), fileName);
+ if (fileName.endsWith(SST_FILE_SUFFIX)) {
+ // hardlink'ing the immutable sst-files.
+ Files.createLink(targetFile.toPath(),
restoreFile.toPath());
+ } else {
+ // true copy for all other files.
+ Files.copy(restoreFile.toPath(),
targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ }
+ }
}
- private StreamStateHandle materializeStateData(Path filePath)
throws Exception {
+ /**
+ * Reads Flink's state meta data file from the state handle.
+ */
+ private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> readMetaData(
+ StreamStateHandle metaStateHandle) throws Exception {
+
FSDataInputStream inputStream = null;
- CheckpointStreamFactory.CheckpointStateOutputStream
outputStream = null;
try {
- final byte[] buffer = new byte[8 * 1024];
-
- FileSystem backupFileSystem =
backupPath.getFileSystem();
- inputStream = backupFileSystem.open(filePath);
-
closeableRegistry.registerCloseable(inputStream);
-
- outputStream = checkpointStreamFactory
-
.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
-
closeableRegistry.registerCloseable(outputStream);
-
- while (true) {
- int numBytes = inputStream.read(buffer);
+ inputStream = metaStateHandle.openInputStream();
+
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
- if (numBytes == -1) {
- break;
- }
+ KeyedBackendSerializationProxy<T>
serializationProxy =
+ new
KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
+ DataInputView in = new
DataInputViewStreamWrapper(inputStream);
+ serializationProxy.read(in);
- outputStream.write(buffer, 0, numBytes);
- }
+ // check for key serializer compatibility; this
also reconfigures the
+ // key serializer to be compatible, if it is
required and is possible
+ if
(CompatibilityUtil.resolveCompatibilityResult(
+ serializationProxy.getKeySerializer(),
+ UnloadableDummyTypeSerializer.class,
+
serializationProxy.getKeySerializerConfigSnapshot(),
+ stateBackend.keySerializer)
+ .isRequiresMigration()) {
- StreamStateHandle result = null;
- if
(closeableRegistry.unregisterCloseable(outputStream)) {
- result =
outputStream.closeAndGetHandle();
- outputStream = null;
+ // TODO replace with state migration;
note that key hash codes need to remain the same after migration
+ throw new StateMigrationException("The
new key serializer is not compatible to read previous keys. " +
+ "Aborting now since state
migration is currently not available");
}
- return result;
+ return
serializationProxy.getStateMetaInfoSnapshots();
} finally {
- if (inputStream != null &&
closeableRegistry.unregisterCloseable(inputStream)) {
+ if
(stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}
-
- if (outputStream != null &&
closeableRegistry.unregisterCloseable(outputStream)) {
- outputStream.close();
- }
}
}
- private StreamStateHandle materializeMetaData() throws
Exception {
- CheckpointStreamFactory.CheckpointStateOutputStream
outputStream = null;
-
- try {
- outputStream = checkpointStreamFactory
-
.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
-
closeableRegistry.registerCloseable(outputStream);
+ private void transferAllStateDataToDirectory(
+ IncrementalKeyedStateHandle restoreStateHandle,
+ Path dest) throws IOException {
- //no need for compression scheme support
because sst-files are already compressed
- KeyedBackendSerializationProxy<K>
serializationProxy =
- new KeyedBackendSerializationProxy<>(
- stateBackend.keySerializer,
- stateMetaInfoSnapshots,
- false);
+ final Map<StateHandleID, StreamStateHandle> sstFiles =
+ restoreStateHandle.getSharedState();
+ final Map<StateHandleID, StreamStateHandle> miscFiles =
+ restoreStateHandle.getPrivateState();
- DataOutputView out = new
DataOutputViewStreamWrapper(outputStream);
+ transferAllDataFromStateHandles(sstFiles, dest);
+ transferAllDataFromStateHandles(miscFiles, dest);
+ }
- serializationProxy.write(out);
+ /**
+ * Copies all the files from the given stream state handles to
the given path, renaming the files w.r.t. their
+ * {@link StateHandleID}.
+ */
+ private void transferAllDataFromStateHandles(
+ Map<StateHandleID, StreamStateHandle> stateHandleMap,
+ Path restoreInstancePath) throws IOException {
- StreamStateHandle result = null;
- if
(closeableRegistry.unregisterCloseable(outputStream)) {
- result =
outputStream.closeAndGetHandle();
- outputStream = null;
- }
- return result;
- } finally {
- if (outputStream != null) {
- if
(closeableRegistry.unregisterCloseable(outputStream)) {
- outputStream.close();
- }
- }
+ for (Map.Entry<StateHandleID, StreamStateHandle> entry
: stateHandleMap.entrySet()) {
+ StateHandleID stateHandleID = entry.getKey();
+ StreamStateHandle remoteFileHandle =
entry.getValue();
+ copyStateDataHandleData(new
Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
}
}
- void takeSnapshot() throws Exception {
+ /**
+ * Copies the file from a single state handle to the given path.
+ */
+ private void copyStateDataHandleData(
+ Path restoreFilePath,
+ StreamStateHandle remoteFileHandle) throws IOException {
- final long lastCompletedCheckpoint;
+ FileSystem restoreFileSystem =
restoreFilePath.getFileSystem();
- // use the last completed checkpoint as the comparison
base.
- synchronized (stateBackend.materializedSstFiles) {
- lastCompletedCheckpoint =
stateBackend.lastCompletedCheckpointId;
- baseSstFiles =
stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
- }
+ FSDataInputStream inputStream = null;
+ FSDataOutputStream outputStream = null;
- LOG.trace("Taking incremental snapshot for checkpoint
{}. Snapshot is based on last completed checkpoint {} " +
- "assuming the following (shared) files as base:
{}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
+ try {
+ inputStream =
remoteFileHandle.openInputStream();
+
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
- // save meta data
- for (Map.Entry<String, Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
- : stateBackend.kvStateInformation.entrySet()) {
-
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
- }
+ outputStream =
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+
stateBackend.cancelStreamRegistry.registerCloseable(outputStream);
- // save state data
- backupPath = new
Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
+ byte[] buffer = new byte[8 * 1024];
+ while (true) {
+ int numBytes = inputStream.read(buffer);
+ if (numBytes == -1) {
+ break;
+ }
- LOG.trace("Local RocksDB checkpoint goes to backup path
{}.", backupPath);
+ outputStream.write(buffer, 0, numBytes);
+ }
+ } finally {
+ if
(stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
+ inputStream.close();
+ }
- backupFileSystem = backupPath.getFileSystem();
- if (backupFileSystem.exists(backupPath)) {
- throw new IllegalStateException("Unexpected
existence of the backup directory.");
+ if
(stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
+ outputStream.close();
+ }
}
-
- // create hard links of living files in the checkpoint
path
- Checkpoint checkpoint =
Checkpoint.create(stateBackend.db);
- checkpoint.createCheckpoint(backupPath.getPath());
}
- KeyedStateHandle materializeSnapshot() throws Exception {
+ /**
+ * In case of rescaling, this method creates a temporary
RocksDB instance for a key-groups shard. All contents
+ * from the temporary instance are copied into the real restore
instance and then the temporary instance is
+ * discarded.
+ */
+ private void restoreKeyGroupsShardWithTemporaryHelperInstance(
+ Path restoreInstancePath,
+ List<ColumnFamilyDescriptor> columnFamilyDescriptors,
+ List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> stateMetaInfoSnapshots) throws Exception {
-
stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
+ List<ColumnFamilyHandle> columnFamilyHandles =
+ new ArrayList<>(1 +
columnFamilyDescriptors.size());
- // write meta data
- metaStateHandle = materializeMetaData();
+ try (RocksDB restoreDb = stateBackend.openDB(
+ restoreInstancePath.getPath(),
+ columnFamilyDescriptors,
+ columnFamilyHandles)) {
- // write state data
-
Preconditions.checkState(backupFileSystem.exists(backupPath));
+ final ColumnFamilyHandle defaultColumnFamily =
columnFamilyHandles.remove(0);
- FileStatus[] fileStatuses =
backupFileSystem.listStatus(backupPath);
- if (fileStatuses != null) {
- for (FileStatus fileStatus : fileStatuses) {
- final Path filePath =
fileStatus.getPath();
- final String fileName =
filePath.getName();
- final StateHandleID stateHandleID = new
StateHandleID(fileName);
+
Preconditions.checkState(columnFamilyHandles.size() ==
columnFamilyDescriptors.size());
- if (fileName.endsWith(SST_FILE_SUFFIX))
{
- final boolean existsAlready =
- baseSstFiles != null &&
baseSstFiles.contains(stateHandleID);
+ try {
+ for (int i = 0; i <
columnFamilyDescriptors.size(); ++i) {
+ ColumnFamilyHandle
columnFamilyHandle = columnFamilyHandles.get(i);
+ ColumnFamilyDescriptor
columnFamilyDescriptor = columnFamilyDescriptors.get(i);
+
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot =
stateMetaInfoSnapshots.get(i);
- if (existsAlready) {
- // we introduce a
placeholder state handle, that is replaced with the
- // original from the
shared state registry (created from a previous checkpoint)
- sstFiles.put(
- stateHandleID,
- new
PlaceholderStreamStateHandle());
- } else {
-
sstFiles.put(stateHandleID, materializeStateData(filePath));
- }
- } else {
- StreamStateHandle fileHandle =
materializeStateData(filePath);
- miscFiles.put(stateHandleID,
fileHandle);
- }
- }
- }
+ Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
+
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
- synchronized (stateBackend.materializedSstFiles) {
-
stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
- }
+ if (null ==
registeredStateMetaInfoEntry) {
- return new IncrementalKeyedStateHandle(
- stateBackend.backendUID,
- stateBackend.keyGroupRange,
- checkpointId,
- sstFiles,
- miscFiles,
- metaStateHandle);
- }
+
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
+ new
RegisteredKeyedBackendStateMetaInfo<>(
+
stateMetaInfoSnapshot.getStateType(),
+
stateMetaInfoSnapshot.getName(),
+
stateMetaInfoSnapshot.getNamespaceSerializer(),
+
stateMetaInfoSnapshot.getStateSerializer());
- void stop() {
+
registeredStateMetaInfoEntry =
+ new Tuple2<>(
+
stateBackend.db.createColumnFamily(columnFamilyDescriptor),
+
stateMetaInfo);
- if
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
- try {
- closeableRegistry.close();
- } catch (IOException e) {
- LOG.warn("Could not properly close io
streams.", e);
- }
- }
- }
+
stateBackend.kvStateInformation.put(
+
stateMetaInfoSnapshot.getName(),
+
registeredStateMetaInfoEntry);
+ }
- void releaseResources(boolean canceled) {
+ ColumnFamilyHandle
targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
- dbLease.close();
+ try (RocksIterator iterator =
restoreDb.newIterator(columnFamilyHandle)) {
- if
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
- try {
- closeableRegistry.close();
- } catch (IOException e) {
- LOG.warn("Exception on closing
registry.", e);
- }
- }
+ int startKeyGroup =
stateBackend.getKeyGroupRange().getStartKeyGroup();
+ byte[]
startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
+ for (int j = 0; j <
stateBackend.keyGroupPrefixBytes; ++j) {
+
startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>>
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
+ }
- if (backupPath != null) {
- try {
- if
(backupFileSystem.exists(backupPath)) {
+
iterator.seek(startKeyGroupPrefixBytes);
- LOG.trace("Deleting local
RocksDB backup path {}.", backupPath);
-
backupFileSystem.delete(backupPath, true);
- }
- } catch (Exception e) {
- LOG.warn("Could not properly delete the
checkpoint directory.", e);
- }
- }
+ while
(iterator.isValid()) {
- if (canceled) {
- Collection<StateObject> statesToDiscard =
- new ArrayList<>(1 + miscFiles.size() +
sstFiles.size());
+ int keyGroup =
0;
+ for (int j = 0;
j < stateBackend.keyGroupPrefixBytes; ++j) {
+
keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
+ }
- statesToDiscard.add(metaStateHandle);
- statesToDiscard.addAll(miscFiles.values());
- statesToDiscard.addAll(sstFiles.values());
+ if
(stateBackend.keyGroupRange.contains(keyGroup)) {
+
stateBackend.db.put(targetColumnFamilyHandle,
+
iterator.key(), iterator.value());
+ }
- try {
-
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
- } catch (Exception e) {
- LOG.warn("Could not properly discard
states.", e);
+ iterator.next();
+ }
+ } // releases native iterator
resources
+ }
+ } finally {
+
+ //release native tmp db column family
resources
+
IOUtils.closeQuietly(defaultColumnFamily);
+
+ for (ColumnFamilyHandle
flinkColumnFamilyHandle : columnFamilyHandles) {
+
IOUtils.closeQuietly(flinkColumnFamilyHandle);
+ }
}
- }
+ } // releases native tmp db resources
}
}
- @Override
- public void restore(Collection<KeyedStateHandle> restoreState) throws
Exception {
- LOG.info("Initializing RocksDB keyed state backend from
snapshot.");
+ //
------------------------------------------------------------------------
+ // State factories
+ //
------------------------------------------------------------------------
- if (LOG.isDebugEnabled()) {
- LOG.debug("Restoring snapshot from state handles: {}.",
restoreState);
- }
+ /**
+ * Creates a column family handle for use with a k/v state. When
restoring from a snapshot
+ * we don't restore the individual k/v states, just the global RocksDB
database and the
+ * list of column families. When a k/v state is first requested we
check here whether we
+ * already have a column family for that and return it or create a new
one if it doesn't exist.
+ *
+ * <p>This also checks whether the {@link StateDescriptor} for a state
matches the one
+ * that we checkpointed, i.e. is already in the map of column families.
+ */
+ @SuppressWarnings("rawtypes, unchecked")
+ protected <N, S> ColumnFamilyHandle getColumnFamily(
+ StateDescriptor<?, S> descriptor, TypeSerializer<N>
namespaceSerializer) throws IOException, StateMigrationException {
- // clear all meta data
- kvStateInformation.clear();
- restoredKvStateMetaInfos.clear();
+ Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
+ kvStateInformation.get(descriptor.getName());
- try {
- if (restoreState == null || restoreState.isEmpty()) {
- createDB();
- } else if (restoreState.iterator().next() instanceof
IncrementalKeyedStateHandle) {
- RocksDBIncrementalRestoreOperation<K>
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
- restoreOperation.restore(restoreState);
- } else {
- RocksDBFullRestoreOperation<K> restoreOperation
= new RocksDBFullRestoreOperation<>(this);
- restoreOperation.doRestore(restoreState);
+ RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new
RegisteredKeyedBackendStateMetaInfo<>(
+ descriptor.getType(),
+ descriptor.getName(),
+ namespaceSerializer,
+ descriptor.getSerializer());
+
+ if (stateInfo != null) {
+ // TODO with eager registration in place, these checks
should be moved to restore()
+
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>
restoredMetaInfo =
+
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>)
restoredKvStateMetaInfos.get(descriptor.getName());
+
+ Preconditions.checkState(
+ Objects.equals(newMetaInfo.getName(),
restoredMetaInfo.getName()),
+ "Incompatible state names. " +
+ "Was [" + restoredMetaInfo.getName() +
"], " +
+ "registered with [" +
newMetaInfo.getName() + "].");
+
+ if (!Objects.equals(newMetaInfo.getStateType(),
StateDescriptor.Type.UNKNOWN)
+ &&
!Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN))
{
+
+ Preconditions.checkState(
+ newMetaInfo.getStateType() ==
restoredMetaInfo.getStateType(),
+ "Incompatible state types. " +
+ "Was [" +
restoredMetaInfo.getStateType() + "], " +
+ "registered with [" +
newMetaInfo.getStateType() + "].");
}
- } catch (Exception ex) {
- dispose();
- throw ex;
+
+ // check compatibility results to determine if state
migration is required
+ CompatibilityResult<N> namespaceCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
+ restoredMetaInfo.getNamespaceSerializer(),
+ null,
+
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
+ newMetaInfo.getNamespaceSerializer());
+
+ CompatibilityResult<S> stateCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
+ restoredMetaInfo.getStateSerializer(),
+ UnloadableDummyTypeSerializer.class,
+
restoredMetaInfo.getStateSerializerConfigSnapshot(),
+ newMetaInfo.getStateSerializer());
+
+ if (namespaceCompatibility.isRequiresMigration() ||
stateCompatibility.isRequiresMigration()) {
+ // TODO state migration currently isn't
possible.
+ throw new StateMigrationException("State
migration isn't supported, yet.");
+ } else {
+ stateInfo.f1 = newMetaInfo;
+ return stateInfo.f0;
+ }
+ }
+
+ byte[] nameBytes =
descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
+
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY,
nameBytes),
+ "The chosen state name 'default' collides with the name
of the default column family!");
+
+ ColumnFamilyDescriptor columnDescriptor = new
ColumnFamilyDescriptor(nameBytes, columnOptions);
+
+ final ColumnFamilyHandle columnFamily;
+
+ try {
+ columnFamily = db.createColumnFamily(columnDescriptor);
+ } catch (RocksDBException e) {
+ throw new IOException("Error creating
ColumnFamilyHandle.", e);
}
+
+ Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
+ new Tuple2<>(columnFamily, newMetaInfo);
+ Map rawAccess = kvStateInformation;
+ rawAccess.put(descriptor.getName(), tuple);
+ return columnFamily;
}
@Override
- public void notifyCheckpointComplete(long completedCheckpointId) {
- synchronized (materializedSstFiles) {
- if (completedCheckpointId < lastCompletedCheckpointId) {
- return;
- }
+ protected <N, T> InternalValueState<N, T> createValueState(
+ TypeSerializer<N> namespaceSerializer,
+ ValueStateDescriptor<T> stateDesc) throws Exception {
- materializedSstFiles.keySet().removeIf(checkpointId ->
checkpointId < completedCheckpointId);
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc,
namespaceSerializer);
- lastCompletedCheckpointId = completedCheckpointId;
- }
+ return new RocksDBValueState<>(columnFamily,
namespaceSerializer, stateDesc, this);
}
- private void createDB() throws IOException {
- List<ColumnFamilyHandle> columnFamilyHandles = new
ArrayList<>(1);
- this.db = openDB(instanceRocksDBPath.getAbsolutePath(),
Collections.emptyList(), columnFamilyHandles);
- this.defaultColumnFamily = columnFamilyHandles.get(0);
+ @Override
+ protected <N, T> InternalListState<N, T> createListState(
+ TypeSerializer<N> namespaceSerializer,
+ ListStateDescriptor<T> stateDesc) throws Exception {
+
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc,
namespaceSerializer);
+
+ return new RocksDBListState<>(columnFamily,
namespaceSerializer, stateDesc, this);
}
- private RocksDB openDB(
- String path,
- List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
- List<ColumnFamilyHandle> stateColumnFamilyHandles) throws
IOException {
+ @Override
+ protected <N, T> InternalReducingState<N, T> createReducingState(
+ TypeSerializer<N> namespaceSerializer,
+ ReducingStateDescriptor<T> stateDesc) throws Exception {
- List<ColumnFamilyDescriptor> columnFamilyDescriptors =
- new ArrayList<>(1 +
stateColumnFamilyDescriptors.size());
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc,
namespaceSerializer);
- columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
+ return new RocksDBReducingState<>(columnFamily,
namespaceSerializer, stateDesc, this);
+ }
- // we add the required descriptor for the default CF in last
position.
- columnFamilyDescriptors.add(new
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
+ @Override
+ protected <N, T, ACC, R> InternalAggregatingState<N, T, R>
createAggregatingState(
+ TypeSerializer<N> namespaceSerializer,
+ AggregatingStateDescriptor<T, ACC, R> stateDesc) throws
Exception {
- RocksDB dbRef;
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc,
namespaceSerializer);
+ return new RocksDBAggregatingState<>(columnFamily,
namespaceSerializer, stateDesc, this);
+ }
- try {
- dbRef = RocksDB.open(
- Preconditions.checkNotNull(dbOptions),
- Preconditions.checkNotNull(path),
- columnFamilyDescriptors,
- stateColumnFamilyHandles);
- } catch (RocksDBException e) {
- throw new IOException("Error while opening RocksDB
instance.", e);
- }
+ @Override
+ protected <N, T, ACC> InternalFoldingState<N, T, ACC>
createFoldingState(
+ TypeSerializer<N> namespaceSerializer,
+ FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
- // requested + default CF
- Preconditions.checkState(1 +
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
- "Not all requested column family handles have been
created");
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc,
namespaceSerializer);
- return dbRef;
+ return new RocksDBFoldingState<>(columnFamily,
namespaceSerializer, stateDesc, this);
+ }
+
+ @Override
+ protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
+ TypeSerializer<N> namespaceSerializer,
+ MapStateDescriptor<UK, UV> stateDesc) throws Exception {
+
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc,
namespaceSerializer);
+
+ return new RocksDBMapState<>(columnFamily, namespaceSerializer,
stateDesc, this);
}
/**
- * Encapsulates the process of restoring a RocksDBKeyedStateBackend
from a snapshot.
+ * Only visible for testing, DO NOT USE.
*/
- static final class RocksDBFullRestoreOperation<K> {
+ public File getInstanceBasePath() {
+ return instanceBasePath;
+ }
- private final RocksDBKeyedStateBackend<K>
rocksDBKeyedStateBackend;
+ @Override
+ public boolean supportsAsynchronousSnapshots() {
+ return true;
+ }
- /** Current key-groups state handle from which we restore
key-groups. */
- private KeyGroupsStateHandle currentKeyGroupsStateHandle;
- /** Current input stream we obtained from
currentKeyGroupsStateHandle. */
- private FSDataInputStream currentStateHandleInStream;
- /** Current data input view that wraps
currentStateHandleInStream. */
- private DataInputView currentStateHandleInView;
- /** Current list of ColumnFamilyHandles for all column families
we restore from currentKeyGroupsStateHandle. */
- private List<ColumnFamilyHandle>
currentStateHandleKVStateColumnFamilies;
- /** The compression decorator that was used for writing the
state, as determined by the meta data. */
- private StreamCompressionDecorator
keygroupStreamCompressionDecorator;
+ @VisibleForTesting
+ @SuppressWarnings("unchecked")
+ @Override
+ public int numStateEntries() {
+ int count = 0;
- /**
- * Creates a restore operation object for the given state
backend instance.
- *
- * @param rocksDBKeyedStateBackend the state backend into which
we restore
- */
- public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K>
rocksDBKeyedStateBackend) {
- this.rocksDBKeyedStateBackend =
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
+ for (Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
kvStateInformation.values()) {
+ try (RocksIterator rocksIterator =
db.newIterator(column.f0)) {
+ rocksIterator.seekToFirst();
+
+ while (rocksIterator.isValid()) {
+ count++;
+ rocksIterator.next();
+ }
+ }
+ }
+
+ return count;
+ }
+
+
+
+ /**
+ * Iterator that merges multiple RocksDB iterators to partition all
states into contiguous key-groups.
+ * The resulting iteration sequence is ordered by (key-group, kv-state).
+ */
+ @VisibleForTesting
+ static final class RocksDBMergeIterator implements AutoCloseable {
+
+ private final PriorityQueue<MergeIterator> heap;
+ private final int keyGroupPrefixByteCount;
+ private boolean newKeyGroup;
+ private boolean newKVState;
+ private boolean valid;
+
+ private MergeIterator currentSubIterator;
+
+ private static final List<Comparator<MergeIterator>>
COMPARATORS;
+
+ static {
+ int maxBytes = 4;
+ COMPARATORS = new ArrayList<>(maxBytes);
+ for (int i = 0; i < maxBytes; ++i) {
+ final int currentBytes = i;
+ COMPARATORS.add(new Comparator<MergeIterator>()
{
+ @Override
+ public int compare(MergeIterator o1,
MergeIterator o2) {
+ int arrayCmpRes =
compareKeyGroupsForByteArrays(
+ o1.currentKey,
o2.currentKey, currentBytes);
+ return arrayCmpRes == 0 ?
o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
+ }
+ });
+ }
+ }
+
+ RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>>
kvStateIterators, final int keyGroupPrefixByteCount) {
+ Preconditions.checkNotNull(kvStateIterators);
+ this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+ Comparator<MergeIterator> iteratorComparator =
COMPARATORS.get(keyGroupPrefixByteCount);
+
+ if (kvStateIterators.size() > 0) {
+ PriorityQueue<MergeIterator>
iteratorPriorityQueue =
+ new
PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
+
+ for (Tuple2<RocksIterator, Integer>
rocksIteratorWithKVStateId : kvStateIterators) {
+ final RocksIterator rocksIterator =
rocksIteratorWithKVStateId.f0;
+ rocksIterator.seekToFirst();
+ if (rocksIterator.isValid()) {
+ iteratorPriorityQueue.offer(new
MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+ } else {
+
IOUtils.closeQuietly(rocksIterator);
+ }
+ }
+
+ kvStateIterators.clear();
+
+ this.heap = iteratorPriorityQueue;
+ this.valid = !heap.isEmpty();
+ this.currentSubIterator = heap.poll();
+ } else {
+ // creating a PriorityQueue of size 0 results
in an exception.
+ this.heap = null;
+ this.valid = false;
+ }
+
+ this.newKeyGroup = true;
+ this.newKVState = true;
}
/**
- * Restores all key-groups data that is referenced by the
passed state handles.
- *
- * @param keyedStateHandles List of all key groups state
handles that shall be restored.
+ * Advance the iterator. Should only be called if {@link
#isValid()} returned true. Valid can only chance after
+ * calls to {@link #next()}.
*/
- public void doRestore(Collection<KeyedStateHandle>
keyedStateHandles)
- throws IOException, StateMigrationException,
RocksDBException {
+ public void next() {
+ newKeyGroup = false;
+ newKVState = false;
- rocksDBKeyedStateBackend.createDB();
+ final RocksIterator rocksIterator =
currentSubIterator.getIterator();
+ rocksIterator.next();
- for (KeyedStateHandle keyedStateHandle :
keyedStateHandles) {
- if (keyedStateHandle != null) {
+ byte[] oldKey = currentSubIterator.getCurrentKey();
+ if (rocksIterator.isValid()) {
+ currentSubIterator.currentKey =
rocksIterator.key();
- if (!(keyedStateHandle instanceof
KeyGroupsStateHandle)) {
- throw new
IllegalStateException("Unexpected state handle type, " +
- "expected: " +
KeyGroupsStateHandle.class +
- ", but found: " +
keyedStateHandle.getClass());
- }
- this.currentKeyGroupsStateHandle =
(KeyGroupsStateHandle) keyedStateHandle;
- restoreKeyGroupsInStateHandle();
+ if (isDifferentKeyGroup(oldKey,
currentSubIterator.getCurrentKey())) {
+ heap.offer(currentSubIterator);
+ currentSubIterator
--- End diff --
This should also be remove when canceled is true.
---