This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 268a30802edf0c7d1be61e29d47ba5343dfa8a49
Author: Stefan Richter <srich...@confluent.io>
AuthorDate: Mon Feb 12 13:35:40 2024 +0100

    [FLINK-31238] Deactivate parts of the code until new FRocksDB release is 
available.
    
    Then this commit should be reverted.
---
 .../flink-statebackend-rocksdb/pom.xml             |  4 +-
 .../state/RocksDBIncrementalCheckpointUtils.java   | 15 ++------
 .../streaming/state/RocksDBOperationUtils.java     | 43 ++--------------------
 .../streaming/state/restore/RocksDBHandle.java     |  4 +-
 .../RocksDBIncrementalRestoreOperation.java        | 16 +++++---
 5 files changed, 22 insertions(+), 60 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml 
b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
index 592d29df673..1a39fc84b7f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/pom.xml
+++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
@@ -61,9 +61,9 @@ under the License.
                </dependency>
 
                <dependency>
-                       <groupId>io.github.fredia</groupId>
+                       <groupId>com.ververica</groupId>
                        <artifactId>frocksdbjni</artifactId>
-                       <version>8.6.7-ververica-test-1.0</version>
+                       <version>6.20.3-ververica-2.0</version>
                </dependency>
 
                <!-- test dependencies -->
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 2e7342d4e0f..99b97ef5164 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -20,17 +20,11 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
 
 import 
org.apache.flink.shaded.guava31.com.google.common.primitives.UnsignedBytes;
 
-import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.CompactRangeOptions;
-import org.rocksdb.ExportImportFilesMetaData;
 import org.rocksdb.LiveFileMetaData;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -39,16 +33,11 @@ import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.io.File;
