carp84 commented on a change in pull request #7674: [FLINK-10043] [State
Backends] Refactor RocksDBKeyedStateBackend object
construction/initialization/restore code
URL: https://github.com/apache/flink/pull/7674#discussion_r257748314
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
##########
@@ -510,831 +505,92 @@ public WriteOptions getWriteOptions() {
return snapshotRunner;
}
- @Override
- public void restore(Collection<KeyedStateHandle> restoreState) throws
Exception {
-
- LOG.info("Initializing RocksDB keyed state backend.");
-
- // clear all meta data
- kvStateInformation.clear();
-
- try {
- RocksDBIncrementalRestoreOperation<K>
incrementalRestoreOperation = null;
- if (restoreState == null || restoreState.isEmpty()) {
- createDB();
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Restoring snapshot from
state handles: {}, will use {} thread(s) to download files from DFS.",
restoreState, numberOfTransferingThreads);
- }
-
- KeyedStateHandle firstStateHandle =
restoreState.iterator().next();
- if (firstStateHandle instanceof
IncrementalKeyedStateHandle
- || firstStateHandle instanceof
IncrementalLocalKeyedStateHandle) {
- incrementalRestoreOperation = new
RocksDBIncrementalRestoreOperation<>(this);
-
incrementalRestoreOperation.restore(restoreState);
- } else {
- RocksDBFullRestoreOperation<K>
fullRestoreOperation = new RocksDBFullRestoreOperation<>(this);
-
fullRestoreOperation.doRestore(restoreState);
- }
- }
-
- // it is important that we only create the key builder
after the restore, and not before;
- // restore operations may reconfigure the key
serializer, so accessing the key serializer
- // only now we can be certain that the key serializer
used in the builder is final.
- this.sharedRocksKeyBuilder = new
RocksDBSerializedCompositeKeyBuilder<>(
- getKeySerializer(),
+ private RocksDBRestoreOperation<K> getRocksDBRestoreOperation(int
keyGroupPrefixBytes,
+
CloseableRegistry
cancelStreamRegistry,
+
LinkedHashMap<String,
StateColumnFamilyHandle> kvStateInformation,
+
RocksDBNativeMetricMonitor
nativeMetricMonitor,
+
Collection<KeyedStateHandle>
restoreState) {
+ File instanceRocksDBPath = new File(instanceBasePath,
DB_INSTANCE_DIR_STRING);
+ if (restoreState == null || restoreState.isEmpty()) {
+ return new RocksDBNoneRestoreOperation<>(
+ keyGroupRange,
keyGroupPrefixBytes,
- 32);
-
- initializeSnapshotStrategy(incrementalRestoreOperation);
- } catch (Exception ex) {
- dispose();
- throw ex;
- }
- }
-
- @VisibleForTesting
- void initializeSnapshotStrategy(@Nullable
RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation) {
-
- this.savepointSnapshotStrategy =
- new RocksFullSnapshotStrategy<>(
- db,
- rocksDBResourceGuard,
- getKeySerializer(),
+ numberOfTransferingThreads,
+ cancelStreamRegistry,
+ userCodeClassLoader,
kvStateInformation,
+ keySerializerProvider,
+ instanceBasePath,
+ instanceRocksDBPath,
+ dbOptions,
+ columnOptions,
+ nativeMetricMonitor);
+ }
+ KeyedStateHandle firstStateHandle =
restoreState.iterator().next();
+ boolean isIncrementalStateHandle = (firstStateHandle instanceof
IncrementalKeyedStateHandle)
+ || (firstStateHandle instanceof
IncrementalLocalKeyedStateHandle);
+ if (isIncrementalStateHandle) {
+ return new RocksDBIncrementalRestoreOperation<>(
+ operatorIdentifier,
keyGroupRange,
keyGroupPrefixBytes,
- localRecoveryConfig,
+ numberOfTransferingThreads,
cancelStreamRegistry,
- keyGroupCompressionDecorator);
-
- if (enableIncrementalCheckpointing) {
- final UUID backendUID;
- final SortedMap<Long, Set<StateHandleID>>
materializedSstFiles;
- final long lastCompletedCheckpointId;
-
- if (incrementalRestoreOperation == null) {
- backendUID = UUID.randomUUID();
- materializedSstFiles = new TreeMap<>();
- lastCompletedCheckpointId = -1L;
- } else {
- backendUID =
Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredBackendUID());
- materializedSstFiles =
Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredSstFiles());
- lastCompletedCheckpointId =
incrementalRestoreOperation.getLastCompletedCheckpointId();
-
Preconditions.checkState(lastCompletedCheckpointId >= 0L);
- }
- // TODO eventually we might want to separate savepoint
and snapshot strategy, i.e. having 2 strategies.
- this.checkpointSnapshotStrategy = new
RocksIncrementalSnapshotStrategy<>(
- db,
- rocksDBResourceGuard,
- getKeySerializer(),
+ userCodeClassLoader,
kvStateInformation,
+ keySerializerProvider,
+ instanceBasePath,
+ instanceRocksDBPath,
+ dbOptions,
+ columnOptions,
+ nativeMetricMonitor);
+ } else {
+ return new RocksDBFullRestoreOperation<>(
keyGroupRange,
keyGroupPrefixBytes,
- localRecoveryConfig,
+ numberOfTransferingThreads,
cancelStreamRegistry,
+ userCodeClassLoader,
+ kvStateInformation,
+ keySerializerProvider,
instanceBasePath,
- backendUID,
- materializedSstFiles,
- lastCompletedCheckpointId,
- numberOfTransferingThreads);
- } else {
- this.checkpointSnapshotStrategy =
savepointSnapshotStrategy;
+ instanceRocksDBPath,
+ dbOptions,
+ columnOptions,
+ nativeMetricMonitor);
}
}
@Override
- public void notifyCheckpointComplete(long completedCheckpointId) throws
Exception {
-
- if (checkpointSnapshotStrategy != null) {
-
checkpointSnapshotStrategy.notifyCheckpointComplete(completedCheckpointId);
- }
+ public void restore(Collection<KeyedStateHandle> restoreState) throws
Exception {
- if (savepointSnapshotStrategy != null) {
-
savepointSnapshotStrategy.notifyCheckpointComplete(completedCheckpointId);
+ if (restoreState == null) {
+ return;
}
- }
-
- private void createDB() throws IOException {
- List<ColumnFamilyHandle> columnFamilyHandles = new
ArrayList<>(1);
- this.db = openDB(instanceRocksDBPath.getAbsolutePath(),
Collections.emptyList(), columnFamilyHandles);
- this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db,
writeOptions);
- this.defaultColumnFamily = columnFamilyHandles.get(0);
- }
-
- private RocksDB openDB(
- String path,
- List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
- List<ColumnFamilyHandle> stateColumnFamilyHandles) throws
IOException {
-
- List<ColumnFamilyDescriptor> columnFamilyDescriptors =
- new ArrayList<>(1 +
stateColumnFamilyDescriptors.size());
-
- // we add the required descriptor for the default CF in FIRST
position, see
- //
https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
- columnFamilyDescriptors.add(new
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions));
- columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
-
- RocksDB dbRef;
+ LOG.info("Restoring RocksDB keyed state backend.");
try {
- dbRef = RocksDB.open(
- Preconditions.checkNotNull(dbOptions),
- Preconditions.checkNotNull(path),
- columnFamilyDescriptors,
- stateColumnFamilyHandles);
- } catch (RocksDBException e) {
- throw new IOException("Error while opening RocksDB
instance.", e);
- }
-
- // requested + default CF
- Preconditions.checkState(1 +
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
- "Not all requested column family handles have been
created");
-
- if (this.metricOptions.isEnabled()) {
- this.nativeMetricMonitor = new
RocksDBNativeMetricMonitor(
- dbRef,
- metricOptions,
- metricGroup
- );
- }
-
- return dbRef;
- }
-
- /**
- * Encapsulates the process of restoring a RocksDBKeyedStateBackend
from a full snapshot.
- */
- private static final class RocksDBFullRestoreOperation<K> {
-
- private final RocksDBKeyedStateBackend<K>
rocksDBKeyedStateBackend;
-
- /** Current key-groups state handle from which we restore
key-groups. */
- private KeyGroupsStateHandle currentKeyGroupsStateHandle;
- /** Current input stream we obtained from
currentKeyGroupsStateHandle. */
- private FSDataInputStream currentStateHandleInStream;
- /** Current data input view that wraps
currentStateHandleInStream. */
- private DataInputView currentStateHandleInView;
- /** Current list of ColumnFamilyHandles for all column families
we restore from currentKeyGroupsStateHandle. */
- private List<ColumnFamilyHandle>
currentStateHandleKVStateColumnFamilies;
- /** The compression decorator that was used for writing the
state, as determined by the meta data. */
- private StreamCompressionDecorator
keygroupStreamCompressionDecorator;
-
- private boolean isKeySerializerCompatibilityChecked;
-
- /**
- * Creates a restore operation object for the given state
backend instance.
- *
- * @param rocksDBKeyedStateBackend the state backend into which
we restore
- */
- RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K>
rocksDBKeyedStateBackend) {
- this.rocksDBKeyedStateBackend =
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
- }
-
- /**
- * Restores all key-groups data that is referenced by the
passed state handles.
- *
- * @param keyedStateHandles List of all key groups state
handles that shall be restored.
- */
- void doRestore(Collection<KeyedStateHandle> keyedStateHandles)
- throws IOException, StateMigrationException,
RocksDBException {
-
- rocksDBKeyedStateBackend.createDB();
-
- for (KeyedStateHandle keyedStateHandle :
keyedStateHandles) {
- if (keyedStateHandle != null) {
-
- if (!(keyedStateHandle instanceof
KeyGroupsStateHandle)) {
- throw new
IllegalStateException("Unexpected state handle type, " +
- "expected: " +
KeyGroupsStateHandle.class +
- ", but found: " +
keyedStateHandle.getClass());
- }
- this.currentKeyGroupsStateHandle =
(KeyGroupsStateHandle) keyedStateHandle;
- restoreKeyGroupsInStateHandle();
- }
- }
- }
-
- /**
- * Restore one key groups state handle.
- */
- private void restoreKeyGroupsInStateHandle()
- throws IOException, StateMigrationException,
RocksDBException {
- try {
- currentStateHandleInStream =
currentKeyGroupsStateHandle.openInputStream();
-
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
- currentStateHandleInView = new
DataInputViewStreamWrapper(currentStateHandleInStream);
- restoreKVStateMetaData();
- restoreKVStateData();
- } finally {
- if
(rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
{
-
IOUtils.closeQuietly(currentStateHandleInStream);
- }
- }
- }
-
- /**
- * Restore the KV-state / ColumnFamily meta data for all
key-groups referenced by the current state handle.
- */
- private void restoreKVStateMetaData() throws IOException,
StateMigrationException, RocksDBException {
-
- // isSerializerPresenceRequired flag is set to false,
since for the RocksDB state backend,
- // deserialization of state happens lazily during
runtime; we depend on the fact
- // that the new serializer for states could be
compatible, and therefore the restore can continue
- // without old serializers required to be present.
- KeyedBackendSerializationProxy<K> serializationProxy =
- new
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
-
- serializationProxy.read(currentStateHandleInView);
-
- if (!isKeySerializerCompatibilityChecked) {
- // check for key serializer compatibility; this
also reconfigures the
- // key serializer to be compatible, if it is
required and is possible
- TypeSerializerSchemaCompatibility<K>
keySerializerSchemaCompat =
-
rocksDBKeyedStateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerSnapshot());
- if
(keySerializerSchemaCompat.isCompatibleAfterMigration() ||
keySerializerSchemaCompat.isIncompatible()) {
- throw new StateMigrationException("The
new key serializer must be compatible.");
- }
-
- isKeySerializerCompatibilityChecked = true;
- }
-
- this.keygroupStreamCompressionDecorator =
serializationProxy.isUsingKeyGroupCompression() ?
- SnappyStreamCompressionDecorator.INSTANCE :
UncompressedStreamCompressionDecorator.INSTANCE;
-
- List<StateMetaInfoSnapshot> restoredMetaInfos =
- serializationProxy.getStateMetaInfoSnapshots();
- currentStateHandleKVStateColumnFamilies = new
ArrayList<>(restoredMetaInfos.size());
-
- for (StateMetaInfoSnapshot restoredMetaInfo :
restoredMetaInfos) {
-
- Tuple2<ColumnFamilyHandle,
RegisteredStateMetaInfoBase> registeredColumn =
-
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
-
- if (registeredColumn == null) {
- byte[] nameBytes =
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
-
- ColumnFamilyDescriptor
columnFamilyDescriptor = new ColumnFamilyDescriptor(
- nameBytes,
-
rocksDBKeyedStateBackend.columnOptions);
-
- ColumnFamilyHandle columnFamily =
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
-
- // create a meta info for the state on
restore;
- // this allows us to retain the state
in future snapshots even if it wasn't accessed
- RegisteredStateMetaInfoBase
stateMetaInfo =
-
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo);
- registeredColumn = new
Tuple2<>(columnFamily, stateMetaInfo);
-
rocksDBKeyedStateBackend.kvStateInformation.put(restoredMetaInfo.getName(),
registeredColumn);
-
- } else {
- // TODO with eager state registration
in place, check here for serializer migration strategies
- }
-
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
- }
- }
-
- /**
- * Restore the KV-state / ColumnFamily data for all key-groups
referenced by the current state handle.
- */
- private void restoreKVStateData() throws IOException,
RocksDBException {
- //for all key-groups in the current state handle...
- try (RocksDBWriteBatchWrapper writeBatchWrapper = new
RocksDBWriteBatchWrapper(rocksDBKeyedStateBackend.db)) {
- for (Tuple2<Integer, Long> keyGroupOffset :
currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
- int keyGroup = keyGroupOffset.f0;
-
- // Check that restored key groups all
belong to the backend
-
Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
- "The key group must belong to
the backend");
-
- long offset = keyGroupOffset.f1;
- //not empty key-group?
- if (0L != offset) {
-
currentStateHandleInStream.seek(offset);
- try (InputStream compressedKgIn
=
keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
{
-
DataInputViewStreamWrapper compressedKgInputView = new
DataInputViewStreamWrapper(compressedKgIn);
- //TODO this could be
aware of keyGroupPrefixBytes and write only one byte if possible
- int kvStateId =
compressedKgInputView.readShort();
- ColumnFamilyHandle
handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
- //insert all k/v pairs
into DB
- boolean
keyGroupHasMoreKeys = true;
- while
(keyGroupHasMoreKeys) {
- byte[] key =
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
- byte[] value =
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
- if
(hasMetaDataFollowsFlag(key)) {
- //clear
the signal bit in the key to make it ready for insertion again
-
clearMetaDataFollowsFlag(key);
-
writeBatchWrapper.put(handle, key, value);
- //TODO
this could be aware of keyGroupPrefixBytes and write only one byte if possible
-
kvStateId = END_OF_KEY_GROUP_MARK
-
& compressedKgInputView.readShort();
- if
(END_OF_KEY_GROUP_MARK == kvStateId) {
-
keyGroupHasMoreKeys = false;
- } else {
-
handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
- }
- } else {
-
writeBatchWrapper.put(handle, key, value);
- }
- }
- }
- }
- }
- }
+ RocksDBRestoreOperation<K> restoreOperation =
getRocksDBRestoreOperation(
Review comment:
For the current codes, yes I agree, while my questions is actually whether
we need to reserve it for future usage, although I don't know what exactly the
requirement will be. Or maybe we could add it back whenever the need emerges,
won't be hard anyway (smile).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services