This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c11632674a8a9636a5f73d906141af578e153d4f Author: Stefan Richter <[email protected]> AuthorDate: Thu Feb 22 12:18:02 2024 +0100 [FLINK-35580] Prevent potential JVM crashes from async compaction when RocksDB is already closed. --- .../state/RocksDBIncrementalCheckpointUtils.java | 121 +++++++++++------ .../streaming/state/RocksDBKeyedStateBackend.java | 2 +- .../state/RocksDBKeyedStateBackendBuilder.java | 13 +- .../streaming/state/RocksDBOperationUtils.java | 13 +- .../RocksDBIncrementalRestoreOperation.java | 147 +++++++++++---------- .../state/restore/RocksDBRestoreResult.java | 11 +- .../streaming/state/RocksDBRecoveryTest.java | 15 ++- 7 files changed, 194 insertions(+), 128 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java index 3147f01747a..e3be413307d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java @@ -17,12 +17,14 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.shaded.guava32.com.google.common.primitives.UnsignedBytes; @@ -41,6 +43,7 @@ import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.Closeable; import java.io.File; import java.nio.file.Path; import java.util.ArrayList; @@ -50,7 +53,6 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; @@ -243,60 +245,93 @@ public class RocksDBIncrementalCheckpointUtils { * @param keyGroupPrefixBytes the number of bytes used to serialize the key-group prefix of keys * in the DB. * @param dbExpectedKeyGroupRange the expected key-groups range of the DB. + * @param rocksDBResourceGuard the resource guard for the given db instance. * @return runnable that performs compaction upon execution if the key-groups range is exceeded. * Otherwise, empty optional is returned. */ - public static Optional<RunnableWithException> createRangeCompactionTaskIfNeeded( + public static RunnableWithException createAsyncRangeCompactionTask( RocksDB db, Collection<ColumnFamilyHandle> columnFamilyHandles, int keyGroupPrefixBytes, - KeyGroupRange dbExpectedKeyGroupRange) { - - RangeCheckResult rangeCheckResult = - checkSstDataAgainstKeyGroupRange(db, keyGroupPrefixBytes, dbExpectedKeyGroupRange); + KeyGroupRange dbExpectedKeyGroupRange, + ResourceGuard rocksDBResourceGuard, + CloseableRegistry closeableRegistry) { + + return () -> { + logger.debug( + "Starting range check for async compaction targeting key-groups range {}.", + dbExpectedKeyGroupRange.prettyPrintInterval()); + final RangeCheckResult rangeCheckResult; + try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource()) { + rangeCheckResult = + checkSstDataAgainstKeyGroupRange( + db, keyGroupPrefixBytes, dbExpectedKeyGroupRange); + } - if (rangeCheckResult.allInRange()) { - // No keys exceed the proclaimed range of the backend, so we don't need a compaction - // from this point of view. - return Optional.empty(); - } + if (rangeCheckResult.allInRange()) { + logger.debug( + "Nothing to compact in async compaction targeting key-groups range {}.", + dbExpectedKeyGroupRange.prettyPrintInterval()); + // No keys exceed the proclaimed range of the backend, so we don't need a compaction + // from this point of view. + return; + } - return Optional.of( - () -> { - try (CompactRangeOptions compactionOptions = - new CompactRangeOptions() - .setExclusiveManualCompaction(true) - .setBottommostLevelCompaction( - CompactRangeOptions.BottommostLevelCompaction - .kForceOptimized)) { - - if (!rangeCheckResult.leftInRange) { - // Compact all keys before from the expected key-groups range - for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { - db.compactRange( - columnFamilyHandle, - // TODO: change to null once this API is fixed - new byte[] {}, - rangeCheckResult.minKey, - compactionOptions); - } + try (CompactRangeOptions compactionOptions = + new CompactRangeOptions() + .setBottommostLevelCompaction( + CompactRangeOptions.BottommostLevelCompaction + .kForceOptimized)) { + + // To cancel an ongoing compaction asap, we register cancelling through the options + // with the registry + final Closeable cancelCompactionCloseable = + () -> { + logger.debug( + "Cancel request for async compaction targeting key-groups range {}.", + dbExpectedKeyGroupRange.prettyPrintInterval()); + compactionOptions.setCanceled(true); + }; + + closeableRegistry.registerCloseable(cancelCompactionCloseable); + + if (!rangeCheckResult.leftInRange) { + logger.debug( + "Compacting left interval in async compaction targeting key-groups range {}.", + dbExpectedKeyGroupRange.prettyPrintInterval()); + // Compact all keys before from the expected key-groups range + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource()) { + db.compactRange( + columnFamilyHandle, + // TODO: change to null once this API is fixed + new byte[] {}, + rangeCheckResult.minKey, + compactionOptions); } + } + } - if (!rangeCheckResult.rightInRange) { - // Compact all keys after the expected key-groups range - for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { - db.compactRange( - columnFamilyHandle, - rangeCheckResult.maxKey, - // TODO: change to null once this API is fixed - new byte[] { - (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff - }, - compactionOptions); - } + if (!rangeCheckResult.rightInRange) { + logger.debug( + "Compacting right interval in async compaction targeting key-groups range {}.", + dbExpectedKeyGroupRange.prettyPrintInterval()); + // Compact all keys after the expected key-groups range + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource()) { + db.compactRange( + columnFamilyHandle, + rangeCheckResult.maxKey, + // TODO: change to null once this API is fixed + new byte[] {(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff}, + compactionOptions); } } - }); + } + + closeableRegistry.unregisterCloseable(cancelCompactionCloseable); + } + }; } /** diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index cef331f2d67..ba4c6570acd 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -467,7 +467,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // disposed, as // working on the disposed object results in SEGFAULTS. if (db != null) { - IOUtils.closeQuietly(writeBatchWrapper); // Metric collection occurs on a background thread. When this method returns @@ -1018,6 +1017,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return writeBatchSize; } + @VisibleForTesting public Optional<CompletableFuture<Void>> getAsyncCompactAfterRestoreFuture() { return Optional.ofNullable(asyncCompactAfterRestoreFuture); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 948c1de2743..88b44462146 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -373,6 +373,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken restoreOperation = getRocksDBRestoreOperation( keyGroupPrefixBytes, + rocksDBResourceGuard, cancelStreamRegistry, kvStateInformation, registeredPQStates, @@ -381,8 +382,13 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken db = restoreResult.getDb(); defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle(); nativeMetricMonitor = restoreResult.getNativeMetricMonitor(); - asyncCompactAfterRestoreFuture = - restoreResult.getAsyncCompactAfterRestoreFuture().orElse(null); + if (ioExecutor != null) { + asyncCompactAfterRestoreFuture = + restoreResult + .getAsyncCompactTaskAfterRestore() + .map((task) -> CompletableFuture.runAsync(task, ioExecutor)) + .orElse(null); + } if (restoreOperation instanceof RocksDBIncrementalRestoreOperation) { backendUID = restoreResult.getBackendUID(); materializedSstFiles = restoreResult.getRestoredSstFiles(); @@ -431,6 +437,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken new ArrayList<>(kvStateInformation.values().size()); IOUtils.closeQuietly(cancelStreamRegistryForBackend); IOUtils.closeQuietly(writeBatchWrapper); + IOUtils.closeQuietly(rocksDBResourceGuard); RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater( columnFamilyOptions, defaultColumnFamilyHandle); IOUtils.closeQuietly(defaultColumnFamilyHandle); @@ -498,6 +505,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken private RocksDBRestoreOperation getRocksDBRestoreOperation( int keyGroupPrefixBytes, + ResourceGuard rocksDBResourceGuard, CloseableRegistry cancelStreamRegistry, LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, @@ -521,6 +529,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken keyGroupRange, keyGroupPrefixBytes, numberOfTransferingThreads, + rocksDBResourceGuard, cancelStreamRegistry, userCodeClassLoader, kvStateInformation, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java index 4750969d050..b511bbfc620 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java @@ -142,22 +142,27 @@ public class RocksDBOperationUtils { ttlCompactFiltersManager, writeBufferManagerCapacity); - final ColumnFamilyHandle columnFamilyHandle; try { - columnFamilyHandle = + ColumnFamilyHandle columnFamilyHandle = createColumnFamily( columnFamilyDescriptor, db, importFilesMetaData, cancelStreamRegistryForRestore); + return new RocksDBKeyedStateBackend.RocksDbKvStateInfo( + columnFamilyHandle, metaInfoBase); } catch (Exception ex) { IOUtils.closeQuietly(columnFamilyDescriptor.getOptions()); throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", ex); } - - return new RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfoBase); } + /** + * Create RocksDB-backed KV-state, including RocksDB ColumnFamily. + * + * @param cancelStreamRegistryForRestore {@link ICloseableRegistry#close closing} it interrupts + * KV state creation + */ public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo( RegisteredStateMetaInfoBase metaInfoBase, RocksDB db, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index ae4d7b8ed0e..9aae10f2b5e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -50,9 +50,9 @@ import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; import org.apache.flink.util.StateMigrationException; import org.apache.flink.util.clock.SystemClock; -import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.RunnableWithException; import org.rocksdb.ColumnFamilyDescriptor; @@ -68,6 +68,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.File; @@ -84,9 +85,7 @@ import java.util.Objects; import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.function.Function; import java.util.stream.Collectors; @@ -122,6 +121,8 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper private final StateSerializerProvider<K> keySerializerProvider; private final ClassLoader userCodeClassLoader; private final CustomInitializationMetrics customInitializationMetrics; + private final ResourceGuard dbResourceGuard; + private long lastCompletedCheckpointId; private UUID backendUID; private final long writeBatchSize; @@ -142,6 +143,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper KeyGroupRange keyGroupRange, int keyGroupPrefixBytes, int numberOfTransferringThreads, + ResourceGuard dbResourceGuard, CloseableRegistry cancelStreamRegistry, ClassLoader userCodeClassLoader, Map<String, RocksDbKvStateInfo> kvStateInformation, @@ -180,6 +182,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper this.overlapFractionThreshold = overlapFractionThreshold; this.customInitializationMetrics = customInitializationMetrics; this.restoreStateHandles = restoreStateHandles; + this.dbResourceGuard = dbResourceGuard; this.cancelStreamRegistry = cancelStreamRegistry; this.keyGroupRange = keyGroupRange; this.instanceBasePath = instanceBasePath; @@ -233,57 +236,11 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper runAndReportDuration( () -> restoreFromLocalState(localKeyedStateHandles), RESTORE_STATE_DURATION); - CompletableFuture<Void> asyncCompactFuture = null; - if (asyncCompactAfterRescale) { - asyncCompactFuture = - RocksDBIncrementalCheckpointUtils.createRangeCompactionTaskIfNeeded( - rocksHandle.getDb(), - rocksHandle.getColumnFamilyHandles(), - keyGroupPrefixBytes, - keyGroupRange) - .map( - (run) -> { - RunnableWithException runWithLogging = - () -> { - long t = System.currentTimeMillis(); - logger.info( - "Starting async compaction after restore for backend {} in operator {}", - backendUID, - operatorIdentifier); - try { - runAndReportDuration( - run, - RESTORE_ASYNC_COMPACTION_DURATION); - logger.info( - "Completed async compaction after restore for backend {} in operator {} after {} ms.", - backendUID, - operatorIdentifier, - System.currentTimeMillis() - t); - } catch (Exception ex) { - logger.info( - "Failed async compaction after restore for backend {} in operator {} after {} ms.", - backendUID, - operatorIdentifier, - System.currentTimeMillis() - t, - ex); - throw ex; - } - }; - ExecutorService executorService = - Executors.newSingleThreadExecutor(); - CompletableFuture<Void> resultFuture = - FutureUtils.runAsync( - runWithLogging, executorService); - executorService.shutdown(); - return resultFuture; - }) - .orElse(null); - logger.info( - "Finished RocksDB incremental recovery in operator {} with " - + "target key-group range range {}.", - operatorIdentifier, - keyGroupRange.prettyPrintInterval()); - } + logger.info( + "Finished RocksDB incremental recovery in operator {} with " + + "target key-group range range {}.", + operatorIdentifier, + keyGroupRange.prettyPrintInterval()); return new RocksDBRestoreResult( this.rocksHandle.getDb(), @@ -292,7 +249,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper lastCompletedCheckpointId, backendUID, restoredSstFiles, - asyncCompactFuture); + createAsyncCompactionTask()); } finally { // Cleanup all download directories allDownloadSpecs.stream() @@ -301,6 +258,47 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper } } + @Nullable + private Runnable createAsyncCompactionTask() { + + if (!asyncCompactAfterRescale) { + return null; + } + + return () -> { + long t = System.currentTimeMillis(); + logger.info( + "Starting async compaction after restore for backend {} in operator {}", + backendUID, + operatorIdentifier); + try { + RunnableWithException asyncRangeCompactionTask = + RocksDBIncrementalCheckpointUtils.createAsyncRangeCompactionTask( + rocksHandle.getDb(), + rocksHandle.getColumnFamilyHandles(), + keyGroupPrefixBytes, + keyGroupRange, + dbResourceGuard, + cancelStreamRegistry); + runAndReportDuration(asyncRangeCompactionTask, RESTORE_ASYNC_COMPACTION_DURATION); + logger.info( + "Completed async compaction after restore for backend {} in operator {} after {} ms.", + backendUID, + operatorIdentifier, + System.currentTimeMillis() - t); + } catch (Throwable throwable) { + // We don't rethrow because the executing thread might have a fatal exception + // handler. + logger.info( + "Failed async compaction after restore for backend {} in operator {} after {} ms.", + backendUID, + operatorIdentifier, + System.currentTimeMillis() - t, + throwable); + } + }; + } + private void restoreFromLocalState( List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles) throws Exception { if (localKeyedStateHandles.size() == 1) { @@ -519,28 +517,40 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper logger.info( "Starting restore export for backend with range {} in operator {}.", - keyGroupRange, + keyGroupRange.prettyPrintInterval(), operatorIdentifier); int minExportKeyGroup = Integer.MAX_VALUE; int maxExportKeyGroup = Integer.MIN_VALUE; + int index = 0; for (IncrementalLocalKeyedStateHandle stateHandle : localKeyedStateHandles) { + + final String logLineSuffix = + " for state handle at index " + + index + + " with proclaimed key-group range " + + stateHandle.getKeyGroupRange().prettyPrintInterval() + + " for backend with range " + + keyGroupRange.prettyPrintInterval() + + " in operator " + + operatorIdentifier + + "."; + + logger.debug("Opening temporary database" + logLineSuffix); try (RestoredDBInstance tmpRestoreDBInfo = restoreTempDBInstanceFromLocalState(stateHandle)) { List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles; + logger.debug("Checking actual keys of sst files" + logLineSuffix); + // Check if the data in all SST files referenced in the handle is within the // proclaimed key-groups range of the handle. if (RocksDBIncrementalCheckpointUtils.isSstDataInKeyGroupRange( tmpRestoreDBInfo.db, keyGroupPrefixBytes, stateHandle.getKeyGroupRange())) { - logger.debug( - "Exporting state handle {} for backend with range {} in operator {}.", - stateHandle, - keyGroupRange, - operatorIdentifier); + logger.debug("Start exporting" + logLineSuffix); List<RegisteredStateMetaInfoBase> registeredStateMetaInfoBases = tmpRestoreDBInfo.stateMetaInfoSnapshots.stream() @@ -565,22 +575,21 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper maxExportKeyGroup, stateHandle.getKeyGroupRange().getEndKeyGroup()); - logger.debug( - "Done exporting state handle {} for backend with range {} in operator {}.", - stateHandle, - keyGroupRange, - operatorIdentifier); + logger.debug("Done exporting" + logLineSuffix); } else { // Actual key range in files exceeds proclaimed range, cannot import. We // will copy this handle using a temporary DB later. skipped.add(stateHandle); + logger.debug("Skipped export" + logLineSuffix); } } + + ++index; } logger.info( "Completed restore export for backend with range {} in operator {}. Number of Exported handles: {}. Skipped handles: {}.", - keyGroupRange, + keyGroupRange.prettyPrintInterval(), operatorIdentifier, localKeyedStateHandles.size() - skipped.size(), skipped); @@ -647,7 +656,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper // We initialize the base DB by importing all the exported data. logger.info( "Starting to import exported state handles for backend with range {} in operator {} using Clip/Ingest DB.", - keyGroupRange, + keyGroupRange.prettyPrintInterval(), operatorIdentifier); rocksHandle.openDB(); for (Map.Entry<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>> entry : @@ -667,7 +676,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper logger.info( "Completed importing exported state handles for backend with range {} in operator {} using Clip/Ingest DB.", - keyGroupRange, + keyGroupRange.prettyPrintInterval(), operatorIdentifier); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java index 1e458eb45f4..5f565b28950 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java @@ -30,7 +30,6 @@ import java.util.Collection; import java.util.Optional; import java.util.SortedMap; import java.util.UUID; -import java.util.concurrent.CompletableFuture; /** Entity holding result of RocksDB instance restore. */ public class RocksDBRestoreResult { @@ -43,7 +42,7 @@ public class RocksDBRestoreResult { private final UUID backendUID; private final SortedMap<Long, Collection<HandleAndLocalPath>> restoredSstFiles; - private final CompletableFuture<Void> asyncCompactAfterRestoreFuture; + private final Runnable asyncCompactTaskAfterRestore; public RocksDBRestoreResult( RocksDB db, @@ -52,14 +51,14 @@ public class RocksDBRestoreResult { long lastCompletedCheckpointId, UUID backendUID, SortedMap<Long, Collection<HandleAndLocalPath>> restoredSstFiles, - @Nullable CompletableFuture<Void> asyncCompactAfterRestoreFuture) { + @Nullable Runnable asyncCompactTaskAfterRestore) { this.db = db; this.defaultColumnFamilyHandle = defaultColumnFamilyHandle; this.nativeMetricMonitor = nativeMetricMonitor; this.lastCompletedCheckpointId = lastCompletedCheckpointId; this.backendUID = backendUID; this.restoredSstFiles = restoredSstFiles; - this.asyncCompactAfterRestoreFuture = asyncCompactAfterRestoreFuture; + this.asyncCompactTaskAfterRestore = asyncCompactTaskAfterRestore; } public RocksDB getDb() { @@ -86,7 +85,7 @@ public class RocksDBRestoreResult { return nativeMetricMonitor; } - public Optional<CompletableFuture<Void>> getAsyncCompactAfterRestoreFuture() { - return Optional.ofNullable(asyncCompactAfterRestoreFuture); + public Optional<Runnable> getAsyncCompactTaskAfterRestore() { + return Optional.ofNullable(asyncCompactTaskAfterRestore); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java index ce4d93ea0be..3e55dd553a7 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java @@ -45,6 +45,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.RunnableFuture; import java.util.stream.Collectors; @@ -108,6 +110,7 @@ public class RocksDBRecoveryTest { int startParallelism, int targetParallelism, int numKeys, int updateDistance) throws Exception { + ExecutorService ioExecutor = Executors.newSingleThreadExecutor(); OUTPUT.println("Rescaling from " + startParallelism + " to " + targetParallelism + "..."); final String stateName = "TestValueState"; final int maxParallelism = startParallelism * targetParallelism; @@ -133,6 +136,7 @@ public class RocksDBRecoveryTest { Collections.emptyList()) .setEnableIncrementalCheckpointing(true) .setUseIngestDbRestoreMode(true) + .setIOExecutor(ioExecutor) .build(); valueStates.add( @@ -183,7 +187,8 @@ public class RocksDBRecoveryTest { targetParallelism, maxParallelism, startSnapshotResult, - backends); + backends, + ioExecutor); backends.forEach( backend -> @@ -216,7 +221,8 @@ public class RocksDBRecoveryTest { startParallelism, maxParallelism, rescaleSnapshotResult, - backends); + backends, + ioExecutor); count = 0; for (RocksDBKeyedStateBackend<Integer> backend : backends) { @@ -243,6 +249,7 @@ public class RocksDBRecoveryTest { for (SnapshotResult<KeyedStateHandle> snapshotResult : cleanupSnapshotResult) { snapshotResult.discardState(); } + ioExecutor.shutdown(); } } @@ -252,7 +259,8 @@ public class RocksDBRecoveryTest { int targetParallelism, int maxParallelism, List<SnapshotResult<KeyedStateHandle>> snapshotResult, - List<RocksDBKeyedStateBackend<Integer>> backendsOut) + List<RocksDBKeyedStateBackend<Integer>> backendsOut, + ExecutorService ioExecutor) throws IOException { List<KeyedStateHandle> stateHandles = @@ -290,6 +298,7 @@ public class RocksDBRecoveryTest { .setUseIngestDbRestoreMode(useIngest) .setIncrementalRestoreAsyncCompactAfterRescale(asyncCompactAfterRescale) .setRescalingUseDeleteFilesInRange(true) + .setIOExecutor(ioExecutor) .build(); long instanceTime = System.currentTimeMillis() - tInstance;
