mayuehappy commented on code in PR #24031:
URL: https://github.com/apache/flink/pull/24031#discussion_r1481216701


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##########
@@ -310,98 +322,275 @@ private void 
restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
                 
.map(StateHandleDownloadSpec::createLocalStateHandleForDownloadedState)
                 .forEach(localKeyedStateHandles::add);
 
+        // Transfer remaining key-groups from temporary instance into base DB
+        byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
+        CompositeKeySerializationUtils.serializeKeyGroup(
+                keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
+
+        byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
+        CompositeKeySerializationUtils.serializeKeyGroup(
+                keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
+
+        try {
+            // Process all state downloads
+            transferRemoteStateToLocalDirectory(allDownloadSpecs);
+
+            if (useIngestDbRestoreMode) {
+                // Optimized path for merging multiple handles with Ingest/Clip
+                rescaleClipIngestDB(
+                        localKeyedStateHandles, startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes);
+            } else {
+                // Optimized path for single handle and legacy path for 
merging multiple handles.
+                rescaleCopyFromTemporaryInstance(
+                        localKeyedStateHandles, startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes);
+            }
+
+        } finally {
+            // Cleanup all download directories
+            allDownloadSpecs.stream()
+                    .map(StateHandleDownloadSpec::getDownloadDestination)
+                    .forEach(this::cleanUpPathQuietly);
+        }
+    }
+
+    private void rescaleCopyFromTemporaryInstance(
+            Collection<IncrementalLocalKeyedStateHandle> 
localKeyedStateHandles,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
+            throws Exception {
+
         // Choose the best state handle for the initial DB
         final IncrementalLocalKeyedStateHandle selectedInitialHandle =
                 
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
                         localKeyedStateHandles, keyGroupRange, 
overlapFractionThreshold);
+
         Preconditions.checkNotNull(selectedInitialHandle);
+
         // Remove the selected handle from the list so that we don't restore 
it twice.
         localKeyedStateHandles.remove(selectedInitialHandle);
 
+        // Init the base DB instance with the initial state
+        initBaseDBForRescaling(selectedInitialHandle);
+
+        try (RocksDBWriteBatchWrapper writeBatchWrapper =
+                new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), 
writeBatchSize)) {
+            for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
+                try (RestoredDBInstance tmpRestoreDBInfo =
+                        restoreTempDBInstanceFromLocalState(stateHandle)) {
+                    copyTempDbIntoBaseDb(
+                            tmpRestoreDBInfo,
+                            writeBatchWrapper,
+                            startKeyGroupPrefixBytes,
+                            stopKeyGroupPrefixBytes);
+                }
+            }
+        }
+    }
+
+    private void copyTempDbIntoBaseDb(
+            RestoredDBInstance tmpRestoreDBInfo,
+            RocksDBWriteBatchWrapper writeBatchWrapper,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
+            throws Exception {
+
+        logger.info(
+                "Starting copy of state handle {} to base DB for rescaling.",
+                tmpRestoreDBInfo.srcStateHandle);
+
+        List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
+                tmpRestoreDBInfo.columnFamilyDescriptors;
+        List<ColumnFamilyHandle> tmpColumnFamilyHandles = 
tmpRestoreDBInfo.columnFamilyHandles;
+
+        // iterating only the requested descriptors automatically skips the 
default
+        // column
+        // family handle
+        for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); 
++descIdx) {
+            ColumnFamilyHandle tmpColumnFamilyHandle = 
tmpColumnFamilyHandles.get(descIdx);
+
+            ColumnFamilyHandle targetColumnFamilyHandle =
+                    this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
+                                    null, 
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx))
+                            .columnFamilyHandle;
+
+            try (RocksIteratorWrapper iterator =
+                    RocksDBOperationUtils.getRocksIterator(
+                            tmpRestoreDBInfo.db,
+                            tmpColumnFamilyHandle,
+                            tmpRestoreDBInfo.readOptions)) {
+
+                iterator.seek(startKeyGroupPrefixBytes);
+
+                while (iterator.isValid()) {
+
+                    if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
+                            iterator.key(), stopKeyGroupPrefixBytes)) {
+                        writeBatchWrapper.put(
+                                targetColumnFamilyHandle, iterator.key(), 
iterator.value());
+                    } else {
+                        // Since the iterator will visit the record according 
to the
+                        // sorted
+                        // order,
+                        // we can just break here.
+                        break;
+                    }
+
+                    iterator.next();
+                }
+            } // releases native iterator resources
+        }
+        logger.info(
+                "Finished copy of state handle {} to base DB for rescaling.",
+                tmpRestoreDBInfo.srcStateHandle);
+    }
+
+    /**
+     * Recovery from multi incremental states with rescaling. For rescaling, 
this method creates a
+     * temporary RocksDB instance for a key-groups shard. All contents from 
the temporary instance
+     * are copied into the real restore instance and then the temporary 
instance is discarded.
+     */
+    private void rescaleClipIngestDB(
+            Collection<IncrementalLocalKeyedStateHandle> 
localKeyedStateHandles,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
+            throws Exception {
+
+        final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
+        final Path exportCfBasePath = 
absolutInstanceBasePath.resolve("export-cfs");
+        Files.createDirectories(exportCfBasePath);
+
+        final Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>>
+                exportedColumnFamilyMetaData = new HashMap<>();
+
+        final List<IncrementalLocalKeyedStateHandle> restoreHandles;
+        if (localKeyedStateHandles instanceof ArrayList) {
+            restoreHandles = (List<IncrementalLocalKeyedStateHandle>) 
localKeyedStateHandles;
+        } else {
+            restoreHandles = new ArrayList<>(localKeyedStateHandles);
+        }
+
+        // This buffer holds open temporary RocksDB instances that we create 
from individual handles
+        // and acts as a lookahead buffer, so that we don't need to 
immediately give up on
+        // import/export when we encounter a database that exceeds the 
proclaimed key-groups range
+        // of its source state handle.
+        final ArrayDeque<RestoredDBInstance> tmpInstanceBuffer =
+                new ArrayDeque<>(incrementalRestoreInstanceBufferSize);
+
         try {
-            // Process all state downloads
-            transferRemoteStateToLocalDirectory(allDownloadSpecs);
+            int restoreIndex = 0;
+            // Initially, we skip the last handle for the case that we don't 
find any temporary DB
+            // that can be imported and must resort to the copy approach. Then 
we can open the base
+            // DB from the skipped handle and save one copy.
+            int skipLastHandle = 1;
+            while (restoreIndex < (restoreHandles.size() - skipLastHandle)
+                    && tmpInstanceBuffer.size() < 
incrementalRestoreInstanceBufferSize) {
 
-            // Init the base DB instance with the initial state
-            initBaseDBForRescaling(selectedInitialHandle);
+                IncrementalLocalKeyedStateHandle stateHandle = 
restoreHandles.get(restoreIndex++);
 
-            // Transfer remaining key-groups from temporary instance into base 
DB
-            byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
-            CompositeKeySerializationUtils.serializeKeyGroup(
-                    keyGroupRange.getStartKeyGroup(), 
startKeyGroupPrefixBytes);
+                RestoredDBInstance tmpRestoreDBInfo =
+                        restoreTempDBInstanceFromLocalState(stateHandle);
 
-            byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
-            CompositeKeySerializationUtils.serializeKeyGroup(
-                    keyGroupRange.getEndKeyGroup() + 1, 
stopKeyGroupPrefixBytes);
+                tmpInstanceBuffer.add(tmpRestoreDBInfo);
 
-            // Insert all remaining state through creating temporary RocksDB 
instances
-            for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
-                logger.info(
-                        "Starting to restore from state handle: {} with 
rescaling.", stateHandle);
+                List<ColumnFamilyHandle> tmpColumnFamilyHandles =
+                        tmpRestoreDBInfo.columnFamilyHandles;
+
+                // Check if the data in all SST files referenced in the handle 
is within the
+                // proclaimed key-groups range of the handle.
+                if (RocksDBIncrementalCheckpointUtils.isSstDataInKeyGroupRange(
+                        tmpRestoreDBInfo.db, keyGroupPrefixBytes, 
stateHandle.getKeyGroupRange())) {
 
-                try (RestoredDBInstance tmpRestoreDBInfo =
-                                
restoreTempDBInstanceFromLocalState(stateHandle);
-                        RocksDBWriteBatchWrapper writeBatchWrapper =
-                                new RocksDBWriteBatchWrapper(
-                                        this.rocksHandle.getDb(), 
writeBatchSize)) {
-
-                    List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
-                            tmpRestoreDBInfo.columnFamilyDescriptors;
-                    List<ColumnFamilyHandle> tmpColumnFamilyHandles =
-                            tmpRestoreDBInfo.columnFamilyHandles;
-
-                    // iterating only the requested descriptors automatically 
skips the default
-                    // column
-                    // family handle
-                    for (int descIdx = 0; descIdx < 
tmpColumnFamilyDescriptors.size(); ++descIdx) {
-                        ColumnFamilyHandle tmpColumnFamilyHandle =
-                                tmpColumnFamilyHandles.get(descIdx);
-
-                        ColumnFamilyHandle targetColumnFamilyHandle =
-                                
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
-                                                null,
-                                                
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(
-                                                        descIdx))
-                                        .columnFamilyHandle;
-
-                        try (RocksIteratorWrapper iterator =
-                                RocksDBOperationUtils.getRocksIterator(
-                                        tmpRestoreDBInfo.db,
-                                        tmpColumnFamilyHandle,
-                                        tmpRestoreDBInfo.readOptions)) {
-
-                            iterator.seek(startKeyGroupPrefixBytes);
-
-                            while (iterator.isValid()) {
-
-                                if 
(RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
-                                        iterator.key(), 
stopKeyGroupPrefixBytes)) {
-                                    writeBatchWrapper.put(
-                                            targetColumnFamilyHandle,
-                                            iterator.key(),
-                                            iterator.value());
-                                } else {
-                                    // Since the iterator will visit the 
record according to the
-                                    // sorted
-                                    // order,
-                                    // we can just break here.
-                                    break;
-                                }
-
-                                iterator.next();
-                            }
-                        } // releases native iterator resources
-                    }
                     logger.info(
-                            "Finished restoring from state handle: {} with 
rescaling.",
+                            "Exporting state handle {} for rescaling with 
Clip/Ingest DB.",
                             stateHandle);
+                    // Use Range delete to clip the temp db to the target 
range of the backend
+                    RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(

Review Comment:
   Another point is, if these handles do not overlaps, can we first import them 
before clip them with deleteRange.
   Because in current code, first we deleteRange and then exports, each DB will 
generate a new small file containing the RangeDeletion tombstone during export. 
So can we deleteRange together after importing, so that we can reduce the 
number of small files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to