This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a0ace618ad1df15c5d906d7d0ecc0f9d7f3ea685
Author: Stefan Richter <[email protected]>
AuthorDate: Thu Mar 7 17:02:13 2024 +0100

    [FLINK-35580] Register async compact task with the backend's lifecycle 
instead of the restore lifecycle.
---
 .../state/RocksDBKeyedStateBackendBuilder.java     | 13 ++++++----
 .../RocksDBIncrementalRestoreOperation.java        | 30 +++++++++++++++++-----
 2 files changed, 31 insertions(+), 12 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 88b44462146..7b312d759ad 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -330,7 +330,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
         RocksDBWriteBatchWrapper writeBatchWrapper = null;
         ColumnFamilyHandle defaultColumnFamilyHandle = null;
         RocksDBNativeMetricMonitor nativeMetricMonitor = null;
-        CloseableRegistry cancelStreamRegistryForBackend = new 
CloseableRegistry();
+        CloseableRegistry cancelRegistryForBackend = new CloseableRegistry();
         LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation =
                 new LinkedHashMap<>();
         LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> 
registeredPQStates =
@@ -375,6 +375,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                 keyGroupPrefixBytes,
                                 rocksDBResourceGuard,
                                 cancelStreamRegistry,
+                                cancelRegistryForBackend,
                                 kvStateInformation,
                                 registeredPQStates,
                                 ttlCompactFiltersManager);
@@ -435,7 +436,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
             // Do clean up
             List<ColumnFamilyOptions> columnFamilyOptions =
                     new ArrayList<>(kvStateInformation.values().size());
-            IOUtils.closeQuietly(cancelStreamRegistryForBackend);
+            IOUtils.closeQuietly(cancelRegistryForBackend);
             IOUtils.closeQuietly(writeBatchWrapper);
             IOUtils.closeQuietly(rocksDBResourceGuard);
             RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(
@@ -487,7 +488,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                 kvStateInformation,
                 registeredPQStates,
                 keyGroupPrefixBytes,
-                cancelStreamRegistryForBackend,
+                cancelRegistryForBackend,
                 this.keyGroupCompressionDecorator,
                 rocksDBResourceGuard,
                 checkpointStrategy,
@@ -506,7 +507,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
     private RocksDBRestoreOperation getRocksDBRestoreOperation(
             int keyGroupPrefixBytes,
             ResourceGuard rocksDBResourceGuard,
-            CloseableRegistry cancelStreamRegistry,
+            CloseableRegistry cancelStreamRegistryForRestore,
+            CloseableRegistry cancelRegistryForBackend,
             LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation,
             LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> 
registeredPQStates,
             RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
@@ -530,7 +532,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                     keyGroupPrefixBytes,
                     numberOfTransferingThreads,
                     rocksDBResourceGuard,
-                    cancelStreamRegistry,
+                    cancelStreamRegistryForRestore,
+                    cancelRegistryForBackend,
                     userCodeClassLoader,
                     kvStateInformation,
                     keySerializerProvider,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 9aae10f2b5e..5c72e78766b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -113,7 +113,19 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
     private final SortedMap<Long, Collection<HandleAndLocalPath>> 
restoredSstFiles;
     private final RocksDBHandle rocksHandle;
     private final Collection<IncrementalKeyedStateHandle> restoreStateHandles;
-    private final CloseableRegistry cancelStreamRegistry;
+
+    /**
+     * This registry will be closed after restore and should only contain 
Closeables that are closed
+     * by the end of the restore operation.
+     */
+    private final CloseableRegistry cancelStreamRegistryForRestore;
+
+    /**
+     * This registry will only be closed when the created backend is closed 
and should be used for
+     * all Closeables that are closed at some later point after restore.
+     */
+    private final CloseableRegistry cancelRegistryForBackend;
+
     private final KeyGroupRange keyGroupRange;
     private final File instanceBasePath;
     private final int numberOfTransferringThreads;
@@ -144,7 +156,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             int keyGroupPrefixBytes,
             int numberOfTransferringThreads,
             ResourceGuard dbResourceGuard,
-            CloseableRegistry cancelStreamRegistry,
+            CloseableRegistry cancelStreamRegistryForRestore,
+            CloseableRegistry cancelRegistryForBackend,
             ClassLoader userCodeClassLoader,
             Map<String, RocksDbKvStateInfo> kvStateInformation,
             StateSerializerProvider<K> keySerializerProvider,
@@ -183,7 +196,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         this.customInitializationMetrics = customInitializationMetrics;
         this.restoreStateHandles = restoreStateHandles;
         this.dbResourceGuard = dbResourceGuard;
-        this.cancelStreamRegistry = cancelStreamRegistry;
+        this.cancelStreamRegistryForRestore = cancelStreamRegistryForRestore;
+        this.cancelRegistryForBackend = cancelRegistryForBackend;
         this.keyGroupRange = keyGroupRange;
         this.instanceBasePath = instanceBasePath;
         this.numberOfTransferringThreads = numberOfTransferringThreads;
@@ -279,7 +293,9 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                                 keyGroupPrefixBytes,
                                 keyGroupRange,
                                 dbResourceGuard,
-                                cancelStreamRegistry);
+                                // This task will be owned by the backend's 
lifecycle because it
+                                // continues to exist after restore is 
completed.
+                                cancelRegistryForBackend);
                 runAndReportDuration(asyncRangeCompactionTask, 
RESTORE_ASYNC_COMPACTION_DURATION);
                 logger.info(
                         "Completed async compaction after restore for backend 
{} in operator {} after {} ms.",
@@ -742,7 +758,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                         RocksDBStateDataTransferHelper.forThreadNumIfSpecified(
                                 numberOfTransferringThreads, ioExecutor))) {
             rocksDBStateDownloader.transferAllStateDataToDirectory(
-                    downloadSpecs, cancelStreamRegistry);
+                    downloadSpecs, cancelStreamRegistryForRestore);
             logger.info(
                     "Finished downloading remote state to local directory in 
operator {} for target key-group range {}.",
                     operatorIdentifier,
@@ -999,11 +1015,11 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
         try {
             inputStream = metaStateHandle.openInputStream();
-            cancelStreamRegistry.registerCloseable(inputStream);
+            cancelStreamRegistryForRestore.registerCloseable(inputStream);
             DataInputView in = new DataInputViewStreamWrapper(inputStream);
             return readMetaData(in);
         } finally {
-            if (cancelStreamRegistry.unregisterCloseable(inputStream)) {
+            if 
(cancelStreamRegistryForRestore.unregisterCloseable(inputStream)) {
                 inputStream.close();
             }
         }

Reply via email to