This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 268a30802edf0c7d1be61e29d47ba5343dfa8a49 Author: Stefan Richter <srich...@confluent.io> AuthorDate: Mon Feb 12 13:35:40 2024 +0100 [FLINK-31238] Deactivate parts of the code until new FRocksDB release is available. Then this commit should be reverted. --- .../flink-statebackend-rocksdb/pom.xml | 4 +- .../state/RocksDBIncrementalCheckpointUtils.java | 15 ++------ .../streaming/state/RocksDBOperationUtils.java | 43 ++-------------------- .../streaming/state/restore/RocksDBHandle.java | 4 +- .../RocksDBIncrementalRestoreOperation.java | 16 +++++--- 5 files changed, 22 insertions(+), 60 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml b/flink-state-backends/flink-statebackend-rocksdb/pom.xml index 592d29df673..1a39fc84b7f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/pom.xml +++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml @@ -61,9 +61,9 @@ under the License. </dependency> <dependency> - <groupId>io.github.fredia</groupId> + <groupId>com.ververica</groupId> <artifactId>frocksdbjni</artifactId> - <version>8.6.7-ververica-test-1.0</version> + <version>6.20.3-ververica-2.0</version> </dependency> <!-- test dependencies --> diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java index 2e7342d4e0f..99b97ef5164 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java @@ -20,17 +20,11 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; -import org.apache.flink.util.IOUtils; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.shaded.guava31.com.google.common.primitives.UnsignedBytes; -import org.rocksdb.Checkpoint; import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.CompactRangeOptions; -import org.rocksdb.ExportImportFilesMetaData; import org.rocksdb.LiveFileMetaData; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -39,16 +33,11 @@ import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.io.File; -import java.nio.file.Path; -import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.UUID; /** Utils for RocksDB Incremental Checkpoint. */ public class RocksDBIncrementalCheckpointUtils { @@ -218,6 +207,7 @@ public class RocksDBIncrementalCheckpointUtils { return Optional.of( () -> { + /* try (CompactRangeOptions compactionOptions = new CompactRangeOptions() .setExclusiveManualCompaction(true) @@ -251,6 +241,7 @@ public class RocksDBIncrementalCheckpointUtils { } } } + */ }); } @@ -321,6 +312,7 @@ public class RocksDBIncrementalCheckpointUtils { * @param resultOutput output parameter for the metadata of the export. * @throws RocksDBException on problems inside RocksDB. */ + /* public static void exportColumnFamilies( RocksDB db, List<ColumnFamilyHandle> columnFamilyHandles, @@ -357,6 +349,7 @@ public class RocksDBIncrementalCheckpointUtils { } } } + */ /** check whether the bytes is before prefixBytes in the character order. */ public static boolean beforeThePrefixBytes(@Nonnull byte[] bytes, @Nonnull byte[] prefixBytes) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java index 1f7bcf5ff1a..c3a9549ccd8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java @@ -31,8 +31,6 @@ import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import org.rocksdb.ExportImportFilesMetaData; -import org.rocksdb.ImportColumnFamilyOptions; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -44,7 +42,6 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -120,17 +117,13 @@ public class RocksDBOperationUtils { * * <p>Creates the column family for the state. Sets TTL compaction filter if {@code * ttlCompactFiltersManager} is not {@code null}. - * - * @param importFilesMetaData if not empty, we import the files specified in the metadata to the - * column family. */ public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo( RegisteredStateMetaInfoBase metaInfoBase, RocksDB db, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, - @Nullable Long writeBufferManagerCapacity, - List<ExportImportFilesMetaData> importFilesMetaData) { + @Nullable Long writeBufferManagerCapacity) { ColumnFamilyDescriptor columnFamilyDescriptor = createColumnFamilyDescriptor( @@ -141,8 +134,7 @@ public class RocksDBOperationUtils { final ColumnFamilyHandle columnFamilyHandle; try { - columnFamilyHandle = - createColumnFamily(columnFamilyDescriptor, db, importFilesMetaData); + columnFamilyHandle = createColumnFamily(columnFamilyDescriptor, db); } catch (Exception ex) { IOUtils.closeQuietly(columnFamilyDescriptor.getOptions()); throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", ex); @@ -151,21 +143,6 @@ public class RocksDBOperationUtils { return new RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfoBase); } - public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo( - RegisteredStateMetaInfoBase metaInfoBase, - RocksDB db, - Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, - @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, - @Nullable Long writeBufferManagerCapacity) { - return createStateInfo( - metaInfoBase, - db, - columnFamilyOptionsFactory, - ttlCompactFiltersManager, - writeBufferManagerCapacity, - Collections.emptyList()); - } - /** * Creates a column descriptor for a state column family. * @@ -253,20 +230,8 @@ public class RocksDBOperationUtils { } private static ColumnFamilyHandle createColumnFamily( - ColumnFamilyDescriptor columnDescriptor, - RocksDB db, - List<ExportImportFilesMetaData> importFilesMetaData) - throws RocksDBException { - - if (importFilesMetaData.isEmpty()) { - return db.createColumnFamily(columnDescriptor); - } else { - try (ImportColumnFamilyOptions importColumnFamilyOptions = - new ImportColumnFamilyOptions().setMoveFiles(true)) { - return db.createColumnFamilyWithImport( - columnDescriptor, importColumnFamilyOptions, importFilesMetaData); - } - } + ColumnFamilyDescriptor columnDescriptor, RocksDB db) throws RocksDBException { + return db.createColumnFamily(columnDescriptor); } public static void addColumnFamilyOptionsToCloseLater( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java index a2ed8f1e33f..25fbb95bc79 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java @@ -28,13 +28,11 @@ import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; -import org.apache.flink.util.Preconditions; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import org.rocksdb.ExportImportFilesMetaData; import org.rocksdb.RocksDB; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -193,6 +191,7 @@ class RocksDBHandle implements AutoCloseable { * @param stateMetaInfo info about the state to create. * @param cfMetaDataList the data to import. */ + /* void registerStateColumnFamilyHandleWithImport( RegisteredStateMetaInfoBase stateMetaInfo, List<ExportImportFilesMetaData> cfMetaDataList) { @@ -213,6 +212,7 @@ class RocksDBHandle implements AutoCloseable { columnFamilyHandles.add(stateInfo.columnFamilyHandle); } + */ /** * This recreates the new working directory of the recovered RocksDB instance and links/copies diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index 0a5e9744c29..4d09b50b44e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -58,7 +58,6 @@ import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import org.rocksdb.ExportImportFilesMetaData; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -71,11 +70,9 @@ import javax.annotation.Nonnull; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -86,7 +83,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Function; -import java.util.stream.Collectors; import static org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException; @@ -167,8 +163,10 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.keySerializerProvider = keySerializerProvider; this.userCodeClassLoader = userCodeClassLoader; - this.useIngestDbRestoreMode = useIngestDbRestoreMode; - this.asyncCompactAfterRescale = asyncCompactAfterRescale; + // this.useIngestDbRestoreMode = useIngestDbRestoreMode; + // this.asyncCompactAfterRescale = asyncCompactAfterRescale; + this.useIngestDbRestoreMode = false; + this.asyncCompactAfterRescale = false; } /** Root method that branches for different implementations of {@link KeyedStateHandle}. */ @@ -408,6 +406,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper byte[] stopKeyGroupPrefixBytes) throws Exception { + /* final Path absolutInstanceBasePath = instanceBasePath.getAbsoluteFile().toPath(); final Path exportCfBasePath = absolutInstanceBasePath.resolve("export-cfs"); Files.createDirectories(exportCfBasePath); @@ -447,6 +446,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper // Cleanup export base directory cleanUpPathQuietly(exportCfBasePath); } + */ } /** @@ -462,6 +462,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper * @return the total key-groups range of the exported data. * @throws Exception on any export error. */ + /* private KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange( Path exportCfBasePath, List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles, @@ -542,6 +543,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper ? new KeyGroupRange(minExportKeyGroup, maxExportKeyGroup) : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; } + */ /** * Helper method that merges the data from multiple state handles into the restoring base DB by @@ -594,6 +596,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper * @param exportKeyGroupRange the total key-groups range of the exported data. * @throws Exception on import error. */ + /* private void initBaseDBFromColumnFamilyImports( Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>> exportedColumnFamilyMetaData, @@ -622,6 +625,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper keyGroupRange, operatorIdentifier); } + */ /** * Restores the checkpointing status and state for this backend. This can only be done if the