This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fac89ba7dcceab782dec211bcdc47982b54081c9
Author: Rui Fan <[email protected]>
AuthorDate: Fri Sep 26 12:37:52 2025 +0200

    [FLINK-38415][refactor] Extract single state handle processing logic for 
testability
---
 .../restore/DistributeStateHandlerHelper.java      | 175 +++++++++++++++
 .../state/rocksdb/restore/RestoredDBInstance.java  | 162 ++++++++++++++
 .../RocksDBIncrementalRestoreOperation.java        | 235 +++++----------------
 3 files changed, 390 insertions(+), 182 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelper.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelper.java
new file mode 100644
index 00000000000..2cb8c1a708f
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/DistributeStateHandlerHelper.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.rocksdb.restore;
+
+import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.rocksdb.RocksDBIncrementalCheckpointUtils;
+import org.apache.flink.state.rocksdb.ttl.RocksDbTtlCompactFiltersManager;
+import org.apache.flink.types.Either;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.ExportImportFilesMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Helper class for distributing state handle data during RocksDB incremental 
restore. This class
+ * encapsulates the logic for processing a single state handle.
+ */
+public class DistributeStateHandlerHelper implements AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DistributeStateHandlerHelper.class);
+
+    private final IncrementalLocalKeyedStateHandle stateHandle;
+    private final RestoredDBInstance restoredDbInstance;
+    private final int keyGroupPrefixBytes;
+    private final KeyGroupRange keyGroupRange;
+    private final String operatorIdentifier;
+    private final int index;
+
+    /**
+     * Creates a helper for processing a single state handle. The database 
instance is created in
+     * the constructor to enable proper resource management and separation of 
concerns.
+     *
+     * @param stateHandle the state handle to process
+     * @param columnFamilyOptionsFactory factory for creating column family 
options
+     * @param dbOptions database options
+     * @param ttlCompactFiltersManager TTL compact filters manager (can be 
null)
+     * @param writeBufferManagerCapacity write buffer manager capacity (can be 
null)
+     * @param keyGroupPrefixBytes number of key group prefix bytes for SST 
file range checking
+     * @param keyGroupRange target key group range (for logging)
+     * @param operatorIdentifier operator identifier (for logging)
+     * @param index current processing index (for logging)
+     * @throws Exception on any database opening error
+     */
+    public DistributeStateHandlerHelper(
+            IncrementalLocalKeyedStateHandle stateHandle,
+            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+            Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
+            DBOptions dbOptions,
+            RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
+            Long writeBufferManagerCapacity,
+            int keyGroupPrefixBytes,
+            KeyGroupRange keyGroupRange,
+            String operatorIdentifier,
+            int index)
+            throws Exception {
+        this.stateHandle = stateHandle;
+        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+        this.keyGroupRange = keyGroupRange;
+        this.operatorIdentifier = operatorIdentifier;
+        this.index = index;
+
+        final String logLineSuffix = createLogLineSuffix();
+
+        LOG.debug("Opening temporary database : {}", logLineSuffix);
+
+        // Open database using restored instance helper method
+        this.restoredDbInstance =
+                RestoredDBInstance.restoreTempDBInstanceFromLocalState(
+                        stateHandle,
+                        stateMetaInfoSnapshots,
+                        columnFamilyOptionsFactory,
+                        dbOptions,
+                        ttlCompactFiltersManager,
+                        writeBufferManagerCapacity);
+    }
+
+    /**
+     * Distributes state handle data by checking SST file ranges and exporting 
column families.
+     * Returns Left if successfully exported, Right if the handle was skipped.
+     *
+     * @param exportCfBasePath base path for export
+     * @param exportedColumnFamiliesOut output parameter for exported column 
families
+     * @return Either.Left containing key group range if successfully 
exported, Either.Right
+     *     containing the skipped state handle otherwise
+     * @throws Exception on any export error
+     */
+    public Either<KeyGroupRange, IncrementalLocalKeyedStateHandle> 
tryDistribute(
+            Path exportCfBasePath,
+            Map<RegisteredStateMetaInfoBase.Key, 
List<ExportImportFilesMetaData>>
+                    exportedColumnFamiliesOut)
+            throws Exception {
+
+        final String logLineSuffix = createLogLineSuffix();
+
+        List<ColumnFamilyHandle> tmpColumnFamilyHandles = 
restoredDbInstance.columnFamilyHandles;
+
+        LOG.debug("Checking actual keys of sst files {}", logLineSuffix);
+
+        // Check SST file range
+        RocksDBIncrementalCheckpointUtils.RangeCheckResult rangeCheckResult =
+                
RocksDBIncrementalCheckpointUtils.checkSstDataAgainstKeyGroupRange(
+                        restoredDbInstance.db, keyGroupPrefixBytes, 
stateHandle.getKeyGroupRange());
+
+        LOG.info("{} {}", rangeCheckResult, logLineSuffix);
+
+        if (rangeCheckResult.allInRange()) {
+            LOG.debug("Start exporting {}", logLineSuffix);
+
+            List<RegisteredStateMetaInfoBase> registeredStateMetaInfoBases =
+                    restoredDbInstance.stateMetaInfoSnapshots.stream()
+                            
.map(RegisteredStateMetaInfoBase::fromMetaInfoSnapshot)
+                            .collect(Collectors.toList());
+
+            // Export all the Column Families and store the result in 
exportedColumnFamiliesOut
+            RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
+                    restoredDbInstance.db,
+                    tmpColumnFamilyHandles,
+                    registeredStateMetaInfoBases,
+                    exportCfBasePath,
+                    exportedColumnFamiliesOut);
+
+            LOG.debug("Done exporting {}", logLineSuffix);
+            return Either.Left(stateHandle.getKeyGroupRange());
+        } else {
+            LOG.debug("Skipped export {}", logLineSuffix);
+            return Either.Right(stateHandle);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        restoredDbInstance.close();
+    }
+
+    /** Creates a consistent log line suffix for logging operations. */
+    private String createLogLineSuffix() {
+        return " for state handle at index "
+                + index
+                + " with proclaimed key-group range "
+                + stateHandle.getKeyGroupRange().prettyPrintInterval()
+                + " for backend with range "
+                + keyGroupRange.prettyPrintInterval()
+                + " in operator "
+                + operatorIdentifier
+                + ".";
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java
new file mode 100644
index 00000000000..afbe27bce9c
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RestoredDBInstance.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.rocksdb.restore;
+
+import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.rocksdb.RocksDBOperationUtils;
+import org.apache.flink.state.rocksdb.ttl.RocksDbTtlCompactFiltersManager;
+import org.apache.flink.util.IOUtils;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+
+import javax.annotation.Nonnull;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+/** Restored DB instance containing all necessary handles and metadata. */
+public class RestoredDBInstance implements AutoCloseable {
+
+    @Nonnull public final RocksDB db;
+    @Nonnull public final ColumnFamilyHandle defaultColumnFamilyHandle;
+    @Nonnull public final List<ColumnFamilyHandle> columnFamilyHandles;
+    @Nonnull public final List<ColumnFamilyDescriptor> columnFamilyDescriptors;
+    @Nonnull public final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+    public final ReadOptions readOptions;
+    public final IncrementalLocalKeyedStateHandle srcStateHandle;
+
+    public RestoredDBInstance(
+            @Nonnull RocksDB db,
+            @Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
+            @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors,
+            @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+            IncrementalLocalKeyedStateHandle srcStateHandle) {
+        this.db = db;
+        this.defaultColumnFamilyHandle = columnFamilyHandles.remove(0);
+        this.columnFamilyHandles = columnFamilyHandles;
+        this.columnFamilyDescriptors = columnFamilyDescriptors;
+        this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+        this.readOptions = new ReadOptions();
+        this.srcStateHandle = srcStateHandle;
+    }
+
+    @Override
+    public void close() {
+        List<ColumnFamilyOptions> columnFamilyOptions =
+                new ArrayList<>(columnFamilyDescriptors.size() + 1);
+        columnFamilyDescriptors.forEach((cfd) -> 
columnFamilyOptions.add(cfd.getOptions()));
+        RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(
+                columnFamilyOptions, defaultColumnFamilyHandle);
+        IOUtils.closeQuietly(defaultColumnFamilyHandle);
+        IOUtils.closeAllQuietly(columnFamilyHandles);
+        IOUtils.closeQuietly(db);
+        IOUtils.closeAllQuietly(columnFamilyOptions);
+        IOUtils.closeQuietly(readOptions);
+    }
+
+    /**
+     * Restores a RocksDB instance from local state for the given state handle.
+     *
+     * @param stateHandle the state handle to restore from
+     * @param columnFamilyOptionsFactory factory for creating column family 
options
+     * @param dbOptions database options
+     * @param ttlCompactFiltersManager TTL compact filters manager (can be 
null)
+     * @param writeBufferManagerCapacity write buffer manager capacity (can be 
null)
+     * @return restored DB instance with all necessary handles and metadata
+     * @throws Exception on any restore error
+     */
+    public static RestoredDBInstance restoreTempDBInstanceFromLocalState(
+            IncrementalLocalKeyedStateHandle stateHandle,
+            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+            Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
+            DBOptions dbOptions,
+            RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
+            Long writeBufferManagerCapacity)
+            throws Exception {
+
+        List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+                createColumnFamilyDescriptors(
+                        stateMetaInfoSnapshots,
+                        columnFamilyOptionsFactory,
+                        ttlCompactFiltersManager,
+                        writeBufferManagerCapacity,
+                        false);
+
+        Path restoreSourcePath = 
stateHandle.getDirectoryStateHandle().getDirectory();
+
+        List<ColumnFamilyHandle> columnFamilyHandles =
+                new ArrayList<>(stateMetaInfoSnapshots.size() + 1);
+
+        RocksDB db =
+                RocksDBOperationUtils.openDB(
+                        restoreSourcePath.toString(),
+                        columnFamilyDescriptors,
+                        columnFamilyHandles,
+                        RocksDBOperationUtils.createColumnFamilyOptions(
+                                columnFamilyOptionsFactory, "default"),
+                        dbOptions);
+
+        return new RestoredDBInstance(
+                db,
+                columnFamilyHandles,
+                columnFamilyDescriptors,
+                stateMetaInfoSnapshots,
+                stateHandle);
+    }
+
+    /**
+     * This method recreates and registers all {@link ColumnFamilyDescriptor} 
from Flink's state
+     * metadata snapshot.
+     */
+    public static List<ColumnFamilyDescriptor> createColumnFamilyDescriptors(
+            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+            Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
+            RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
+            Long writeBufferManagerCapacity,
+            boolean registerTtlCompactFilter) {
+
+        List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+                new ArrayList<>(stateMetaInfoSnapshots.size());
+
+        for (StateMetaInfoSnapshot stateMetaInfoSnapshot : 
stateMetaInfoSnapshots) {
+            RegisteredStateMetaInfoBase metaInfoBase =
+                    
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
+
+            ColumnFamilyDescriptor columnFamilyDescriptor =
+                    RocksDBOperationUtils.createColumnFamilyDescriptor(
+                            metaInfoBase,
+                            columnFamilyOptionsFactory,
+                            registerTtlCompactFilter ? 
ttlCompactFiltersManager : null,
+                            writeBufferManagerCapacity);
+
+            columnFamilyDescriptors.add(columnFamilyDescriptor);
+        }
+        return columnFamilyDescriptors;
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBIncrementalRestoreOperation.java
index 79bc7bba201..d5a04132238 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/restore/RocksDBIncrementalRestoreOperation.java
@@ -48,6 +48,7 @@ import 
org.apache.flink.state.rocksdb.RocksDBWriteBatchWrapper;
 import org.apache.flink.state.rocksdb.RocksIteratorWrapper;
 import org.apache.flink.state.rocksdb.StateHandleDownloadSpec;
 import org.apache.flink.state.rocksdb.ttl.RocksDbTtlCompactFiltersManager;
+import org.apache.flink.types.Either;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
@@ -61,8 +62,6 @@ import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.ExportImportFilesMetaData;
-import org.rocksdb.ReadOptions;
-import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,7 +87,6 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 import static 
org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION;
 import static 
org.apache.flink.runtime.metrics.MetricNames.RESTORE_ASYNC_COMPACTION_DURATION;
@@ -492,7 +490,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                             notImportableHandles);
 
             if (exportedColumnFamilyMetaData.isEmpty()) {
-                // Nothing coule be exported, so we fall back to
+                // Nothing could be exported, so we fall back to
                 // #mergeStateHandlesWithCopyFromTemporaryInstance
                 mergeStateHandlesWithCopyFromTemporaryInstance(
                         notImportableHandles, startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes);
@@ -542,74 +540,39 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         int minExportKeyGroup = Integer.MAX_VALUE;
         int maxExportKeyGroup = Integer.MIN_VALUE;
         int index = 0;
-        for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
-
-            final String logLineSuffix =
-                    " for state handle at index "
-                            + index
-                            + " with proclaimed key-group range "
-                            + 
stateHandle.getKeyGroupRange().prettyPrintInterval()
-                            + " for backend with range "
-                            + keyGroupRange.prettyPrintInterval()
-                            + " in operator "
-                            + operatorIdentifier
-                            + ".";
-
-            logger.debug("Opening temporary database" + logLineSuffix);
-            try (RestoredDBInstance tmpRestoreDBInfo =
-                    restoreTempDBInstanceFromLocalState(stateHandle)) {
-
-                List<ColumnFamilyHandle> tmpColumnFamilyHandles =
-                        tmpRestoreDBInfo.columnFamilyHandles;
-
-                logger.debug("Checking actual keys of sst files" + 
logLineSuffix);
-
-                // Check if the data in all SST files referenced in the handle 
is within the
-                // proclaimed key-groups range of the handle.
-                RocksDBIncrementalCheckpointUtils.RangeCheckResult 
rangeCheckResult =
-                        
RocksDBIncrementalCheckpointUtils.checkSstDataAgainstKeyGroupRange(
-                                tmpRestoreDBInfo.db,
-                                keyGroupPrefixBytes,
-                                stateHandle.getKeyGroupRange());
-
-                logger.info("{}" + logLineSuffix, rangeCheckResult);
-
-                if (rangeCheckResult.allInRange()) {
-
-                    logger.debug("Start exporting" + logLineSuffix);
-
-                    List<RegisteredStateMetaInfoBase> 
registeredStateMetaInfoBases =
-                            tmpRestoreDBInfo.stateMetaInfoSnapshots.stream()
-                                    
.map(RegisteredStateMetaInfoBase::fromMetaInfoSnapshot)
-                                    .collect(Collectors.toList());
-
-                    // Export all the Column Families and store the result in
-                    // exportedColumnFamiliesOut
-                    RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
-                            tmpRestoreDBInfo.db,
-                            tmpColumnFamilyHandles,
-                            registeredStateMetaInfoBases,
-                            exportCfBasePath,
-                            exportedColumnFamiliesOut);
 
+        for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
+            KeyedBackendSerializationProxy<K> serializationProxy =
+                    readMetaData(stateHandle.getMetaDataStateHandle());
+            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
+                    serializationProxy.getStateMetaInfoSnapshots();
+            // Use Helper to encapsulate single stateHandle processing
+            try (DistributeStateHandlerHelper helper =
+                    new DistributeStateHandlerHelper(
+                            stateHandle,
+                            stateMetaInfoSnapshots,
+                            rocksHandle.getColumnFamilyOptionsFactory(),
+                            rocksHandle.getDbOptions(),
+                            rocksHandle.getTtlCompactFiltersManager(),
+                            rocksHandle.getWriteBufferManagerCapacity(),
+                            keyGroupPrefixBytes,
+                            keyGroupRange,
+                            operatorIdentifier,
+                            index)) {
+
+                Either<KeyGroupRange, IncrementalLocalKeyedStateHandle> result 
=
+                        helper.tryDistribute(exportCfBasePath, 
exportedColumnFamiliesOut);
+
+                // Handle the result and collect skipped handles
+                if (result.isLeft()) {
+                    KeyGroupRange exportedRange = result.left();
                     minExportKeyGroup =
-                            Math.min(
-                                    minExportKeyGroup,
-                                    
stateHandle.getKeyGroupRange().getStartKeyGroup());
-                    maxExportKeyGroup =
-                            Math.max(
-                                    maxExportKeyGroup,
-                                    
stateHandle.getKeyGroupRange().getEndKeyGroup());
-
-                    logger.debug("Done exporting" + logLineSuffix);
+                            Math.min(minExportKeyGroup, 
exportedRange.getStartKeyGroup());
+                    maxExportKeyGroup = Math.max(maxExportKeyGroup, 
exportedRange.getEndKeyGroup());
                 } else {
-                    // Actual key range in files exceeds proclaimed range, 
cannot import. We
-                    // will copy this handle using a temporary DB later.
-                    skipped.add(stateHandle);
-                    logger.debug("Skipped export" + logLineSuffix);
+                    skipped.add(result.right());
                 }
             }
-
             ++index;
         }
 
@@ -737,7 +700,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
     /**
      * Restores the base DB from local state of a single state handle.
      *
-     * @param localKeyedStateHandle the state handle tor estore from.
+     * @param localKeyedStateHandle the state handle to restore from.
      * @throws Exception on any restore error.
      */
     private void restoreBaseDBFromLocalState(IncrementalLocalKeyedStateHandle 
localKeyedStateHandle)
@@ -750,7 +713,12 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         Path restoreSourcePath = 
localKeyedStateHandle.getDirectoryStateHandle().getDirectory();
 
         this.rocksHandle.openDB(
-                createColumnFamilyDescriptors(stateMetaInfoSnapshots, true),
+                RestoredDBInstance.createColumnFamilyDescriptors(
+                        stateMetaInfoSnapshots,
+                        rocksHandle.getColumnFamilyOptionsFactory(),
+                        rocksHandle.getTtlCompactFiltersManager(),
+                        rocksHandle.getWriteBufferManagerCapacity(),
+                        true),
                 stateMetaInfoSnapshots,
                 restoreSourcePath,
                 cancelStreamRegistryForRestore);
@@ -812,10 +780,20 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                         
cancelStreamRegistryForRestore.registerCloseableTemporarily(
                                 writeBatchWrapper.getCancelCloseable())) {
             for (IncrementalLocalKeyedStateHandle handleToCopy : toImport) {
-                try (RestoredDBInstance restoredDBInstance =
-                        restoreTempDBInstanceFromLocalState(handleToCopy)) {
+                KeyedBackendSerializationProxy<K> serializationProxy =
+                        readMetaData(handleToCopy.getMetaDataStateHandle());
+                List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
+                        serializationProxy.getStateMetaInfoSnapshots();
+                try (RestoredDBInstance restoredDbInstance =
+                        RestoredDBInstance.restoreTempDBInstanceFromLocalState(
+                                handleToCopy,
+                                stateMetaInfoSnapshots,
+                                rocksHandle.getColumnFamilyOptionsFactory(),
+                                rocksHandle.getDbOptions(),
+                                rocksHandle.getTtlCompactFiltersManager(),
+                                rocksHandle.getWriteBufferManagerCapacity())) {
                     copyTempDbIntoBaseDb(
-                            restoredDBInstance,
+                            restoredDbInstance,
                             writeBatchWrapper,
                             startKeyGroupPrefixBytes,
                             stopKeyGroupPrefixBytes);
@@ -824,7 +802,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         }
 
         logger.info(
-                "Competed copying state handles for backend with range {} in 
operator {} using temporary instances.",
+                "Completed copying state handles for backend with range {} in 
operator {} using temporary instances.",
                 keyGroupRange.prettyPrintInterval(),
                 operatorIdentifier);
     }
@@ -856,8 +834,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         List<ColumnFamilyHandle> tmpColumnFamilyHandles = 
tmpRestoreDBInfo.columnFamilyHandles;
 
         // iterating only the requested descriptors automatically skips the 
default
-        // column
-        // family handle
+        // column family handle
         for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); 
++descIdx) {
             ColumnFamilyHandle tmpColumnFamilyHandle = 
tmpColumnFamilyHandles.get(descIdx);
 
@@ -905,114 +882,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         try {
             FileUtils.deleteDirectory(path.toFile());
         } catch (IOException ex) {
-            logger.warn("Failed to clean up path " + path, ex);
-        }
-    }
-
-    /** Entity to hold the temporary RocksDB instance created for restore. */
-    private static class RestoredDBInstance implements AutoCloseable {
-
-        @Nonnull private final RocksDB db;
-
-        @Nonnull private final ColumnFamilyHandle defaultColumnFamilyHandle;
-
-        @Nonnull private final List<ColumnFamilyHandle> columnFamilyHandles;
-
-        @Nonnull private final List<ColumnFamilyDescriptor> 
columnFamilyDescriptors;
-
-        @Nonnull private final List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots;
-
-        private final ReadOptions readOptions;
-
-        private final IncrementalLocalKeyedStateHandle srcStateHandle;
-
-        private RestoredDBInstance(
-                @Nonnull RocksDB db,
-                @Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
-                @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors,
-                @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
-                @Nonnull IncrementalLocalKeyedStateHandle srcStateHandle) {
-            this.db = db;
-            this.defaultColumnFamilyHandle = columnFamilyHandles.remove(0);
-            this.columnFamilyHandles = columnFamilyHandles;
-            this.columnFamilyDescriptors = columnFamilyDescriptors;
-            this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
-            this.readOptions = new ReadOptions();
-            this.srcStateHandle = srcStateHandle;
-        }
-
-        @Override
-        public void close() {
-            List<ColumnFamilyOptions> columnFamilyOptions =
-                    new ArrayList<>(columnFamilyDescriptors.size() + 1);
-            columnFamilyDescriptors.forEach((cfd) -> 
columnFamilyOptions.add(cfd.getOptions()));
-            RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(
-                    columnFamilyOptions, defaultColumnFamilyHandle);
-            IOUtils.closeQuietly(defaultColumnFamilyHandle);
-            IOUtils.closeAllQuietly(columnFamilyHandles);
-            IOUtils.closeQuietly(db);
-            IOUtils.closeAllQuietly(columnFamilyOptions);
-            IOUtils.closeQuietly(readOptions);
-        }
-    }
-
-    private RestoredDBInstance restoreTempDBInstanceFromLocalState(
-            IncrementalLocalKeyedStateHandle stateHandle) throws Exception {
-        KeyedBackendSerializationProxy<K> serializationProxy =
-                readMetaData(stateHandle.getMetaDataStateHandle());
-        // read meta data
-        List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
-                serializationProxy.getStateMetaInfoSnapshots();
-
-        List<ColumnFamilyDescriptor> columnFamilyDescriptors =
-                createColumnFamilyDescriptors(stateMetaInfoSnapshots, false);
-
-        List<ColumnFamilyHandle> columnFamilyHandles =
-                new ArrayList<>(stateMetaInfoSnapshots.size() + 1);
-
-        RocksDB restoreDb =
-                RocksDBOperationUtils.openDB(
-                        
stateHandle.getDirectoryStateHandle().getDirectory().toString(),
-                        columnFamilyDescriptors,
-                        columnFamilyHandles,
-                        RocksDBOperationUtils.createColumnFamilyOptions(
-                                
this.rocksHandle.getColumnFamilyOptionsFactory(), "default"),
-                        this.rocksHandle.getDbOptions());
-
-        return new RestoredDBInstance(
-                restoreDb,
-                columnFamilyHandles,
-                columnFamilyDescriptors,
-                stateMetaInfoSnapshots,
-                stateHandle);
-    }
-
-    /**
-     * This method recreates and registers all {@link ColumnFamilyDescriptor} 
from Flink's state
-     * metadata snapshot.
-     */
-    private List<ColumnFamilyDescriptor> createColumnFamilyDescriptors(
-            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, boolean 
registerTtlCompactFilter) {
-
-        List<ColumnFamilyDescriptor> columnFamilyDescriptors =
-                new ArrayList<>(stateMetaInfoSnapshots.size());
-
-        for (StateMetaInfoSnapshot stateMetaInfoSnapshot : 
stateMetaInfoSnapshots) {
-            RegisteredStateMetaInfoBase metaInfoBase =
-                    
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
-
-            ColumnFamilyDescriptor columnFamilyDescriptor =
-                    RocksDBOperationUtils.createColumnFamilyDescriptor(
-                            metaInfoBase,
-                            this.rocksHandle.getColumnFamilyOptionsFactory(),
-                            registerTtlCompactFilter
-                                    ? 
this.rocksHandle.getTtlCompactFiltersManager()
-                                    : null,
-                            this.rocksHandle.getWriteBufferManagerCapacity());
-
-            columnFamilyDescriptors.add(columnFamilyDescriptor);
+            logger.warn("Failed to clean up path {}", path, ex);
         }
-        return columnFamilyDescriptors;
     }
 
     private void runAndReportDuration(RunnableWithException runnable, String 
metricName)

Reply via email to