StefanRRichter commented on code in PR #24031:
URL: https://github.com/apache/flink/pull/24031#discussion_r1447131761


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##########
@@ -329,74 +332,164 @@ private void 
restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
             CompositeKeySerializationUtils.serializeKeyGroup(
                     keyGroupRange.getEndKeyGroup() + 1, 
stopKeyGroupPrefixBytes);
 
-            // Insert all remaining state through creating temporary RocksDB 
instances
+            rescalingRestoreFromLocalStateOperation.accept(
+                    localKeyedStateHandles, startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes);
+        } finally {
+            // Cleanup all download directories
+            allDownloadSpecs.stream()
+                    .map(StateHandleDownloadSpec::getDownloadDestination)
+                    .forEach(this::cleanUpPathQuietly);
+        }
+    }
+
+    private void rescaleCopyFromTemporaryInstance(
+            Collection<IncrementalLocalKeyedStateHandle> 
localKeyedStateHandles,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
+            throws Exception {
+
+        // Choose the best state handle for the initial DB
+        final IncrementalLocalKeyedStateHandle selectedInitialHandle =
+                
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
+                        localKeyedStateHandles, keyGroupRange, 
overlapFractionThreshold);
+
+        Preconditions.checkNotNull(selectedInitialHandle);
+
+        // Remove the selected handle from the list so that we don't restore 
it twice.
+        localKeyedStateHandles.remove(selectedInitialHandle);
+
+        // Init the base DB instance with the initial state
+        initBaseDBForRescaling(selectedInitialHandle);
+
+        for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
+            logger.info("Starting to restore from state handle: {} with 
rescaling.", stateHandle);
+
+            try (RestoredDBInstance tmpRestoreDBInfo =
+                            restoreTempDBInstanceFromLocalState(stateHandle);
+                    RocksDBWriteBatchWrapper writeBatchWrapper =
+                            new RocksDBWriteBatchWrapper(
+                                    this.rocksHandle.getDb(), writeBatchSize)) 
{
+
+                List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
+                        tmpRestoreDBInfo.columnFamilyDescriptors;
+                List<ColumnFamilyHandle> tmpColumnFamilyHandles =
+                        tmpRestoreDBInfo.columnFamilyHandles;
+
+                // iterating only the requested descriptors automatically 
skips the default
+                // column
+                // family handle
+                for (int descIdx = 0; descIdx < 
tmpColumnFamilyDescriptors.size(); ++descIdx) {
+                    ColumnFamilyHandle tmpColumnFamilyHandle = 
tmpColumnFamilyHandles.get(descIdx);
+
+                    ColumnFamilyHandle targetColumnFamilyHandle =
+                            
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
+                                            null,
+                                            
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx))
+                                    .columnFamilyHandle;
+
+                    try (RocksIteratorWrapper iterator =
+                            RocksDBOperationUtils.getRocksIterator(
+                                    tmpRestoreDBInfo.db,
+                                    tmpColumnFamilyHandle,
+                                    tmpRestoreDBInfo.readOptions)) {
+
+                        iterator.seek(startKeyGroupPrefixBytes);
+
+                        while (iterator.isValid()) {
+
+                            if 
(RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
+                                    iterator.key(), stopKeyGroupPrefixBytes)) {
+                                writeBatchWrapper.put(
+                                        targetColumnFamilyHandle, 
iterator.key(), iterator.value());
+                            } else {
+                                // Since the iterator will visit the record 
according to the
+                                // sorted
+                                // order,
+                                // we can just break here.
+                                break;
+                            }
+
+                            iterator.next();
+                        }
+                    } // releases native iterator resources
+                }
+                logger.info(
+                        "Finished restoring from state handle: {} with 
rescaling.", stateHandle);
+            }
+        }
+    }
+
+    /**
+     * Recovery from multi incremental states with rescaling. For rescaling, 
this method creates a
+     * temporary RocksDB instance for a key-groups shard. All contents from 
the temporary instance
+     * are copied into the real restore instance and then the temporary 
instance is discarded.
+     */
+    private void rescaleClipIngestDB(
+            Collection<IncrementalLocalKeyedStateHandle> 
localKeyedStateHandles,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
+            throws Exception {
+
+        final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
+        final Path exportCfBasePath = 
absolutInstanceBasePath.resolve("export-cfs");
+        Files.createDirectories(exportCfBasePath);
+
+        final Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>>
+                columnFamilyMetaDataToImport = new HashMap<>();
+
+        try {
             for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
                 logger.info(
-                        "Starting to restore from state handle: {} with 
rescaling.", stateHandle);
+                        "Starting to restore from state handle: {} with 
rescaling using Clip/Ingest DB.",
+                        stateHandle);
 
                 try (RestoredDBInstance tmpRestoreDBInfo =
-                                
restoreTempDBInstanceFromLocalState(stateHandle);
-                        RocksDBWriteBatchWrapper writeBatchWrapper =
-                                new RocksDBWriteBatchWrapper(
-                                        this.rocksHandle.getDb(), 
writeBatchSize)) {
+                        restoreTempDBInstanceFromLocalState(stateHandle)) {
 
-                    List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
-                            tmpRestoreDBInfo.columnFamilyDescriptors;
                     List<ColumnFamilyHandle> tmpColumnFamilyHandles =
                             tmpRestoreDBInfo.columnFamilyHandles;
 
-                    // iterating only the requested descriptors automatically 
skips the default
-                    // column
-                    // family handle
-                    for (int descIdx = 0; descIdx < 
tmpColumnFamilyDescriptors.size(); ++descIdx) {
-                        ColumnFamilyHandle tmpColumnFamilyHandle =
-                                tmpColumnFamilyHandles.get(descIdx);
-
-                        ColumnFamilyHandle targetColumnFamilyHandle =
-                                
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
-                                                null,
-                                                
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(
-                                                        descIdx))
-                                        .columnFamilyHandle;
-
-                        try (RocksIteratorWrapper iterator =
-                                RocksDBOperationUtils.getRocksIterator(
-                                        tmpRestoreDBInfo.db,
-                                        tmpColumnFamilyHandle,
-                                        tmpRestoreDBInfo.readOptions)) {
-
-                            iterator.seek(startKeyGroupPrefixBytes);
-
-                            while (iterator.isValid()) {
-
-                                if 
(RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
-                                        iterator.key(), 
stopKeyGroupPrefixBytes)) {
-                                    writeBatchWrapper.put(
-                                            targetColumnFamilyHandle,
-                                            iterator.key(),
-                                            iterator.value());
-                                } else {
-                                    // Since the iterator will visit the 
record according to the
-                                    // sorted
-                                    // order,
-                                    // we can just break here.
-                                    break;
-                                }
-
-                                iterator.next();
-                            }
-                        } // releases native iterator resources
+                    // Clip all tmp db to Range [startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes)
+                    RocksDBIncrementalCheckpointUtils.clipColumnFamilies(
+                            tmpRestoreDBInfo.db,
+                            tmpColumnFamilyHandles,
+                            startKeyGroupPrefixBytes,
+                            stopKeyGroupPrefixBytes);
+
+                    // Export all the Column Families
+                    List<Pair<RegisteredStateMetaInfoBase, 
ExportImportFilesMetaData>>
+                            exportedCFAndMetaData =
+                                    
RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
+                                            tmpRestoreDBInfo.db,
+                                            tmpColumnFamilyHandles,
+                                            
tmpRestoreDBInfo.stateMetaInfoSnapshots,
+                                            exportCfBasePath);
+
+                    for (Pair<RegisteredStateMetaInfoBase, 
ExportImportFilesMetaData> entry :
+                            exportedCFAndMetaData) {
+                        ExportImportFilesMetaData cfMetaData = 
entry.getValue();
+                        // TODO: method files() doesn't exist in the RocksDB 
API
+                        //                        if 
(cfMetaData.files().isEmpty()) {
+                        //                            continue;
+                        //                        }
+                        columnFamilyMetaDataToImport
+                                .computeIfAbsent(entry.getKey(), (k) -> new 
ArrayList<>())
+                                .add(cfMetaData);
                     }
-                    logger.info(
-                            "Finished restoring from state handle: {} with 
rescaling.",
-                            stateHandle);
                 }
+                logger.info(
+                        "Finished exporting column family from state handle: 
{} for rescaling.",
+                        stateHandle);
             }
+
+            // Open the target RocksDB and import the exported column families
+            this.rocksHandle.openDB();
+            columnFamilyMetaDataToImport.forEach(
+                    
this.rocksHandle::registerStateColumnFamilyHandleWithImport);
+            logger.info(
+                    "Finished importing exported column families into target 
DB for rescaling.");
         } finally {
-            // Cleanup all download directories
-            allDownloadSpecs.stream()
-                    .map(StateHandleDownloadSpec::getDownloadDestination)
-                    .forEach(this::cleanUpPathQuietly);
+            cleanUpPathQuietly(exportCfBasePath);

Review Comment:
   No, I think the files are hard-linked in the directory of the created 
instance to which we import.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to