StefanRRichter commented on code in PR #24031:
URL: https://github.com/apache/flink/pull/24031#discussion_r1447132521
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##########
@@ -329,74 +332,164 @@ private void
restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
CompositeKeySerializationUtils.serializeKeyGroup(
keyGroupRange.getEndKeyGroup() + 1,
stopKeyGroupPrefixBytes);
- // Insert all remaining state through creating temporary RocksDB
instances
+ rescalingRestoreFromLocalStateOperation.accept(
+ localKeyedStateHandles, startKeyGroupPrefixBytes,
stopKeyGroupPrefixBytes);
+ } finally {
+ // Cleanup all download directories
+ allDownloadSpecs.stream()
+ .map(StateHandleDownloadSpec::getDownloadDestination)
+ .forEach(this::cleanUpPathQuietly);
+ }
+ }
+
+ private void rescaleCopyFromTemporaryInstance(
+ Collection<IncrementalLocalKeyedStateHandle>
localKeyedStateHandles,
+ byte[] startKeyGroupPrefixBytes,
+ byte[] stopKeyGroupPrefixBytes)
+ throws Exception {
+
+ // Choose the best state handle for the initial DB
+ final IncrementalLocalKeyedStateHandle selectedInitialHandle =
+
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
+ localKeyedStateHandles, keyGroupRange,
overlapFractionThreshold);
+
+ Preconditions.checkNotNull(selectedInitialHandle);
+
+ // Remove the selected handle from the list so that we don't restore
it twice.
+ localKeyedStateHandles.remove(selectedInitialHandle);
+
+ // Init the base DB instance with the initial state
+ initBaseDBForRescaling(selectedInitialHandle);
+
+ for (IncrementalLocalKeyedStateHandle stateHandle :
localKeyedStateHandles) {
+ logger.info("Starting to restore from state handle: {} with
rescaling.", stateHandle);
+
+ try (RestoredDBInstance tmpRestoreDBInfo =
+ restoreTempDBInstanceFromLocalState(stateHandle);
+ RocksDBWriteBatchWrapper writeBatchWrapper =
+ new RocksDBWriteBatchWrapper(
+ this.rocksHandle.getDb(), writeBatchSize))
{
+
+ List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
+ tmpRestoreDBInfo.columnFamilyDescriptors;
+ List<ColumnFamilyHandle> tmpColumnFamilyHandles =
+ tmpRestoreDBInfo.columnFamilyHandles;
+
+ // iterating only the requested descriptors automatically
skips the default
+ // column
+ // family handle
+ for (int descIdx = 0; descIdx <
tmpColumnFamilyDescriptors.size(); ++descIdx) {
+ ColumnFamilyHandle tmpColumnFamilyHandle =
tmpColumnFamilyHandles.get(descIdx);
+
+ ColumnFamilyHandle targetColumnFamilyHandle =
+
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
+ null,
+
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx))
+ .columnFamilyHandle;
+
+ try (RocksIteratorWrapper iterator =
+ RocksDBOperationUtils.getRocksIterator(
+ tmpRestoreDBInfo.db,
+ tmpColumnFamilyHandle,
+ tmpRestoreDBInfo.readOptions)) {
+
+ iterator.seek(startKeyGroupPrefixBytes);
+
+ while (iterator.isValid()) {
+
+ if
(RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
+ iterator.key(), stopKeyGroupPrefixBytes)) {
+ writeBatchWrapper.put(
+ targetColumnFamilyHandle,
iterator.key(), iterator.value());
+ } else {
+ // Since the iterator will visit the record
according to the
+ // sorted
+ // order,
+ // we can just break here.
+ break;
+ }
+
+ iterator.next();
+ }
+ } // releases native iterator resources
+ }
+ logger.info(
+ "Finished restoring from state handle: {} with
rescaling.", stateHandle);
+ }
+ }
+ }
+
+ /**
+ * Recovery from multi incremental states with rescaling. For rescaling,
this method creates a
+ * temporary RocksDB instance for a key-groups shard. All contents from
the temporary instance
+ * are copied into the real restore instance and then the temporary
instance is discarded.
+ */
+ private void rescaleClipIngestDB(
+ Collection<IncrementalLocalKeyedStateHandle>
localKeyedStateHandles,
+ byte[] startKeyGroupPrefixBytes,
+ byte[] stopKeyGroupPrefixBytes)
+ throws Exception {
+
+ final Path absolutInstanceBasePath =
instanceBasePath.getAbsoluteFile().toPath();
+ final Path exportCfBasePath =
absolutInstanceBasePath.resolve("export-cfs");
Review Comment:
The folder is temporary and the files will be hard-linked iirc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]