This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c11632674a8a9636a5f73d906141af578e153d4f
Author: Stefan Richter <[email protected]>
AuthorDate: Thu Feb 22 12:18:02 2024 +0100

    [FLINK-35580] Prevent potential JVM crashes from async compaction when 
RocksDB is already closed.
---
 .../state/RocksDBIncrementalCheckpointUtils.java   | 121 +++++++++++------
 .../streaming/state/RocksDBKeyedStateBackend.java  |   2 +-
 .../state/RocksDBKeyedStateBackendBuilder.java     |  13 +-
 .../streaming/state/RocksDBOperationUtils.java     |  13 +-
 .../RocksDBIncrementalRestoreOperation.java        | 147 +++++++++++----------
 .../state/restore/RocksDBRestoreResult.java        |  11 +-
 .../streaming/state/RocksDBRecoveryTest.java       |  15 ++-
 7 files changed, 194 insertions(+), 128 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 3147f01747a..e3be413307d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -17,12 +17,14 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
 import org.apache.flink.util.function.RunnableWithException;
 
 import 
org.apache.flink.shaded.guava32.com.google.common.primitives.UnsignedBytes;
@@ -41,6 +43,7 @@ import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.Closeable;
 import java.io.File;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -50,7 +53,6 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -243,60 +245,93 @@ public class RocksDBIncrementalCheckpointUtils {
      * @param keyGroupPrefixBytes the number of bytes used to serialize the 
key-group prefix of keys
      *     in the DB.
      * @param dbExpectedKeyGroupRange the expected key-groups range of the DB.
+     * @param rocksDBResourceGuard the resource guard for the given db 
instance.
      * @return runnable that performs compaction upon execution if the 
key-groups range is exceeded.
      *     Otherwise, empty optional is returned.
      */
-    public static Optional<RunnableWithException> 
createRangeCompactionTaskIfNeeded(
+    public static RunnableWithException createAsyncRangeCompactionTask(
             RocksDB db,
             Collection<ColumnFamilyHandle> columnFamilyHandles,
             int keyGroupPrefixBytes,
-            KeyGroupRange dbExpectedKeyGroupRange) {
-
-        RangeCheckResult rangeCheckResult =
-                checkSstDataAgainstKeyGroupRange(db, keyGroupPrefixBytes, 
dbExpectedKeyGroupRange);
+            KeyGroupRange dbExpectedKeyGroupRange,
+            ResourceGuard rocksDBResourceGuard,
+            CloseableRegistry closeableRegistry) {
+
+        return () -> {
+            logger.debug(
+                    "Starting range check for async compaction targeting 
key-groups range {}.",
+                    dbExpectedKeyGroupRange.prettyPrintInterval());
+            final RangeCheckResult rangeCheckResult;
+            try (ResourceGuard.Lease ignored = 
rocksDBResourceGuard.acquireResource()) {
+                rangeCheckResult =
+                        checkSstDataAgainstKeyGroupRange(
+                                db, keyGroupPrefixBytes, 
dbExpectedKeyGroupRange);
+            }
 
-        if (rangeCheckResult.allInRange()) {
-            // No keys exceed the proclaimed range of the backend, so we don't 
need a compaction
-            // from this point of view.
-            return Optional.empty();
-        }
+            if (rangeCheckResult.allInRange()) {
+                logger.debug(
+                        "Nothing to compact in async compaction targeting 
key-groups range {}.",
+                        dbExpectedKeyGroupRange.prettyPrintInterval());
+                // No keys exceed the proclaimed range of the backend, so we 
don't need a compaction
+                // from this point of view.
+                return;
+            }
 
-        return Optional.of(
-                () -> {
-                    try (CompactRangeOptions compactionOptions =
-                            new CompactRangeOptions()
-                                    .setExclusiveManualCompaction(true)
-                                    .setBottommostLevelCompaction(
-                                            
CompactRangeOptions.BottommostLevelCompaction
-                                                    .kForceOptimized)) {
-
-                        if (!rangeCheckResult.leftInRange) {
-                            // Compact all keys before from the expected 
key-groups range
-                            for (ColumnFamilyHandle columnFamilyHandle : 
columnFamilyHandles) {
-                                db.compactRange(
-                                        columnFamilyHandle,
-                                        // TODO: change to null once this API 
is fixed
-                                        new byte[] {},
-                                        rangeCheckResult.minKey,
-                                        compactionOptions);
-                            }
+            try (CompactRangeOptions compactionOptions =
+                    new CompactRangeOptions()
+                            .setBottommostLevelCompaction(
+                                    
CompactRangeOptions.BottommostLevelCompaction
+                                            .kForceOptimized)) {
+
+                // To cancel an ongoing compaction asap, we register 
cancelling through the options
+                // with the registry
+                final Closeable cancelCompactionCloseable =
+                        () -> {
+                            logger.debug(
+                                    "Cancel request for async compaction 
targeting key-groups range {}.",
+                                    
dbExpectedKeyGroupRange.prettyPrintInterval());
+                            compactionOptions.setCanceled(true);
+                        };
+
+                closeableRegistry.registerCloseable(cancelCompactionCloseable);
+
+                if (!rangeCheckResult.leftInRange) {
+                    logger.debug(
+                            "Compacting left interval in async compaction 
targeting key-groups range {}.",
+                            dbExpectedKeyGroupRange.prettyPrintInterval());
+                    // Compact all keys before from the expected key-groups 
range
+                    for (ColumnFamilyHandle columnFamilyHandle : 
columnFamilyHandles) {
+                        try (ResourceGuard.Lease ignored = 
rocksDBResourceGuard.acquireResource()) {
+                            db.compactRange(
+                                    columnFamilyHandle,
+                                    // TODO: change to null once this API is 
fixed
+                                    new byte[] {},
+                                    rangeCheckResult.minKey,
+                                    compactionOptions);
                         }
+                    }
+                }
 
-                        if (!rangeCheckResult.rightInRange) {
-                            // Compact all keys after the expected key-groups 
range
-                            for (ColumnFamilyHandle columnFamilyHandle : 
columnFamilyHandles) {
-                                db.compactRange(
-                                        columnFamilyHandle,
-                                        rangeCheckResult.maxKey,
-                                        // TODO: change to null once this API 
is fixed
-                                        new byte[] {
-                                            (byte) 0xff, (byte) 0xff, (byte) 
0xff, (byte) 0xff
-                                        },
-                                        compactionOptions);
-                            }
+                if (!rangeCheckResult.rightInRange) {
+                    logger.debug(
+                            "Compacting right interval in async compaction 
targeting key-groups range {}.",
+                            dbExpectedKeyGroupRange.prettyPrintInterval());
+                    // Compact all keys after the expected key-groups range
+                    for (ColumnFamilyHandle columnFamilyHandle : 
columnFamilyHandles) {
+                        try (ResourceGuard.Lease ignored = 
rocksDBResourceGuard.acquireResource()) {
+                            db.compactRange(
+                                    columnFamilyHandle,
+                                    rangeCheckResult.maxKey,
+                                    // TODO: change to null once this API is 
fixed
+                                    new byte[] {(byte) 0xff, (byte) 0xff, 
(byte) 0xff, (byte) 0xff},
+                                    compactionOptions);
                         }
                     }
-                });
+                }
+
+                
closeableRegistry.unregisterCloseable(cancelCompactionCloseable);
+            }
+        };
     }
 
     /**
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index cef331f2d67..ba4c6570acd 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -467,7 +467,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         // disposed, as
         // working on the disposed object results in SEGFAULTS.
         if (db != null) {
-
             IOUtils.closeQuietly(writeBatchWrapper);
 
             // Metric collection occurs on a background thread. When this 
method returns
@@ -1018,6 +1017,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         return writeBatchSize;
     }
 
+    @VisibleForTesting
     public Optional<CompletableFuture<Void>> 
getAsyncCompactAfterRestoreFuture() {
         return Optional.ofNullable(asyncCompactAfterRestoreFuture);
     }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 948c1de2743..88b44462146 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -373,6 +373,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                 restoreOperation =
                         getRocksDBRestoreOperation(
                                 keyGroupPrefixBytes,
+                                rocksDBResourceGuard,
                                 cancelStreamRegistry,
                                 kvStateInformation,
                                 registeredPQStates,
@@ -381,8 +382,13 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                 db = restoreResult.getDb();
                 defaultColumnFamilyHandle = 
restoreResult.getDefaultColumnFamilyHandle();
                 nativeMetricMonitor = restoreResult.getNativeMetricMonitor();
-                asyncCompactAfterRestoreFuture =
-                        
restoreResult.getAsyncCompactAfterRestoreFuture().orElse(null);
+                if (ioExecutor != null) {
+                    asyncCompactAfterRestoreFuture =
+                            restoreResult
+                                    .getAsyncCompactTaskAfterRestore()
+                                    .map((task) -> 
CompletableFuture.runAsync(task, ioExecutor))
+                                    .orElse(null);
+                }
                 if (restoreOperation instanceof 
RocksDBIncrementalRestoreOperation) {
                     backendUID = restoreResult.getBackendUID();
                     materializedSstFiles = restoreResult.getRestoredSstFiles();
@@ -431,6 +437,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                     new ArrayList<>(kvStateInformation.values().size());
             IOUtils.closeQuietly(cancelStreamRegistryForBackend);
             IOUtils.closeQuietly(writeBatchWrapper);
+            IOUtils.closeQuietly(rocksDBResourceGuard);
             RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(
                     columnFamilyOptions, defaultColumnFamilyHandle);
             IOUtils.closeQuietly(defaultColumnFamilyHandle);
@@ -498,6 +505,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
 
     private RocksDBRestoreOperation getRocksDBRestoreOperation(
             int keyGroupPrefixBytes,
+            ResourceGuard rocksDBResourceGuard,
             CloseableRegistry cancelStreamRegistry,
             LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation,
             LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> 
registeredPQStates,
@@ -521,6 +529,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                     keyGroupRange,
                     keyGroupPrefixBytes,
                     numberOfTransferingThreads,
+                    rocksDBResourceGuard,
                     cancelStreamRegistry,
                     userCodeClassLoader,
                     kvStateInformation,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index 4750969d050..b511bbfc620 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -142,22 +142,27 @@ public class RocksDBOperationUtils {
                         ttlCompactFiltersManager,
                         writeBufferManagerCapacity);
 
-        final ColumnFamilyHandle columnFamilyHandle;
         try {
-            columnFamilyHandle =
+            ColumnFamilyHandle columnFamilyHandle =
                     createColumnFamily(
                             columnFamilyDescriptor,
                             db,
                             importFilesMetaData,
                             cancelStreamRegistryForRestore);
+            return new RocksDBKeyedStateBackend.RocksDbKvStateInfo(
+                    columnFamilyHandle, metaInfoBase);
         } catch (Exception ex) {
             IOUtils.closeQuietly(columnFamilyDescriptor.getOptions());
             throw new FlinkRuntimeException("Error creating 
ColumnFamilyHandle.", ex);
         }
-
-        return new 
RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfoBase);
     }
 
+    /**
+     * Create RocksDB-backed KV-state, including RocksDB ColumnFamily.
+     *
+     * @param cancelStreamRegistryForRestore {@link ICloseableRegistry#close 
closing} it interrupts
+     *     KV state creation
+     */
     public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
             RegisteredStateMetaInfoBase metaInfoBase,
             RocksDB db,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index ae4d7b8ed0e..9aae10f2b5e 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -50,9 +50,9 @@ import 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.clock.SystemClock;
-import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.RunnableWithException;
 
 import org.rocksdb.ColumnFamilyDescriptor;
@@ -68,6 +68,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.Closeable;
 import java.io.File;
@@ -84,9 +85,7 @@ import java.util.Objects;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -122,6 +121,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
     private final StateSerializerProvider<K> keySerializerProvider;
     private final ClassLoader userCodeClassLoader;
     private final CustomInitializationMetrics customInitializationMetrics;
+    private final ResourceGuard dbResourceGuard;
+
     private long lastCompletedCheckpointId;
     private UUID backendUID;
     private final long writeBatchSize;
@@ -142,6 +143,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             KeyGroupRange keyGroupRange,
             int keyGroupPrefixBytes,
             int numberOfTransferringThreads,
+            ResourceGuard dbResourceGuard,
             CloseableRegistry cancelStreamRegistry,
             ClassLoader userCodeClassLoader,
             Map<String, RocksDbKvStateInfo> kvStateInformation,
@@ -180,6 +182,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         this.overlapFractionThreshold = overlapFractionThreshold;
         this.customInitializationMetrics = customInitializationMetrics;
         this.restoreStateHandles = restoreStateHandles;
+        this.dbResourceGuard = dbResourceGuard;
         this.cancelStreamRegistry = cancelStreamRegistry;
         this.keyGroupRange = keyGroupRange;
         this.instanceBasePath = instanceBasePath;
@@ -233,57 +236,11 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             runAndReportDuration(
                     () -> restoreFromLocalState(localKeyedStateHandles), 
RESTORE_STATE_DURATION);
 
-            CompletableFuture<Void> asyncCompactFuture = null;
-            if (asyncCompactAfterRescale) {
-                asyncCompactFuture =
-                        
RocksDBIncrementalCheckpointUtils.createRangeCompactionTaskIfNeeded(
-                                        rocksHandle.getDb(),
-                                        rocksHandle.getColumnFamilyHandles(),
-                                        keyGroupPrefixBytes,
-                                        keyGroupRange)
-                                .map(
-                                        (run) -> {
-                                            RunnableWithException 
runWithLogging =
-                                                    () -> {
-                                                        long t = 
System.currentTimeMillis();
-                                                        logger.info(
-                                                                "Starting 
async compaction after restore for backend {} in operator {}",
-                                                                backendUID,
-                                                                
operatorIdentifier);
-                                                        try {
-                                                            
runAndReportDuration(
-                                                                    run,
-                                                                    
RESTORE_ASYNC_COMPACTION_DURATION);
-                                                            logger.info(
-                                                                    "Completed 
async compaction after restore for backend {} in operator {} after {} ms.",
-                                                                    backendUID,
-                                                                    
operatorIdentifier,
-                                                                    
System.currentTimeMillis() - t);
-                                                        } catch (Exception ex) 
{
-                                                            logger.info(
-                                                                    "Failed 
async compaction after restore for backend {} in operator {} after {} ms.",
-                                                                    backendUID,
-                                                                    
operatorIdentifier,
-                                                                    
System.currentTimeMillis() - t,
-                                                                    ex);
-                                                            throw ex;
-                                                        }
-                                                    };
-                                            ExecutorService executorService =
-                                                    
Executors.newSingleThreadExecutor();
-                                            CompletableFuture<Void> 
resultFuture =
-                                                    FutureUtils.runAsync(
-                                                            runWithLogging, 
executorService);
-                                            executorService.shutdown();
-                                            return resultFuture;
-                                        })
-                                .orElse(null);
-                logger.info(
-                        "Finished RocksDB incremental recovery in operator {} 
with "
-                                + "target key-group range range {}.",
-                        operatorIdentifier,
-                        keyGroupRange.prettyPrintInterval());
-            }
+            logger.info(
+                    "Finished RocksDB incremental recovery in operator {} with 
"
+                            + "target key-group range range {}.",
+                    operatorIdentifier,
+                    keyGroupRange.prettyPrintInterval());
 
             return new RocksDBRestoreResult(
                     this.rocksHandle.getDb(),
@@ -292,7 +249,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                     lastCompletedCheckpointId,
                     backendUID,
                     restoredSstFiles,
-                    asyncCompactFuture);
+                    createAsyncCompactionTask());
         } finally {
             // Cleanup all download directories
             allDownloadSpecs.stream()
@@ -301,6 +258,47 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         }
     }
 
+    @Nullable
+    private Runnable createAsyncCompactionTask() {
+
+        if (!asyncCompactAfterRescale) {
+            return null;
+        }
+
+        return () -> {
+            long t = System.currentTimeMillis();
+            logger.info(
+                    "Starting async compaction after restore for backend {} in 
operator {}",
+                    backendUID,
+                    operatorIdentifier);
+            try {
+                RunnableWithException asyncRangeCompactionTask =
+                        
RocksDBIncrementalCheckpointUtils.createAsyncRangeCompactionTask(
+                                rocksHandle.getDb(),
+                                rocksHandle.getColumnFamilyHandles(),
+                                keyGroupPrefixBytes,
+                                keyGroupRange,
+                                dbResourceGuard,
+                                cancelStreamRegistry);
+                runAndReportDuration(asyncRangeCompactionTask, 
RESTORE_ASYNC_COMPACTION_DURATION);
+                logger.info(
+                        "Completed async compaction after restore for backend 
{} in operator {} after {} ms.",
+                        backendUID,
+                        operatorIdentifier,
+                        System.currentTimeMillis() - t);
+            } catch (Throwable throwable) {
+                // We don't rethrow because the executing thread might have a 
fatal exception
+                // handler.
+                logger.info(
+                        "Failed async compaction after restore for backend {} 
in operator {} after {} ms.",
+                        backendUID,
+                        operatorIdentifier,
+                        System.currentTimeMillis() - t,
+                        throwable);
+            }
+        };
+    }
+
     private void restoreFromLocalState(
             List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles) 
throws Exception {
         if (localKeyedStateHandles.size() == 1) {
@@ -519,28 +517,40 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
         logger.info(
                 "Starting restore export for backend with range {} in operator 
{}.",
-                keyGroupRange,
+                keyGroupRange.prettyPrintInterval(),
                 operatorIdentifier);
 
         int minExportKeyGroup = Integer.MAX_VALUE;
         int maxExportKeyGroup = Integer.MIN_VALUE;
+        int index = 0;
         for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
+
+            final String logLineSuffix =
+                    " for state handle at index "
+                            + index
+                            + " with proclaimed key-group range "
+                            + 
stateHandle.getKeyGroupRange().prettyPrintInterval()
+                            + " for backend with range "
+                            + keyGroupRange.prettyPrintInterval()
+                            + " in operator "
+                            + operatorIdentifier
+                            + ".";
+
+            logger.debug("Opening temporary database" + logLineSuffix);
             try (RestoredDBInstance tmpRestoreDBInfo =
                     restoreTempDBInstanceFromLocalState(stateHandle)) {
 
                 List<ColumnFamilyHandle> tmpColumnFamilyHandles =
                         tmpRestoreDBInfo.columnFamilyHandles;
 
+                logger.debug("Checking actual keys of sst files" + 
logLineSuffix);
+
                 // Check if the data in all SST files referenced in the handle 
is within the
                 // proclaimed key-groups range of the handle.
                 if (RocksDBIncrementalCheckpointUtils.isSstDataInKeyGroupRange(
                         tmpRestoreDBInfo.db, keyGroupPrefixBytes, 
stateHandle.getKeyGroupRange())) {
 
-                    logger.debug(
-                            "Exporting state handle {} for backend with range 
{} in operator {}.",
-                            stateHandle,
-                            keyGroupRange,
-                            operatorIdentifier);
+                    logger.debug("Start exporting" + logLineSuffix);
 
                     List<RegisteredStateMetaInfoBase> 
registeredStateMetaInfoBases =
                             tmpRestoreDBInfo.stateMetaInfoSnapshots.stream()
@@ -565,22 +575,21 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                                     maxExportKeyGroup,
                                     
stateHandle.getKeyGroupRange().getEndKeyGroup());
 
-                    logger.debug(
-                            "Done exporting state handle {} for backend with 
range {} in operator {}.",
-                            stateHandle,
-                            keyGroupRange,
-                            operatorIdentifier);
+                    logger.debug("Done exporting" + logLineSuffix);
                 } else {
                     // Actual key range in files exceeds proclaimed range, 
cannot import. We
                     // will copy this handle using a temporary DB later.
                     skipped.add(stateHandle);
+                    logger.debug("Skipped export" + logLineSuffix);
                 }
             }
+
+            ++index;
         }
 
         logger.info(
                 "Completed restore export for backend with range {} in 
operator {}. Number of Exported handles: {}. Skipped handles: {}.",
-                keyGroupRange,
+                keyGroupRange.prettyPrintInterval(),
                 operatorIdentifier,
                 localKeyedStateHandles.size() - skipped.size(),
                 skipped);
@@ -647,7 +656,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         // We initialize the base DB by importing all the exported data.
         logger.info(
                 "Starting to import exported state handles for backend with 
range {} in operator {} using Clip/Ingest DB.",
-                keyGroupRange,
+                keyGroupRange.prettyPrintInterval(),
                 operatorIdentifier);
         rocksHandle.openDB();
         for (Map.Entry<RegisteredStateMetaInfoBase, 
List<ExportImportFilesMetaData>> entry :
@@ -667,7 +676,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
         logger.info(
                 "Completed importing exported state handles for backend with 
range {} in operator {} using Clip/Ingest DB.",
-                keyGroupRange,
+                keyGroupRange.prettyPrintInterval(),
                 operatorIdentifier);
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java
index 1e458eb45f4..5f565b28950 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java
@@ -30,7 +30,6 @@ import java.util.Collection;
 import java.util.Optional;
 import java.util.SortedMap;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 
 /** Entity holding result of RocksDB instance restore. */
 public class RocksDBRestoreResult {
@@ -43,7 +42,7 @@ public class RocksDBRestoreResult {
     private final UUID backendUID;
     private final SortedMap<Long, Collection<HandleAndLocalPath>> 
restoredSstFiles;
 
-    private final CompletableFuture<Void> asyncCompactAfterRestoreFuture;
+    private final Runnable asyncCompactTaskAfterRestore;
 
     public RocksDBRestoreResult(
             RocksDB db,
@@ -52,14 +51,14 @@ public class RocksDBRestoreResult {
             long lastCompletedCheckpointId,
             UUID backendUID,
             SortedMap<Long, Collection<HandleAndLocalPath>> restoredSstFiles,
-            @Nullable CompletableFuture<Void> asyncCompactAfterRestoreFuture) {
+            @Nullable Runnable asyncCompactTaskAfterRestore) {
         this.db = db;
         this.defaultColumnFamilyHandle = defaultColumnFamilyHandle;
         this.nativeMetricMonitor = nativeMetricMonitor;
         this.lastCompletedCheckpointId = lastCompletedCheckpointId;
         this.backendUID = backendUID;
         this.restoredSstFiles = restoredSstFiles;
-        this.asyncCompactAfterRestoreFuture = asyncCompactAfterRestoreFuture;
+        this.asyncCompactTaskAfterRestore = asyncCompactTaskAfterRestore;
     }
 
     public RocksDB getDb() {
@@ -86,7 +85,7 @@ public class RocksDBRestoreResult {
         return nativeMetricMonitor;
     }
 
-    public Optional<CompletableFuture<Void>> 
getAsyncCompactAfterRestoreFuture() {
-        return Optional.ofNullable(asyncCompactAfterRestoreFuture);
+    public Optional<Runnable> getAsyncCompactTaskAfterRestore() {
+        return Optional.ofNullable(asyncCompactTaskAfterRestore);
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java
index ce4d93ea0be..3e55dd553a7 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java
@@ -45,6 +45,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.RunnableFuture;
 import java.util.stream.Collectors;
 
@@ -108,6 +110,7 @@ public class RocksDBRecoveryTest {
             int startParallelism, int targetParallelism, int numKeys, int 
updateDistance)
             throws Exception {
 
+        ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
         OUTPUT.println("Rescaling from " + startParallelism + " to " + 
targetParallelism + "...");
         final String stateName = "TestValueState";
         final int maxParallelism = startParallelism * targetParallelism;
@@ -133,6 +136,7 @@ public class RocksDBRecoveryTest {
                                             Collections.emptyList())
                                     .setEnableIncrementalCheckpointing(true)
                                     .setUseIngestDbRestoreMode(true)
+                                    .setIOExecutor(ioExecutor)
                                     .build();
 
                     valueStates.add(
@@ -183,7 +187,8 @@ public class RocksDBRecoveryTest {
                             targetParallelism,
                             maxParallelism,
                             startSnapshotResult,
-                            backends);
+                            backends,
+                            ioExecutor);
 
                     backends.forEach(
                             backend ->
@@ -216,7 +221,8 @@ public class RocksDBRecoveryTest {
                             startParallelism,
                             maxParallelism,
                             rescaleSnapshotResult,
-                            backends);
+                            backends,
+                            ioExecutor);
 
                     count = 0;
                     for (RocksDBKeyedStateBackend<Integer> backend : backends) 
{
@@ -243,6 +249,7 @@ public class RocksDBRecoveryTest {
             for (SnapshotResult<KeyedStateHandle> snapshotResult : 
cleanupSnapshotResult) {
                 snapshotResult.discardState();
             }
+            ioExecutor.shutdown();
         }
     }
 
@@ -252,7 +259,8 @@ public class RocksDBRecoveryTest {
             int targetParallelism,
             int maxParallelism,
             List<SnapshotResult<KeyedStateHandle>> snapshotResult,
-            List<RocksDBKeyedStateBackend<Integer>> backendsOut)
+            List<RocksDBKeyedStateBackend<Integer>> backendsOut,
+            ExecutorService ioExecutor)
             throws IOException {
 
         List<KeyedStateHandle> stateHandles =
@@ -290,6 +298,7 @@ public class RocksDBRecoveryTest {
                             .setUseIngestDbRestoreMode(useIngest)
                             
.setIncrementalRestoreAsyncCompactAfterRescale(asyncCompactAfterRescale)
                             .setRescalingUseDeleteFilesInRange(true)
+                            .setIOExecutor(ioExecutor)
                             .build();
 
             long instanceTime = System.currentTimeMillis() - tInstance;


Reply via email to