This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cde81ab266 [FLINK-35581] Remove comments from the code related to 
ingestDB (#25263)
0cde81ab266 is described below

commit 0cde81ab26638f77ce4c71ed51812829dce97730
Author: mayuehappy <[email protected]>
AuthorDate: Fri Aug 30 21:31:58 2024 +0800

    [FLINK-35581] Remove comments from the code related to ingestDB (#25263)
---
 .../state/RocksDBIncrementalCheckpointUtils.java   | 13 ++++---
 .../streaming/state/RocksDBOperationUtils.java     | 41 ++++++++++++++++++++--
 .../streaming/state/restore/RocksDBHandle.java     |  7 ++--
 .../RocksDBIncrementalRestoreOperation.java        | 23 ++++++------
 4 files changed, 62 insertions(+), 22 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 4a3374b4119..3147f01747a 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -20,12 +20,17 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
 
 import 
org.apache.flink.shaded.guava32.com.google.common.primitives.UnsignedBytes;
 
+import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.CompactRangeOptions;
+import org.rocksdb.ExportImportFilesMetaData;
 import org.rocksdb.LiveFileMetaData;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -36,13 +41,17 @@ import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.File;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 /** Utils for RocksDB Incremental Checkpoint. */
@@ -254,7 +263,6 @@ public class RocksDBIncrementalCheckpointUtils {
 
         return Optional.of(
                 () -> {
-                    /*
                     try (CompactRangeOptions compactionOptions =
                             new CompactRangeOptions()
                                     .setExclusiveManualCompaction(true)
@@ -288,7 +296,6 @@ public class RocksDBIncrementalCheckpointUtils {
                             }
                         }
                     }
-                     */
                 });
     }
 
@@ -359,7 +366,6 @@ public class RocksDBIncrementalCheckpointUtils {
      * @param resultOutput output parameter for the metadata of the export.
      * @throws RocksDBException on problems inside RocksDB.
      */
-    /*
     public static void exportColumnFamilies(
             RocksDB db,
             List<ColumnFamilyHandle> columnFamilyHandles,
@@ -396,7 +402,6 @@ public class RocksDBIncrementalCheckpointUtils {
             }
         }
     }
-    */
 
     /** check whether the bytes is before prefixBytes in the character order. 
*/
     public static boolean beforeThePrefixBytes(@Nonnull byte[] bytes, @Nonnull 
byte[] prefixBytes) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index acd45f59012..4750969d050 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -33,6 +33,8 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ExportImportFilesMetaData;
+import org.rocksdb.ImportColumnFamilyOptions;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -44,6 +46,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -119,6 +122,9 @@ public class RocksDBOperationUtils {
      *
      * <p>Creates the column family for the state. Sets TTL compaction filter 
if {@code
      * ttlCompactFiltersManager} is not {@code null}.
+     *
+     * @param importFilesMetaData if not empty, we import the files specified 
in the metadata to the
+     *     column family.
      */
     public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
             RegisteredStateMetaInfoBase metaInfoBase,
@@ -126,6 +132,7 @@ public class RocksDBOperationUtils {
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             @Nullable Long writeBufferManagerCapacity,
+            List<ExportImportFilesMetaData> importFilesMetaData,
             ICloseableRegistry cancelStreamRegistryForRestore) {
 
         ColumnFamilyDescriptor columnFamilyDescriptor =
@@ -138,7 +145,11 @@ public class RocksDBOperationUtils {
         final ColumnFamilyHandle columnFamilyHandle;
         try {
             columnFamilyHandle =
-                    createColumnFamily(columnFamilyDescriptor, db, 
cancelStreamRegistryForRestore);
+                    createColumnFamily(
+                            columnFamilyDescriptor,
+                            db,
+                            importFilesMetaData,
+                            cancelStreamRegistryForRestore);
         } catch (Exception ex) {
             IOUtils.closeQuietly(columnFamilyDescriptor.getOptions());
             throw new FlinkRuntimeException("Error creating 
ColumnFamilyHandle.", ex);
@@ -147,6 +158,23 @@ public class RocksDBOperationUtils {
         return new 
RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfoBase);
     }
 
+    public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
+            RegisteredStateMetaInfoBase metaInfoBase,
+            RocksDB db,
+            Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
+            @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
+            @Nullable Long writeBufferManagerCapacity,
+            ICloseableRegistry cancelStreamRegistryForRestore) {
+        return createStateInfo(
+                metaInfoBase,
+                db,
+                columnFamilyOptionsFactory,
+                ttlCompactFiltersManager,
+                writeBufferManagerCapacity,
+                Collections.emptyList(),
+                cancelStreamRegistryForRestore);
+    }
+
     /**
      * Creates a column descriptor for a state column family.
      *
@@ -236,6 +264,7 @@ public class RocksDBOperationUtils {
     private static ColumnFamilyHandle createColumnFamily(
             ColumnFamilyDescriptor columnDescriptor,
             RocksDB db,
+            List<ExportImportFilesMetaData> importFilesMetaData,
             ICloseableRegistry cancelStreamRegistryForRestore)
             throws RocksDBException, InterruptedException {
         if (Thread.currentThread().isInterrupted()) {
@@ -246,7 +275,15 @@ public class RocksDBOperationUtils {
             throw new CancelTaskException("The stream was closed, aborting 
recovery");
         }
 
-        return db.createColumnFamily(columnDescriptor);
+        if (importFilesMetaData.isEmpty()) {
+            return db.createColumnFamily(columnDescriptor);
+        } else {
+            try (ImportColumnFamilyOptions importColumnFamilyOptions =
+                    new ImportColumnFamilyOptions().setMoveFiles(true)) {
+                return db.createColumnFamilyWithImport(
+                        columnDescriptor, importColumnFamilyOptions, 
importFilesMetaData);
+            }
+        }
     }
 
     public static void addColumnFamilyOptionsToCloseLater(
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
index ae58adc83b0..0453c1eea49 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
@@ -29,11 +29,13 @@ import 
org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
 
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ExportImportFilesMetaData;
 import org.rocksdb.RocksDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +58,8 @@ import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUti
 
 /**
  * Utility for creating a RocksDB instance either from scratch or from 
restored local state. This
- * will also register {@link RocksDbKvStateInfo} when using {@link 
#openDB(List, List, Path)}.
+ * will also register {@link RocksDbKvStateInfo} when using {@link 
#openDB(List, List, Path,
+ * ICloseableRegistry)}.
  */
 class RocksDBHandle implements AutoCloseable {
 
@@ -198,7 +201,6 @@ class RocksDBHandle implements AutoCloseable {
      * @param stateMetaInfo info about the state to create.
      * @param cfMetaDataList the data to import.
      */
-    /*
     void registerStateColumnFamilyHandleWithImport(
             RegisteredStateMetaInfoBase stateMetaInfo,
             List<ExportImportFilesMetaData> cfMetaDataList,
@@ -221,7 +223,6 @@ class RocksDBHandle implements AutoCloseable {
 
         columnFamilyHandles.add(stateInfo.columnFamilyHandle);
     }
-    */
 
     /**
      * This recreates the new working directory of the recovered RocksDB 
instance and links/copies
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 48ffe29bca8..ae4d7b8ed0e 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -59,6 +59,7 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ExportImportFilesMetaData;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -72,9 +73,11 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -85,6 +88,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.core.fs.ICloseableRegistry.asCloseable;
 import static 
org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION;
@@ -183,10 +187,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         this.keyGroupPrefixBytes = keyGroupPrefixBytes;
         this.keySerializerProvider = keySerializerProvider;
         this.userCodeClassLoader = userCodeClassLoader;
-        //        this.useIngestDbRestoreMode = useIngestDbRestoreMode;
-        //        this.asyncCompactAfterRescale = asyncCompactAfterRescale;
-        this.useIngestDbRestoreMode = false;
-        this.asyncCompactAfterRescale = false;
+        this.useIngestDbRestoreMode = useIngestDbRestoreMode;
+        this.asyncCompactAfterRescale = asyncCompactAfterRescale;
         this.useDeleteFilesInRange = useDeleteFilesInRange;
         this.ioExecutor = ioExecutor;
     }
@@ -453,7 +455,6 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             byte[] stopKeyGroupPrefixBytes)
             throws Exception {
 
-        /*
         final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
         final Path exportCfBasePath = 
absolutInstanceBasePath.resolve("export-cfs");
         Files.createDirectories(exportCfBasePath);
@@ -493,7 +494,6 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             // Cleanup export base directory
             cleanUpPathQuietly(exportCfBasePath);
         }
-        */
     }
 
     /**
@@ -509,7 +509,6 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
      * @return the total key-groups range of the exported data.
      * @throws Exception on any export error.
      */
-    /*
     private KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange(
             Path exportCfBasePath,
             List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles,
@@ -590,7 +589,6 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                 ? new KeyGroupRange(minExportKeyGroup, maxExportKeyGroup)
                 : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
     }
-     */
 
     /**
      * Helper method that merges the data from multiple state handles into the 
restoring base DB by
@@ -640,7 +638,6 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
      * @param exportKeyGroupRange the total key-groups range of the exported 
data.
      * @throws Exception on import error.
      */
-    /*
     private void initBaseDBFromColumnFamilyImports(
             Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>>
                     exportedColumnFamilyMetaData,
@@ -653,10 +650,10 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                 keyGroupRange,
                 operatorIdentifier);
         rocksHandle.openDB();
-        for (Map.Entry<RegisteredStateMetaInfoBase.Key, 
List<ExportImportFilesMetaData>> entry :
+        for (Map.Entry<RegisteredStateMetaInfoBase, 
List<ExportImportFilesMetaData>> entry :
                 exportedColumnFamilyMetaData.entrySet()) {
             rocksHandle.registerStateColumnFamilyHandleWithImport(
-                    entry.getKey(), entry.getValue(), 
cancelStreamRegistryForRestore);
+                    entry.getKey(), entry.getValue(), cancelStreamRegistry);
         }
 
         // Use Range delete to clip the temp db to the target range of the 
backend
@@ -665,14 +662,14 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                 rocksHandle.getColumnFamilyHandles(),
                 keyGroupRange,
                 exportKeyGroupRange,
-                keyGroupPrefixBytes);
+                keyGroupPrefixBytes,
+                useDeleteFilesInRange);
 
         logger.info(
                 "Completed importing exported state handles for backend with 
range {} in operator {} using Clip/Ingest DB.",
                 keyGroupRange,
                 operatorIdentifier);
     }
-    */
 
     /**
      * Restores the checkpointing status and state for this backend. This can 
only be done if the

Reply via email to