-import java.nio.file.Path;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.UUID;
 
 /** Utils for RocksDB Incremental Checkpoint. */
 public class RocksDBIncrementalCheckpointUtils {
@@ -218,6 +207,7 @@ public class RocksDBIncrementalCheckpointUtils {
 
         return Optional.of(
                 () -> {
+                    /*
                     try (CompactRangeOptions compactionOptions =
                             new CompactRangeOptions()
                                     .setExclusiveManualCompaction(true)
@@ -251,6 +241,7 @@ public class RocksDBIncrementalCheckpointUtils {
                             }
                         }
                     }
+                     */
                 });
     }
 
@@ -321,6 +312,7 @@ public class RocksDBIncrementalCheckpointUtils {
      * @param resultOutput output parameter for the metadata of the export.
      * @throws RocksDBException on problems inside RocksDB.
      */
+    /*
     public static void exportColumnFamilies(
             RocksDB db,
             List<ColumnFamilyHandle> columnFamilyHandles,
@@ -357,6 +349,7 @@ public class RocksDBIncrementalCheckpointUtils {
             }
         }
     }
+    */
 
     /** check whether the bytes is before prefixBytes in the character order. 
*/
     public static boolean beforeThePrefixBytes(@Nonnull byte[] bytes, @Nonnull 
byte[] prefixBytes) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index 1f7bcf5ff1a..c3a9549ccd8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -31,8 +31,6 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ExportImportFilesMetaData;
-import org.rocksdb.ImportColumnFamilyOptions;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -44,7 +42,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -120,17 +117,13 @@ public class RocksDBOperationUtils {
      *
      * <p>Creates the column family for the state. Sets TTL compaction filter 
if {@code
      * ttlCompactFiltersManager} is not {@code null}.
-     *
-     * @param importFilesMetaData if not empty, we import the files specified 
in the metadata to the
-     *     column family.
      */
     public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
             RegisteredStateMetaInfoBase metaInfoBase,
             RocksDB db,
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
-            @Nullable Long writeBufferManagerCapacity,
-            List<ExportImportFilesMetaData> importFilesMetaData) {
+            @Nullable Long writeBufferManagerCapacity) {
 
         ColumnFamilyDescriptor columnFamilyDescriptor =
                 createColumnFamilyDescriptor(
@@ -141,8 +134,7 @@ public class RocksDBOperationUtils {
 
         final ColumnFamilyHandle columnFamilyHandle;
         try {
-            columnFamilyHandle =
-                    createColumnFamily(columnFamilyDescriptor, db, 
importFilesMetaData);
+            columnFamilyHandle = createColumnFamily(columnFamilyDescriptor, 
db);
         } catch (Exception ex) {
             IOUtils.closeQuietly(columnFamilyDescriptor.getOptions());
             throw new FlinkRuntimeException("Error creating 
ColumnFamilyHandle.", ex);
@@ -151,21 +143,6 @@ public class RocksDBOperationUtils {
         return new 
RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfoBase);
     }
 
-    public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
-            RegisteredStateMetaInfoBase metaInfoBase,
-            RocksDB db,
-            Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
-            @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
-            @Nullable Long writeBufferManagerCapacity) {
-        return createStateInfo(
-                metaInfoBase,
-                db,
-                columnFamilyOptionsFactory,
-                ttlCompactFiltersManager,
-                writeBufferManagerCapacity,
-                Collections.emptyList());
-    }
-
     /**
      * Creates a column descriptor for a state column family.
      *
@@ -253,20 +230,8 @@ public class RocksDBOperationUtils {
     }
 
     private static ColumnFamilyHandle createColumnFamily(
-            ColumnFamilyDescriptor columnDescriptor,
-            RocksDB db,
-            List<ExportImportFilesMetaData> importFilesMetaData)
-            throws RocksDBException {
-
-        if (importFilesMetaData.isEmpty()) {
-            return db.createColumnFamily(columnDescriptor);
-        } else {
-            try (ImportColumnFamilyOptions importColumnFamilyOptions =
-                    new ImportColumnFamilyOptions().setMoveFiles(true)) {
-                return db.createColumnFamilyWithImport(
-                        columnDescriptor, importColumnFamilyOptions, 
importFilesMetaData);
-            }
-        }
+            ColumnFamilyDescriptor columnDescriptor, RocksDB db) throws 
RocksDBException {
+        return db.createColumnFamily(columnDescriptor);
     }
 
     public static void addColumnFamilyOptionsToCloseLater(
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
index a2ed8f1e33f..25fbb95bc79 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
@@ -28,13 +28,11 @@ import 
org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
 
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ExportImportFilesMetaData;
 import org.rocksdb.RocksDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -193,6 +191,7 @@ class RocksDBHandle implements AutoCloseable {
      * @param stateMetaInfo info about the state to create.
      * @param cfMetaDataList the data to import.
      */
+    /*
     void registerStateColumnFamilyHandleWithImport(
             RegisteredStateMetaInfoBase stateMetaInfo,
             List<ExportImportFilesMetaData> cfMetaDataList) {
@@ -213,6 +212,7 @@ class RocksDBHandle implements AutoCloseable {
 
         columnFamilyHandles.add(stateInfo.columnFamilyHandle);
     }
+    */
 
     /**
      * This recreates the new working directory of the recovered RocksDB 
instance and links/copies
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 0a5e9744c29..4d09b50b44e 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -58,7 +58,6 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ExportImportFilesMetaData;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -71,11 +70,9 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -86,7 +83,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 import static 
org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException;
 
@@ -167,8 +163,10 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         this.keyGroupPrefixBytes = keyGroupPrefixBytes;
         this.keySerializerProvider = keySerializerProvider;
         this.userCodeClassLoader = userCodeClassLoader;
-        this.useIngestDbRestoreMode = useIngestDbRestoreMode;
-        this.asyncCompactAfterRescale = asyncCompactAfterRescale;
+        //        this.useIngestDbRestoreMode = useIngestDbRestoreMode;
+        //        this.asyncCompactAfterRescale = asyncCompactAfterRescale;
+        this.useIngestDbRestoreMode = false;
+        this.asyncCompactAfterRescale = false;
     }
 
     /** Root method that branches for different implementations of {@link 
KeyedStateHandle}. */
@@ -408,6 +406,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             byte[] stopKeyGroupPrefixBytes)
             throws Exception {
 
+        /*
         final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
         final Path exportCfBasePath = 
absolutInstanceBasePath.resolve("export-cfs");
         Files.createDirectories(exportCfBasePath);
@@ -447,6 +446,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             // Cleanup export base directory
             cleanUpPathQuietly(exportCfBasePath);
         }
+        */
     }
 
     /**
@@ -462,6 +462,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
      * @return the total key-groups range of the exported data.
      * @throws Exception on any export error.
      */
+    /*
     private KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange(
             Path exportCfBasePath,
             List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles,
@@ -542,6 +543,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                 ? new KeyGroupRange(minExportKeyGroup, maxExportKeyGroup)
                 : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
     }
+     */
 
     /**
      * Helper method that merges the data from multiple state handles into the 
restoring base DB by
@@ -594,6 +596,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
      * @param exportKeyGroupRange the total key-groups range of the exported 
data.
      * @throws Exception on import error.
      */
+    /*
     private void initBaseDBFromColumnFamilyImports(
             Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>>
                     exportedColumnFamilyMetaData,
@@ -622,6 +625,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                 keyGroupRange,
                 operatorIdentifier);
     }
+    */
 
     /**
      * Restores the checkpointing status and state for this backend. This can 
only be done if the

Reply via email to