This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5495a96a38663dc04753cb6db0c100aa2bc7d297 Author: Stefan Richter <srich...@confluent.io> AuthorDate: Fri Jan 5 15:32:32 2024 +0100 [FLINK-31238] Use IngestDB to speed up Rocksdb rescaling recovery (part 2) --- .../state/AbstractKeyedStateBackendBuilder.java | 2 +- .../state/EmbeddedRocksDBStateBackend.java | 43 +- .../state/RocksDBConfigurableOptions.java | 16 +- .../state/RocksDBIncrementalCheckpointUtils.java | 307 ++++++- .../streaming/state/RocksDBKeyedStateBackend.java | 13 +- .../state/RocksDBKeyedStateBackendBuilder.java | 20 +- .../streaming/state/RocksDBOperationUtils.java | 62 +- .../streaming/state/RocksDBStateDownloader.java | 4 +- .../state/restore/RocksDBFullRestoreOperation.java | 1 + .../streaming/state/restore/RocksDBHandle.java | 24 +- .../RocksDBHeapTimersFullRestoreOperation.java | 1 + .../RocksDBIncrementalRestoreOperation.java | 878 ++++++++++++++------- .../state/restore/RocksDBNoneRestoreOperation.java | 1 + .../state/restore/RocksDBRestoreResult.java | 14 +- .../state/EmbeddedRocksDBStateBackendTest.java | 49 +- .../streaming/state/RocksDBRecoveryTest.java | 379 +++++++++ .../state/RocksDBStateBackendConfigTest.java | 43 + .../contrib/streaming/state/RocksDBTestUtils.java | 35 +- .../RocksIncrementalCheckpointRescalingTest.java | 3 +- .../test/checkpointing/AutoRescalingITCase.java | 14 +- 20 files changed, 1482 insertions(+), 427 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java index 00da919b381..c8fe9ef4652 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java @@ -34,7 +34,7 @@ import java.util.Collection; /** An abstract base implementation of the {@link StateBackendBuilder} interface. */ public abstract class AbstractKeyedStateBackendBuilder<K> - implements StateBackendBuilder<AbstractKeyedStateBackend, BackendBuildingException> { + implements StateBackendBuilder<AbstractKeyedStateBackend<K>, BackendBuildingException> { protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final TaskKvStateRegistry kvStateRegistry; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index 5fa5c68acf1..0c5e748772e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -70,6 +70,7 @@ import java.util.UUID; import java.util.function.Supplier; import static org.apache.flink.configuration.description.TextElement.text; +import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE; @@ -108,8 +109,6 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke private static final double UNDEFINED_OVERLAP_FRACTION_THRESHOLD = -1; - private static final boolean UNDEFINED_USE_INGEST_DB_RESTORE_MODE = false; - // ------------------------------------------------------------------------ // -- configuration values, set in the application / configuration @@ -171,9 +170,19 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke * The threshold of the overlap fraction between the handle's key-group range and target * key-group range. */ - private double overlapFractionThreshold; + private final double overlapFractionThreshold; + + /** + * Whether we use the optimized Ingest/Clip DB method for rescaling RocksDB incremental + * checkpoints. + */ + private final TernaryBoolean useIngestDbRestoreMode; - private boolean useIngestDbRestoreMode; + /** + * Whether we trigger an async compaction after restores for which we detect state in the + * database (including tombstones) that exceed the proclaimed key-groups range of the backend. + */ + private final TernaryBoolean incrementalRestoreAsyncCompactAfterRescale; /** Factory for Write Buffer Manager and Block Cache. */ private RocksDBMemoryFactory rocksDBMemoryFactory; @@ -207,7 +216,8 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD; this.rocksDBMemoryFactory = RocksDBMemoryFactory.DEFAULT; this.priorityQueueConfig = new RocksDBPriorityQueueConfig(); - this.useIngestDbRestoreMode = UNDEFINED_USE_INGEST_DB_RESTORE_MODE; + this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED; + this.incrementalRestoreAsyncCompactAfterRescale = TernaryBoolean.UNDEFINED; } /** @@ -302,10 +312,16 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke overlapFractionThreshold >= 0 && this.overlapFractionThreshold <= 1, "Overlap fraction threshold of restoring should be between 0 and 1"); + incrementalRestoreAsyncCompactAfterRescale = + original.incrementalRestoreAsyncCompactAfterRescale == TernaryBoolean.UNDEFINED + ? TernaryBoolean.fromBoxedBoolean( + config.get(INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE)) + : original.incrementalRestoreAsyncCompactAfterRescale; + useIngestDbRestoreMode = - original.useIngestDbRestoreMode == UNDEFINED_USE_INGEST_DB_RESTORE_MODE - ? config.get(USE_INGEST_DB_RESTORE_MODE) - : original.useIngestDbRestoreMode; + original.useIngestDbRestoreMode == TernaryBoolean.UNDEFINED + ? TernaryBoolean.fromBoxedBoolean(config.get(USE_INGEST_DB_RESTORE_MODE)) + : TernaryBoolean.fromBoolean(original.getUseIngestDbRestoreMode()); this.rocksDBMemoryFactory = original.rocksDBMemoryFactory; } @@ -478,6 +494,8 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke resourceContainer.getMemoryWatcherOptions(nativeMetricOptions)) .setWriteBatchSize(getWriteBatchSize()) .setOverlapFractionThreshold(getOverlapFractionThreshold()) + .setIncrementalRestoreAsyncCompactAfterRescale( + getIncrementalRestoreAsyncCompactAfterRescale()) .setUseIngestDbRestoreMode(getUseIngestDbRestoreMode()); return builder.build(); } @@ -817,10 +835,13 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke : overlapFractionThreshold; } + boolean getIncrementalRestoreAsyncCompactAfterRescale() { + return incrementalRestoreAsyncCompactAfterRescale.getOrDefault( + INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue()); + } + boolean getUseIngestDbRestoreMode() { - return useIngestDbRestoreMode == UNDEFINED_USE_INGEST_DB_RESTORE_MODE - ? USE_INGEST_DB_RESTORE_MODE.defaultValue() - : useIngestDbRestoreMode; + return useIngestDbRestoreMode.getOrDefault(USE_INGEST_DB_RESTORE_MODE.defaultValue()); } // ------------------------------------------------------------------------ diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java index 255182f3502..8acf8af11fa 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java @@ -312,9 +312,17 @@ public class RocksDBConfigurableOptions implements Serializable { public static final ConfigOption<Boolean> USE_INGEST_DB_RESTORE_MODE = key("state.backend.rocksdb.use-ingest-db-restore-mode") .booleanType() - .defaultValue(false) + .defaultValue(Boolean.FALSE) + .withDescription( + "A recovery mode that directly clips and ingests multiple DBs during state recovery if the keys" + + " in the SST files does not exceed the declared key-group range."); + + public static final ConfigOption<Boolean> INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE = + key("state.backend.rocksdb.incremental-restore-async-compact-after-rescale") + .booleanType() + .defaultValue(Boolean.FALSE) .withDescription( - "A recovery mode that directly clips and ingests multiple DBs during state recovery. "); + "If true, an async compaction of RocksDB is started after every restore after which we detect keys (including tombstones) in the database that are outside the key-groups range of the backend."); static final ConfigOption<?>[] CANDIDATE_CONFIGS = new ConfigOption<?>[] { @@ -341,7 +349,9 @@ public class RocksDBConfigurableOptions implements Serializable { USE_BLOOM_FILTER, BLOOM_FILTER_BITS_PER_KEY, BLOOM_FILTER_BLOCK_BASED_MODE, - RESTORE_OVERLAP_FRACTION_THRESHOLD + RESTORE_OVERLAP_FRACTION_THRESHOLD, + USE_INGEST_DB_RESTORE_MODE, + INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE }; private static final Set<ConfigOption<?>> POSITIVE_INT_CONFIG_SET = diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java index 4fed7e72e4d..2e7342d4e0f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java @@ -21,11 +21,17 @@ import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; -import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.RunnableWithException; + +import org.apache.flink.shaded.guava31.com.google.common.primitives.UnsignedBytes; import org.rocksdb.Checkpoint; import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.CompactRangeOptions; import org.rocksdb.ExportImportFilesMetaData; +import org.rocksdb.LiveFileMetaData; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -33,18 +39,22 @@ import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.File; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; -import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; /** Utils for RocksDB Incremental Checkpoint. */ public class RocksDBIncrementalCheckpointUtils { + /** - * Evaluates state handle's "score" regarding to the target range when choosing the best state + * Evaluates state handle's "score" regarding the target range when choosing the best state * handle to init the initial db for recovery, if the overlap fraction is less than * overlapFractionThreshold, then just return {@code Score.MIN} to mean the handle has no chance * to be the initial handle. @@ -93,9 +103,11 @@ public class RocksDBIncrementalCheckpointUtils { } @Override - public int compareTo(@Nonnull Score other) { - return Comparator.comparing(Score::getIntersectGroupRange) - .thenComparing(Score::getOverlapFraction) + public int compareTo(@Nullable Score other) { + return Comparator.nullsFirst( + Comparator.comparing(Score::getIntersectGroupRange) + .thenComparing(Score::getIntersectGroupRange) + .thenComparing(Score::getOverlapFraction)) .compare(this, other); } } @@ -163,51 +175,187 @@ public class RocksDBIncrementalCheckpointUtils { } /** - * Clip the entries in the CF according to the range [begin_key, end_key). Any entries outside - * this range will be completely deleted (including tombstones). + * Returns true, if all entries in the sst files of the given DB is strictly within the expected + * key-group range for the DB. * - * @param db the target need to be clipped. - * @param columnFamilyHandles the column family need to be clipped. - * @param beginKeyBytes the begin key bytes - * @param endKeyBytes the end key bytes + * @param db the DB to check. + * @param dbExpectedKeyGroupRange the expected key-groups range of the DB. + * @param keyGroupPrefixBytes the number of bytes used to serialize the key-group prefix of keys + * in the DB. + */ + public static boolean isSstDataInKeyGroupRange( + RocksDB db, int keyGroupPrefixBytes, KeyGroupRange dbExpectedKeyGroupRange) { + return checkSstDataAgainstKeyGroupRange(db, keyGroupPrefixBytes, dbExpectedKeyGroupRange) + .allInRange(); + } + + /** + * Returns a range compaction task as runnable if any data in the SST files of the given DB + * exceeds the proclaimed key-group range. + * + * @param db the DB to check and compact if needed. + * @param columnFamilyHandles list of column families to check. + * @param keyGroupPrefixBytes the number of bytes used to serialize the key-group prefix of keys + * in the DB. + * @param dbExpectedKeyGroupRange the expected key-groups range of the DB. + * @return runnable that performs compaction upon execution if the key-groups range is exceeded. + * Otherwise, empty optional is returned. */ - public static void clipColumnFamilies( + public static Optional<RunnableWithException> createRangeCompactionTaskIfNeeded( RocksDB db, - List<ColumnFamilyHandle> columnFamilyHandles, - byte[] beginKeyBytes, - byte[] endKeyBytes) - throws RocksDBException { + Collection<ColumnFamilyHandle> columnFamilyHandles, + int keyGroupPrefixBytes, + KeyGroupRange dbExpectedKeyGroupRange) { - for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { - db.clipColumnFamily(columnFamilyHandle, beginKeyBytes, endKeyBytes); + RangeCheckResult rangeCheckResult = + checkSstDataAgainstKeyGroupRange(db, keyGroupPrefixBytes, dbExpectedKeyGroupRange); + + if (rangeCheckResult.allInRange()) { + // No keys exceed the proclaimed range of the backend, so we don't need a compaction + // from this point of view. + return Optional.empty(); + } + + return Optional.of( + () -> { + try (CompactRangeOptions compactionOptions = + new CompactRangeOptions() + .setExclusiveManualCompaction(true) + .setBottommostLevelCompaction( + CompactRangeOptions.BottommostLevelCompaction + .kForceOptimized)) { + + if (!rangeCheckResult.leftInRange) { + // Compact all keys before from the expected key-groups range + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + db.compactRange( + columnFamilyHandle, + // TODO: change to null once this API is fixed + new byte[] {}, + rangeCheckResult.minKey, + compactionOptions); + } + } + + if (!rangeCheckResult.rightInRange) { + // Compact all keys after the expected key-groups range + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + db.compactRange( + columnFamilyHandle, + rangeCheckResult.maxKey, + // TODO: change to null once this API is fixed + new byte[] { + (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff + }, + compactionOptions); + } + } + } + }); + } + + /** + * Checks data in the SST files of the given DB for keys that exceed either the lower and upper + * bound of the proclaimed key-groups range of the DB. + * + * @param db the DB to check. + * @param keyGroupPrefixBytes the number of bytes used to serialize the key-group prefix of keys + * in the DB. + * @param dbExpectedKeyGroupRange the expected key-groups range of the DB. + * @return the check result with detailed info about lower and upper bound violations. + */ + private static RangeCheckResult checkSstDataAgainstKeyGroupRange( + RocksDB db, int keyGroupPrefixBytes, KeyGroupRange dbExpectedKeyGroupRange) { + final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes]; + final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes]; + + CompositeKeySerializationUtils.serializeKeyGroup( + dbExpectedKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes); + + CompositeKeySerializationUtils.serializeKeyGroup( + dbExpectedKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes); + + KeyRange dbKeyRange = getDBKeyRange(db); + Comparator<byte[]> comparator = UnsignedBytes.lexicographicalComparator(); + return RangeCheckResult.of( + comparator.compare(dbKeyRange.minKey, beginKeyGroupBytes) >= 0, + comparator.compare(dbKeyRange.maxKey, endKeyGroupBytes) < 0, + beginKeyGroupBytes, + endKeyGroupBytes); + } + + /** Returns a pair of minimum and maximum key in the sst files of the given database. */ + private static KeyRange getDBKeyRange(RocksDB db) { + final Comparator<byte[]> comparator = UnsignedBytes.lexicographicalComparator(); + final List<LiveFileMetaData> liveFilesMetaData = db.getLiveFilesMetaData(); + + if (liveFilesMetaData.isEmpty()) { + return KeyRange.EMPTY; + } + + Iterator<LiveFileMetaData> liveFileMetaDataIterator = liveFilesMetaData.iterator(); + LiveFileMetaData fileMetaData = liveFileMetaDataIterator.next(); + byte[] smallestKey = fileMetaData.smallestKey(); + byte[] largestKey = fileMetaData.largestKey(); + while (liveFileMetaDataIterator.hasNext()) { + fileMetaData = liveFileMetaDataIterator.next(); + byte[] sstSmallestKey = fileMetaData.smallestKey(); + byte[] sstLargestKey = fileMetaData.largestKey(); + if (comparator.compare(sstSmallestKey, smallestKey) < 0) { + smallestKey = sstSmallestKey; + } + if (comparator.compare(sstLargestKey, largestKey) > 0) { + largestKey = sstLargestKey; + } } + return KeyRange.of(smallestKey, largestKey); } - public static Map<RegisteredStateMetaInfoBase, ExportImportFilesMetaData> exportColumnFamilies( + /** + * Exports the data of the given column families in the given DB. + * + * @param db the DB to export from. + * @param columnFamilyHandles the column families to export. + * @param registeredStateMetaInfoBases meta information about the registered states in the DB. + * @param exportBasePath the path to which the export files go. + * @param resultOutput output parameter for the metadata of the export. + * @throws RocksDBException on problems inside RocksDB. + */ + public static void exportColumnFamilies( RocksDB db, List<ColumnFamilyHandle> columnFamilyHandles, - List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, - Path exportBasePath) + List<RegisteredStateMetaInfoBase> registeredStateMetaInfoBases, + Path exportBasePath, + Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>> resultOutput) throws RocksDBException { - HashMap<RegisteredStateMetaInfoBase, ExportImportFilesMetaData> cfMetaInfoAndData = - new HashMap<>(); + Preconditions.checkArgument( + columnFamilyHandles.size() == registeredStateMetaInfoBases.size(), + "Lists are aligned by index and must be of the same size!"); + try (final Checkpoint checkpoint = Checkpoint.create(db)) { for (int i = 0; i < columnFamilyHandles.size(); i++) { - StateMetaInfoSnapshot metaInfoSnapshot = stateMetaInfoSnapshots.get(i); - - RegisteredStateMetaInfoBase stateMetaInfo = - RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(metaInfoSnapshot); + RegisteredStateMetaInfoBase stateMetaInfo = registeredStateMetaInfoBases.get(i); - ExportImportFilesMetaData cfMetaData = + Path subPath = exportBasePath.resolve(UUID.randomUUID().toString()); + ExportImportFilesMetaData exportedColumnFamilyMetaData = checkpoint.exportColumnFamily( - columnFamilyHandles.get(i), - exportBasePath.resolve(UUID.randomUUID().toString()).toString()); - cfMetaInfoAndData.put(stateMetaInfo, cfMetaData); + columnFamilyHandles.get(i), subPath.toString()); + + File[] exportedSstFiles = + subPath.toFile() + .listFiles((file, name) -> name.toLowerCase().endsWith(".sst")); + + if (exportedSstFiles != null && exportedSstFiles.length > 0) { + resultOutput + .computeIfAbsent(stateMetaInfo, (key) -> new ArrayList<>()) + .add(exportedColumnFamilyMetaData); + } else { + // Close unused empty export result + IOUtils.closeQuietly(exportedColumnFamilyMetaData); + } } } - - return cfMetaInfoAndData; } /** check whether the bytes is before prefixBytes in the character order. */ @@ -228,26 +376,107 @@ public class RocksDBIncrementalCheckpointUtils { * * @param restoreStateHandles The candidate state handles. * @param targetKeyGroupRange The target key group range. + * @param overlapFractionThreshold configured threshold for overlap. * @return The best candidate or null if no candidate was a good fit. + * @param <T> the generic parameter type of the state handles. */ @Nullable public static <T extends KeyedStateHandle> T chooseTheBestStateHandleForInitial( - @Nonnull Collection<T> restoreStateHandles, + @Nonnull List<T> restoreStateHandles, + @Nonnull KeyGroupRange targetKeyGroupRange, + double overlapFractionThreshold) { + + int pos = + findTheBestStateHandleForInitial( + restoreStateHandles, targetKeyGroupRange, overlapFractionThreshold); + return pos >= 0 ? restoreStateHandles.get(pos) : null; + } + + /** + * Choose the best state handle according to the {@link #stateHandleEvaluator(KeyedStateHandle, + * KeyGroupRange, double)} to init the initial db from the given lists and returns its index. + * + * @param restoreStateHandles The candidate state handles. + * @param targetKeyGroupRange The target key group range. + * @param overlapFractionThreshold configured threshold for overlap. + * @return the index of the best candidate handle in the list or -1 if the list was empty. + * @param <T> the generic parameter type of the state handles. + */ + public static <T extends KeyedStateHandle> int findTheBestStateHandleForInitial( + @Nonnull List<T> restoreStateHandles, @Nonnull KeyGroupRange targetKeyGroupRange, double overlapFractionThreshold) { - T bestStateHandle = null; + if (restoreStateHandles.isEmpty()) { + return -1; + } + + // Shortcut for a common case (scale out) + if (restoreStateHandles.size() == 1) { + return 0; + } + + int currentPos = 0; + int bestHandlePos = 0; Score bestScore = Score.MIN; for (T rawStateHandle : restoreStateHandles) { Score handleScore = stateHandleEvaluator( rawStateHandle, targetKeyGroupRange, overlapFractionThreshold); - if (bestStateHandle == null || handleScore.compareTo(bestScore) > 0) { - bestStateHandle = rawStateHandle; + if (handleScore.compareTo(bestScore) > 0) { + bestHandlePos = currentPos; bestScore = handleScore; } + ++currentPos; + } + return bestHandlePos; + } + + /** Helper class tha defines a key-range in RocksDB as byte arrays for min and max key. */ + private static final class KeyRange { + static final KeyRange EMPTY = KeyRange.of(new byte[0], new byte[0]); + + final byte[] minKey; + final byte[] maxKey; + + private KeyRange(byte[] minKey, byte[] maxKey) { + this.minKey = minKey; + this.maxKey = maxKey; + } + + static KeyRange of(byte[] minKey, byte[] maxKey) { + return new KeyRange(minKey, maxKey); } + } - return bestStateHandle; + /** + * Helper class that represents the result of a range check of the actual keys in a RocksDB + * instance against the proclaimed key-group range of the instance. In short, this checks if the + * instance contains any keys (or tombstones for keys) that don't belong in the instance's + * key-groups range. + */ + private static final class RangeCheckResult { + private final byte[] minKey; + + private final byte[] maxKey; + final boolean leftInRange; + final boolean rightInRange; + + private RangeCheckResult( + boolean leftInRange, boolean rightInRange, byte[] minKey, byte[] maxKey) { + this.leftInRange = leftInRange; + this.rightInRange = rightInRange; + this.minKey = minKey; + this.maxKey = maxKey; + } + + boolean allInRange() { + return leftInRange && rightInRange; + } + + static RangeCheckResult of( + boolean leftInRange, boolean rightInRange, byte[] minKey, byte[] maxKey) { + return new RangeCheckResult(leftInRange, rightInRange, minKey, maxKey); + } } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index c91b63963f6..8e531ecebf8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -80,6 +80,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; @@ -88,8 +89,10 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Spliterator; import java.util.Spliterators; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.RunnableFuture; import java.util.function.Function; import java.util.stream.Collectors; @@ -259,6 +262,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final RocksDbTtlCompactFiltersManager ttlCompactFiltersManager; + @Nullable private final CompletableFuture<Void> asyncCompactAfterRestoreFuture; + public RocksDBKeyedStateBackend( ClassLoader userCodeClassLoader, File instanceBasePath, @@ -284,7 +289,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { PriorityQueueSetFactory priorityQueueFactory, RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, InternalKeyContext<K> keyContext, - @Nonnegative long writeBatchSize) { + @Nonnegative long writeBatchSize, + @Nullable CompletableFuture<Void> asyncCompactFuture) { super( kvStateRegistry, @@ -321,6 +327,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.nativeMetricMonitor = nativeMetricMonitor; this.sharedRocksKeyBuilder = sharedRocksKeyBuilder; this.priorityQueueFactory = priorityQueueFactory; + this.asyncCompactAfterRestoreFuture = asyncCompactFuture; if (priorityQueueFactory instanceof HeapPriorityQueueSetFactory) { this.heapPriorityQueuesManager = new HeapPriorityQueuesManager( @@ -994,4 +1001,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { long getWriteBatchSize() { return writeBatchSize; } + + public Optional<CompletableFuture<Void>> getAsyncCompactAfterRestoreFuture() { + return Optional.ofNullable(asyncCompactAfterRestoreFuture); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 9bfb8238b85..623f9c497ba 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -74,8 +74,10 @@ import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.apache.flink.util.Preconditions.checkArgument; @@ -125,6 +127,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes(); private RocksDB injectedTestDB; // for testing + private boolean incrementalRestoreAsyncCompactAfterRescale = + INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue(); private double overlapFractionThreshold = RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue(); private boolean useIngestDbRestoreMode = USE_INGEST_DB_RESTORE_MODE.defaultValue(); private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for testing @@ -271,6 +275,13 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken return this; } + RocksDBKeyedStateBackendBuilder<K> setIncrementalRestoreAsyncCompactAfterRescale( + boolean incrementalRestoreAsyncCompactAfterRescale) { + this.incrementalRestoreAsyncCompactAfterRescale = + incrementalRestoreAsyncCompactAfterRescale; + return this; + } + RocksDBKeyedStateBackendBuilder<K> setUseIngestDbRestoreMode(boolean useIngestDbRestoreMode) { this.useIngestDbRestoreMode = useIngestDbRestoreMode; return this; @@ -303,6 +314,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken new LinkedHashMap<>(); RocksDB db = null; RocksDBRestoreOperation restoreOperation = null; + CompletableFuture<Void> asyncCompactAfterRestoreFuture = null; RocksDbTtlCompactFiltersManager ttlCompactFiltersManager = new RocksDbTtlCompactFiltersManager(ttlTimeProvider); @@ -341,6 +353,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken db = restoreResult.getDb(); defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle(); nativeMetricMonitor = restoreResult.getNativeMetricMonitor(); + asyncCompactAfterRestoreFuture = + restoreResult.getAsyncCompactAfterRestoreFuture().orElse(null); if (restoreOperation instanceof RocksDBIncrementalRestoreOperation) { backendUID = restoreResult.getBackendUID(); materializedSstFiles = restoreResult.getRestoredSstFiles(); @@ -446,7 +460,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken priorityQueueFactory, ttlCompactFiltersManager, keyContext, - writeBatchSize); + writeBatchSize, + asyncCompactAfterRestoreFuture); } private RocksDBRestoreOperation getRocksDBRestoreOperation( @@ -490,7 +505,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken writeBatchSize, optionsContainer.getWriteBufferManagerCapacity(), overlapFractionThreshold, - useIngestDbRestoreMode); + useIngestDbRestoreMode, + incrementalRestoreAsyncCompactAfterRescale); } else if (priorityQueueConfig.getPriorityQueueStateType() == EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP) { return new RocksDBHeapTimersFullRestoreOperation<>( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java index ccb435dffc7..1f7bcf5ff1a 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java @@ -44,6 +44,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -119,6 +120,9 @@ public class RocksDBOperationUtils { * * <p>Creates the column family for the state. Sets TTL compaction filter if {@code * ttlCompactFiltersManager} is not {@code null}. + * + * @param importFilesMetaData if not empty, we import the files specified in the metadata to the + * column family. */ public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo( RegisteredStateMetaInfoBase metaInfoBase, @@ -126,7 +130,7 @@ public class RocksDBOperationUtils { Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, @Nullable Long writeBufferManagerCapacity, - List<ExportImportFilesMetaData> cfMetaDataList) { + List<ExportImportFilesMetaData> importFilesMetaData) { ColumnFamilyDescriptor columnFamilyDescriptor = createColumnFamilyDescriptor( @@ -135,10 +139,15 @@ public class RocksDBOperationUtils { ttlCompactFiltersManager, writeBufferManagerCapacity); - ColumnFamilyHandle columnFamilyHandle = - cfMetaDataList == null - ? createColumnFamily(columnFamilyDescriptor, db) - : createColumnFamilyWithImport(columnFamilyDescriptor, db, cfMetaDataList); + final ColumnFamilyHandle columnFamilyHandle; + try { + columnFamilyHandle = + createColumnFamily(columnFamilyDescriptor, db, importFilesMetaData); + } catch (Exception ex) { + IOUtils.closeQuietly(columnFamilyDescriptor.getOptions()); + throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", ex); + } + return new RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfoBase); } @@ -154,7 +163,7 @@ public class RocksDBOperationUtils { columnFamilyOptionsFactory, ttlCompactFiltersManager, writeBufferManagerCapacity, - null); + Collections.emptyList()); } /** @@ -168,15 +177,17 @@ public class RocksDBOperationUtils { @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, @Nullable Long writeBufferManagerCapacity) { + byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); + Preconditions.checkState( + !Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes), + "The chosen state name 'default' collides with the name of the default column family!"); + ColumnFamilyOptions options = createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName()); + if (ttlCompactFiltersManager != null) { ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options); } - byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); - Preconditions.checkState( - !Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes), - "The chosen state name 'default' collides with the name of the default column family!"); if (writeBufferManagerCapacity != null) { // It'd be great to perform the check earlier, e.g. when creating write buffer manager. @@ -203,8 +214,7 @@ public class RocksDBOperationUtils { * @return true if sanity check passes, false otherwise */ static boolean sanityCheckArenaBlockSize( - long writeBufferSize, long arenaBlockSizeConfigured, long writeBufferManagerCapacity) - throws IllegalStateException { + long writeBufferSize, long arenaBlockSizeConfigured, long writeBufferManagerCapacity) { long defaultArenaBlockSize = RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize); @@ -243,25 +253,19 @@ public class RocksDBOperationUtils { } private static ColumnFamilyHandle createColumnFamily( - ColumnFamilyDescriptor columnDescriptor, RocksDB db) { - try { - return db.createColumnFamily(columnDescriptor); - } catch (RocksDBException e) { - IOUtils.closeQuietly(columnDescriptor.getOptions()); - throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e); - } - } - - private static ColumnFamilyHandle createColumnFamilyWithImport( ColumnFamilyDescriptor columnDescriptor, RocksDB db, - List<ExportImportFilesMetaData> metaDataList) { - try { - return db.createColumnFamilyWithImport( - columnDescriptor, new ImportColumnFamilyOptions(), metaDataList); - } catch (RocksDBException e) { - IOUtils.closeQuietly(columnDescriptor.getOptions()); - throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e); + List<ExportImportFilesMetaData> importFilesMetaData) + throws RocksDBException { + + if (importFilesMetaData.isEmpty()) { + return db.createColumnFamily(columnDescriptor); + } else { + try (ImportColumnFamilyOptions importColumnFamilyOptions = + new ImportColumnFamilyOptions().setMoveFiles(true)) { + return db.createColumnFamilyWithImport( + columnDescriptor, importColumnFamilyOptions, importFilesMetaData); + } } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java index af1694c66e7..f7ea297d761 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java @@ -29,8 +29,6 @@ import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.ThrowingRunnable; -import org.apache.flink.shaded.guava31.com.google.common.collect.Streams; - import java.io.IOException; import java.io.OutputStream; import java.nio.file.Files; @@ -108,7 +106,7 @@ public class RocksDBStateDownloader extends RocksDBStateDataTransfer { .flatMap( downloadRequest -> // Take all files from shared and private state. - Streams.concat( + Stream.concat( downloadRequest.getStateHandle().getSharedState() .stream(), downloadRequest.getStateHandle().getPrivateState() diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java index 4005adde565..228ba065e11 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java @@ -108,6 +108,7 @@ public class RocksDBFullRestoreOperation<K> implements RocksDBRestoreOperation { this.rocksHandle.getNativeMetricMonitor(), -1, null, + null, null); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java index b5d3c7efc74..a2ed8f1e33f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java @@ -187,18 +187,19 @@ class RocksDBHandle implements AutoCloseable { return registeredStateMetaInfoEntry; } - RocksDbKvStateInfo registerStateColumnFamilyHandleWithImport( + /** + * Registers a new column family and imports data from the given export. + * + * @param stateMetaInfo info about the state to create. + * @param cfMetaDataList the data to import. + */ + void registerStateColumnFamilyHandleWithImport( RegisteredStateMetaInfoBase stateMetaInfo, List<ExportImportFilesMetaData> cfMetaDataList) { - RocksDbKvStateInfo registeredStateMetaInfoEntry = - kvStateInformation.get(stateMetaInfo.getName()); - if (registeredStateMetaInfoEntry != null) { - System.out.println("test"); - } - Preconditions.checkState(registeredStateMetaInfoEntry == null); + Preconditions.checkState(!kvStateInformation.containsKey(stateMetaInfo.getName())); - registeredStateMetaInfoEntry = + RocksDbKvStateInfo stateInfo = RocksDBOperationUtils.createStateInfo( stateMetaInfo, db, @@ -208,12 +209,9 @@ class RocksDBHandle implements AutoCloseable { cfMetaDataList); RocksDBOperationUtils.registerKvStateInformation( - kvStateInformation, - nativeMetricMonitor, - stateMetaInfo.getName(), - registeredStateMetaInfoEntry); + kvStateInformation, nativeMetricMonitor, stateMetaInfo.getName(), stateInfo); - return registeredStateMetaInfoEntry; + columnFamilyHandles.add(stateInfo.columnFamilyHandle); } /** diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java index 0c859e88f6b..649aa572a62 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java @@ -138,6 +138,7 @@ public class RocksDBHeapTimersFullRestoreOperation<K> implements RocksDBRestoreO this.rocksHandle.getNativeMetricMonitor(), -1, null, + null, null); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index 01559ea55d8..0a5e9744c29 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -51,7 +51,8 @@ import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; -import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.RunnableWithException; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; @@ -74,7 +75,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -82,7 +82,11 @@ import java.util.Objects; import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException; @@ -111,7 +115,9 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper private boolean isKeySerializerCompatibilityChecked; - private ThrowingConsumer<Collection<KeyedStateHandle>, Exception> rescalingRestoreOperation; + private final boolean useIngestDbRestoreMode; + + private final boolean asyncCompactAfterRescale; public RocksDBIncrementalRestoreOperation( String operatorIdentifier, @@ -134,7 +140,8 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper @Nonnegative long writeBatchSize, Long writeBufferManagerCapacity, double overlapFractionThreshold, - boolean useIngestDbRestoreMode) { + boolean useIngestDbRestoreMode, + boolean asyncCompactAfterRescale) { this.rocksHandle = new RocksDBHandle( kvStateInformation, @@ -160,8 +167,8 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.keySerializerProvider = keySerializerProvider; this.userCodeClassLoader = userCodeClassLoader; - this.rescalingRestoreOperation = - useIngestDbRestoreMode ? this::restoreWithIngestDbMode : this::restoreWithRescaling; + this.useIngestDbRestoreMode = useIngestDbRestoreMode; + this.asyncCompactAfterRescale = asyncCompactAfterRescale; } /** Root method that branches for different implementations of {@link KeyedStateHandle}. */ @@ -172,53 +179,457 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper return null; } - final KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next(); + final List<StateHandleDownloadSpec> allDownloadSpecs = + new ArrayList<>(restoreStateHandles.size()); + + final List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles = + new ArrayList<>(restoreStateHandles.size()); - boolean isRescaling = - (restoreStateHandles.size() > 1 - || !Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange)); + final Path absolutInstanceBasePath = instanceBasePath.getAbsoluteFile().toPath(); - if (isRescaling) { - rescalingRestoreOperation.accept(restoreStateHandles); - } else { - restoreWithoutRescaling(theFirstStateHandle); + try { + prepareStateHandleDownloadsToLocal( + absolutInstanceBasePath, localKeyedStateHandles, allDownloadSpecs); + + if (localKeyedStateHandles.size() == 1) { + // This happens if we don't rescale and for some scale out scenarios. + initBaseDBFromSingleStateHandle(localKeyedStateHandles.get(0)); + } else { + // This happens for all scale ins and some scale outs. + restoreFromMultipleStateHandles(localKeyedStateHandles); + } + + CompletableFuture<Void> asyncCompactFuture = null; + if (asyncCompactAfterRescale) { + asyncCompactFuture = + RocksDBIncrementalCheckpointUtils.createRangeCompactionTaskIfNeeded( + rocksHandle.getDb(), + rocksHandle.getColumnFamilyHandles(), + keyGroupPrefixBytes, + keyGroupRange) + .map( + (run) -> { + RunnableWithException runWithLogging = + () -> { + long t = System.currentTimeMillis(); + logger.info( + "Starting async compaction after restore for backend {} in operator {}", + backendUID, + operatorIdentifier); + try { + run.run(); + logger.info( + "Completed async compaction after restore for backend {} in operator {} after {} ms.", + backendUID, + operatorIdentifier, + System.currentTimeMillis() - t); + } catch (Exception ex) { + logger.info( + "Failed async compaction after restore for backend {} in operator {} after {} ms.", + backendUID, + operatorIdentifier, + System.currentTimeMillis() - t, + ex); + throw ex; + } + }; + ExecutorService executorService = + Executors.newSingleThreadExecutor(); + CompletableFuture<Void> resultFuture = + FutureUtils.runAsync( + runWithLogging, executorService); + executorService.shutdown(); + return resultFuture; + }) + .orElse(null); + } + + return new RocksDBRestoreResult( + this.rocksHandle.getDb(), + this.rocksHandle.getDefaultColumnFamilyHandle(), + this.rocksHandle.getNativeMetricMonitor(), + lastCompletedCheckpointId, + backendUID, + restoredSstFiles, + asyncCompactFuture); + } finally { + // Cleanup all download directories + allDownloadSpecs.stream() + .map(StateHandleDownloadSpec::getDownloadDestination) + .forEach(this::cleanUpPathQuietly); } - return new RocksDBRestoreResult( - this.rocksHandle.getDb(), - this.rocksHandle.getDefaultColumnFamilyHandle(), - this.rocksHandle.getNativeMetricMonitor(), - lastCompletedCheckpointId, - backendUID, - restoredSstFiles); } - /** Recovery from a single remote incremental state without rescaling. */ + /** + * Prepares the download of all {@link IncrementalRemoteKeyedStateHandle}s to {@link + * IncrementalLocalKeyedStateHandle}s by creating the download specs and already converting the + * handle type. + * + * @param absolutInstanceBasePath the base path of the restoring DB instance as absolute path. + * @param localKeyedStateHandlesOut the output parameter for the created {@link + * IncrementalLocalKeyedStateHandle}s. + * @param allDownloadSpecsOut output parameter for the created download specs. + * @throws Exception if an unexpected state handle type is passed as argument. + */ @SuppressWarnings("unchecked") - private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception { + private void prepareStateHandleDownloadsToLocal( + Path absolutInstanceBasePath, + List<IncrementalLocalKeyedStateHandle> localKeyedStateHandlesOut, + List<StateHandleDownloadSpec> allDownloadSpecsOut) + throws Exception { + // Prepare and collect all the download request to pull remote state to a local directory + for (KeyedStateHandle stateHandle : restoreStateHandles) { + if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) { + StateHandleDownloadSpec downloadRequest = + new StateHandleDownloadSpec( + (IncrementalRemoteKeyedStateHandle) stateHandle, + absolutInstanceBasePath.resolve(UUID.randomUUID().toString())); + allDownloadSpecsOut.add(downloadRequest); + } else if (stateHandle instanceof IncrementalLocalKeyedStateHandle) { + localKeyedStateHandlesOut.add((IncrementalLocalKeyedStateHandle) stateHandle); + } else { + throw unexpectedStateHandleException( + new Class[] { + IncrementalRemoteKeyedStateHandle.class, + IncrementalLocalKeyedStateHandle.class + }, + stateHandle.getClass()); + } + } + + allDownloadSpecsOut.stream() + .map(StateHandleDownloadSpec::createLocalStateHandleForDownloadedState) + .forEach(localKeyedStateHandlesOut::add); + + transferRemoteStateToLocalDirectory(allDownloadSpecsOut); + } + + /** + * Initializes the base DB that we restore from a single local state handle. + * + * @param stateHandle the state handle to restore the base DB from. + * @throws Exception on any error during restore. + */ + private void initBaseDBFromSingleStateHandle(IncrementalLocalKeyedStateHandle stateHandle) + throws Exception { + logger.info( - "Starting to restore from state handle: {} without rescaling.", keyedStateHandle); - if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) { - IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle = - (IncrementalRemoteKeyedStateHandle) keyedStateHandle; - restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle); - restoreBaseDBFromRemoteState(incrementalRemoteKeyedStateHandle); - } else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) { - IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle = - (IncrementalLocalKeyedStateHandle) keyedStateHandle; - restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle); - restoreBaseDBFromLocalState(incrementalLocalKeyedStateHandle); + "Starting to restore Base DB in backend with range {} in operator {} from selected state handle {}.", + keyGroupRange, + operatorIdentifier, + stateHandle); + + // Restore base DB from selected initial handle + restoreBaseDBFromLocalState(stateHandle); + + KeyGroupRange stateHandleKeyGroupRange = stateHandle.getKeyGroupRange(); + + // Check if the key-groups range has changed. + if (Objects.equals(stateHandleKeyGroupRange, keyGroupRange)) { + // This is the case if we didn't rescale, so we can restore all the info from the + // previous backend instance (backend id and incremental checkpoint history). + restorePreviousIncrementalFilesStatus(stateHandle); + } else { + // If the key-groups don't match, this was a scale out, and we need to clip the + // key-groups range of the db to the target range for this backend. + try { + RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange( + this.rocksHandle.getDb(), + this.rocksHandle.getColumnFamilyHandles(), + keyGroupRange, + stateHandleKeyGroupRange, + keyGroupPrefixBytes); + } catch (RocksDBException e) { + String errMsg = "Failed to clip DB after initialization."; + logger.error(errMsg, e); + throw new BackendBuildingException(errMsg, e); + } + } + logger.info( + "Completed restoring backend with range {} in operator {} from selected state handle.", + keyGroupRange, + operatorIdentifier); + } + + /** + * Initializes the base DB that we restore from a list of multiple local state handles. + * + * @param localKeyedStateHandles the list of state handles to restore the base DB from. + * @throws Exception on any error during restore. + */ + private void restoreFromMultipleStateHandles( + List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles) throws Exception { + + logger.info( + "Starting to restore backend with range {} in operator {} from multiple state handles {} with useIngestDbRestoreMode = {}.", + keyGroupRange, + operatorIdentifier, + localKeyedStateHandles, + useIngestDbRestoreMode); + + byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; + CompositeKeySerializationUtils.serializeKeyGroup( + keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes); + + byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; + CompositeKeySerializationUtils.serializeKeyGroup( + keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); + + if (useIngestDbRestoreMode) { + // Optimized path for merging multiple handles with Ingest/Clip + mergeStateHandlesWithClipAndIngest( + localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); } else { - throw unexpectedStateHandleException( - new Class[] { - IncrementalRemoteKeyedStateHandle.class, - IncrementalLocalKeyedStateHandle.class - }, - keyedStateHandle.getClass()); + // Optimized path for single handle and legacy path for merging multiple handles. + mergeStateHandlesWithCopyFromTemporaryInstance( + localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); + } + + logger.info( + "Completed restoring backend with range {} in operator {} from multiple state handles with useIngestDbRestoreMode = {}.", + keyGroupRange, + operatorIdentifier, + useIngestDbRestoreMode); + } + + /** + * Restores the base DB by merging multiple state handles into one. This method first checks if + * all data to import is in the expected key-groups range and then uses import/export. + * Otherwise, this method falls back to copying the data using a temporary DB. + * + * @param localKeyedStateHandles the list of state handles to restore the base DB from. + * @param startKeyGroupPrefixBytes the min/start key of the key groups range as bytes. + * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups range as bytes. + * @throws Exception on any restore error. + */ + private void mergeStateHandlesWithClipAndIngest( + List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles, + byte[] startKeyGroupPrefixBytes, + byte[] stopKeyGroupPrefixBytes) + throws Exception { + + final Path absolutInstanceBasePath = instanceBasePath.getAbsoluteFile().toPath(); + final Path exportCfBasePath = absolutInstanceBasePath.resolve("export-cfs"); + Files.createDirectories(exportCfBasePath); + + final Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>> + exportedColumnFamilyMetaData = new HashMap<>(localKeyedStateHandles.size()); + + final List<IncrementalLocalKeyedStateHandle> notImportableHandles = + new ArrayList<>(localKeyedStateHandles.size()); + + try { + + KeyGroupRange exportedSstKeyGroupsRange = + exportColumnFamiliesWithSstDataInKeyGroupsRange( + exportCfBasePath, + localKeyedStateHandles, + exportedColumnFamilyMetaData, + notImportableHandles); + + if (exportedColumnFamilyMetaData.isEmpty()) { + // Nothing coule be exported, so we fall back to + // #mergeStateHandlesWithCopyFromTemporaryInstance + mergeStateHandlesWithCopyFromTemporaryInstance( + notImportableHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); + } else { + // We initialize the base DB by importing all the exported data. + initBaseDBFromColumnFamilyImports( + exportedColumnFamilyMetaData, exportedSstKeyGroupsRange); + // Copy data from handles that we couldn't directly import using temporary + // instances. + copyToBaseDBUsingTempDBs( + notImportableHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); + } + } finally { + // Close native RocksDB objects + exportedColumnFamilyMetaData.values().forEach(IOUtils::closeAllQuietly); + // Cleanup export base directory + cleanUpPathQuietly(exportCfBasePath); + } + } + + /** + * Prepares the data for importing by exporting from temporary RocksDB instances. We can only + * import data that does not exceed the target key-groups range and skip state handles that + * exceed their range. + * + * @param exportCfBasePath the base path for the export files. + * @param localKeyedStateHandles the state handles to prepare for import. + * @param exportedColumnFamiliesOut output parameter for the metadata of completed exports. + * @param skipped output parameter for state handles that could not be exported because the data + * exceeded the proclaimed range. + * @return the total key-groups range of the exported data. + * @throws Exception on any export error. + */ + private KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange( + Path exportCfBasePath, + List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles, + Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>> + exportedColumnFamiliesOut, + List<IncrementalLocalKeyedStateHandle> skipped) + throws Exception { + + logger.info( + "Starting restore export for backend with range {} in operator {}.", + keyGroupRange, + operatorIdentifier); + + int minExportKeyGroup = Integer.MAX_VALUE; + int maxExportKeyGroup = Integer.MIN_VALUE; + for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) { + try (RestoredDBInstance tmpRestoreDBInfo = + restoreTempDBInstanceFromLocalState(stateHandle)) { + + List<ColumnFamilyHandle> tmpColumnFamilyHandles = + tmpRestoreDBInfo.columnFamilyHandles; + + // Check if the data in all SST files referenced in the handle is within the + // proclaimed key-groups range of the handle. + if (RocksDBIncrementalCheckpointUtils.isSstDataInKeyGroupRange( + tmpRestoreDBInfo.db, keyGroupPrefixBytes, stateHandle.getKeyGroupRange())) { + + logger.debug( + "Exporting state handle {} for backend with range {} in operator {}.", + stateHandle, + keyGroupRange, + operatorIdentifier); + + List<RegisteredStateMetaInfoBase> registeredStateMetaInfoBases = + tmpRestoreDBInfo.stateMetaInfoSnapshots.stream() + .map(RegisteredStateMetaInfoBase::fromMetaInfoSnapshot) + .collect(Collectors.toList()); + + // Export all the Column Families and store the result in + // exportedColumnFamiliesOut + RocksDBIncrementalCheckpointUtils.exportColumnFamilies( + tmpRestoreDBInfo.db, + tmpColumnFamilyHandles, + registeredStateMetaInfoBases, + exportCfBasePath, + exportedColumnFamiliesOut); + + minExportKeyGroup = + Math.min( + minExportKeyGroup, + stateHandle.getKeyGroupRange().getStartKeyGroup()); + maxExportKeyGroup = + Math.max( + maxExportKeyGroup, + stateHandle.getKeyGroupRange().getEndKeyGroup()); + + logger.debug( + "Done exporting state handle {} for backend with range {} in operator {}.", + stateHandle, + keyGroupRange, + operatorIdentifier); + } else { + // Actual key range in files exceeds proclaimed range, cannot import. We + // will copy this handle using a temporary DB later. + skipped.add(stateHandle); + } + } } + + logger.info( + "Completed restore export for backend with range {} in operator {}. Number of Exported handles: {}. Skipped handles: {}.", + keyGroupRange, + operatorIdentifier, + localKeyedStateHandles.size() - skipped.size(), + skipped); + + return minExportKeyGroup <= maxExportKeyGroup + ? new KeyGroupRange(minExportKeyGroup, maxExportKeyGroup) + : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; + } + + /** + * Helper method that merges the data from multiple state handles into the restoring base DB by + * the help of copying through temporary RocksDB instances. + * + * @param localKeyedStateHandles the state handles to merge into the base DB. + * @param startKeyGroupPrefixBytes the min/start key of the key groups range as bytes. + * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups range as bytes. + * @throws Exception on any merge error. + */ + private void mergeStateHandlesWithCopyFromTemporaryInstance( + List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles, + byte[] startKeyGroupPrefixBytes, + byte[] stopKeyGroupPrefixBytes) + throws Exception { + + logger.info( + "Starting to merge state for backend with range {} in operator {} from multiple state handles using temporary instances.", + keyGroupRange, + operatorIdentifier); + + // Choose the best state handle for the initial DB + final IncrementalLocalKeyedStateHandle selectedInitialHandle = + localKeyedStateHandles.remove( + RocksDBIncrementalCheckpointUtils.findTheBestStateHandleForInitial( + localKeyedStateHandles, keyGroupRange, overlapFractionThreshold)); + + Preconditions.checkNotNull(selectedInitialHandle); + + // Remove the selected handle from the list so that we don't restore it twice. + localKeyedStateHandles.remove(selectedInitialHandle); + + // Init the base DB instance with the initial state + initBaseDBFromSingleStateHandle(selectedInitialHandle); + + // Copy remaining handles using temporary RocksDB instances + copyToBaseDBUsingTempDBs( + localKeyedStateHandles, startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes); + + logger.info( + "Completed merging state for backend with range {} in operator {} from multiple state handles using temporary instances.", + keyGroupRange, + operatorIdentifier); + } + + /** + * Initializes the base DB by importing from previously exported data. + * + * @param exportedColumnFamilyMetaData the export (meta) data. + * @param exportKeyGroupRange the total key-groups range of the exported data. + * @throws Exception on import error. + */ + private void initBaseDBFromColumnFamilyImports( + Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>> + exportedColumnFamilyMetaData, + KeyGroupRange exportKeyGroupRange) + throws Exception { + + // We initialize the base DB by importing all the exported data. + logger.info( + "Starting to import exported state handles for backend with range {} in operator {} using Clip/Ingest DB.", + keyGroupRange, + operatorIdentifier); + rocksHandle.openDB(); + exportedColumnFamilyMetaData.forEach( + rocksHandle::registerStateColumnFamilyHandleWithImport); + + // Use Range delete to clip the temp db to the target range of the backend + RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange( + rocksHandle.getDb(), + rocksHandle.getColumnFamilyHandles(), + keyGroupRange, + exportKeyGroupRange, + keyGroupPrefixBytes); + logger.info( - "Finished restoring from state handle: {} without rescaling.", keyedStateHandle); + "Completed importing exported state handles for backend with range {} in operator {} using Clip/Ingest DB.", + keyGroupRange, + operatorIdentifier); } + /** + * Restores the checkpointing status and state for this backend. This can only be done if the + * backend was not rescaled and is therefore identical to the source backend in the previous + * run. + * + * @param localKeyedStateHandle the single state handle from which the backend is restored. + */ private void restorePreviousIncrementalFilesStatus( IncrementalKeyedStateHandle localKeyedStateHandle) { backendUID = localKeyedStateHandle.getBackendIdentifier(); @@ -226,24 +637,20 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper localKeyedStateHandle.getCheckpointId(), localKeyedStateHandle.getSharedStateHandles()); lastCompletedCheckpointId = localKeyedStateHandle.getCheckpointId(); + logger.info( + "Restored previous incremental files status in backend with range {} in operator {}: backend uuid {}, last checkpoint id {}.", + keyGroupRange, + operatorIdentifier, + backendUID, + lastCompletedCheckpointId); } - private void restoreBaseDBFromRemoteState(IncrementalRemoteKeyedStateHandle stateHandle) - throws Exception { - // used as restore source for IncrementalRemoteKeyedStateHandle - final Path tmpRestoreInstancePath = - instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString()); - final StateHandleDownloadSpec downloadRequest = - new StateHandleDownloadSpec(stateHandle, tmpRestoreInstancePath); - try { - transferRemoteStateToLocalDirectory(Collections.singletonList(downloadRequest)); - restoreBaseDBFromLocalState(downloadRequest.createLocalStateHandleForDownloadedState()); - } finally { - cleanUpPathQuietly(downloadRequest.getDownloadDestination()); - } - } - - /** Restores RocksDB instance from local state. */ + /** + * Restores the base DB from local state of a single state handle. + * + * @param localKeyedStateHandle the state handle tor estore from. + * @throws Exception on any restore error. + */ private void restoreBaseDBFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception { KeyedBackendSerializationProxy<K> serializationProxy = @@ -264,268 +671,142 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper restoreSourcePath); } + /** + * Helper method to download files, as specified in the given download specs, to the local + * directory. + * + * @param downloadSpecs specifications of files to download. + * @throws Exception On any download error. + */ private void transferRemoteStateToLocalDirectory( - Collection<StateHandleDownloadSpec> downloadRequests) throws Exception { + Collection<StateHandleDownloadSpec> downloadSpecs) throws Exception { try (RocksDBStateDownloader rocksDBStateDownloader = new RocksDBStateDownloader( numberOfTransferringThreads, customInitializationMetrics)) { rocksDBStateDownloader.transferAllStateDataToDirectory( - downloadRequests, cancelStreamRegistry); - } - } - - private void cleanUpPathQuietly(@Nonnull Path path) { - try { - FileUtils.deleteDirectory(path.toFile()); - } catch (IOException ex) { - logger.warn("Failed to clean up path " + path, ex); + downloadSpecs, cancelStreamRegistry); } } /** - * Recovery from multi incremental states with rescaling. For rescaling, this method creates a - * temporary RocksDB instance for a key-groups shard. All contents from the temporary instance - * are copied into the real restore instance and then the temporary instance is discarded. + * Helper method to copy all data from the given local state handles to the base DB by using + * temporary DB instances. + * + * @param toImport the state handles to import. + * @param startKeyGroupPrefixBytes the min/start key of the key groups range as bytes. + * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups range as bytes. + * @throws Exception on any copy error. */ - private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) + private void copyToBaseDBUsingTempDBs( + List<IncrementalLocalKeyedStateHandle> toImport, + byte[] startKeyGroupPrefixBytes, + byte[] stopKeyGroupPrefixBytes) throws Exception { - Preconditions.checkArgument(restoreStateHandles != null && !restoreStateHandles.isEmpty()); - - final List<StateHandleDownloadSpec> allDownloadSpecs = new ArrayList<>(); - - final List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles = - new ArrayList<>(restoreStateHandles.size()); - - final Path absolutInstanceBasePath = instanceBasePath.getAbsoluteFile().toPath(); - - // Prepare and collect all the download request to pull remote state to a local directory - for (KeyedStateHandle stateHandle : restoreStateHandles) { - if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) { - StateHandleDownloadSpec downloadRequest = - new StateHandleDownloadSpec( - (IncrementalRemoteKeyedStateHandle) stateHandle, - absolutInstanceBasePath.resolve(UUID.randomUUID().toString())); - allDownloadSpecs.add(downloadRequest); - } else if (stateHandle instanceof IncrementalLocalKeyedStateHandle) { - localKeyedStateHandles.add((IncrementalLocalKeyedStateHandle) stateHandle); - } else { - throw unexpectedStateHandleException( - IncrementalRemoteKeyedStateHandle.class, stateHandle.getClass()); - } + if (toImport.isEmpty()) { + return; } - allDownloadSpecs.stream() - .map(StateHandleDownloadSpec::createLocalStateHandleForDownloadedState) - .forEach(localKeyedStateHandles::add); - - // Choose the best state handle for the initial DB - final IncrementalLocalKeyedStateHandle selectedInitialHandle = - RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial( - localKeyedStateHandles, keyGroupRange, overlapFractionThreshold); - Preconditions.checkNotNull(selectedInitialHandle); - // Remove the selected handle from the list so that we don't restore it twice. - localKeyedStateHandles.remove(selectedInitialHandle); - - try { - // Process all state downloads - transferRemoteStateToLocalDirectory(allDownloadSpecs); - - // Init the base DB instance with the initial state - initBaseDBForRescaling(selectedInitialHandle); - - // Transfer remaining key-groups from temporary instance into base DB - byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; - CompositeKeySerializationUtils.serializeKeyGroup( - keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes); - - byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; - CompositeKeySerializationUtils.serializeKeyGroup( - keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); - - // Insert all remaining state through creating temporary RocksDB instances - for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) { - logger.info( - "Starting to restore from state handle: {} with rescaling.", stateHandle); - - try (RestoredDBInstance tmpRestoreDBInfo = - restoreTempDBInstanceFromLocalState(stateHandle); - RocksDBWriteBatchWrapper writeBatchWrapper = - new RocksDBWriteBatchWrapper( - this.rocksHandle.getDb(), writeBatchSize)) { - - List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = - tmpRestoreDBInfo.columnFamilyDescriptors; - List<ColumnFamilyHandle> tmpColumnFamilyHandles = - tmpRestoreDBInfo.columnFamilyHandles; - - // iterating only the requested descriptors automatically skips the default - // column - // family handle - for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); ++descIdx) { - ColumnFamilyHandle tmpColumnFamilyHandle = - tmpColumnFamilyHandles.get(descIdx); - - ColumnFamilyHandle targetColumnFamilyHandle = - this.rocksHandle.getOrRegisterStateColumnFamilyHandle( - null, - tmpRestoreDBInfo.stateMetaInfoSnapshots.get( - descIdx)) - .columnFamilyHandle; - - try (RocksIteratorWrapper iterator = - RocksDBOperationUtils.getRocksIterator( - tmpRestoreDBInfo.db, - tmpColumnFamilyHandle, - tmpRestoreDBInfo.readOptions)) { - - iterator.seek(startKeyGroupPrefixBytes); - - while (iterator.isValid()) { - - if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes( - iterator.key(), stopKeyGroupPrefixBytes)) { - writeBatchWrapper.put( - targetColumnFamilyHandle, - iterator.key(), - iterator.value()); - } else { - // Since the iterator will visit the record according to the - // sorted - // order, - // we can just break here. - break; - } - - iterator.next(); - } - } // releases native iterator resources - } - logger.info( - "Finished restoring from state handle: {} with rescaling.", - stateHandle); + logger.info( + "Starting to copy state handles for backend with range {} in operator {} using temporary instances.", + keyGroupRange, + operatorIdentifier); + + try (RocksDBWriteBatchWrapper writeBatchWrapper = + new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), writeBatchSize)) { + for (IncrementalLocalKeyedStateHandle handleToCopy : toImport) { + try (RestoredDBInstance restoredDBInstance = + restoreTempDBInstanceFromLocalState(handleToCopy)) { + copyTempDbIntoBaseDb( + restoredDBInstance, + writeBatchWrapper, + startKeyGroupPrefixBytes, + stopKeyGroupPrefixBytes); } } - } finally { - // Cleanup all download directories - allDownloadSpecs.stream() - .map(StateHandleDownloadSpec::getDownloadDestination) - .forEach(this::cleanUpPathQuietly); } + + logger.info( + "Competed copying state handles for backend with range {} in operator {} using temporary instances.", + keyGroupRange, + operatorIdentifier); } /** - * Recovery from multi incremental states with rescaling. For rescaling, this method creates a - * temporary RocksDB instance for a key-groups shard. All contents from the temporary instance - * are copied into the real restore instance and then the temporary instance is discarded. + * Helper method tp copy all data from an open temporary DB to the base DB. + * + * @param tmpRestoreDBInfo the temporary instance. + * @param writeBatchWrapper write batch wrapper for writes against the base DB. + * @param startKeyGroupPrefixBytes the min/start key of the key groups range as bytes. + * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups range as bytes. + * @throws Exception on any copy error. */ - private void restoreWithIngestDbMode(Collection<KeyedStateHandle> restoreStateHandles) + private void copyTempDbIntoBaseDb( + RestoredDBInstance tmpRestoreDBInfo, + RocksDBWriteBatchWrapper writeBatchWrapper, + byte[] startKeyGroupPrefixBytes, + byte[] stopKeyGroupPrefixBytes) throws Exception { - Preconditions.checkArgument(restoreStateHandles != null && !restoreStateHandles.isEmpty()); - - Map<StateHandleID, StateHandleDownloadSpec> allDownloadSpecs = - CollectionUtil.newHashMapWithExpectedSize(restoreStateHandles.size()); - - final Path absolutInstanceBasePath = instanceBasePath.getAbsoluteFile().toPath(); - final Path exportCfBasePath = absolutInstanceBasePath.resolve("export-cfs"); - Files.createDirectories(exportCfBasePath); - - // Open base db as Empty DB - this.rocksHandle.openDB(); - - // Prepare and collect all the download request to pull remote state to a local directory - for (KeyedStateHandle stateHandle : restoreStateHandles) { - if (!(stateHandle instanceof IncrementalRemoteKeyedStateHandle)) { - throw unexpectedStateHandleException( - IncrementalRemoteKeyedStateHandle.class, stateHandle.getClass()); - } - StateHandleDownloadSpec downloadRequest = - new StateHandleDownloadSpec( - (IncrementalRemoteKeyedStateHandle) stateHandle, - absolutInstanceBasePath.resolve(UUID.randomUUID().toString())); - allDownloadSpecs.put(stateHandle.getStateHandleId(), downloadRequest); - } - - // Process all state downloads - transferRemoteStateToLocalDirectory(allDownloadSpecs.values()); - - // Transfer remaining key-groups from temporary instance into base DB - byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; - CompositeKeySerializationUtils.serializeKeyGroup( - keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes); - - byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes]; - CompositeKeySerializationUtils.serializeKeyGroup( - keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes); - - HashMap<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>> cfMetaDataToImport = - new HashMap(); - - // Insert all remaining state through creating temporary RocksDB instances - for (StateHandleDownloadSpec downloadRequest : allDownloadSpecs.values()) { - logger.info( - "Starting to restore from state handle: {} with rescaling.", - downloadRequest.getStateHandle()); - - try (RestoredDBInstance tmpRestoreDBInfo = - restoreTempDBInstanceFromDownloadedState(downloadRequest)) { - - List<ColumnFamilyHandle> tmpColumnFamilyHandles = - tmpRestoreDBInfo.columnFamilyHandles; - - // Clip all tmp db to Range [startKeyGroupPrefixBytes, stopKeyGroupPrefixBytes) - RocksDBIncrementalCheckpointUtils.clipColumnFamilies( - tmpRestoreDBInfo.db, - tmpColumnFamilyHandles, - startKeyGroupPrefixBytes, - stopKeyGroupPrefixBytes); - - // Export all the Column Families - Map<RegisteredStateMetaInfoBase, ExportImportFilesMetaData> exportedCFAndMetaData = - RocksDBIncrementalCheckpointUtils.exportColumnFamilies( - tmpRestoreDBInfo.db, - tmpColumnFamilyHandles, - tmpRestoreDBInfo.stateMetaInfoSnapshots, - exportCfBasePath); - - exportedCFAndMetaData.forEach( - (stateMeta, cfMetaData) -> { - if (!cfMetaData.files().isEmpty()) { - cfMetaDataToImport.putIfAbsent(stateMeta, new ArrayList<>()); - cfMetaDataToImport.get(stateMeta).add(cfMetaData); - } - }); - } finally { - cleanUpPathQuietly(downloadRequest.getDownloadDestination()); - } - } + logger.debug( + "Starting copy of state handle {} for backend with range {} in operator {} to base DB using temporary instance.", + tmpRestoreDBInfo.srcStateHandle, + keyGroupRange, + operatorIdentifier); + + List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = + tmpRestoreDBInfo.columnFamilyDescriptors; + List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles; + + // iterating only the requested descriptors automatically skips the default + // column + // family handle + for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); ++descIdx) { + ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(descIdx); + + ColumnFamilyHandle targetColumnFamilyHandle = + this.rocksHandle.getOrRegisterStateColumnFamilyHandle( + null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx)) + .columnFamilyHandle; + + try (RocksIteratorWrapper iterator = + RocksDBOperationUtils.getRocksIterator( + tmpRestoreDBInfo.db, + tmpColumnFamilyHandle, + tmpRestoreDBInfo.readOptions)) { + + iterator.seek(startKeyGroupPrefixBytes); + + while (iterator.isValid()) { + + if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes( + iterator.key(), stopKeyGroupPrefixBytes)) { + writeBatchWrapper.put( + targetColumnFamilyHandle, iterator.key(), iterator.value()); + } else { + // Since the iterator will visit the record according to the + // sorted + // order, + // we can just break here. + break; + } - try { - cfMetaDataToImport.forEach(this.rocksHandle::registerStateColumnFamilyHandleWithImport); - } finally { - cleanUpPathQuietly(exportCfBasePath); + iterator.next(); + } + } // releases native iterator resources } + logger.debug( + "Finished copy of state handle {} for backend with range {} in operator {} using temporary instance.", + tmpRestoreDBInfo.srcStateHandle, + keyGroupRange, + operatorIdentifier); } - private void initBaseDBForRescaling(IncrementalLocalKeyedStateHandle stateHandle) - throws Exception { - - // 1. Restore base DB from selected initial handle - restoreBaseDBFromLocalState(stateHandle); - - // 2. Clip the base DB instance + private void cleanUpPathQuietly(@Nonnull Path path) { try { - RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange( - this.rocksHandle.getDb(), - this.rocksHandle.getColumnFamilyHandles(), - keyGroupRange, - stateHandle.getKeyGroupRange(), - keyGroupPrefixBytes); - } catch (RocksDBException e) { - String errMsg = "Failed to clip DB after initialization."; - logger.error(errMsg, e); - throw new BackendBuildingException(errMsg, e); + FileUtils.deleteDirectory(path.toFile()); + } catch (IOException ex) { + logger.warn("Failed to clean up path " + path, ex); } } @@ -544,17 +825,21 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper private final ReadOptions readOptions; + private final IncrementalLocalKeyedStateHandle srcStateHandle; + private RestoredDBInstance( @Nonnull RocksDB db, @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors, - @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) { + @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, + @Nonnull IncrementalLocalKeyedStateHandle srcStateHandle) { this.db = db; this.defaultColumnFamilyHandle = columnFamilyHandles.remove(0); this.columnFamilyHandles = columnFamilyHandles; this.columnFamilyDescriptors = columnFamilyDescriptors; this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; this.readOptions = new ReadOptions(); + this.srcStateHandle = srcStateHandle; } @Override @@ -596,12 +881,16 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper this.rocksHandle.getDbOptions()); return new RestoredDBInstance( - restoreDb, columnFamilyHandles, columnFamilyDescriptors, stateMetaInfoSnapshots); + restoreDb, + columnFamilyHandles, + columnFamilyDescriptors, + stateMetaInfoSnapshots, + stateHandle); } /** * This method recreates and registers all {@link ColumnFamilyDescriptor} from Flink's state - * meta data snapshot. + * metadata snapshot. */ private List<ColumnFamilyDescriptor> createColumnFamilyDescriptors( List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, boolean registerTtlCompactFilter) { @@ -612,6 +901,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) { RegisteredStateMetaInfoBase metaInfoBase = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot); + ColumnFamilyDescriptor columnFamilyDescriptor = RocksDBOperationUtils.createColumnFamilyDescriptor( metaInfoBase, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java index 4202c899c59..b12d6ff1d15 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java @@ -66,6 +66,7 @@ public class RocksDBNoneRestoreOperation<K> implements RocksDBRestoreOperation { this.rocksHandle.getNativeMetricMonitor(), -1, null, + null, null); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java index ad17b6f2769..1e458eb45f4 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java @@ -24,9 +24,13 @@ import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocal import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDB; +import javax.annotation.Nullable; + import java.util.Collection; +import java.util.Optional; import java.util.SortedMap; import java.util.UUID; +import java.util.concurrent.CompletableFuture; /** Entity holding result of RocksDB instance restore. */ public class RocksDBRestoreResult { @@ -39,19 +43,23 @@ public class RocksDBRestoreResult { private final UUID backendUID; private final SortedMap<Long, Collection<HandleAndLocalPath>> restoredSstFiles; + private final CompletableFuture<Void> asyncCompactAfterRestoreFuture; + public RocksDBRestoreResult( RocksDB db, ColumnFamilyHandle defaultColumnFamilyHandle, RocksDBNativeMetricMonitor nativeMetricMonitor, long lastCompletedCheckpointId, UUID backendUID, - SortedMap<Long, Collection<HandleAndLocalPath>> restoredSstFiles) { + SortedMap<Long, Collection<HandleAndLocalPath>> restoredSstFiles, + @Nullable CompletableFuture<Void> asyncCompactAfterRestoreFuture) { this.db = db; this.defaultColumnFamilyHandle = defaultColumnFamilyHandle; this.nativeMetricMonitor = nativeMetricMonitor; this.lastCompletedCheckpointId = lastCompletedCheckpointId; this.backendUID = backendUID; this.restoredSstFiles = restoredSstFiles; + this.asyncCompactAfterRestoreFuture = asyncCompactAfterRestoreFuture; } public RocksDB getDb() { @@ -77,4 +85,8 @@ public class RocksDBRestoreResult { public RocksDBNativeMetricMonitor getNativeMetricMonitor() { return nativeMetricMonitor; } + + public Optional<CompletableFuture<Void>> getAsyncCompactAfterRestoreFuture() { + return Optional.ofNullable(asyncCompactAfterRestoreFuture); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java index 03b1a087e28..d41b166e3c9 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java @@ -86,6 +86,7 @@ import java.util.Queue; import java.util.concurrent.RunnableFuture; import java.util.stream.Collectors; +import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Matchers.any; @@ -116,7 +117,14 @@ public class EmbeddedRocksDBStateBackendTest { true, (SupplierWithException<CheckpointStorage, IOException>) - JobManagerCheckpointStorage::new + JobManagerCheckpointStorage::new, + false + }, + { + true, + (SupplierWithException<CheckpointStorage, IOException>) + JobManagerCheckpointStorage::new, + true }, { false, @@ -126,7 +134,8 @@ public class EmbeddedRocksDBStateBackendTest TempDirUtils.newFolder(tempFolder).toURI().toString(); return new FileSystemCheckpointStorage( new Path(checkpointPath), 0, -1); - } + }, + false } }); } @@ -137,6 +146,9 @@ public class EmbeddedRocksDBStateBackendTest @Parameter(value = 1) public SupplierWithException<CheckpointStorage, IOException> storageSupplier; + @Parameter(value = 2) + public boolean useIngestDB; + // Store it because we need it for the cleanup test. private String dbPath; private RocksDB db = null; @@ -168,6 +180,7 @@ public class EmbeddedRocksDBStateBackendTest EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(enableIncrementalCheckpointing); Configuration configuration = new Configuration(); + configuration.setBoolean(USE_INGEST_DB_RESTORE_MODE, useIngestDB); configuration.set( RocksDBOptions.TIMER_SERVICE_FACTORY, EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB); @@ -234,7 +247,8 @@ public class EmbeddedRocksDBStateBackendTest spy(db), defaultCFHandle, optionsContainer.getColumnOptions()) - .setEnableIncrementalCheckpointing(enableIncrementalCheckpointing); + .setEnableIncrementalCheckpointing(enableIncrementalCheckpointing) + .setUseIngestDbRestoreMode(useIngestDB); if (enableIncrementalCheckpointing) { rocksDBStateUploader = @@ -315,19 +329,18 @@ public class EmbeddedRocksDBStateBackendTest @TestTemplate public void testCorrectMergeOperatorSet() throws Exception { prepareRocksDB(); - final ColumnFamilyOptions columnFamilyOptions = spy(new ColumnFamilyOptions()); - RocksDBKeyedStateBackend<Integer> test = null; - try { - test = - RocksDBTestUtils.builderForTestDB( - TempDirUtils.newFolder(tempFolder), - IntSerializer.INSTANCE, - db, - defaultCFHandle, - columnFamilyOptions) - .setEnableIncrementalCheckpointing(enableIncrementalCheckpointing) - .build(); + try (ColumnFamilyOptions columnFamilyOptions = spy(new ColumnFamilyOptions()); + RocksDBKeyedStateBackend<Integer> test = + RocksDBTestUtils.builderForTestDB( + TempDirUtils.newFolder(tempFolder), + IntSerializer.INSTANCE, + db, + defaultCFHandle, + columnFamilyOptions) + .setEnableIncrementalCheckpointing(enableIncrementalCheckpointing) + .setUseIngestDbRestoreMode(useIngestDB) + .build()) { ValueStateDescriptor<String> stubState1 = new ValueStateDescriptor<>("StubState-1", StringSerializer.INSTANCE); @@ -339,12 +352,6 @@ public class EmbeddedRocksDBStateBackendTest // The default CF is pre-created so sum up to 2 times (once for each stub state) verify(columnFamilyOptions, Mockito.times(2)) .setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME); - } finally { - if (test != null) { - IOUtils.closeQuietly(test); - test.dispose(); - } - columnFamilyOptions.close(); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java new file mode 100644 index 00000000000..4a233ec06ba --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory; +import org.apache.flink.testutils.junit.utils.TempDirUtils; +import org.apache.flink.util.IOUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.RunnableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.core.fs.Path.fromLocalFile; +import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance; + +/** Rescaling test and microbenchmark for RocksDB. */ +public class RocksDBRecoveryTest { + + // Assign System.out for console output. + private static final PrintStream OUTPUT = + new PrintStream( + new OutputStream() { + @Override + public void write(int b) {} + }); + + @TempDir private static java.nio.file.Path tempFolder; + + @Test + public void testScaleOut_1_2() throws Exception { + testRescale(1, 2, 100_000, 10); + } + + @Test + public void testScaleOut_2_8() throws Exception { + testRescale(2, 8, 100_000, 10); + } + + @Test + public void testScaleOut_2_7() throws Exception { + testRescale(2, 7, 100_000, 10); + } + + @Test + public void testScaleIn_2_1() throws Exception { + testRescale(2, 1, 100_000, 10); + } + + @Test + public void testScaleIn_8_2() throws Exception { + testRescale(8, 2, 100_000, 10); + } + + @Test + public void testScaleIn_7_2() throws Exception { + testRescale(7, 2, 100_000, 10); + } + + @Test + public void testScaleIn_2_3() throws Exception { + testRescale(2, 3, 100_000, 10); + } + + @Test + public void testScaleIn_3_2() throws Exception { + testRescale(3, 2, 100_000, 10); + } + + public void testRescale( + int startParallelism, int targetParallelism, int numKeys, int updateDistance) + throws Exception { + + OUTPUT.println("Rescaling from " + startParallelism + " to " + targetParallelism + "..."); + final String stateName = "TestValueState"; + final int maxParallelism = startParallelism * targetParallelism; + final List<RocksDBKeyedStateBackend<Integer>> backends = new ArrayList<>(maxParallelism); + final List<SnapshotResult<KeyedStateHandle>> startSnapshotResult = new ArrayList<>(); + final List<SnapshotResult<KeyedStateHandle>> rescaleSnapshotResult = new ArrayList<>(); + final List<SnapshotResult<KeyedStateHandle>> cleanupSnapshotResult = new ArrayList<>(); + try { + final List<ValueState<Integer>> valueStates = new ArrayList<>(maxParallelism); + try { + ValueStateDescriptor<Integer> stateDescriptor = + new ValueStateDescriptor<>(stateName, IntSerializer.INSTANCE); + + for (int i = 0; i < startParallelism; ++i) { + RocksDBKeyedStateBackend<Integer> backend = + RocksDBTestUtils.builderForTestDefaults( + TempDirUtils.newFolder(tempFolder), + IntSerializer.INSTANCE, + maxParallelism, + KeyGroupRangeAssignment + .computeKeyGroupRangeForOperatorIndex( + maxParallelism, startParallelism, i), + Collections.emptyList()) + .setEnableIncrementalCheckpointing(true) + .setUseIngestDbRestoreMode(true) + .build(); + + valueStates.add( + backend.getOrCreateKeyedState( + VoidNamespaceSerializer.INSTANCE, stateDescriptor)); + + backends.add(backend); + } + + OUTPUT.println("Inserting " + numKeys + " keys..."); + + for (int i = 1; i <= numKeys; ++i) { + int key = i; + int index = + KeyGroupRangeAssignment.assignKeyToParallelOperator( + key, maxParallelism, startParallelism); + backends.get(index).setCurrentKey(key); + valueStates.get(index).update(i); + + if (updateDistance > 0 && i % updateDistance == 0) { + key = i - updateDistance + 1; + index = + KeyGroupRangeAssignment.assignKeyToParallelOperator( + key, maxParallelism, startParallelism); + backends.get(index).setCurrentKey(key); + valueStates.get(index).update(i); + } + } + + OUTPUT.println("Creating snapshots..."); + snapshotAllBackends(backends, startSnapshotResult); + } finally { + for (RocksDBKeyedStateBackend<Integer> backend : backends) { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + valueStates.clear(); + backends.clear(); + } + + for (boolean useIngest : Arrays.asList(Boolean.TRUE, Boolean.FALSE)) { + for (boolean asyncCompact : Arrays.asList(Boolean.TRUE, Boolean.FALSE)) { + + // Rescale start -> target + rescaleAndRestoreBackends( + useIngest, + asyncCompact, + targetParallelism, + maxParallelism, + startSnapshotResult, + backends); + + backends.forEach( + backend -> + backend.getAsyncCompactAfterRestoreFuture() + .ifPresent( + future -> { + try { + future.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); + + snapshotAllBackends(backends, rescaleSnapshotResult); + + int count = 0; + for (RocksDBKeyedStateBackend<Integer> backend : backends) { + count += backend.getKeys(stateName, VoidNamespace.INSTANCE).count(); + IOUtils.closeQuietly(backend); + backend.dispose(); + } + Assertions.assertEquals(numKeys, count); + backends.clear(); + cleanupSnapshotResult.addAll(rescaleSnapshotResult); + + // Rescale reverse: target -> start + rescaleAndRestoreBackends( + useIngest, + false, + startParallelism, + maxParallelism, + rescaleSnapshotResult, + backends); + + count = 0; + for (RocksDBKeyedStateBackend<Integer> backend : backends) { + count += backend.getKeys(stateName, VoidNamespace.INSTANCE).count(); + IOUtils.closeQuietly(backend); + backend.dispose(); + } + Assertions.assertEquals(numKeys, count); + rescaleSnapshotResult.clear(); + backends.clear(); + } + } + } finally { + for (RocksDBKeyedStateBackend<Integer> backend : backends) { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + for (SnapshotResult<KeyedStateHandle> snapshotResult : startSnapshotResult) { + snapshotResult.discardState(); + } + for (SnapshotResult<KeyedStateHandle> snapshotResult : rescaleSnapshotResult) { + snapshotResult.discardState(); + } + for (SnapshotResult<KeyedStateHandle> snapshotResult : cleanupSnapshotResult) { + snapshotResult.discardState(); + } + } + } + + private void rescaleAndRestoreBackends( + boolean useIngest, + boolean asyncCompactAfterRescale, + int targetParallelism, + int maxParallelism, + List<SnapshotResult<KeyedStateHandle>> snapshotResult, + List<RocksDBKeyedStateBackend<Integer>> backendsOut) + throws IOException { + + List<KeyedStateHandle> stateHandles = + extractKeyedStateHandlesFromSnapshotResult(snapshotResult); + List<KeyGroupRange> ranges = computeKeyGroupRanges(targetParallelism, maxParallelism); + List<List<KeyedStateHandle>> handlesByInstance = + computeHandlesByInstance(stateHandles, ranges, targetParallelism); + + OUTPUT.println( + "Restoring using ingestDb=" + + useIngest + + ", asyncCompact=" + + asyncCompactAfterRescale + + "... "); + + OUTPUT.println( + "Sum of snapshot sizes: " + + stateHandles.stream().mapToLong(StateObject::getStateSize).sum() + / (1024 * 1024) + + " MB"); + + long maxInstanceTime = Long.MIN_VALUE; + long t = System.currentTimeMillis(); + for (int i = 0; i < targetParallelism; ++i) { + List<KeyedStateHandle> instanceHandles = handlesByInstance.get(i); + long tInstance = System.currentTimeMillis(); + RocksDBKeyedStateBackend<Integer> backend = + RocksDBTestUtils.builderForTestDefaults( + TempDirUtils.newFolder(tempFolder), + IntSerializer.INSTANCE, + maxParallelism, + ranges.get(i), + instanceHandles) + .setEnableIncrementalCheckpointing(true) + .setUseIngestDbRestoreMode(useIngest) + .setIncrementalRestoreAsyncCompactAfterRescale(asyncCompactAfterRescale) + .build(); + + long instanceTime = System.currentTimeMillis() - tInstance; + if (instanceTime > maxInstanceTime) { + maxInstanceTime = instanceTime; + } + + OUTPUT.println( + " Restored instance " + + i + + " from " + + instanceHandles.size() + + " state handles" + + " time (ms): " + + instanceTime); + + backendsOut.add(backend); + } + OUTPUT.println("Total restore time (ms): " + (System.currentTimeMillis() - t)); + OUTPUT.println("Max restore time (ms): " + maxInstanceTime); + } + + private void snapshotAllBackends( + List<RocksDBKeyedStateBackend<Integer>> backends, + List<SnapshotResult<KeyedStateHandle>> snapshotResultsOut) + throws Exception { + for (int i = 0; i < backends.size(); ++i) { + RocksDBKeyedStateBackend<Integer> backend = backends.get(i); + FsCheckpointStreamFactory fsCheckpointStreamFactory = + new FsCheckpointStreamFactory( + getSharedInstance(), + fromLocalFile( + TempDirUtils.newFolder( + tempFolder, "checkpointsDir_" + UUID.randomUUID() + i)), + fromLocalFile( + TempDirUtils.newFolder( + tempFolder, "sharedStateDir_" + UUID.randomUUID() + i)), + 1, + 4096); + + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = + backend.snapshot( + 0L, + 0L, + fsCheckpointStreamFactory, + CheckpointOptions.forCheckpointWithDefaultLocation()); + + snapshot.run(); + snapshotResultsOut.add(snapshot.get()); + } + } + + private List<KeyedStateHandle> extractKeyedStateHandlesFromSnapshotResult( + List<SnapshotResult<KeyedStateHandle>> snapshotResults) { + return snapshotResults.stream() + .map(SnapshotResult::getJobManagerOwnedSnapshot) + .collect(Collectors.toList()); + } + + private List<KeyGroupRange> computeKeyGroupRanges(int restoreParallelism, int maxParallelism) { + List<KeyGroupRange> ranges = new ArrayList<>(restoreParallelism); + for (int i = 0; i < restoreParallelism; ++i) { + ranges.add( + KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex( + maxParallelism, restoreParallelism, i)); + } + return ranges; + } + + private List<List<KeyedStateHandle>> computeHandlesByInstance( + List<KeyedStateHandle> stateHandles, + List<KeyGroupRange> computedRanges, + int restoreParallelism) { + List<List<KeyedStateHandle>> handlesByInstance = new ArrayList<>(restoreParallelism); + for (KeyGroupRange targetRange : computedRanges) { + List<KeyedStateHandle> handlesForTargetRange = new ArrayList<>(1); + handlesByInstance.add(handlesForTargetRange); + + for (KeyedStateHandle stateHandle : stateHandles) { + if (stateHandle.getKeyGroupRange().getIntersection(targetRange) + != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { + handlesForTargetRange.add(stateHandle); + } + } + } + return handlesByInstance; + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 34d27e1bb59..d54210e727e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -872,6 +872,49 @@ public class RocksDBStateBackendConfigTest { assertTrue(0.3 == rocksDBStateBackend.getOverlapFractionThreshold()); } + @Test + public void testDefaultUseIngestDB() { + EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); + assertEquals( + RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE.defaultValue(), + rocksDBStateBackend.getUseIngestDbRestoreMode()); + } + + @Test + public void testConfigureUseIngestDB() { + EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); + Configuration configuration = new Configuration(); + configuration.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, true); + rocksDBStateBackend = + rocksDBStateBackend.configure(configuration, getClass().getClassLoader()); + assertTrue(rocksDBStateBackend.getUseIngestDbRestoreMode()); + } + + @Test + public void testDefaultIncrementalRestoreInstanceBufferSize() { + EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); + assertEquals( + RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE + .defaultValue(), + rocksDBStateBackend.getIncrementalRestoreAsyncCompactAfterRescale()); + } + + @Test + public void testConfigureIncrementalRestoreInstanceBufferSize() { + EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); + Configuration configuration = new Configuration(); + boolean notDefault = + !RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE + .defaultValue(); + configuration.set( + RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE, + notDefault); + rocksDBStateBackend = + rocksDBStateBackend.configure(configuration, getClass().getClassLoader()); + assertEquals( + notDefault, rocksDBStateBackend.getIncrementalRestoreAsyncCompactAfterRescale()); + } + private void verifySetParameter(Runnable setter) { try { setter.run(); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java index 2fc862664ce..43111df7582 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig; @@ -37,8 +38,11 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.RocksDB; +import javax.annotation.Nonnull; + import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.Collections; /** Test utils for the RocksDB state backend. */ @@ -50,13 +54,34 @@ public final class RocksDBTestUtils { return builderForTestDefaults( instanceBasePath, keySerializer, - EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP); + 2, + new KeyGroupRange(0, 1), + Collections.emptyList()); } public static <K> RocksDBKeyedStateBackendBuilder<K> builderForTestDefaults( File instanceBasePath, TypeSerializer<K> keySerializer, - EmbeddedRocksDBStateBackend.PriorityQueueStateType queueStateType) { + int numKeyGroups, + KeyGroupRange keyGroupRange, + @Nonnull Collection<KeyedStateHandle> stateHandles) { + + return builderForTestDefaults( + instanceBasePath, + keySerializer, + EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP, + numKeyGroups, + keyGroupRange, + stateHandles); + } + + public static <K> RocksDBKeyedStateBackendBuilder<K> builderForTestDefaults( + File instanceBasePath, + TypeSerializer<K> keySerializer, + EmbeddedRocksDBStateBackend.PriorityQueueStateType queueStateType, + int numKeyGroups, + KeyGroupRange keyGroupRange, + @Nonnull Collection<KeyedStateHandle> stateHandles) { final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer(); @@ -68,8 +93,8 @@ public final class RocksDBTestUtils { stateName -> optionsContainer.getColumnOptions(), new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()), keySerializer, - 2, - new KeyGroupRange(0, 1), + numKeyGroups, + keyGroupRange, new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), RocksDBPriorityQueueConfig.buildWithPriorityQueueType(queueStateType), @@ -77,7 +102,7 @@ public final class RocksDBTestUtils { LatencyTrackingStateConfig.disabled(), new UnregisteredMetricsGroup(), (key, value) -> {}, - Collections.emptyList(), + stateHandles, UncompressedStreamCompressionDecorator.INSTANCE, new CloseableRegistry()); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java index 7e253cd2cc6..38b8d3e8c92 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; import org.apache.flink.runtime.state.KeyGroupRange; @@ -63,7 +64,7 @@ public class RocksIncrementalCheckpointRescalingTest extends TestLogger { private final int maxParallelism = 10; - private KeySelector<String, String> keySelector = new TestKeySelector(); + private final KeySelector<String, String> keySelector = new TestKeySelector(); private String[] records; diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java index 9b95c8aa41c..ba13faca294 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java @@ -38,6 +38,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -109,23 +110,29 @@ public class AutoRescalingITCase extends TestLogger { private static final int slotsPerTaskManager = 2; private static final int totalSlots = numTaskManagers * slotsPerTaskManager; - @Parameterized.Parameters(name = "backend = {0}, buffersPerChannel = {1}") + @Parameterized.Parameters(name = "backend = {0}, buffersPerChannel = {1}, useIngestDB = {2}") public static Collection<Object[]> data() { return Arrays.asList( new Object[][] { - {"rocksdb", 0}, {"rocksdb", 2}, {"filesystem", 0}, {"filesystem", 2} + {"rocksdb", 0, false}, + {"rocksdb", 2, true}, + {"filesystem", 0, false}, + {"filesystem", 2, false} }); } - public AutoRescalingITCase(String backend, int buffersPerChannel) { + public AutoRescalingITCase(String backend, int buffersPerChannel, boolean useIngestDB) { this.backend = backend; this.buffersPerChannel = buffersPerChannel; + this.useIngestDB = useIngestDB; } private final String backend; private final int buffersPerChannel; + private final boolean useIngestDB; + private String currentBackend = null; enum OperatorCheckpointMethod { @@ -154,6 +161,7 @@ public class AutoRescalingITCase extends TestLogger { final File savepointDir = temporaryFolder.newFolder(); config.set(StateBackendOptions.STATE_BACKEND, currentBackend); + config.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, useIngestDB); config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); config.set(CheckpointingOptions.LOCAL_RECOVERY, true); config.set(