StefanRRichter commented on code in PR #24031:
URL: https://github.com/apache/flink/pull/24031#discussion_r1481300226
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##########
@@ -310,98 +322,275 @@ private void
restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
.map(StateHandleDownloadSpec::createLocalStateHandleForDownloadedState)
.forEach(localKeyedStateHandles::add);
+ // Transfer remaining key-groups from temporary instance into base DB
+ byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
+ CompositeKeySerializationUtils.serializeKeyGroup(
+ keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
+
+ byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
+ CompositeKeySerializationUtils.serializeKeyGroup(
+ keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
+
+ try {
+ // Process all state downloads
+ transferRemoteStateToLocalDirectory(allDownloadSpecs);
+
+ if (useIngestDbRestoreMode) {
+ // Optimized path for merging multiple handles with Ingest/Clip
+ rescaleClipIngestDB(
+ localKeyedStateHandles, startKeyGroupPrefixBytes,
stopKeyGroupPrefixBytes);
+ } else {
+ // Optimized path for single handle and legacy path for
merging multiple handles.
+ rescaleCopyFromTemporaryInstance(
+ localKeyedStateHandles, startKeyGroupPrefixBytes,
stopKeyGroupPrefixBytes);
+ }
+
+ } finally {
+ // Cleanup all download directories
+ allDownloadSpecs.stream()
+ .map(StateHandleDownloadSpec::getDownloadDestination)
+ .forEach(this::cleanUpPathQuietly);
+ }
+ }
+
+ private void rescaleCopyFromTemporaryInstance(
+ Collection<IncrementalLocalKeyedStateHandle>
localKeyedStateHandles,
+ byte[] startKeyGroupPrefixBytes,
+ byte[] stopKeyGroupPrefixBytes)
+ throws Exception {
+
// Choose the best state handle for the initial DB
final IncrementalLocalKeyedStateHandle selectedInitialHandle =
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
localKeyedStateHandles, keyGroupRange,
overlapFractionThreshold);
+
Preconditions.checkNotNull(selectedInitialHandle);
+
// Remove the selected handle from the list so that we don't restore
it twice.
localKeyedStateHandles.remove(selectedInitialHandle);
+ // Init the base DB instance with the initial state
+ initBaseDBForRescaling(selectedInitialHandle);
+
+ try (RocksDBWriteBatchWrapper writeBatchWrapper =
+ new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(),
writeBatchSize)) {
+ for (IncrementalLocalKeyedStateHandle stateHandle :
localKeyedStateHandles) {
+ try (RestoredDBInstance tmpRestoreDBInfo =
+ restoreTempDBInstanceFromLocalState(stateHandle)) {
+ copyTempDbIntoBaseDb(
+ tmpRestoreDBInfo,
+ writeBatchWrapper,
+ startKeyGroupPrefixBytes,
+ stopKeyGroupPrefixBytes);
+ }
+ }
+ }
+ }
+
+ private void copyTempDbIntoBaseDb(
+ RestoredDBInstance tmpRestoreDBInfo,
+ RocksDBWriteBatchWrapper writeBatchWrapper,
+ byte[] startKeyGroupPrefixBytes,
+ byte[] stopKeyGroupPrefixBytes)
+ throws Exception {
+
+ logger.info(
+ "Starting copy of state handle {} to base DB for rescaling.",
+ tmpRestoreDBInfo.srcStateHandle);
+
+ List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
+ tmpRestoreDBInfo.columnFamilyDescriptors;
+ List<ColumnFamilyHandle> tmpColumnFamilyHandles =
tmpRestoreDBInfo.columnFamilyHandles;
+
+ // iterating only the requested descriptors automatically skips the
default
+ // column
+ // family handle
+ for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size();
++descIdx) {
+ ColumnFamilyHandle tmpColumnFamilyHandle =
tmpColumnFamilyHandles.get(descIdx);
+
+ ColumnFamilyHandle targetColumnFamilyHandle =
+ this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
+ null,
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx))
+ .columnFamilyHandle;
+
+ try (RocksIteratorWrapper iterator =
+ RocksDBOperationUtils.getRocksIterator(
+ tmpRestoreDBInfo.db,
+ tmpColumnFamilyHandle,
+ tmpRestoreDBInfo.readOptions)) {
+
+ iterator.seek(startKeyGroupPrefixBytes);
+
+ while (iterator.isValid()) {
+
+ if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
+ iterator.key(), stopKeyGroupPrefixBytes)) {
+ writeBatchWrapper.put(
+ targetColumnFamilyHandle, iterator.key(),
iterator.value());
+ } else {
+ // Since the iterator will visit the record according
to the
+ // sorted
+ // order,
+ // we can just break here.
+ break;
+ }
+
+ iterator.next();
+ }
+ } // releases native iterator resources
+ }
+ logger.info(
+ "Finished copy of state handle {} to base DB for rescaling.",
+ tmpRestoreDBInfo.srcStateHandle);
+ }
+
+ /**
+ * Recovery from multi incremental states with rescaling. For rescaling,
this method creates a
+ * temporary RocksDB instance for a key-groups shard. All contents from
the temporary instance
+ * are copied into the real restore instance and then the temporary
instance is discarded.
+ */
+ private void rescaleClipIngestDB(
+ Collection<IncrementalLocalKeyedStateHandle>
localKeyedStateHandles,
+ byte[] startKeyGroupPrefixBytes,
+ byte[] stopKeyGroupPrefixBytes)
+ throws Exception {
+
+ final Path absolutInstanceBasePath =
instanceBasePath.getAbsoluteFile().toPath();
+ final Path exportCfBasePath =
absolutInstanceBasePath.resolve("export-cfs");
+ Files.createDirectories(exportCfBasePath);
+
+ final Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>>
+ exportedColumnFamilyMetaData = new HashMap<>();
+
+ final List<IncrementalLocalKeyedStateHandle> restoreHandles;
+ if (localKeyedStateHandles instanceof ArrayList) {
+ restoreHandles = (List<IncrementalLocalKeyedStateHandle>)
localKeyedStateHandles;
+ } else {
+ restoreHandles = new ArrayList<>(localKeyedStateHandles);
+ }
+
+ // This buffer holds open temporary RocksDB instances that we create
from individual handles
+ // and acts as a lookahead buffer, so that we don't need to
immediately give up on
+ // import/export when we encounter a database that exceeds the
proclaimed key-groups range
+ // of its source state handle.
+ final ArrayDeque<RestoredDBInstance> tmpInstanceBuffer =
+ new ArrayDeque<>(incrementalRestoreInstanceBufferSize);
+
try {
- // Process all state downloads
- transferRemoteStateToLocalDirectory(allDownloadSpecs);
+ int restoreIndex = 0;
+ // Initially, we skip the last handle for the case that we don't
find any temporary DB
+ // that can be imported and must resort to the copy approach. Then
we can open the base
+ // DB from the skipped handle and save one copy.
+ int skipLastHandle = 1;
+ while (restoreIndex < (restoreHandles.size() - skipLastHandle)
+ && tmpInstanceBuffer.size() <
incrementalRestoreInstanceBufferSize) {
- // Init the base DB instance with the initial state
- initBaseDBForRescaling(selectedInitialHandle);
+ IncrementalLocalKeyedStateHandle stateHandle =
restoreHandles.get(restoreIndex++);
- // Transfer remaining key-groups from temporary instance into base
DB
- byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
- CompositeKeySerializationUtils.serializeKeyGroup(
- keyGroupRange.getStartKeyGroup(),
startKeyGroupPrefixBytes);
+ RestoredDBInstance tmpRestoreDBInfo =
+ restoreTempDBInstanceFromLocalState(stateHandle);
- byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
- CompositeKeySerializationUtils.serializeKeyGroup(
- keyGroupRange.getEndKeyGroup() + 1,
stopKeyGroupPrefixBytes);
+ tmpInstanceBuffer.add(tmpRestoreDBInfo);
- // Insert all remaining state through creating temporary RocksDB
instances
- for (IncrementalLocalKeyedStateHandle stateHandle :
localKeyedStateHandles) {
- logger.info(
- "Starting to restore from state handle: {} with
rescaling.", stateHandle);
+ List<ColumnFamilyHandle> tmpColumnFamilyHandles =
+ tmpRestoreDBInfo.columnFamilyHandles;
+
+ // Check if the data in all SST files referenced in the handle
is within the
+ // proclaimed key-groups range of the handle.
+ if (RocksDBIncrementalCheckpointUtils.isSstDataInKeyGroupRange(
+ tmpRestoreDBInfo.db, keyGroupPrefixBytes,
stateHandle.getKeyGroupRange())) {
- try (RestoredDBInstance tmpRestoreDBInfo =
-
restoreTempDBInstanceFromLocalState(stateHandle);
- RocksDBWriteBatchWrapper writeBatchWrapper =
- new RocksDBWriteBatchWrapper(
- this.rocksHandle.getDb(),
writeBatchSize)) {
-
- List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
- tmpRestoreDBInfo.columnFamilyDescriptors;
- List<ColumnFamilyHandle> tmpColumnFamilyHandles =
- tmpRestoreDBInfo.columnFamilyHandles;
-
- // iterating only the requested descriptors automatically
skips the default
- // column
- // family handle
- for (int descIdx = 0; descIdx <
tmpColumnFamilyDescriptors.size(); ++descIdx) {
- ColumnFamilyHandle tmpColumnFamilyHandle =
- tmpColumnFamilyHandles.get(descIdx);
-
- ColumnFamilyHandle targetColumnFamilyHandle =
-
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
- null,
-
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(
- descIdx))
- .columnFamilyHandle;
-
- try (RocksIteratorWrapper iterator =
- RocksDBOperationUtils.getRocksIterator(
- tmpRestoreDBInfo.db,
- tmpColumnFamilyHandle,
- tmpRestoreDBInfo.readOptions)) {
-
- iterator.seek(startKeyGroupPrefixBytes);
-
- while (iterator.isValid()) {
-
- if
(RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
- iterator.key(),
stopKeyGroupPrefixBytes)) {
- writeBatchWrapper.put(
- targetColumnFamilyHandle,
- iterator.key(),
- iterator.value());
- } else {
- // Since the iterator will visit the
record according to the
- // sorted
- // order,
- // we can just break here.
- break;
- }
-
- iterator.next();
- }
- } // releases native iterator resources
- }
logger.info(
- "Finished restoring from state handle: {} with
rescaling.",
+ "Exporting state handle {} for rescaling with
Clip/Ingest DB.",
stateHandle);
+ // Use Range delete to clip the temp db to the target
range of the backend
+ RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
Review Comment:
I changed the code accordingly. It didn't work at first because the import
code didn't add the ColumnFamilyHandles to the DbHandle. Now it seems to work :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]