This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
commit fac89ba7dcceab782dec211bcdc47982b54081c9 Author: Rui Fan <[email protected]> AuthorDate: Fri Sep 26 12:37:52 2025 +0200 [FLINK-38415][refactor] Extract single state handle processing logic for testability --- .../restore/DistributeStateHandlerHelper.java | 175 +++++++++++++++ .../state/rocksdb/restore/RestoredDBInstance.java | 162 ++++++++++++++ .../RocksDBIncrementalRestoreOperation.java | 235 +++++---------------- 3 files changed, 390 insertions(+), 182 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelper.java new file mode 100644 index 00000000000..2cb8c1a708f --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelper.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.rocksdb.restore; + +import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.state.rocksdb.RocksDBIncrementalCheckpointUtils; +import org.apache.flink.state.rocksdb.ttl.RocksDbTtlCompactFiltersManager; +import org.apache.flink.types.Either; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.ExportImportFilesMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Helper class for distributing state handle data during RocksDB incremental restore. This class + * encapsulates the logic for processing a single state handle. + */ +public class DistributeStateHandlerHelper implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(DistributeStateHandlerHelper.class); + + private final IncrementalLocalKeyedStateHandle stateHandle; + private final RestoredDBInstance restoredDbInstance; + private final int keyGroupPrefixBytes; + private final KeyGroupRange keyGroupRange; + private final String operatorIdentifier; + private final int index; + + /** + * Creates a helper for processing a single state handle. The database instance is created in + * the constructor to enable proper resource management and separation of concerns. + * + * @param stateHandle the state handle to process + * @param columnFamilyOptionsFactory factory for creating column family options + * @param dbOptions database options + * @param ttlCompactFiltersManager TTL compact filters manager (can be null) + * @param writeBufferManagerCapacity write buffer manager capacity (can be null) + * @param keyGroupPrefixBytes number of key group prefix bytes for SST file range checking + * @param keyGroupRange target key group range (for logging) + * @param operatorIdentifier operator identifier (for logging) + * @param index current processing index (for logging) + * @throws Exception on any database opening error + */ + public DistributeStateHandlerHelper( + IncrementalLocalKeyedStateHandle stateHandle, + List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, + Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, + DBOptions dbOptions, + RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, + Long writeBufferManagerCapacity, + int keyGroupPrefixBytes, + KeyGroupRange keyGroupRange, + String operatorIdentifier, + int index) + throws Exception { + this.stateHandle = stateHandle; + this.keyGroupPrefixBytes = keyGroupPrefixBytes; + this.keyGroupRange = keyGroupRange; + this.operatorIdentifier = operatorIdentifier; + this.index = index; + + final String logLineSuffix = createLogLineSuffix(); + + LOG.debug("Opening temporary database : {}", logLineSuffix); + + // Open database using restored instance helper method + this.restoredDbInstance = + RestoredDBInstance.restoreTempDBInstanceFromLocalState( + stateHandle, + stateMetaInfoSnapshots, + columnFamilyOptionsFactory, + dbOptions, + ttlCompactFiltersManager, + writeBufferManagerCapacity); + } + + /** + * Distributes state handle data by checking SST file ranges and exporting column families. + * Returns Left if successfully exported, Right if the handle was skipped. + * + * @param exportCfBasePath base path for export + * @param exportedColumnFamiliesOut output parameter for exported column families + * @return Either.Left containing key group range if successfully exported, Either.Right + * containing the skipped state handle otherwise + * @throws Exception on any export error + */ + public Either<KeyGroupRange, IncrementalLocalKeyedStateHandle> tryDistribute( + Path exportCfBasePath, + Map<RegisteredStateMetaInfoBase.Key, List<ExportImportFilesMetaData>> + exportedColumnFamiliesOut) + throws Exception { + + final String logLineSuffix = createLogLineSuffix(); + + List<ColumnFamilyHandle> tmpColumnFamilyHandles = restoredDbInstance.columnFamilyHandles; + + LOG.debug("Checking actual keys of sst files {}", logLineSuffix); + + // Check SST file range + RocksDBIncrementalCheckpointUtils.RangeCheckResult rangeCheckResult = + RocksDBIncrementalCheckpointUtils.checkSstDataAgainstKeyGroupRange( + restoredDbInstance.db, keyGroupPrefixBytes, stateHandle.getKeyGroupRange()); + + LOG.info("{} {}", rangeCheckResult, logLineSuffix); + + if (rangeCheckResult.allInRange()) { + LOG.debug("Start exporting {}", logLineSuffix); + + List<RegisteredStateMetaInfoBase> registeredStateMetaInfoBases = + restoredDbInstance.stateMetaInfoSnapshots.stream() + .map(RegisteredStateMetaInfoBase::fromMetaInfoSnapshot) + .collect(Collectors.toList()); + + // Export all the Column Families and store the result in exportedColumnFamiliesOut + RocksDBIncrementalCheckpointUtils.exportColumnFamilies( + restoredDbInstance.db, + tmpColumnFamilyHandles, + registeredStateMetaInfoBases, + exportCfBasePath, + exportedColumnFamiliesOut); + + LOG.debug("Done exporting {}", logLineSuffix); + return Either.Left(stateHandle.getKeyGroupRange()); + } else { + LOG.debug("Skipped export {}", logLineSuffix); + return Either.Right(stateHandle); + } + } + + @Override + public void close() throws Exception { + restoredDbInstance.close(); + } + + /** Creates a consistent log line suffix for logging operations. */ + private String createLogLineSuffix() { + return " for state handle at index " + + index + + " with proclaimed key-group range " + + stateHandle.getKeyGroupRange().prettyPrintInterval() + + " for backend with range " + + keyGroupRange.prettyPrintInterval() + + " in operator " + + operatorIdentifier + + "."; + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java new file mode 100644 index 00000000000..afbe27bce9c --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.rocksdb.restore; + +import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; +import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.state.rocksdb.RocksDBOperationUtils; +import org.apache.flink.state.rocksdb.ttl.RocksDbTtlCompactFiltersManager; +import org.apache.flink.util.IOUtils; + +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; + +import javax.annotation.Nonnull; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +/** Restored DB instance containing all necessary handles and metadata. */ +public class RestoredDBInstance implements AutoCloseable { + + @Nonnull public final RocksDB db; + @Nonnull public final ColumnFamilyHandle defaultColumnFamilyHandle; + @Nonnull public final List<ColumnFamilyHandle> columnFamilyHandles; + @Nonnull public final List<ColumnFamilyDescriptor> columnFamilyDescriptors; + @Nonnull public final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; + public final ReadOptions readOptions; + public final IncrementalLocalKeyedStateHandle srcStateHandle; + + public RestoredDBInstance( + @Nonnull RocksDB db, + @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, + @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors, + @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, + IncrementalLocalKeyedStateHandle srcStateHandle) { + this.db = db; + this.defaultColumnFamilyHandle = columnFamilyHandles.remove(0); + this.columnFamilyHandles = columnFamilyHandles; + this.columnFamilyDescriptors = columnFamilyDescriptors; + this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; + this.readOptions = new ReadOptions(); + this.srcStateHandle = srcStateHandle; + } + + @Override + public void close() { + List<ColumnFamilyOptions> columnFamilyOptions = + new ArrayList<>(columnFamilyDescriptors.size() + 1); + columnFamilyDescriptors.forEach((cfd) -> columnFamilyOptions.add(cfd.getOptions())); + RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater( + columnFamilyOptions, defaultColumnFamilyHandle); + IOUtils.closeQuietly(defaultColumnFamilyHandle); + IOUtils.closeAllQuietly(columnFamilyHandles); + IOUtils.closeQuietly(db); + IOUtils.closeAllQuietly(columnFamilyOptions); + IOUtils.closeQuietly(readOptions); + } + + /** + * Restores a RocksDB instance from local state for the given state handle. + * + * @param stateHandle the state handle to restore from + * @param columnFamilyOptionsFactory factory for creating column family options + * @param dbOptions database options + * @param ttlCompactFiltersManager TTL compact filters manager (can be null) + * @param writeBufferManagerCapacity write buffer manager capacity (can be null) + * @return restored DB instance with all necessary handles and metadata + * @throws Exception on any restore error + */ + public static RestoredDBInstance restoreTempDBInstanceFromLocalState( + IncrementalLocalKeyedStateHandle stateHandle, + List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, + Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, + DBOptions dbOptions, + RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, + Long writeBufferManagerCapacity) + throws Exception { + + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + createColumnFamilyDescriptors( + stateMetaInfoSnapshots, + columnFamilyOptionsFactory, + ttlCompactFiltersManager, + writeBufferManagerCapacity, + false); + + Path restoreSourcePath = stateHandle.getDirectoryStateHandle().getDirectory(); + + List<ColumnFamilyHandle> columnFamilyHandles = + new ArrayList<>(stateMetaInfoSnapshots.size() + 1); + + RocksDB db = + RocksDBOperationUtils.openDB( + restoreSourcePath.toString(), + columnFamilyDescriptors, + columnFamilyHandles, + RocksDBOperationUtils.createColumnFamilyOptions( + columnFamilyOptionsFactory, "default"), + dbOptions); + + return new RestoredDBInstance( + db, + columnFamilyHandles, + columnFamilyDescriptors, + stateMetaInfoSnapshots, + stateHandle); + } + + /** + * This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state + * metadata snapshot. + */ + public static List<ColumnFamilyDescriptor> createColumnFamilyDescriptors( + List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, + Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, + RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, + Long writeBufferManagerCapacity, + boolean registerTtlCompactFilter) { + + List<ColumnFamilyDescriptor> columnFamilyDescriptors = + new ArrayList<>(stateMetaInfoSnapshots.size()); + + for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) { + RegisteredStateMetaInfoBase metaInfoBase = + RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot); + + ColumnFamilyDescriptor columnFamilyDescriptor = + RocksDBOperationUtils.createColumnFamilyDescriptor( + metaInfoBase, + columnFamilyOptionsFactory, + registerTtlCompactFilter ? ttlCompactFiltersManager : null, + writeBufferManagerCapacity); + + columnFamilyDescriptors.add(columnFamilyDescriptor); + } + return columnFamilyDescriptors; + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBIncrementalRestoreOperation.java index 79bc7bba201..d5a04132238 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBIncrementalRestoreOperation.java @@ -48,6 +48,7 @@ import org.apache.flink.state.rocksdb.RocksDBWriteBatchWrapper; import org.apache.flink.state.rocksdb.RocksIteratorWrapper; import org.apache.flink.state.rocksdb.StateHandleDownloadSpec; import org.apache.flink.state.rocksdb.ttl.RocksDbTtlCompactFiltersManager; +import org.apache.flink.types.Either; import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; @@ -61,8 +62,6 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; import org.rocksdb.ExportImportFilesMetaData; -import org.rocksdb.ReadOptions; -import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,7 +87,6 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.function.Function; -import java.util.stream.Collectors; import static org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION; import static org.apache.flink.runtime.metrics.MetricNames.RESTORE_ASYNC_COMPACTION_DURATION; @@ -492,7 +490,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper notImportableHandles); if (exportedColumnFamilyMetaData.isEmpty()) { - // Nothing coule be exported, so we fall back to + // Nothing could be exported, so we fall back to // #mergeStateHandlesWithCopyFromTemporaryInstance mergeStateHandlesWithCopyFromTemporaryInstance( notImportableHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); @@ -542,74 +540,39 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper int minExportKeyGroup = Integer.MAX_VALUE; int maxExportKeyGroup = Integer.MIN_VALUE; int index = 0; - for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) { - - final String logLineSuffix = - " for state handle at index " - + index - + " with proclaimed key-group range " - + stateHandle.getKeyGroupRange().prettyPrintInterval() - + " for backend with range " - + keyGroupRange.prettyPrintInterval() - + " in operator " - + operatorIdentifier - + "."; - - logger.debug("Opening temporary database" + logLineSuffix); - try (RestoredDBInstance tmpRestoreDBInfo = - restoreTempDBInstanceFromLocalState(stateHandle)) { - - List<ColumnFamilyHandle> tmpColumnFamilyHandles = - tmpRestoreDBInfo.columnFamilyHandles; - - logger.debug("Checking actual keys of sst files" + logLineSuffix); - - // Check if the data in all SST files referenced in the handle is within the - // proclaimed key-groups range of the handle. - RocksDBIncrementalCheckpointUtils.RangeCheckResult rangeCheckResult = - RocksDBIncrementalCheckpointUtils.checkSstDataAgainstKeyGroupRange( - tmpRestoreDBInfo.db, - keyGroupPrefixBytes, - stateHandle.getKeyGroupRange()); - - logger.info("{}" + logLineSuffix, rangeCheckResult); - - if (rangeCheckResult.allInRange()) { - - logger.debug("Start exporting" + logLineSuffix); - - List<RegisteredStateMetaInfoBase> registeredStateMetaInfoBases = - tmpRestoreDBInfo.stateMetaInfoSnapshots.stream() - .map(RegisteredStateMetaInfoBase::fromMetaInfoSnapshot) - .collect(Collectors.toList()); - - // Export all the Column Families and store the result in - // exportedColumnFamiliesOut - RocksDBIncrementalCheckpointUtils.exportColumnFamilies( - tmpRestoreDBInfo.db, - tmpColumnFamilyHandles, - registeredStateMetaInfoBases, - exportCfBasePath, - exportedColumnFamiliesOut); + for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) { + KeyedBackendSerializationProxy<K> serializationProxy = + readMetaData(stateHandle.getMetaDataStateHandle()); + List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = + serializationProxy.getStateMetaInfoSnapshots(); + // Use Helper to encapsulate single stateHandle processing + try (DistributeStateHandlerHelper helper = + new DistributeStateHandlerHelper( + stateHandle, + stateMetaInfoSnapshots, + rocksHandle.getColumnFamilyOptionsFactory(), + rocksHandle.getDbOptions(), + rocksHandle.getTtlCompactFiltersManager(), + rocksHandle.getWriteBufferManagerCapacity(), + keyGroupPrefixBytes, + keyGroupRange, + operatorIdentifier, + index)) { + + Either<KeyGroupRange, IncrementalLocalKeyedStateHandle> result = + helper.tryDistribute(exportCfBasePath, exportedColumnFamiliesOut); + + // Handle the result and collect skipped handles + if (result.isLeft()) { + KeyGroupRange exportedRange = result.left(); minExportKeyGroup = - Math.min( - minExportKeyGroup, - stateHandle.getKeyGroupRange().getStartKeyGroup()); - maxExportKeyGroup = - Math.max( - maxExportKeyGroup, - stateHandle.getKeyGroupRange().getEndKeyGroup()); - - logger.debug("Done exporting" + logLineSuffix); + Math.min(minExportKeyGroup, exportedRange.getStartKeyGroup()); + maxExportKeyGroup = Math.max(maxExportKeyGroup, exportedRange.getEndKeyGroup()); } else { - // Actual key range in files exceeds proclaimed range, cannot import. We - // will copy this handle using a temporary DB later. - skipped.add(stateHandle); - logger.debug("Skipped export" + logLineSuffix); + skipped.add(result.right()); } } - ++index; } @@ -737,7 +700,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper /** * Restores the base DB from local state of a single state handle. * - * @param localKeyedStateHandle the state handle tor estore from. + * @param localKeyedStateHandle the state handle to restore from. * @throws Exception on any restore error. */ private void restoreBaseDBFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) @@ -750,7 +713,12 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory(); this.rocksHandle.openDB( - createColumnFamilyDescriptors(stateMetaInfoSnapshots, true), + RestoredDBInstance.createColumnFamilyDescriptors( + stateMetaInfoSnapshots, + rocksHandle.getColumnFamilyOptionsFactory(), + rocksHandle.getTtlCompactFiltersManager(), + rocksHandle.getWriteBufferManagerCapacity(), + true), stateMetaInfoSnapshots, restoreSourcePath, cancelStreamRegistryForRestore); @@ -812,10 +780,20 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper cancelStreamRegistryForRestore.registerCloseableTemporarily( writeBatchWrapper.getCancelCloseable())) { for (IncrementalLocalKeyedStateHandle handleToCopy : toImport) { - try (RestoredDBInstance restoredDBInstance = - restoreTempDBInstanceFromLocalState(handleToCopy)) { + KeyedBackendSerializationProxy<K> serializationProxy = + readMetaData(handleToCopy.getMetaDataStateHandle()); + List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = + serializationProxy.getStateMetaInfoSnapshots(); + try (RestoredDBInstance restoredDbInstance = + RestoredDBInstance.restoreTempDBInstanceFromLocalState( + handleToCopy, + stateMetaInfoSnapshots, + rocksHandle.getColumnFamilyOptionsFactory(), + rocksHandle.getDbOptions(), + rocksHandle.getTtlCompactFiltersManager(), + rocksHandle.getWriteBufferManagerCapacity())) { copyTempDbIntoBaseDb( - restoredDBInstance, + restoredDbInstance, writeBatchWrapper, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); @@ -824,7 +802,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper } logger.info( - "Competed copying state handles for backend with range {} in operator {} using temporary instances.", + "Completed copying state handles for backend with range {} in operator {} using temporary instances.", keyGroupRange.prettyPrintInterval(), operatorIdentifier); } @@ -856,8 +834,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles; // iterating only the requested descriptors automatically skips the default - // column - // family handle + // column family handle for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); ++descIdx) { ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(descIdx); @@ -905,114 +882,8 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper try { FileUtils.deleteDirectory(path.toFile()); } catch (IOException ex) { - logger.warn("Failed to clean up path " + path, ex); - } - } - - /** Entity to hold the temporary RocksDB instance created for restore. */ - private static class RestoredDBInstance implements AutoCloseable { - - @Nonnull private final RocksDB db; - - @Nonnull private final ColumnFamilyHandle defaultColumnFamilyHandle; - - @Nonnull private final List<ColumnFamilyHandle> columnFamilyHandles; - - @Nonnull private final List<ColumnFamilyDescriptor> columnFamilyDescriptors; - - @Nonnull private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots; - - private final ReadOptions readOptions; - - private final IncrementalLocalKeyedStateHandle srcStateHandle; - - private RestoredDBInstance( - @Nonnull RocksDB db, - @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, - @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors, - @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, - @Nonnull IncrementalLocalKeyedStateHandle srcStateHandle) { - this.db = db; - this.defaultColumnFamilyHandle = columnFamilyHandles.remove(0); - this.columnFamilyHandles = columnFamilyHandles; - this.columnFamilyDescriptors = columnFamilyDescriptors; - this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; - this.readOptions = new ReadOptions(); - this.srcStateHandle = srcStateHandle; - } - - @Override - public void close() { - List<ColumnFamilyOptions> columnFamilyOptions = - new ArrayList<>(columnFamilyDescriptors.size() + 1); - columnFamilyDescriptors.forEach((cfd) -> columnFamilyOptions.add(cfd.getOptions())); - RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater( - columnFamilyOptions, defaultColumnFamilyHandle); - IOUtils.closeQuietly(defaultColumnFamilyHandle); - IOUtils.closeAllQuietly(columnFamilyHandles); - IOUtils.closeQuietly(db); - IOUtils.closeAllQuietly(columnFamilyOptions); - IOUtils.closeQuietly(readOptions); - } - } - - private RestoredDBInstance restoreTempDBInstanceFromLocalState( - IncrementalLocalKeyedStateHandle stateHandle) throws Exception { - KeyedBackendSerializationProxy<K> serializationProxy = - readMetaData(stateHandle.getMetaDataStateHandle()); - // read meta data - List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = - serializationProxy.getStateMetaInfoSnapshots(); - - List<ColumnFamilyDescriptor> columnFamilyDescriptors = - createColumnFamilyDescriptors(stateMetaInfoSnapshots, false); - - List<ColumnFamilyHandle> columnFamilyHandles = - new ArrayList<>(stateMetaInfoSnapshots.size() + 1); - - RocksDB restoreDb = - RocksDBOperationUtils.openDB( - stateHandle.getDirectoryStateHandle().getDirectory().toString(), - columnFamilyDescriptors, - columnFamilyHandles, - RocksDBOperationUtils.createColumnFamilyOptions( - this.rocksHandle.getColumnFamilyOptionsFactory(), "default"), - this.rocksHandle.getDbOptions()); - - return new RestoredDBInstance( - restoreDb, - columnFamilyHandles, - columnFamilyDescriptors, - stateMetaInfoSnapshots, - stateHandle); - } - - /** - * This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state - * metadata snapshot. - */ - private List<ColumnFamilyDescriptor> createColumnFamilyDescriptors( - List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, boolean registerTtlCompactFilter) { - - List<ColumnFamilyDescriptor> columnFamilyDescriptors = - new ArrayList<>(stateMetaInfoSnapshots.size()); - - for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) { - RegisteredStateMetaInfoBase metaInfoBase = - RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot); - - ColumnFamilyDescriptor columnFamilyDescriptor = - RocksDBOperationUtils.createColumnFamilyDescriptor( - metaInfoBase, - this.rocksHandle.getColumnFamilyOptionsFactory(), - registerTtlCompactFilter - ? this.rocksHandle.getTtlCompactFiltersManager() - : null, - this.rocksHandle.getWriteBufferManagerCapacity()); - - columnFamilyDescriptors.add(columnFamilyDescriptor); + logger.warn("Failed to clean up path {}", path, ex); } - return columnFamilyDescriptors; } private void runAndReportDuration(RunnableWithException runnable, String metricName)
