StefanRRichter commented on code in PR #24031:
URL: https://github.com/apache/flink/pull/24031#discussion_r1447132521


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##########
@@ -329,74 +332,164 @@ private void 
restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
             CompositeKeySerializationUtils.serializeKeyGroup(
                     keyGroupRange.getEndKeyGroup() + 1, 
stopKeyGroupPrefixBytes);
 
-            // Insert all remaining state through creating temporary RocksDB 
instances
+            rescalingRestoreFromLocalStateOperation.accept(
+                    localKeyedStateHandles, startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes);
+        } finally {
+            // Cleanup all download directories
+            allDownloadSpecs.stream()
+                    .map(StateHandleDownloadSpec::getDownloadDestination)
+                    .forEach(this::cleanUpPathQuietly);
+        }
+    }
+
+    private void rescaleCopyFromTemporaryInstance(
+            Collection<IncrementalLocalKeyedStateHandle> 
localKeyedStateHandles,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
+            throws Exception {
+
+        // Choose the best state handle for the initial DB
+        final IncrementalLocalKeyedStateHandle selectedInitialHandle =
+                
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
+                        localKeyedStateHandles, keyGroupRange, 
overlapFractionThreshold);
+
+        Preconditions.checkNotNull(selectedInitialHandle);
+
+        // Remove the selected handle from the list so that we don't restore 
it twice.
+        localKeyedStateHandles.remove(selectedInitialHandle);
+
+        // Init the base DB instance with the initial state
+        initBaseDBForRescaling(selectedInitialHandle);
+
+        for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
+            logger.info("Starting to restore from state handle: {} with 
rescaling.", stateHandle);
+
+            try (RestoredDBInstance tmpRestoreDBInfo =
+                            restoreTempDBInstanceFromLocalState(stateHandle);
+                    RocksDBWriteBatchWrapper writeBatchWrapper =
+                            new RocksDBWriteBatchWrapper(
+                                    this.rocksHandle.getDb(), writeBatchSize)) 
{
+
+                List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
+                        tmpRestoreDBInfo.columnFamilyDescriptors;
+                List<ColumnFamilyHandle> tmpColumnFamilyHandles =
+                        tmpRestoreDBInfo.columnFamilyHandles;
+
+                // iterating only the requested descriptors automatically 
skips the default
+                // column
+                // family handle
+                for (int descIdx = 0; descIdx < 
tmpColumnFamilyDescriptors.size(); ++descIdx) {
+                    ColumnFamilyHandle tmpColumnFamilyHandle = 
tmpColumnFamilyHandles.get(descIdx);
+
+                    ColumnFamilyHandle targetColumnFamilyHandle =
+                            
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
+                                            null,
+                                            
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx))
+                                    .columnFamilyHandle;
+
+                    try (RocksIteratorWrapper iterator =
+                            RocksDBOperationUtils.getRocksIterator(
+                                    tmpRestoreDBInfo.db,
+                                    tmpColumnFamilyHandle,
+                                    tmpRestoreDBInfo.readOptions)) {
+
+                        iterator.seek(startKeyGroupPrefixBytes);
+
+                        while (iterator.isValid()) {
+
+                            if 
(RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
+                                    iterator.key(), stopKeyGroupPrefixBytes)) {
+                                writeBatchWrapper.put(
+                                        targetColumnFamilyHandle, 
iterator.key(), iterator.value());
+                            } else {
+                                // Since the iterator will visit the record 
according to the
+                                // sorted
+                                // order,
+                                // we can just break here.
+                                break;
+                            }
+
+                            iterator.next();
+                        }
+                    } // releases native iterator resources
+                }
+                logger.info(
+                        "Finished restoring from state handle: {} with 
rescaling.", stateHandle);
+            }
+        }
+    }
+
+    /**
+     * Recovery from multi incremental states with rescaling. For rescaling, 
this method creates a
+     * temporary RocksDB instance for a key-groups shard. All contents from 
the temporary instance
+     * are copied into the real restore instance and then the temporary 
instance is discarded.
+     */
+    private void rescaleClipIngestDB(
+            Collection<IncrementalLocalKeyedStateHandle> 
localKeyedStateHandles,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
+            throws Exception {
+
+        final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
+        final Path exportCfBasePath = 
absolutInstanceBasePath.resolve("export-cfs");

Review Comment:
   The folder is temporary and the files will be hard-linked iirc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to