StefanRRichter commented on code in PR #24031:
URL: https://github.com/apache/flink/pull/24031#discussion_r1442957056
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java:
##########
@@ -414,90 +424,70 @@ private void
restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
* temporary RocksDB instance for a key-groups shard. All contents from
the temporary instance
* are copied into the real restore instance and then the temporary
instance is discarded.
*/
- private void restoreWithIngestDbMode(Collection<KeyedStateHandle>
restoreStateHandles)
+ private void rescaleClipIngestDB(
+ Collection<IncrementalLocalKeyedStateHandle>
localKeyedStateHandles,
+ byte[] startKeyGroupPrefixBytes,
+ byte[] stopKeyGroupPrefixBytes)
throws Exception {
- Preconditions.checkArgument(restoreStateHandles != null &&
!restoreStateHandles.isEmpty());
-
- Map<StateHandleID, StateHandleDownloadSpec> allDownloadSpecs =
-
CollectionUtil.newHashMapWithExpectedSize(restoreStateHandles.size());
-
final Path absolutInstanceBasePath =
instanceBasePath.getAbsoluteFile().toPath();
final Path exportCfBasePath =
absolutInstanceBasePath.resolve("export-cfs");
Files.createDirectories(exportCfBasePath);
- // Open base db as Empty DB
- this.rocksHandle.openDB();
-
- // Prepare and collect all the download request to pull remote state
to a local directory
- for (KeyedStateHandle stateHandle : restoreStateHandles) {
- if (!(stateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
- throw unexpectedStateHandleException(
- IncrementalRemoteKeyedStateHandle.class,
stateHandle.getClass());
- }
- StateHandleDownloadSpec downloadRequest =
- new StateHandleDownloadSpec(
- (IncrementalRemoteKeyedStateHandle) stateHandle,
-
absolutInstanceBasePath.resolve(UUID.randomUUID().toString()));
- allDownloadSpecs.put(stateHandle.getStateHandleId(),
downloadRequest);
- }
-
- // Process all state downloads
- transferRemoteStateToLocalDirectory(allDownloadSpecs.values());
-
- // Transfer remaining key-groups from temporary instance into base DB
- byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
- CompositeKeySerializationUtils.serializeKeyGroup(
- keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
+ final Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>>
+ columnFamilyMetaDataToImport = new HashMap<>();
- byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
- CompositeKeySerializationUtils.serializeKeyGroup(
- keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
-
- HashMap<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>>
cfMetaDataToImport =
- new HashMap();
-
- // Insert all remaining state through creating temporary RocksDB
instances
- for (StateHandleDownloadSpec downloadRequest :
allDownloadSpecs.values()) {
- logger.info(
- "Starting to restore from state handle: {} with
rescaling.",
- downloadRequest.getStateHandle());
+ try {
+ for (IncrementalLocalKeyedStateHandle stateHandle :
localKeyedStateHandles) {
+ logger.info(
+ "Starting to restore from state handle: {} with
rescaling using Clip/Ingest DB.",
+ stateHandle);
- try (RestoredDBInstance tmpRestoreDBInfo =
- restoreTempDBInstanceFromDownloadedState(downloadRequest))
{
+ try (RestoredDBInstance tmpRestoreDBInfo =
+ restoreTempDBInstanceFromLocalState(stateHandle)) {
- List<ColumnFamilyHandle> tmpColumnFamilyHandles =
- tmpRestoreDBInfo.columnFamilyHandles;
+ List<ColumnFamilyHandle> tmpColumnFamilyHandles =
+ tmpRestoreDBInfo.columnFamilyHandles;
- // Clip all tmp db to Range [startKeyGroupPrefixBytes,
stopKeyGroupPrefixBytes)
- RocksDBIncrementalCheckpointUtils.clipColumnFamilies(
- tmpRestoreDBInfo.db,
- tmpColumnFamilyHandles,
- startKeyGroupPrefixBytes,
- stopKeyGroupPrefixBytes);
-
- // Export all the Column Families
- Map<RegisteredStateMetaInfoBase, ExportImportFilesMetaData>
exportedCFAndMetaData =
- RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
- tmpRestoreDBInfo.db,
- tmpColumnFamilyHandles,
- tmpRestoreDBInfo.stateMetaInfoSnapshots,
- exportCfBasePath);
-
- exportedCFAndMetaData.forEach(
- (stateMeta, cfMetaData) -> {
- if (!cfMetaData.files().isEmpty()) {
- cfMetaDataToImport.putIfAbsent(stateMeta, new
ArrayList<>());
-
cfMetaDataToImport.get(stateMeta).add(cfMetaData);
- }
- });
- } finally {
- cleanUpPathQuietly(downloadRequest.getDownloadDestination());
+ // Clip all tmp db to Range [startKeyGroupPrefixBytes,
stopKeyGroupPrefixBytes)
+ RocksDBIncrementalCheckpointUtils.clipColumnFamilies(
+ tmpRestoreDBInfo.db,
+ tmpColumnFamilyHandles,
+ startKeyGroupPrefixBytes,
+ stopKeyGroupPrefixBytes);
+
+ // Export all the Column Families
+ List<Pair<RegisteredStateMetaInfoBase,
ExportImportFilesMetaData>>
+ exportedCFAndMetaData =
+
RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
+ tmpRestoreDBInfo.db,
+ tmpColumnFamilyHandles,
+
tmpRestoreDBInfo.stateMetaInfoSnapshots,
+ exportCfBasePath);
+
+ for (Pair<RegisteredStateMetaInfoBase,
ExportImportFilesMetaData> entry :
+ exportedCFAndMetaData) {
+ ExportImportFilesMetaData cfMetaData =
entry.getValue();
+ // TODO: method files() doesn't exist in the RocksDB
API
Review Comment:
@mayuehappy Is it correct to remove this code? I could not find a files()
method in the RocksDB API.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]