This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a0ace618ad1df15c5d906d7d0ecc0f9d7f3ea685 Author: Stefan Richter <[email protected]> AuthorDate: Thu Mar 7 17:02:13 2024 +0100 [FLINK-35580] Register async compact task with the backend's lifecycle instead of the restore lifecycle. --- .../state/RocksDBKeyedStateBackendBuilder.java | 13 ++++++---- .../RocksDBIncrementalRestoreOperation.java | 30 +++++++++++++++++----- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 88b44462146..7b312d759ad 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -330,7 +330,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken RocksDBWriteBatchWrapper writeBatchWrapper = null; ColumnFamilyHandle defaultColumnFamilyHandle = null; RocksDBNativeMetricMonitor nativeMetricMonitor = null; - CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry(); + CloseableRegistry cancelRegistryForBackend = new CloseableRegistry(); LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>(); LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates = @@ -375,6 +375,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken keyGroupPrefixBytes, rocksDBResourceGuard, cancelStreamRegistry, + cancelRegistryForBackend, kvStateInformation, registeredPQStates, ttlCompactFiltersManager); @@ -435,7 +436,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken // Do clean up List<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<>(kvStateInformation.values().size()); - IOUtils.closeQuietly(cancelStreamRegistryForBackend); + IOUtils.closeQuietly(cancelRegistryForBackend); IOUtils.closeQuietly(writeBatchWrapper); IOUtils.closeQuietly(rocksDBResourceGuard); RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater( @@ -487,7 +488,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken kvStateInformation, registeredPQStates, keyGroupPrefixBytes, - cancelStreamRegistryForBackend, + cancelRegistryForBackend, this.keyGroupCompressionDecorator, rocksDBResourceGuard, checkpointStrategy, @@ -506,7 +507,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken private RocksDBRestoreOperation getRocksDBRestoreOperation( int keyGroupPrefixBytes, ResourceGuard rocksDBResourceGuard, - CloseableRegistry cancelStreamRegistry, + CloseableRegistry cancelStreamRegistryForRestore, + CloseableRegistry cancelRegistryForBackend, LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) { @@ -530,7 +532,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken keyGroupPrefixBytes, numberOfTransferingThreads, rocksDBResourceGuard, - cancelStreamRegistry, + cancelStreamRegistryForRestore, + cancelRegistryForBackend, userCodeClassLoader, kvStateInformation, keySerializerProvider, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index 9aae10f2b5e..5c72e78766b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -113,7 +113,19 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper private final SortedMap<Long, Collection<HandleAndLocalPath>> restoredSstFiles; private final RocksDBHandle rocksHandle; private final Collection<IncrementalKeyedStateHandle> restoreStateHandles; - private final CloseableRegistry cancelStreamRegistry; + + /** + * This registry will be closed after restore and should only contain Closeables that are closed + * by the end of the restore operation. + */ + private final CloseableRegistry cancelStreamRegistryForRestore; + + /** + * This registry will only be closed when the created backend is closed and should be used for + * all Closeables that are closed at some later point after restore. + */ + private final CloseableRegistry cancelRegistryForBackend; + private final KeyGroupRange keyGroupRange; private final File instanceBasePath; private final int numberOfTransferringThreads; @@ -144,7 +156,8 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper int keyGroupPrefixBytes, int numberOfTransferringThreads, ResourceGuard dbResourceGuard, - CloseableRegistry cancelStreamRegistry, + CloseableRegistry cancelStreamRegistryForRestore, + CloseableRegistry cancelRegistryForBackend, ClassLoader userCodeClassLoader, Map<String, RocksDbKvStateInfo> kvStateInformation, StateSerializerProvider<K> keySerializerProvider, @@ -183,7 +196,8 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper this.customInitializationMetrics = customInitializationMetrics; this.restoreStateHandles = restoreStateHandles; this.dbResourceGuard = dbResourceGuard; - this.cancelStreamRegistry = cancelStreamRegistry; + this.cancelStreamRegistryForRestore = cancelStreamRegistryForRestore; + this.cancelRegistryForBackend = cancelRegistryForBackend; this.keyGroupRange = keyGroupRange; this.instanceBasePath = instanceBasePath; this.numberOfTransferringThreads = numberOfTransferringThreads; @@ -279,7 +293,9 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper keyGroupPrefixBytes, keyGroupRange, dbResourceGuard, - cancelStreamRegistry); + // This task will be owned by the backend's lifecycle because it + // continues to exist after restore is completed. + cancelRegistryForBackend); runAndReportDuration(asyncRangeCompactionTask, RESTORE_ASYNC_COMPACTION_DURATION); logger.info( "Completed async compaction after restore for backend {} in operator {} after {} ms.", @@ -742,7 +758,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper RocksDBStateDataTransferHelper.forThreadNumIfSpecified( numberOfTransferringThreads, ioExecutor))) { rocksDBStateDownloader.transferAllStateDataToDirectory( - downloadSpecs, cancelStreamRegistry); + downloadSpecs, cancelStreamRegistryForRestore); logger.info( "Finished downloading remote state to local directory in operator {} for target key-group range {}.", operatorIdentifier, @@ -999,11 +1015,11 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper try { inputStream = metaStateHandle.openInputStream(); - cancelStreamRegistry.registerCloseable(inputStream); + cancelStreamRegistryForRestore.registerCloseable(inputStream); DataInputView in = new DataInputViewStreamWrapper(inputStream); return readMetaData(in); } finally { - if (cancelStreamRegistry.unregisterCloseable(inputStream)) { + if (cancelStreamRegistryForRestore.unregisterCloseable(inputStream)) { inputStream.close(); } }
