This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0cde81ab266 [FLINK-35581] Remove comments from the code related to
ingestDB (#25263)
0cde81ab266 is described below
commit 0cde81ab26638f77ce4c71ed51812829dce97730
Author: mayuehappy <[email protected]>
AuthorDate: Fri Aug 30 21:31:58 2024 +0800
[FLINK-35581] Remove comments from the code related to ingestDB (#25263)
---
.../state/RocksDBIncrementalCheckpointUtils.java | 13 ++++---
.../streaming/state/RocksDBOperationUtils.java | 41 ++++++++++++++++++++--
.../streaming/state/restore/RocksDBHandle.java | 7 ++--
.../RocksDBIncrementalRestoreOperation.java | 23 ++++++------
4 files changed, 62 insertions(+), 22 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 4a3374b4119..3147f01747a 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -20,12 +20,17 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
import
org.apache.flink.shaded.guava32.com.google.common.primitives.UnsignedBytes;
+import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.CompactRangeOptions;
+import org.rocksdb.ExportImportFilesMetaData;
import org.rocksdb.LiveFileMetaData;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -36,13 +41,17 @@ import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.io.File;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
import java.util.stream.Collectors;
/** Utils for RocksDB Incremental Checkpoint. */
@@ -254,7 +263,6 @@ public class RocksDBIncrementalCheckpointUtils {
return Optional.of(
() -> {
- /*
try (CompactRangeOptions compactionOptions =
new CompactRangeOptions()
.setExclusiveManualCompaction(true)
@@ -288,7 +296,6 @@ public class RocksDBIncrementalCheckpointUtils {
}
}
}
- */
});
}
@@ -359,7 +366,6 @@ public class RocksDBIncrementalCheckpointUtils {
* @param resultOutput output parameter for the metadata of the export.
* @throws RocksDBException on problems inside RocksDB.
*/
- /*
public static void exportColumnFamilies(
RocksDB db,
List<ColumnFamilyHandle> columnFamilyHandles,
@@ -396,7 +402,6 @@ public class RocksDBIncrementalCheckpointUtils {
}
}
}
- */
/** check whether the bytes is before prefixBytes in the character order.
*/
public static boolean beforeThePrefixBytes(@Nonnull byte[] bytes, @Nonnull
byte[] prefixBytes) {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index acd45f59012..4750969d050 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -33,6 +33,8 @@ import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.ExportImportFilesMetaData;
+import org.rocksdb.ImportColumnFamilyOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -44,6 +46,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -119,6 +122,9 @@ public class RocksDBOperationUtils {
*
* <p>Creates the column family for the state. Sets TTL compaction filter
if {@code
* ttlCompactFiltersManager} is not {@code null}.
+ *
+ * @param importFilesMetaData if not empty, we import the files specified
in the metadata to the
+ * column family.
*/
public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
RegisteredStateMetaInfoBase metaInfoBase,
@@ -126,6 +132,7 @@ public class RocksDBOperationUtils {
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
@Nullable Long writeBufferManagerCapacity,
+ List<ExportImportFilesMetaData> importFilesMetaData,
ICloseableRegistry cancelStreamRegistryForRestore) {
ColumnFamilyDescriptor columnFamilyDescriptor =
@@ -138,7 +145,11 @@ public class RocksDBOperationUtils {
final ColumnFamilyHandle columnFamilyHandle;
try {
columnFamilyHandle =
- createColumnFamily(columnFamilyDescriptor, db,
cancelStreamRegistryForRestore);
+ createColumnFamily(
+ columnFamilyDescriptor,
+ db,
+ importFilesMetaData,
+ cancelStreamRegistryForRestore);
} catch (Exception ex) {
IOUtils.closeQuietly(columnFamilyDescriptor.getOptions());
throw new FlinkRuntimeException("Error creating
ColumnFamilyHandle.", ex);
@@ -147,6 +158,23 @@ public class RocksDBOperationUtils {
return new
RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfoBase);
}
+ public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
+ RegisteredStateMetaInfoBase metaInfoBase,
+ RocksDB db,
+ Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
+ @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
+ @Nullable Long writeBufferManagerCapacity,
+ ICloseableRegistry cancelStreamRegistryForRestore) {
+ return createStateInfo(
+ metaInfoBase,
+ db,
+ columnFamilyOptionsFactory,
+ ttlCompactFiltersManager,
+ writeBufferManagerCapacity,
+ Collections.emptyList(),
+ cancelStreamRegistryForRestore);
+ }
+
/**
* Creates a column descriptor for a state column family.
*
@@ -236,6 +264,7 @@ public class RocksDBOperationUtils {
private static ColumnFamilyHandle createColumnFamily(
ColumnFamilyDescriptor columnDescriptor,
RocksDB db,
+ List<ExportImportFilesMetaData> importFilesMetaData,
ICloseableRegistry cancelStreamRegistryForRestore)
throws RocksDBException, InterruptedException {
if (Thread.currentThread().isInterrupted()) {
@@ -246,7 +275,15 @@ public class RocksDBOperationUtils {
throw new CancelTaskException("The stream was closed, aborting
recovery");
}
- return db.createColumnFamily(columnDescriptor);
+ if (importFilesMetaData.isEmpty()) {
+ return db.createColumnFamily(columnDescriptor);
+ } else {
+ try (ImportColumnFamilyOptions importColumnFamilyOptions =
+ new ImportColumnFamilyOptions().setMoveFiles(true)) {
+ return db.createColumnFamilyWithImport(
+ columnDescriptor, importColumnFamilyOptions,
importFilesMetaData);
+ }
+ }
}
public static void addColumnFamilyOptionsToCloseLater(
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
index ae58adc83b0..0453c1eea49 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
@@ -29,11 +29,13 @@ import
org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.ExportImportFilesMetaData;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +58,8 @@ import static
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUti
/**
* Utility for creating a RocksDB instance either from scratch or from
restored local state. This
- * will also register {@link RocksDbKvStateInfo} when using {@link
#openDB(List, List, Path)}.
+ * will also register {@link RocksDbKvStateInfo} when using {@link
#openDB(List, List, Path,
+ * ICloseableRegistry)}.
*/
class RocksDBHandle implements AutoCloseable {
@@ -198,7 +201,6 @@ class RocksDBHandle implements AutoCloseable {
* @param stateMetaInfo info about the state to create.
* @param cfMetaDataList the data to import.
*/
- /*
void registerStateColumnFamilyHandleWithImport(
RegisteredStateMetaInfoBase stateMetaInfo,
List<ExportImportFilesMetaData> cfMetaDataList,
@@ -221,7 +223,6 @@ class RocksDBHandle implements AutoCloseable {
columnFamilyHandles.add(stateInfo.columnFamilyHandle);
}
- */
/**
* This recreates the new working directory of the recovered RocksDB
instance and links/copies
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 48ffe29bca8..ae4d7b8ed0e 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -59,6 +59,7 @@ import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.ExportImportFilesMetaData;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -72,9 +73,11 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -85,6 +88,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.apache.flink.core.fs.ICloseableRegistry.asCloseable;
import static
org.apache.flink.runtime.metrics.MetricNames.DOWNLOAD_STATE_DURATION;
@@ -183,10 +187,8 @@ public class RocksDBIncrementalRestoreOperation<K>
implements RocksDBRestoreOper
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.keySerializerProvider = keySerializerProvider;
this.userCodeClassLoader = userCodeClassLoader;
- // this.useIngestDbRestoreMode = useIngestDbRestoreMode;
- // this.asyncCompactAfterRescale = asyncCompactAfterRescale;
- this.useIngestDbRestoreMode = false;
- this.asyncCompactAfterRescale = false;
+ this.useIngestDbRestoreMode = useIngestDbRestoreMode;
+ this.asyncCompactAfterRescale = asyncCompactAfterRescale;
this.useDeleteFilesInRange = useDeleteFilesInRange;
this.ioExecutor = ioExecutor;
}
@@ -453,7 +455,6 @@ public class RocksDBIncrementalRestoreOperation<K>
implements RocksDBRestoreOper
byte[] stopKeyGroupPrefixBytes)
throws Exception {
- /*
final Path absolutInstanceBasePath =
instanceBasePath.getAbsoluteFile().toPath();
final Path exportCfBasePath =
absolutInstanceBasePath.resolve("export-cfs");
Files.createDirectories(exportCfBasePath);
@@ -493,7 +494,6 @@ public class RocksDBIncrementalRestoreOperation<K>
implements RocksDBRestoreOper
// Cleanup export base directory
cleanUpPathQuietly(exportCfBasePath);
}
- */
}
/**
@@ -509,7 +509,6 @@ public class RocksDBIncrementalRestoreOperation<K>
implements RocksDBRestoreOper
* @return the total key-groups range of the exported data.
* @throws Exception on any export error.
*/
- /*
private KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange(
Path exportCfBasePath,
List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles,
@@ -590,7 +589,6 @@ public class RocksDBIncrementalRestoreOperation<K>
implements RocksDBRestoreOper
? new KeyGroupRange(minExportKeyGroup, maxExportKeyGroup)
: KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
}
- */
/**
* Helper method that merges the data from multiple state handles into the
restoring base DB by
@@ -640,7 +638,6 @@ public class RocksDBIncrementalRestoreOperation<K>
implements RocksDBRestoreOper
* @param exportKeyGroupRange the total key-groups range of the exported
data.
* @throws Exception on import error.
*/
- /*
private void initBaseDBFromColumnFamilyImports(
Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>>
exportedColumnFamilyMetaData,
@@ -653,10 +650,10 @@ public class RocksDBIncrementalRestoreOperation<K>
implements RocksDBRestoreOper
keyGroupRange,
operatorIdentifier);
rocksHandle.openDB();
- for (Map.Entry<RegisteredStateMetaInfoBase.Key,
List<ExportImportFilesMetaData>> entry :
+ for (Map.Entry<RegisteredStateMetaInfoBase,
List<ExportImportFilesMetaData>> entry :
exportedColumnFamilyMetaData.entrySet()) {
rocksHandle.registerStateColumnFamilyHandleWithImport(
- entry.getKey(), entry.getValue(),
cancelStreamRegistryForRestore);
+ entry.getKey(), entry.getValue(), cancelStreamRegistry);
}
// Use Range delete to clip the temp db to the target range of the
backend
@@ -665,14 +662,14 @@ public class RocksDBIncrementalRestoreOperation<K>
implements RocksDBRestoreOper
rocksHandle.getColumnFamilyHandles(),
keyGroupRange,
exportKeyGroupRange,
- keyGroupPrefixBytes);
+ keyGroupPrefixBytes,
+ useDeleteFilesInRange);
logger.info(
"Completed importing exported state handles for backend with
range {} in operator {} using Clip/Ingest DB.",
keyGroupRange,
operatorIdentifier);
}
- */
/**
* Restores the checkpointing status and state for this backend. This can
only be done if the