Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r165349688
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -363,1691 +372,1780 @@ public int getKeyGroupPrefixBytes() {
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
- if (checkpointOptions.getCheckpointType() !=
CheckpointOptions.CheckpointType.SAVEPOINT &&
- enableIncrementalCheckpointing) {
- return snapshotIncrementally(checkpointId, timestamp,
streamFactory);
- } else {
- return snapshotFully(checkpointId, timestamp,
streamFactory);
- }
+ return snapshotStrategy.performSnapshot(checkpointId,
timestamp, streamFactory, checkpointOptions);
}
- private RunnableFuture<SnapshotResult<KeyedStateHandle>>
snapshotIncrementally(
- final long checkpointId,
- final long checkpointTimestamp,
- final CheckpointStreamFactory checkpointStreamFactory) throws
Exception {
-
- if (db == null) {
- throw new IOException("RocksDB closed.");
- }
+ @Override
+ public void restore(StateObjectCollection<KeyedStateHandle>
restoreState) throws Exception {
+ LOG.info("Initializing RocksDB keyed state backend from
snapshot.");
- if (kvStateInformation.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Asynchronous RocksDB snapshot
performed on empty keyed state at " +
- checkpointTimestamp + " . Returning
null.");
- }
- return DoneFuture.nullValue();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Restoring snapshot from state handles: {}.",
restoreState);
}
- final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
- new RocksDBIncrementalSnapshotOperation<>(
- this,
- checkpointStreamFactory,
- checkpointId,
- checkpointTimestamp);
-
- snapshotOperation.takeSnapshot();
-
- return new FutureTask<SnapshotResult<KeyedStateHandle>>(
- () -> snapshotOperation.materializeSnapshot()
- ) {
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- snapshotOperation.stop();
- return super.cancel(mayInterruptIfRunning);
- }
+ // clear all meta data
+ kvStateInformation.clear();
+ restoredKvStateMetaInfos.clear();
- @Override
- protected void done() {
-
snapshotOperation.releaseResources(isCancelled());
+ try {
+ if (restoreState == null || restoreState.isEmpty()) {
+ createDB();
+ } else if (restoreState.iterator().next() instanceof
IncrementalKeyedStateHandle) {
+ RocksDBIncrementalRestoreOperation<K>
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+ restoreOperation.restore(restoreState);
+ } else {
+ RocksDBFullRestoreOperation<K> restoreOperation
= new RocksDBFullRestoreOperation<>(this);
+ restoreOperation.doRestore(restoreState);
}
- };
+ } catch (Exception ex) {
+ dispose();
+ throw ex;
+ }
}
- private RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotFully(
- final long checkpointId,
- final long timestamp,
- final CheckpointStreamFactory streamFactory) throws Exception {
-
- long startTime = System.currentTimeMillis();
- final CloseableRegistry snapshotCloseableRegistry = new
CloseableRegistry();
-
- final RocksDBFullSnapshotOperation<K> snapshotOperation;
-
- if (kvStateInformation.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Asynchronous RocksDB snapshot
performed on empty keyed state at " + timestamp +
- " . Returning null.");
- }
+ @Override
+ public void notifyCheckpointComplete(long completedCheckpointId) {
- return DoneFuture.nullValue();
+ if (!enableIncrementalCheckpointing) {
+ return;
}
- snapshotOperation = new RocksDBFullSnapshotOperation<>(this,
streamFactory, snapshotCloseableRegistry);
- snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
-
- // implementation of the async IO operation, based on FutureTask
-
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable
=
- new
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
+ synchronized (materializedSstFiles) {
- @Override
- protected void acquireResources() throws
Exception {
-
cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
-
snapshotOperation.openCheckpointStream();
- }
+ if (completedCheckpointId < lastCompletedCheckpointId) {
+ return;
+ }
- @Override
- protected void releaseResources() throws
Exception {
- closeLocalRegistry();
- releaseSnapshotOperationResources();
- }
+ materializedSstFiles.keySet().removeIf(checkpointId ->
checkpointId < completedCheckpointId);
- private void
releaseSnapshotOperationResources() {
- // hold the db lock while operation on
the db to guard us against async db disposal
-
snapshotOperation.releaseSnapshotResources();
- }
+ lastCompletedCheckpointId = completedCheckpointId;
+ }
+ }
- @Override
- protected void stopOperation() throws Exception
{
- closeLocalRegistry();
- }
+ private void createDB() throws IOException {
+ List<ColumnFamilyHandle> columnFamilyHandles = new
ArrayList<>(1);
+ this.db = openDB(instanceRocksDBPath.getAbsolutePath(),
Collections.emptyList(), columnFamilyHandles);
+ this.defaultColumnFamily = columnFamilyHandles.get(0);
+ }
- private void closeLocalRegistry() {
- if
(cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
- try {
-
snapshotCloseableRegistry.close();
- } catch (Exception ex) {
- LOG.warn("Error closing
local registry", ex);
- }
- }
- }
+ private RocksDB openDB(
+ String path,
+ List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
+ List<ColumnFamilyHandle> stateColumnFamilyHandles) throws
IOException {
- @Override
- public SnapshotResult<KeyedStateHandle>
performOperation() throws Exception {
- long startTime =
System.currentTimeMillis();
+ List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+ new ArrayList<>(1 +
stateColumnFamilyDescriptors.size());
- if (isStopped()) {
- throw new IOException("RocksDB
closed.");
- }
+ columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
- snapshotOperation.writeDBSnapshot();
+ // we add the required descriptor for the default CF in last
position.
+ columnFamilyDescriptors.add(new
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
- LOG.info("Asynchronous RocksDB snapshot
({}, asynchronous part) in thread {} took {} ms.",
- streamFactory,
Thread.currentThread(), (System.currentTimeMillis() - startTime));
+ RocksDB dbRef;
- KeyGroupsStateHandle stateHandle =
snapshotOperation.getSnapshotResultStateHandle();
- return new
SnapshotResult<>(stateHandle, null);
- }
- };
+ try {
+ dbRef = RocksDB.open(
+ Preconditions.checkNotNull(dbOptions),
+ Preconditions.checkNotNull(path),
+ columnFamilyDescriptors,
+ stateColumnFamilyHandles);
+ } catch (RocksDBException e) {
+ throw new IOException("Error while opening RocksDB
instance.", e);
+ }
- LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ",
synchronous part) in thread " +
- Thread.currentThread() + " took " +
(System.currentTimeMillis() - startTime) + " ms.");
+ // requested + default CF
+ Preconditions.checkState(1 +
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
+ "Not all requested column family handles have been
created");
- return AsyncStoppableTaskWithCallback.from(ioCallable);
+ return dbRef;
}
/**
- * Encapsulates the process to perform a snapshot of a
RocksDBKeyedStateBackend.
+ * Encapsulates the process of restoring a RocksDBKeyedStateBackend
from a full snapshot.
*/
- static final class RocksDBFullSnapshotOperation<K> {
+ private static final class RocksDBFullRestoreOperation<K> {
- static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
- static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
-
- private final RocksDBKeyedStateBackend<K> stateBackend;
- private final KeyGroupRangeOffsets keyGroupRangeOffsets;
- private final CheckpointStreamFactory checkpointStreamFactory;
- private final CloseableRegistry snapshotCloseableRegistry;
- private final ResourceGuard.Lease dbLease;
-
- private long checkpointId;
- private long checkpointTimeStamp;
-
- private Snapshot snapshot;
- private ReadOptions readOptions;
- private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
-
- private CheckpointStreamFactory.CheckpointStateOutputStream
outStream;
- private DataOutputView outputView;
-
- RocksDBFullSnapshotOperation(
- RocksDBKeyedStateBackend<K> stateBackend,
- CheckpointStreamFactory checkpointStreamFactory,
- CloseableRegistry registry) throws IOException {
+ private final RocksDBKeyedStateBackend<K>
rocksDBKeyedStateBackend;
- this.stateBackend = stateBackend;
- this.checkpointStreamFactory = checkpointStreamFactory;
- this.keyGroupRangeOffsets = new
KeyGroupRangeOffsets(stateBackend.keyGroupRange);
- this.snapshotCloseableRegistry = registry;
- this.dbLease =
this.stateBackend.rocksDBResourceGuard.acquireResource();
- }
+ /** Current key-groups state handle from which we restore
key-groups. */
+ private KeyGroupsStateHandle currentKeyGroupsStateHandle;
+ /** Current input stream we obtained from
currentKeyGroupsStateHandle. */
+ private FSDataInputStream currentStateHandleInStream;
+ /** Current data input view that wraps
currentStateHandleInStream. */
+ private DataInputView currentStateHandleInView;
+ /** Current list of ColumnFamilyHandles for all column families
we restore from currentKeyGroupsStateHandle. */
+ private List<ColumnFamilyHandle>
currentStateHandleKVStateColumnFamilies;
+ /** The compression decorator that was used for writing the
state, as determined by the meta data. */
+ private StreamCompressionDecorator
keygroupStreamCompressionDecorator;
/**
- * 1) Create a snapshot object from RocksDB.
+ * Creates a restore operation object for the given state
backend instance.
*
- * @param checkpointId id of the checkpoint for which we take
the snapshot
- * @param checkpointTimeStamp timestamp of the checkpoint for
which we take the snapshot
+ * @param rocksDBKeyedStateBackend the state backend into which
we restore
*/
- public void takeDBSnapShot(long checkpointId, long
checkpointTimeStamp) {
- Preconditions.checkArgument(snapshot == null, "Only one
ongoing snapshot allowed!");
- this.kvStateIterators = new
ArrayList<>(stateBackend.kvStateInformation.size());
- this.checkpointId = checkpointId;
- this.checkpointTimeStamp = checkpointTimeStamp;
- this.snapshot = stateBackend.db.getSnapshot();
+ public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K>
rocksDBKeyedStateBackend) {
+ this.rocksDBKeyedStateBackend =
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
}
/**
- * 2) Open CheckpointStateOutputStream through the
checkpointStreamFactory into which we will write.
+ * Restores all key-groups data that is referenced by the
passed state handles.
*
- * @throws Exception
+ * @param keyedStateHandles List of all key groups state
handles that shall be restored.
*/
- public void openCheckpointStream() throws Exception {
- Preconditions.checkArgument(outStream == null, "Output
stream for snapshot is already set.");
- outStream =
checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId,
checkpointTimeStamp);
- snapshotCloseableRegistry.registerCloseable(outStream);
- outputView = new DataOutputViewStreamWrapper(outStream);
- }
+ public void doRestore(Collection<KeyedStateHandle>
keyedStateHandles)
+ throws IOException, StateMigrationException,
RocksDBException {
- /**
- * 3) Write the actual data from RocksDB from the time we took
the snapshot object in (1).
- *
- * @throws IOException
- */
- public void writeDBSnapshot() throws IOException,
InterruptedException {
+ rocksDBKeyedStateBackend.createDB();
- if (null == snapshot) {
- throw new IOException("No snapshot available.
Might be released due to cancellation.");
- }
+ for (KeyedStateHandle keyedStateHandle :
keyedStateHandles) {
+ if (keyedStateHandle != null) {
- Preconditions.checkNotNull(outStream, "No output stream
to write snapshot.");
- writeKVStateMetaData();
- writeKVStateData();
+ if (!(keyedStateHandle instanceof
KeyGroupsStateHandle)) {
+ throw new
IllegalStateException("Unexpected state handle type, " +
+ "expected: " +
KeyGroupsStateHandle.class +
+ ", but found: " +
keyedStateHandle.getClass());
+ }
+ this.currentKeyGroupsStateHandle =
(KeyGroupsStateHandle) keyedStateHandle;
+ restoreKeyGroupsInStateHandle();
+ }
+ }
}
/**
- * 4) Returns a state handle to the snapshot after the snapshot
procedure is completed and null before.
- *
- * @return state handle to the completed snapshot
+ * Restore one key groups state handle.
*/
- public KeyGroupsStateHandle getSnapshotResultStateHandle()
throws IOException {
-
- if
(snapshotCloseableRegistry.unregisterCloseable(outStream)) {
-
- StreamStateHandle stateHandle =
outStream.closeAndGetHandle();
- outStream = null;
-
- if (stateHandle != null) {
- return new
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+ private void restoreKeyGroupsInStateHandle()
+ throws IOException, StateMigrationException,
RocksDBException {
+ try {
+ currentStateHandleInStream =
currentKeyGroupsStateHandle.openInputStream();
+
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
+ currentStateHandleInView = new
DataInputViewStreamWrapper(currentStateHandleInStream);
+ restoreKVStateMetaData();
+ restoreKVStateData();
+ } finally {
+ if
(rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
{
+
IOUtils.closeQuietly(currentStateHandleInStream);
}
}
- return null;
}
/**
- * 5) Release the snapshot object for RocksDB and clean up.
+ * Restore the KV-state / ColumnFamily meta data for all
key-groups referenced by the current state handle.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws RocksDBException
*/
- public void releaseSnapshotResources() {
+ private void restoreKVStateMetaData() throws IOException,
StateMigrationException, RocksDBException {
- outStream = null;
+ KeyedBackendSerializationProxy<K> serializationProxy =
+ new
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
- if (null != kvStateIterators) {
- for (Tuple2<RocksIterator, Integer>
kvStateIterator : kvStateIterators) {
-
IOUtils.closeQuietly(kvStateIterator.f0);
- }
- kvStateIterators = null;
- }
+ serializationProxy.read(currentStateHandleInView);
- if (null != snapshot) {
- if (null != stateBackend.db) {
-
stateBackend.db.releaseSnapshot(snapshot);
- }
- IOUtils.closeQuietly(snapshot);
- snapshot = null;
- }
+ // check for key serializer compatibility; this also
reconfigures the
+ // key serializer to be compatible, if it is required
and is possible
+ if (CompatibilityUtil.resolveCompatibilityResult(
+ serializationProxy.getKeySerializer(),
+ UnloadableDummyTypeSerializer.class,
+
serializationProxy.getKeySerializerConfigSnapshot(),
+ rocksDBKeyedStateBackend.keySerializer)
+ .isRequiresMigration()) {
- if (null != readOptions) {
- IOUtils.closeQuietly(readOptions);
- readOptions = null;
+ // TODO replace with state migration; note that
key hash codes need to remain the same after migration
+ throw new StateMigrationException("The new key
serializer is not compatible to read previous keys. " +
+ "Aborting now since state migration is
currently not available");
}
- this.dbLease.close();
- }
-
- private void writeKVStateMetaData() throws IOException {
+ this.keygroupStreamCompressionDecorator =
serializationProxy.isUsingKeyGroupCompression() ?
+ SnappyStreamCompressionDecorator.INSTANCE :
UncompressedStreamCompressionDecorator.INSTANCE;
- List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> metaInfoSnapshots =
- new
ArrayList<>(stateBackend.kvStateInformation.size());
+ List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> restoredMetaInfos =
+ serializationProxy.getStateMetaInfoSnapshots();
+ currentStateHandleKVStateColumnFamilies = new
ArrayList<>(restoredMetaInfos.size());
+ //rocksDBKeyedStateBackend.restoredKvStateMetaInfos =
new HashMap<>(restoredMetaInfos.size());
- int kvStateId = 0;
- for (Map.Entry<String, Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
- stateBackend.kvStateInformation.entrySet()) {
+ for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>
restoredMetaInfo : restoredMetaInfos) {
-
metaInfoSnapshots.add(column.getValue().f1.snapshot());
+ Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
+
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
- //retrieve iterator for this k/v states
- readOptions = new ReadOptions();
- readOptions.setSnapshot(snapshot);
+ if (registeredColumn == null) {
+ byte[] nameBytes =
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
- kvStateIterators.add(
- new
Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions),
kvStateId));
+ ColumnFamilyDescriptor
columnFamilyDescriptor = new ColumnFamilyDescriptor(
+ nameBytes,
+
rocksDBKeyedStateBackend.columnOptions);
- ++kvStateId;
- }
+ RegisteredKeyedBackendStateMetaInfo<?,
?> stateMetaInfo =
+ new
RegisteredKeyedBackendStateMetaInfo<>(
+
restoredMetaInfo.getStateType(),
+
restoredMetaInfo.getName(),
+
restoredMetaInfo.getNamespaceSerializer(),
+
restoredMetaInfo.getStateSerializer());
- KeyedBackendSerializationProxy<K> serializationProxy =
- new KeyedBackendSerializationProxy<>(
- stateBackend.getKeySerializer(),
- metaInfoSnapshots,
-
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE,
stateBackend.keyGroupCompressionDecorator));
+
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
restoredMetaInfo);
- serializationProxy.write(outputView);
- }
+ ColumnFamilyHandle columnFamily =
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
- private void writeKVStateData() throws IOException,
InterruptedException {
+ registeredColumn = new
Tuple2<>(columnFamily, stateMetaInfo);
+
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(),
registeredColumn);
- byte[] previousKey = null;
- byte[] previousValue = null;
- OutputStream kgOutStream = null;
- DataOutputView kgOutView = null;
+ } else {
+ // TODO with eager state registration
in place, check here for serializer migration strategies
+ }
+
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
+ }
+ }
- try {
- // Here we transfer ownership of RocksIterators
to the RocksDBMergeIterator
- try (RocksDBMergeIterator mergeIterator = new
RocksDBMergeIterator(
- kvStateIterators,
stateBackend.keyGroupPrefixBytes)) {
+ /**
+ * Restore the KV-state / ColumnFamily data for all key-groups
referenced by the current state handle.
+ *
+ * @throws IOException
+ * @throws RocksDBException
+ */
+ private void restoreKVStateData() throws IOException,
RocksDBException {
+ //for all key-groups in the current state handle...
+ for (Tuple2<Integer, Long> keyGroupOffset :
currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
+ int keyGroup = keyGroupOffset.f0;
- // handover complete, null out to
prevent double close
- kvStateIterators = null;
+ // Check that restored key groups all belong to
the backend
+
Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
+ "The key group must belong to the
backend");
- //preamble: setup with first key-group
as our lookahead
- if (mergeIterator.isValid()) {
- //begin first key-group by
recording the offset
-
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(),
outStream.getPos());
- //write the k/v-state id as
metadata
- kgOutStream =
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
- kgOutView = new
DataOutputViewStreamWrapper(kgOutStream);
+ long offset = keyGroupOffset.f1;
+ //not empty key-group?
+ if (0L != offset) {
+ currentStateHandleInStream.seek(offset);
+ try (InputStream compressedKgIn =
keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
{
+ DataInputViewStreamWrapper
compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
//TODO this could be aware of
keyGroupPrefixBytes and write only one byte if possible
-
kgOutView.writeShort(mergeIterator.kvStateId());
- previousKey =
mergeIterator.key();
- previousValue =
mergeIterator.value();
- mergeIterator.next();
+ int kvStateId =
compressedKgInputView.readShort();
+ ColumnFamilyHandle handle =
currentStateHandleKVStateColumnFamilies.get(kvStateId);
+ //insert all k/v pairs into DB
+ boolean keyGroupHasMoreKeys =
true;
+ while (keyGroupHasMoreKeys) {
+ byte[] key =
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+ byte[] value =
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+ if
(RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
+ //clear the
signal bit in the key to make it ready for insertion again
+
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
+
rocksDBKeyedStateBackend.db.put(handle, key, value);
+ //TODO this
could be aware of keyGroupPrefixBytes and write only one byte if possible
+ kvStateId =
RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
+ &
compressedKgInputView.readShort();
+ if
(RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
+
keyGroupHasMoreKeys = false;
+ } else {
+ handle
= currentStateHandleKVStateColumnFamilies.get(kvStateId);
+ }
+ } else {
+
rocksDBKeyedStateBackend.db.put(handle, key, value);
+ }
+ }
}
+ }
+ }
+ }
+ }
- //main loop: write k/v pairs ordered by
(key-group, kv-state), thereby tracking key-group offsets.
- while (mergeIterator.isValid()) {
+ /**
+ * Encapsulates the process of restoring a RocksDBKeyedStateBackend
from an incremental snapshot.
+ */
+ private static class RocksDBIncrementalRestoreOperation<T> {
- assert
(!hasMetaDataFollowsFlag(previousKey));
+ private final RocksDBKeyedStateBackend<T> stateBackend;
- //set signal in first key byte
that meta data will follow in the stream after this k/v pair
- if
(mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
+ private
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
+ this.stateBackend = stateBackend;
+ }
- //be cooperative and
check for interruption from time to time in the hot loop
- checkInterrupted();
+ private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> readMetaData(
+ StreamStateHandle metaStateHandle) throws Exception {
-
setMetaDataFollowsFlagInKey(previousKey);
- }
+ FSDataInputStream inputStream = null;
- writeKeyValuePair(previousKey,
previousValue, kgOutView);
+ try {
+ inputStream = metaStateHandle.openInputStream();
+
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
- //write meta data if we have to
- if
(mergeIterator.isNewKeyGroup()) {
- //TODO this could be
aware of keyGroupPrefixBytes and write only one byte if possible
-
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
- // this will just close
the outer stream
- kgOutStream.close();
- //begin new key-group
-
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(),
outStream.getPos());
- //write the kev-state
- //TODO this could be
aware of keyGroupPrefixBytes and write only one byte if possible
- kgOutStream =
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
- kgOutView = new
DataOutputViewStreamWrapper(kgOutStream);
-
kgOutView.writeShort(mergeIterator.kvStateId());
- } else if
(mergeIterator.isNewKeyValueState()) {
- //write the k/v-state
- //TODO this could be
aware of keyGroupPrefixBytes and write only one byte if possible
-
kgOutView.writeShort(mergeIterator.kvStateId());
- }
+ KeyedBackendSerializationProxy<T>
serializationProxy =
+ new
KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
+ DataInputView in = new
DataInputViewStreamWrapper(inputStream);
+ serializationProxy.read(in);
- //request next k/v pair
- previousKey =
mergeIterator.key();
- previousValue =
mergeIterator.value();
- mergeIterator.next();
- }
- }
+ // check for key serializer compatibility; this
also reconfigures the
+ // key serializer to be compatible, if it is
required and is possible
+ if
(CompatibilityUtil.resolveCompatibilityResult(
+ serializationProxy.getKeySerializer(),
+ UnloadableDummyTypeSerializer.class,
+
serializationProxy.getKeySerializerConfigSnapshot(),
+ stateBackend.keySerializer)
+ .isRequiresMigration()) {
- //epilogue: write last key-group
- if (previousKey != null) {
- assert
(!hasMetaDataFollowsFlag(previousKey));
-
setMetaDataFollowsFlagInKey(previousKey);
- writeKeyValuePair(previousKey,
previousValue, kgOutView);
- //TODO this could be aware of
keyGroupPrefixBytes and write only one byte if possible
-
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
- // this will just close the outer stream
- kgOutStream.close();
- kgOutStream = null;
+ // TODO replace with state migration;
note that key hash codes need to remain the same after migration
+ throw new StateMigrationException("The
new key serializer is not compatible to read previous keys. " +
+ "Aborting now since state
migration is currently not available");
}
+ return
serializationProxy.getStateMetaInfoSnapshots();
} finally {
- // this will just close the outer stream
- IOUtils.closeQuietly(kgOutStream);
+ if
(stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
+ inputStream.close();
+ }
}
}
- private void writeKeyValuePair(byte[] key, byte[] value,
DataOutputView out) throws IOException {
- BytePrimitiveArraySerializer.INSTANCE.serialize(key,
out);
- BytePrimitiveArraySerializer.INSTANCE.serialize(value,
out);
- }
+ private void readStateData(
+ Path restoreFilePath,
+ StreamStateHandle remoteFileHandle) throws IOException {
- static void setMetaDataFollowsFlagInKey(byte[] key) {
- key[0] |= FIRST_BIT_IN_BYTE_MASK;
- }
+ FileSystem restoreFileSystem =
restoreFilePath.getFileSystem();
- static void clearMetaDataFollowsFlag(byte[] key) {
- key[0] &=
(~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
- }
+ FSDataInputStream inputStream = null;
+ FSDataOutputStream outputStream = null;
- static boolean hasMetaDataFollowsFlag(byte[] key) {
- return 0 != (key[0] &
RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
- }
+ try {
+ inputStream =
remoteFileHandle.openInputStream();
+
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
- private static void checkInterrupted() throws
InterruptedException {
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException("RocksDB
snapshot interrupted.");
- }
- }
- }
+ outputStream =
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
+
stateBackend.cancelStreamRegistry.registerCloseable(outputStream);
- private static final class RocksDBIncrementalSnapshotOperation<K> {
+ byte[] buffer = new byte[8 * 1024];
+ while (true) {
+ int numBytes = inputStream.read(buffer);
+ if (numBytes == -1) {
+ break;
+ }
- /** The backend which we snapshot. */
- private final RocksDBKeyedStateBackend<K> stateBackend;
+ outputStream.write(buffer, 0, numBytes);
+ }
+ } finally {
+ if
(stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
+ inputStream.close();
+ }
- /** Stream factory that creates the outpus streams to DFS. */
- private final CheckpointStreamFactory checkpointStreamFactory;
+ if
(stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
+ outputStream.close();
+ }
+ }
+ }
- /** Id for the current checkpoint. */
- private final long checkpointId;
+ private void restoreInstance(
+ IncrementalKeyedStateHandle restoreStateHandle,
+ boolean hasExtraKeys) throws Exception {
- /** Timestamp for the current checkpoint. */
- private final long checkpointTimestamp;
+ // read state data
+ Path restoreInstancePath = new Path(
+ stateBackend.instanceBasePath.getAbsolutePath(),
+ UUID.randomUUID().toString());
- /** All sst files that were part of the last previously
completed checkpoint. */
- private Set<StateHandleID> baseSstFiles;
+ try {
+ final Map<StateHandleID, StreamStateHandle>
sstFiles =
+ restoreStateHandle.getSharedState();
+ final Map<StateHandleID, StreamStateHandle>
miscFiles =
+ restoreStateHandle.getPrivateState();
- /** The state meta data. */
- private final
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots
= new ArrayList<>();
+ readAllStateData(sstFiles, restoreInstancePath);
+ readAllStateData(miscFiles,
restoreInstancePath);
- /** Local filesystem for the RocksDB backup. */
- private FileSystem backupFileSystem;
+ // read meta data
+
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots
=
+
readMetaData(restoreStateHandle.getMetaStateHandle());
- /** Local path for the RocksDB backup. */
- private Path backupPath;
+ List<ColumnFamilyDescriptor>
columnFamilyDescriptors =
+ new ArrayList<>(1 +
stateMetaInfoSnapshots.size());
- // Registry for all opened i/o streams
- private final CloseableRegistry closeableRegistry = new
CloseableRegistry();
+ for
(RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot :
stateMetaInfoSnapshots) {
- // new sst files since the last completed checkpoint
- private final Map<StateHandleID, StreamStateHandle> sstFiles =
new HashMap<>();
+ ColumnFamilyDescriptor
columnFamilyDescriptor = new ColumnFamilyDescriptor(
+
stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+ stateBackend.columnOptions);
- // handles to the misc files in the current snapshot
- private final Map<StateHandleID, StreamStateHandle> miscFiles =
new HashMap<>();
+
columnFamilyDescriptors.add(columnFamilyDescriptor);
+
stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(),
stateMetaInfoSnapshot);
+ }
- // This lease protects from concurrent disposal of the native
rocksdb instance.
- private final ResourceGuard.Lease dbLease;
+ if (hasExtraKeys) {
- private StreamStateHandle metaStateHandle = null;
+ List<ColumnFamilyHandle>
columnFamilyHandles =
+ new ArrayList<>(1 +
columnFamilyDescriptors.size());
- private RocksDBIncrementalSnapshotOperation(
- RocksDBKeyedStateBackend<K> stateBackend,
- CheckpointStreamFactory checkpointStreamFactory,
- long checkpointId,
- long checkpointTimestamp) throws IOException {
+ try (RocksDB restoreDb =
stateBackend.openDB(
+ restoreInstancePath.getPath(),
+ columnFamilyDescriptors,
+ columnFamilyHandles)) {
- this.stateBackend = stateBackend;
- this.checkpointStreamFactory = checkpointStreamFactory;
- this.checkpointId = checkpointId;
- this.checkpointTimestamp = checkpointTimestamp;
- this.dbLease =
this.stateBackend.rocksDBResourceGuard.acquireResource();
- }
+ try {
+ // iterating only the
requested descriptors automatically skips the default column family handle
+ for (int i = 0; i <
columnFamilyDescriptors.size(); ++i) {
+
ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
+
ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
+
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot =
stateMetaInfoSnapshots.get(i);
- private StreamStateHandle materializeStateData(Path filePath)
throws Exception {
- FSDataInputStream inputStream = null;
- CheckpointStreamFactory.CheckpointStateOutputStream
outputStream = null;
+
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>
registeredStateMetaInfoEntry =
+
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
- try {
- final byte[] buffer = new byte[8 * 1024];
+ if (null ==
registeredStateMetaInfoEntry) {
- FileSystem backupFileSystem =
backupPath.getFileSystem();
- inputStream = backupFileSystem.open(filePath);
-
closeableRegistry.registerCloseable(inputStream);
+
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
+
new RegisteredKeyedBackendStateMetaInfo<>(
+
stateMetaInfoSnapshot.getStateType(),
+
stateMetaInfoSnapshot.getName(),
+
stateMetaInfoSnapshot.getNamespaceSerializer(),
+
stateMetaInfoSnapshot.getStateSerializer());
- outputStream = checkpointStreamFactory
-
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
-
closeableRegistry.registerCloseable(outputStream);
+
registeredStateMetaInfoEntry =
+
new Tuple2<>(
+
stateBackend.db.createColumnFamily(columnFamilyDescriptor),
+
stateMetaInfo);
- while (true) {
- int numBytes = inputStream.read(buffer);
+
stateBackend.kvStateInformation.put(
+
stateMetaInfoSnapshot.getName(),
+
registeredStateMetaInfoEntry);
+ }
- if (numBytes == -1) {
- break;
- }
+
ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
- outputStream.write(buffer, 0, numBytes);
- }
+ try
(RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) {
- StreamStateHandle result = null;
- if
(closeableRegistry.unregisterCloseable(outputStream)) {
- result =
outputStream.closeAndGetHandle();
- outputStream = null;
- }
- return result;
+ int
startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
+ byte[]
startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
+ for
(int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
+
startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>>
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
+ }
- } finally {
+
iterator.seek(startKeyGroupPrefixBytes);
- if
(closeableRegistry.unregisterCloseable(inputStream)) {
- inputStream.close();
- }
+ while
(iterator.isValid()) {
- if
(closeableRegistry.unregisterCloseable(outputStream)) {
- outputStream.close();
- }
- }
- }
+
int keyGroup = 0;
+
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
+
keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
+
}
- private StreamStateHandle materializeMetaData() throws
Exception {
- CheckpointStreamFactory.CheckpointStateOutputStream
outputStream = null;
+
if (stateBackend.keyGroupRange.contains(keyGroup)) {
+
stateBackend.db.put(targetColumnFamilyHandle,
+
iterator.key(), iterator.value());
+
}
- try {
- outputStream = checkpointStreamFactory
-
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
-
closeableRegistry.registerCloseable(outputStream);
+
iterator.next();
+ }
+ } // releases
native iterator resources
+ }
+ } finally {
+ //release native tmp db
column family resources
+ for (ColumnFamilyHandle
columnFamilyHandle : columnFamilyHandles) {
+
IOUtils.closeQuietly(columnFamilyHandle);
+ }
+ }
+ } // releases native tmp db resources
+ } else {
+ // pick up again the old backend id, so
the we can reference existing state
+ stateBackend.backendUID =
restoreStateHandle.getBackendIdentifier();
- //no need for compression scheme support
because sst-files are already compressed
- KeyedBackendSerializationProxy<K>
serializationProxy =
- new KeyedBackendSerializationProxy<>(
- stateBackend.keySerializer,
- stateMetaInfoSnapshots,
- false);
+ LOG.debug("Restoring keyed backend uid
in operator {} from incremental snapshot to {}.",
+
stateBackend.operatorIdentifier, stateBackend.backendUID);
- DataOutputView out = new
DataOutputViewStreamWrapper(outputStream);
+ // create hard links in the instance
directory
+ if
(!stateBackend.instanceRocksDBPath.mkdirs()) {
+ throw new IOException("Could
not create RocksDB data directory.");
+ }
- serializationProxy.write(out);
+
createFileHardLinksInRestorePath(sstFiles, restoreInstancePath);
+
createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);
- StreamStateHandle result = null;
- if
(closeableRegistry.unregisterCloseable(outputStream)) {
- result =
outputStream.closeAndGetHandle();
- outputStream = null;
+ List<ColumnFamilyHandle>
columnFamilyHandles =
+ new ArrayList<>(1 +
columnFamilyDescriptors.size());
+
+ stateBackend.db = stateBackend.openDB(
+
stateBackend.instanceRocksDBPath.getAbsolutePath(),
+ columnFamilyDescriptors,
columnFamilyHandles);
+
+ // extract and store the default column
family which is located at the last index
+ stateBackend.defaultColumnFamily =
columnFamilyHandles.remove(columnFamilyHandles.size() - 1);
+
+ for (int i = 0; i <
columnFamilyDescriptors.size(); ++i) {
+
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot =
stateMetaInfoSnapshots.get(i);
+
+ ColumnFamilyHandle
columnFamilyHandle = columnFamilyHandles.get(i);
+
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
+ new
RegisteredKeyedBackendStateMetaInfo<>(
+
stateMetaInfoSnapshot.getStateType(),
+
stateMetaInfoSnapshot.getName(),
+
stateMetaInfoSnapshot.getNamespaceSerializer(),
+
stateMetaInfoSnapshot.getStateSerializer());
+
+
stateBackend.kvStateInformation.put(
+
stateMetaInfoSnapshot.getName(),
+ new
Tuple2<>(columnFamilyHandle, stateMetaInfo));
+ }
+
+ // use the restore sst files as the
base for succeeding checkpoints
+ synchronized
(stateBackend.materializedSstFiles) {
+
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(),
sstFiles.keySet());
+ }
+
+ stateBackend.lastCompletedCheckpointId
= restoreStateHandle.getCheckpointId();
}
- return result;
} finally {
- if (outputStream != null) {
- if
(closeableRegistry.unregisterCloseable(outputStream)) {
- outputStream.close();
- }
+ FileSystem restoreFileSystem =
restoreInstancePath.getFileSystem();
+ if
(restoreFileSystem.exists(restoreInstancePath)) {
+
restoreFileSystem.delete(restoreInstancePath, true);
}
}
}
- void takeSnapshot() throws Exception {
-
- final long lastCompletedCheckpoint;
+ private void readAllStateData(
+ Map<StateHandleID, StreamStateHandle> stateHandleMap,
+ Path restoreInstancePath) throws IOException {
- // use the last completed checkpoint as the comparison
base.
- synchronized (stateBackend.materializedSstFiles) {
- lastCompletedCheckpoint =
stateBackend.lastCompletedCheckpointId;
- baseSstFiles =
stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
+ for (Map.Entry<StateHandleID, StreamStateHandle> entry
: stateHandleMap.entrySet()) {
+ StateHandleID stateHandleID = entry.getKey();
+ StreamStateHandle remoteFileHandle =
entry.getValue();
+ readStateData(new Path(restoreInstancePath,
stateHandleID.toString()), remoteFileHandle);
}
+ }
- LOG.trace("Taking incremental snapshot for checkpoint
{}. Snapshot is based on last completed checkpoint {} " +
- "assuming the following (shared) files as base:
{}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
+ private void createFileHardLinksInRestorePath(
+ Map<StateHandleID, StreamStateHandle> stateHandleMap,
+ Path restoreInstancePath) throws IOException {
- // save meta data
- for (Map.Entry<String, Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
- : stateBackend.kvStateInformation.entrySet()) {
-
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
+ for (StateHandleID stateHandleID :
stateHandleMap.keySet()) {
+ String newSstFileName =
stateHandleID.toString();
+ File restoreFile = new
File(restoreInstancePath.getPath(), newSstFileName);
+ File targetFile = new
File(stateBackend.instanceRocksDBPath, newSstFileName);
+ Files.createLink(targetFile.toPath(),
restoreFile.toPath());
}
+ }
- // save state data
- backupPath = new
Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
+ void restore(Collection<KeyedStateHandle> restoreStateHandles)
throws Exception {
- LOG.trace("Local RocksDB checkpoint goes to backup path
{}.", backupPath);
+ boolean hasExtraKeys = (restoreStateHandles.size() > 1
||
+
!Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(),
stateBackend.keyGroupRange));
- backupFileSystem = backupPath.getFileSystem();
- if (backupFileSystem.exists(backupPath)) {
- throw new IllegalStateException("Unexpected
existence of the backup directory.");
+ if (hasExtraKeys) {
+ stateBackend.createDB();
}
- // create hard links of living files in the checkpoint
path
- Checkpoint checkpoint =
Checkpoint.create(stateBackend.db);
- checkpoint.createCheckpoint(backupPath.getPath());
- }
-
- SnapshotResult<KeyedStateHandle> materializeSnapshot() throws
Exception {
-
-
stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
+ for (KeyedStateHandle rawStateHandle :
restoreStateHandles) {
- // write meta data
- metaStateHandle = materializeMetaData();
-
- // write state data
-
Preconditions.checkState(backupFileSystem.exists(backupPath));
-
- FileStatus[] fileStatuses =
backupFileSystem.listStatus(backupPath);
- if (fileStatuses != null) {
- for (FileStatus fileStatus : fileStatuses) {
- final Path filePath =
fileStatus.getPath();
- final String fileName =
filePath.getName();
- final StateHandleID stateHandleID = new
StateHandleID(fileName);
-
- if (fileName.endsWith(SST_FILE_SUFFIX))
{
- final boolean existsAlready =
- baseSstFiles != null &&
baseSstFiles.contains(stateHandleID);
-
- if (existsAlready) {
- // we introduce a
placeholder state handle, that is replaced with the
- // original from the
shared state registry (created from a previous checkpoint)
- sstFiles.put(
- stateHandleID,
- new
PlaceholderStreamStateHandle());
- } else {
-
sstFiles.put(stateHandleID, materializeStateData(filePath));
- }
- } else {
- StreamStateHandle fileHandle =
materializeStateData(filePath);
- miscFiles.put(stateHandleID,
fileHandle);
- }
+ if (!(rawStateHandle instanceof
IncrementalKeyedStateHandle)) {
+ throw new
IllegalStateException("Unexpected state handle type, " +
+ "expected " +
IncrementalKeyedStateHandle.class +
+ ", but found " +
rawStateHandle.getClass());
}
- }
- synchronized (stateBackend.materializedSstFiles) {
-
stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
+ IncrementalKeyedStateHandle keyedStateHandle =
(IncrementalKeyedStateHandle) rawStateHandle;
+
+ restoreInstance(keyedStateHandle, hasExtraKeys);
}
+ }
+ }
- IncrementalKeyedStateHandle incrementalKeyedStateHandle
= new IncrementalKeyedStateHandle(
- stateBackend.backendUID,
- stateBackend.keyGroupRange,
- checkpointId,
- sstFiles,
- miscFiles,
- metaStateHandle);
+ //
------------------------------------------------------------------------
+ // State factories
+ //
------------------------------------------------------------------------
- return new
SnapshotResult<>(incrementalKeyedStateHandle, null);
- }
+ /**
+ * Creates a column family handle for use with a k/v state. When
restoring from a snapshot
+ * we don't restore the individual k/v states, just the global RocksDB
data base and the
+ * list of column families. When a k/v state is first requested we
check here whether we
+ * already have a column family for that and return it or create a new
one if it doesn't exist.
+ *
+ * <p>This also checks whether the {@link StateDescriptor} for a state
matches the one
+ * that we checkpointed, i.e. is already in the map of column families.
+ */
+ @SuppressWarnings("rawtypes, unchecked")
+ protected <N, S> ColumnFamilyHandle getColumnFamily(
+ StateDescriptor<?, S> descriptor, TypeSerializer<N>
namespaceSerializer) throws IOException, StateMigrationException {
- void stop() {
+ Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
+ kvStateInformation.get(descriptor.getName());
- if
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
- try {
- closeableRegistry.close();
- } catch (IOException e) {
- LOG.warn("Could not properly close io
streams.", e);
- }
- }
- }
+ RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new
RegisteredKeyedBackendStateMetaInfo<>(
+ descriptor.getType(),
+ descriptor.getName(),
+ namespaceSerializer,
+ descriptor.getSerializer());
- void releaseResources(boolean canceled) {
+ if (stateInfo != null) {
+ // TODO with eager registration in place, these checks
should be moved to restore()
- dbLease.close();
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>
restoredMetaInfo =
+
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>)
restoredKvStateMetaInfos.get(descriptor.getName());
- if
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
- try {
- closeableRegistry.close();
- } catch (IOException e) {
- LOG.warn("Exception on closing
registry.", e);
- }
- }
+ Preconditions.checkState(
+ Objects.equals(newMetaInfo.getName(),
restoredMetaInfo.getName()),
+ "Incompatible state names. " +
+ "Was [" + restoredMetaInfo.getName() +
"], " +
+ "registered with [" +
newMetaInfo.getName() + "].");
- if (backupPath != null) {
- try {
- if
(backupFileSystem.exists(backupPath)) {
+ if (!Objects.equals(newMetaInfo.getStateType(),
StateDescriptor.Type.UNKNOWN)
+ &&
!Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN))
{
- LOG.trace("Deleting local
RocksDB backup path {}.", backupPath);
-
backupFileSystem.delete(backupPath, true);
- }
- } catch (Exception e) {
- LOG.warn("Could not properly delete the
checkpoint directory.", e);
- }
+ Preconditions.checkState(
+ newMetaInfo.getStateType() ==
restoredMetaInfo.getStateType(),
+ "Incompatible state types. " +
+ "Was [" +
restoredMetaInfo.getStateType() + "], " +
+ "registered with [" +
newMetaInfo.getStateType() + "].");
}
- if (canceled) {
- Collection<StateObject> statesToDiscard =
- new ArrayList<>(1 + miscFiles.size() +
sstFiles.size());
+ // check compatibility results to determine if state
migration is required
+ CompatibilityResult<N> namespaceCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
+ restoredMetaInfo.getNamespaceSerializer(),
+ null,
+
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
+ newMetaInfo.getNamespaceSerializer());
- statesToDiscard.add(metaStateHandle);
- statesToDiscard.addAll(miscFiles.values());
- statesToDiscard.addAll(sstFiles.values());
+ CompatibilityResult<S> stateCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
+ restoredMetaInfo.getStateSerializer(),
+ UnloadableDummyTypeSerializer.class,
+
restoredMetaInfo.getStateSerializerConfigSnapshot(),
+ newMetaInfo.getStateSerializer());
- try {
-
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
- } catch (Exception e) {
- LOG.warn("Could not properly discard
states.", e);
- }
+ if (namespaceCompatibility.isRequiresMigration() ||
stateCompatibility.isRequiresMigration()) {
+ // TODO state migration currently isn't
possible.
+ throw new StateMigrationException("State
migration isn't supported, yet.");
+ } else {
+ stateInfo.f1 = newMetaInfo;
+ return stateInfo.f0;
}
}
- }
- @Override
- public void restore(StateObjectCollection<KeyedStateHandle>
restoreState) throws Exception {
- LOG.info("Initializing RocksDB keyed state backend from
snapshot.");
+ byte[] nameBytes =
descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
+
Preconditions.checkState(!Arrays.equals(DEFAULT_COLUMN_FAMILY_NAME_BYTES,
nameBytes),
+ "The chosen state name 'default' collides with the name
of the default column family!");
- if (LOG.isDebugEnabled()) {
- LOG.debug("Restoring snapshot from state handles: {}.",
restoreState);
- }
+ ColumnFamilyDescriptor columnDescriptor = new
ColumnFamilyDescriptor(nameBytes, columnOptions);
- // clear all meta data
- kvStateInformation.clear();
- restoredKvStateMetaInfos.clear();
+ final ColumnFamilyHandle columnFamily;
try {
- if (restoreState == null || restoreState.isEmpty()) {
- createDB();
- } else if (restoreState.iterator().next() instanceof
IncrementalKeyedStateHandle) {
- RocksDBIncrementalRestoreOperation<K>
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
- restoreOperation.restore(restoreState);
- } else {
- RocksDBFullRestoreOperation<K> restoreOperation
= new RocksDBFullRestoreOperation<>(this);
- restoreOperation.doRestore(restoreState);
- }
- } catch (Exception ex) {
- dispose();
- throw ex;
+ columnFamily = db.createColumnFamily(columnDescriptor);
+ } catch (RocksDBException e) {
+ throw new IOException("Error creating
ColumnFamilyHandle.", e);
}
+
+ Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
+ new Tuple2<>(columnFamily, newMetaInfo);
+ Map rawAccess = kvStateInformation;
+ rawAccess.put(descriptor.getName(), tuple);
+ return columnFamily;
}
@Override
- public void notifyCheckpointComplete(long completedCheckpointId) {
-
- if (!enableIncrementalCheckpointing) {
- return;
- }
+ protected <N, T> InternalValueState<N, T> createValueState(
+ TypeSerializer<N> namespaceSerializer,
+ ValueStateDescriptor<T> stateDesc) throws Exception {
- synchronized (materializedSstFiles) {
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc,
namespaceSerializer);
- if (completedCheckpointId < lastCompletedCheckpointId) {
- return;
- }
+ return new RocksDBValueState<>(columnFamily,
namespaceSerializer, stateDesc, this);
+ }
- materializedSstFiles.keySet().removeIf(checkpointId ->
checkpointId < completedCheckpointId);
+ @Override
+ protected <N, T> InternalListState<N, T> createListState(
+ TypeSerializer<N> namespaceSerializer,
+ ListStateDescriptor<T> stateDesc) throws Exception {
- lastCompletedCheckpointId = completedCheckpointId;
- }
- }
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc,
namespaceSerializer);
- private void createDB() throws IOException {
- List<ColumnFamilyHandle> columnFamilyHandles = new
ArrayList<>(1);
- this.db = openDB(instanceRocksDBPath.getAbsolutePath(),
Collections.emptyList(), columnFamilyHandles);
- this.defaultColumnFamily = columnFamilyHandles.get(0);
+ return new RocksDBListState<>(columnFamily,
namespaceSerializer, stateDesc, this);
}
- private RocksDB openDB(
- String path,
- List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
- List<ColumnFamilyHandle> stateColumnFamilyHandles) throws
IOException {
+ @Override
+ protected <N, T> InternalReducingState<N, T> createReducingState(
+ TypeSerializer<N> namespaceSerializer,
+ ReducingStateDescriptor<T> stateDesc) throws Exception {
- List<ColumnFamilyDescriptor> columnFamilyDescriptors =
- new ArrayList<>(1 +
stateColumnFamilyDescriptors.size());
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc,
namespaceSerializer);
- columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
+ return new RocksDBReducingState<>(columnFamily,
namespaceSerializer, stateDesc, this);
+ }
- // we add the required descriptor for the default CF in last
position.
- columnFamilyDescriptors.add(new
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
+ @Override
+ protected <N, T, ACC, R> InternalAggregatingState<N, T, R>
createAggregatingState(
+ TypeSerializer<N> namespaceSerializer,
+ AggregatingStateDescriptor<T, ACC, R> stateDesc) throws
Exception {
- RocksDB dbRef;
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc,
namespaceSerializer);
+ return new RocksDBAggregatingState<>(columnFamily,
namespaceSerializer, stateDesc, this);
+ }
- try {
- dbRef = RocksDB.open(
- Preconditions.checkNotNull(dbOptions),
- Preconditions.checkNotNull(path),
- columnFamilyDescriptors,
- stateColumnFamilyHandles);
- } catch (RocksDBException e) {
- throw new IOException("Error while opening RocksDB
instance.", e);
- }
+ @Override
+ protected <N, T, ACC> InternalFoldingState<N, T, ACC>
createFoldingState(
+ TypeSerializer<N> namespaceSerializer,
+ FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
- // requested + default CF
- Preconditions.checkState(1 +
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
- "Not all requested column family handles have been
created");
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc,
namespaceSerializer);
- return dbRef;
+ return new RocksDBFoldingState<>(columnFamily,
namespaceSerializer, stateDesc, this);
}
- /**
- * Encapsulates the process of restoring a RocksDBKeyedStateBackend
from a snapshot.
- */
- static final class RocksDBFullRestoreOperation<K> {
-
- private final RocksDBKeyedStateBackend<K>
rocksDBKeyedStateBackend;
+ @Override
+ protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
+ TypeSerializer<N> namespaceSerializer,
+ MapStateDescriptor<UK, UV> stateDesc) throws Exception {
- /** Current key-groups state handle from which we restore
key-groups. */
- private KeyGroupsStateHandle currentKeyGroupsStateHandle;
- /** Current input stream we obtained from
currentKeyGroupsStateHandle. */
- private FSDataInputStream currentStateHandleInStream;
- /** Current data input view that wraps
currentStateHandleInStream. */
- private DataInputView currentStateHandleInView;
- /** Current list of ColumnFamilyHandles for all column families
we restore from currentKeyGroupsStateHandle. */
- private List<ColumnFamilyHandle>
currentStateHandleKVStateColumnFamilies;
- /** The compression decorator that was used for writing the
state, as determined by the meta data. */
- private StreamCompressionDecorator
keygroupStreamCompressionDecorator;
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc,
namespaceSerializer);
- /**
- * Creates a restore operation object for the given state
backend instance.
- *
- * @param rocksDBKeyedStateBackend the state backend into which
we restore
- */
- public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K>
rocksDBKeyedStateBackend) {
- this.rocksDBKeyedStateBackend =
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
+ return new RocksDBMapState<>(columnFamily, namespaceSerializer,
stateDesc, this);
+ }
+
+ /**
+ * Only visible for testing, DO NOT USE.
+ */
+ public File getInstanceBasePath() {
+ return instanceBasePath;
+ }
+
+ @Override
+ public boolean supportsAsynchronousSnapshots() {
+ return true;
+ }
+
+ @VisibleForTesting
+ @SuppressWarnings("unchecked")
+ @Override
+ public int numStateEntries() {
+ int count = 0;
+
+ for (Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
kvStateInformation.values()) {
+ try (RocksIterator rocksIterator =
db.newIterator(column.f0)) {
+ rocksIterator.seekToFirst();
+
+ while (rocksIterator.isValid()) {
+ count++;
+ rocksIterator.next();
+ }
+ }
}
- /**
- * Restores all key-groups data that is referenced by the
passed state handles.
- *
- * @param keyedStateHandles List of all key groups state
handles that shall be restored.
- */
- public void doRestore(Collection<KeyedStateHandle>
keyedStateHandles)
- throws IOException, StateMigrationException,
RocksDBException {
+ return count;
+ }
- rocksDBKeyedStateBackend.createDB();
- for (KeyedStateHandle keyedStateHandle :
keyedStateHandles) {
- if (keyedStateHandle != null) {
- if (!(keyedStateHandle instanceof
KeyGroupsStateHandle)) {
- throw new
IllegalStateException("Unexpected state handle type, " +
- "expected: " +
KeyGroupsStateHandle.class +
- ", but found: " +
keyedStateHandle.getClass());
+ /**
+ * Iterator that merges multiple RocksDB iterators to partition all
states into contiguous key-groups.
+ * The resulting iteration sequence is ordered by (key-group, kv-state).
+ */
+ @VisibleForTesting
+ static final class RocksDBMergeIterator implements AutoCloseable {
+
+ private final PriorityQueue<MergeIterator> heap;
+ private final int keyGroupPrefixByteCount;
+ private boolean newKeyGroup;
+ private boolean newKVState;
+ private boolean valid;
+
+ private MergeIterator currentSubIterator;
+
+ private static final List<Comparator<MergeIterator>>
COMPARATORS;
+
+ static {
+ int maxBytes = 4;
+ COMPARATORS = new ArrayList<>(maxBytes);
+ for (int i = 0; i < maxBytes; ++i) {
+ final int currentBytes = i;
+ COMPARATORS.add(new Comparator<MergeIterator>()
{
+ @Override
+ public int compare(MergeIterator o1,
MergeIterator o2) {
+ int arrayCmpRes =
compareKeyGroupsForByteArrays(
+ o1.currentKey,
o2.currentKey, currentBytes);
+ return arrayCmpRes == 0 ?
o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
}
- this.currentKeyGroupsStateHandle =
(KeyGroupsStateHandle) keyedStateHandle;
- restoreKeyGroupsInStateHandle();
- }
+ });
}
}
- /**
- * Restore one key groups state handle.
- */
- private void restoreKeyGroupsInStateHandle()
- throws IOException, StateMigrationException,
RocksDBException {
- try {
- currentStateHandleInStream =
currentKeyGroupsStateHandle.openInputStream();
-
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
- currentStateHandleInView = new
DataInputViewStreamWrapper(currentStateHandleInStream);
- restoreKVStateMetaData();
- restoreKVStateData();
- } finally {
- if
(rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
{
-
IOUtils.closeQuietly(currentStateHandleInStream);
+ RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>>
kvStateIterators, final int keyGroupPrefixByteCount) {
+ Preconditions.checkNotNull(kvStateIterators);
+ this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+ Comparator<MergeIterator> iteratorComparator =
COMPARATORS.get(keyGroupPrefixByteCount);
+
+ if (kvStateIterators.size() > 0) {
+ PriorityQueue<MergeIterator>
iteratorPriorityQueue =
+ new
PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
+
+ for (Tuple2<RocksIterator, Integer>
rocksIteratorWithKVStateId : kvStateIterators) {
+ final RocksIterator rocksIterator =
rocksIteratorWithKVStateId.f0;
+ rocksIterator.seekToFirst();
+ if (rocksIterator.isValid()) {
+ iteratorPriorityQueue.offer(new
MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+ } else {
+
IOUtils.closeQuietly(rocksIterator);
+ }
}
+
+ kvStateIterators.clear();
+
+ this.heap = iteratorPriorityQueue;
+ this.valid = !heap.isEmpty();
+ this.currentSubIterator = heap.poll();
+ } else {
+ // creating a PriorityQueue of size 0 results
in an exception.
+ this.heap = null;
+ this.valid = false;
}
+
+ this.newKeyGroup = true;
+ this.newKVState = true;
}
/**
- * Restore the KV-state / ColumnFamily meta data for all
key-groups referenced by the current state handle.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws RocksDBException
+ * Advance the iterator. Should only be called if {@link
#isValid()} returned true. Valid can only chance after
+ * calls to {@link #next()}.
*/
- private void restoreKVStateMetaData() throws IOException,
StateMigrationException, RocksDBException {
+ public void next() {
+ newKeyGroup = false;
+ newKVState = false;
- KeyedBackendSerializationProxy<K> serializationProxy =
- new
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
+ final RocksIterator rocksIterator =
currentSubIterator.getIterator();
+ rocksIterator.next();
- serializationProxy.read(currentStateHandleInView);
+ byte[] oldKey = currentSubIterator.getCurrentKey();
+ if (rocksIterator.isValid()) {
+ currentSubIterator.currentKey =
rocksIterator.key();
- // check for key serializer compatibility; this also
reconfigures the
- // key serializer to be compatible, if it is required
and is possible
- if (CompatibilityUtil.resolveCompatibilityResult(
- serializationProxy.getKeySerializer(),
- UnloadableDummyTypeSerializer.class,
-
serializationProxy.getKeySerializerConfigSnapshot(),
- rocksDBKeyedStateBackend.keySerializer)
- .isRequiresMigration()) {
+ if (isDifferentKeyGroup(oldKey,
currentSubIterator.getCurrentKey())) {
+ heap.offer(currentSubIterator);
+ currentSubIterator = heap.poll();
+ newKVState =
currentSubIterator.getIterator() != rocksIterator;
+ detectNewKeyGroup(oldKey);
+ }
+ } else {
+ IOUtils.closeQuietly(rocksIterator);
- // TODO replace with state migration; note that
key hash codes need to remain the same after migration
- throw new StateMigrationException("The new key
serializer is not compatible to read previous keys. " +
- "Aborting now since state migration is
currently not available");
+ if (heap.isEmpty()) {
+ currentSubIterator = null;
+ valid = false;
+ } else {
+ currentSubIterator = heap.poll();
+ newKVState = true;
+ detectNewKeyGroup(oldKey);
+ }
}
+ }
- this.keygroupStreamCompressionDecorator =
serializationProxy.isUsingKeyGroupCompression() ?
- SnappyStreamCompressionDecorator.INSTANCE :
UncompressedStreamCompressionDecorator.INSTANCE;
-
- List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?,
?>> restoredMetaInfos =
- serializationProxy.getStateMetaInfoSnapshots();
- currentStateHandleKVStateColumnFamilies = new
ArrayList<>(restoredMetaInfos.size());
- //rocksDBKeyedStateBackend.restoredKvStateMetaInfos =
new HashMap<>(restoredMetaInfos.size());
+ private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
+ return 0 != compareKeyGroupsForByteArrays(a, b,
keyGroupPrefixByteCount);
+ }
- for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>
restoredMetaInfo : restoredMetaInfos) {
+ private void detectNewKeyGroup(byte[] oldKey) {
+ if (isDifferentKeyGroup(oldKey,
currentSubIterator.currentKey)) {
+ newKeyGroup = true;
+ }
+ }
- Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
-
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
+ /**
+ * Returns the key-group for the current key.
+ * @return key-group for the current key
+ */
+ public int keyGroup() {
+ int result = 0;
+ //big endian decode
+ for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
+ result <<= 8;
+ result |= (currentSubIterator.currentKey[i] &
0xFF);
+ }
+ return result;
+ }
- if (registeredColumn == null) {
- byte[] nameBytes =
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
+ public byte[] key() {
+ return currentSubIterator.getCurrentKey();
+ }
- ColumnFamilyDescriptor
columnFamilyDescriptor = new ColumnFamilyDescriptor(
- nameBytes,
-
rocksDBKeyedStateBackend.columnOptions);
+ public byte[] value() {
+ return currentSubIterator.getIterator().value();
+ }
- RegisteredKeyedBackendStateMetaInfo<?,
?> stateMetaInfo =
- new
RegisteredKeyedBackendStateMetaInfo<>(
-
restoredMetaInfo.getStateType(),
-
restoredMetaInfo.getName(),
-
restoredMetaInfo.getNamespaceSerializer(),
-
restoredMetaInfo.getStateSerializer());
+ /**
+ * Returns the Id of the k/v state to which the current key
belongs.
+ * @return Id of K/V state to which the current key belongs.
+ */
+ public int kvStateId() {
+ return currentSubIterator.getKvStateId();
+ }
-
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restored
--- End diff --
I skimmed over the changes and I assume that you've mostly copied the code
from one place to another without bigger changes. But it's really hard to see
any real changes due to the combined movement of the code.
---