This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f1e355437eb8ae5a0cdad0caccac26faa5c022c6 Author: Stefan Richter <srich...@confluent.io> AuthorDate: Mon Jan 15 13:22:41 2024 +0100 [FLINK-34134] Minor cleanup of unused parameters in RocksDB backend. --- .../streaming/state/RocksDBKeyedStateBackendBuilder.java | 5 ----- .../state/snapshot/RocksIncrementalSnapshotStrategy.java | 9 ++++++--- .../state/snapshot/RocksIncrementalSnapshotStrategyTest.java | 11 +++++------ 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index f33ce8f0fba..3f67a8b5f85 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -358,10 +358,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken // init snapshot strategy after db is assured to be initialized checkpointStrategy = initializeSavepointAndCheckpointStrategies( - cancelStreamRegistryForBackend, rocksDBResourceGuard, kvStateInformation, - registeredPQStates, keyGroupPrefixBytes, db, backendUID, @@ -523,10 +521,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken } private RocksDBSnapshotStrategyBase<K, ?> initializeSavepointAndCheckpointStrategies( - CloseableRegistry cancelStreamRegistry, ResourceGuard rocksDBResourceGuard, LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, - LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, int keyGroupPrefixBytes, RocksDB db, UUID backendUID, @@ -547,7 +543,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken keyGroupRange, keyGroupPrefixBytes, localRecoveryConfig, - cancelStreamRegistry, instanceBasePath, backendUID, materializedSstFiles, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java index 9eb66a5ee8c..436a0f2ec1c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java @@ -97,7 +97,6 @@ public class RocksIncrementalSnapshotStrategy<K> @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int keyGroupPrefixBytes, @Nonnull LocalRecoveryConfig localRecoveryConfig, - @Nonnull CloseableRegistry cancelStreamRegistry, @Nonnull File instanceBasePath, @Nonnull UUID backendUID, @Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> uploadedStateHandles, @@ -358,7 +357,9 @@ public class RocksIncrementalSnapshotStrategy<K> snapshotCloseableRegistry, tmpResourcesRegistry); uploadedSize += - sstFilesUploadResult.stream().mapToLong(e -> e.getStateSize()).sum(); + sstFilesUploadResult.stream() + .mapToLong(HandleAndLocalPath::getStateSize) + .sum(); sstFiles.addAll(sstFilesUploadResult); List<HandleAndLocalPath> miscFilesUploadResult = @@ -369,7 +370,9 @@ public class RocksIncrementalSnapshotStrategy<K> snapshotCloseableRegistry, tmpResourcesRegistry); uploadedSize += - miscFilesUploadResult.stream().mapToLong(e -> e.getStateSize()).sum(); + miscFilesUploadResult.stream() + .mapToLong(HandleAndLocalPath::getStateSize) + .sum(); miscFiles.addAll(miscFilesUploadResult); synchronized (uploadedSstFiles) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java index c4c399d5ff8..c4cb7058421 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java @@ -70,8 +70,8 @@ class RocksIncrementalSnapshotStrategyTest { void testCheckpointIsIncremental() throws Exception { try (CloseableRegistry closeableRegistry = new CloseableRegistry(); - RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy = - createSnapshotStrategy(closeableRegistry)) { + RocksIncrementalSnapshotStrategy<?> checkpointSnapshotStrategy = + createSnapshotStrategy()) { FsCheckpointStreamFactory checkpointStreamFactory = createFsCheckpointStreamFactory(); // make and notify checkpoint with id 1 @@ -96,8 +96,8 @@ class RocksIncrementalSnapshotStrategyTest { } } - public RocksIncrementalSnapshotStrategy createSnapshotStrategy( - CloseableRegistry closeableRegistry) throws IOException, RocksDBException { + public RocksIncrementalSnapshotStrategy<?> createSnapshotStrategy() + throws IOException, RocksDBException { ColumnFamilyHandle columnFamilyHandle = rocksDBExtension.createNewColumnFamily("test"); RocksDB rocksDB = rocksDBExtension.getRocksDB(); @@ -138,7 +138,6 @@ class RocksIncrementalSnapshotStrategyTest { new KeyGroupRange(0, 1), keyGroupPrefixBytes, TestLocalRecoveryConfig.disabled(), - closeableRegistry, TempDirUtils.newFolder(tmp), UUID.randomUUID(), materializedSstFiles, @@ -160,7 +159,7 @@ class RocksIncrementalSnapshotStrategyTest { public IncrementalRemoteKeyedStateHandle snapshot( long checkpointId, - RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy, + RocksIncrementalSnapshotStrategy<?> checkpointSnapshotStrategy, FsCheckpointStreamFactory checkpointStreamFactory, CloseableRegistry closeableRegistry) throws Exception {