This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f1e355437eb8ae5a0cdad0caccac26faa5c022c6
Author: Stefan Richter <srich...@confluent.io>
AuthorDate: Mon Jan 15 13:22:41 2024 +0100

    [FLINK-34134] Minor cleanup of unused parameters in RocksDB backend.
---
 .../streaming/state/RocksDBKeyedStateBackendBuilder.java      |  5 -----
 .../state/snapshot/RocksIncrementalSnapshotStrategy.java      |  9 ++++++---
 .../state/snapshot/RocksIncrementalSnapshotStrategyTest.java  | 11 +++++------
 3 files changed, 11 insertions(+), 14 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index f33ce8f0fba..3f67a8b5f85 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -358,10 +358,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
             // init snapshot strategy after db is assured to be initialized
             checkpointStrategy =
                     initializeSavepointAndCheckpointStrategies(
-                            cancelStreamRegistryForBackend,
                             rocksDBResourceGuard,
                             kvStateInformation,
-                            registeredPQStates,
                             keyGroupPrefixBytes,
                             db,
                             backendUID,
@@ -523,10 +521,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
     }
 
     private RocksDBSnapshotStrategyBase<K, ?> 
initializeSavepointAndCheckpointStrategies(
-            CloseableRegistry cancelStreamRegistry,
             ResourceGuard rocksDBResourceGuard,
             LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation,
-            LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> 
registeredPQStates,
             int keyGroupPrefixBytes,
             RocksDB db,
             UUID backendUID,
@@ -547,7 +543,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                             keyGroupRange,
                             keyGroupPrefixBytes,
                             localRecoveryConfig,
-                            cancelStreamRegistry,
                             instanceBasePath,
                             backendUID,
                             materializedSstFiles,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 9eb66a5ee8c..436a0f2ec1c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -97,7 +97,6 @@ public class RocksIncrementalSnapshotStrategy<K>
             @Nonnull KeyGroupRange keyGroupRange,
             @Nonnegative int keyGroupPrefixBytes,
             @Nonnull LocalRecoveryConfig localRecoveryConfig,
-            @Nonnull CloseableRegistry cancelStreamRegistry,
             @Nonnull File instanceBasePath,
             @Nonnull UUID backendUID,
             @Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> 
uploadedStateHandles,
@@ -358,7 +357,9 @@ public class RocksIncrementalSnapshotStrategy<K>
                                 snapshotCloseableRegistry,
                                 tmpResourcesRegistry);
                 uploadedSize +=
-                        sstFilesUploadResult.stream().mapToLong(e -> 
e.getStateSize()).sum();
+                        sstFilesUploadResult.stream()
+                                .mapToLong(HandleAndLocalPath::getStateSize)
+                                .sum();
                 sstFiles.addAll(sstFilesUploadResult);
 
                 List<HandleAndLocalPath> miscFilesUploadResult =
@@ -369,7 +370,9 @@ public class RocksIncrementalSnapshotStrategy<K>
                                 snapshotCloseableRegistry,
                                 tmpResourcesRegistry);
                 uploadedSize +=
-                        miscFilesUploadResult.stream().mapToLong(e -> 
e.getStateSize()).sum();
+                        miscFilesUploadResult.stream()
+                                .mapToLong(HandleAndLocalPath::getStateSize)
+                                .sum();
                 miscFiles.addAll(miscFilesUploadResult);
 
                 synchronized (uploadedSstFiles) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
index c4c399d5ff8..c4cb7058421 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
@@ -70,8 +70,8 @@ class RocksIncrementalSnapshotStrategyTest {
     void testCheckpointIsIncremental() throws Exception {
 
         try (CloseableRegistry closeableRegistry = new CloseableRegistry();
-                RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy =
-                        createSnapshotStrategy(closeableRegistry)) {
+                RocksIncrementalSnapshotStrategy<?> checkpointSnapshotStrategy 
=
+                        createSnapshotStrategy()) {
             FsCheckpointStreamFactory checkpointStreamFactory = 
createFsCheckpointStreamFactory();
 
             // make and notify checkpoint with id 1
@@ -96,8 +96,8 @@ class RocksIncrementalSnapshotStrategyTest {
         }
     }
 
-    public RocksIncrementalSnapshotStrategy createSnapshotStrategy(
-            CloseableRegistry closeableRegistry) throws IOException, 
RocksDBException {
+    public RocksIncrementalSnapshotStrategy<?> createSnapshotStrategy()
+            throws IOException, RocksDBException {
 
         ColumnFamilyHandle columnFamilyHandle = 
rocksDBExtension.createNewColumnFamily("test");
         RocksDB rocksDB = rocksDBExtension.getRocksDB();
@@ -138,7 +138,6 @@ class RocksIncrementalSnapshotStrategyTest {
                 new KeyGroupRange(0, 1),
                 keyGroupPrefixBytes,
                 TestLocalRecoveryConfig.disabled(),
-                closeableRegistry,
                 TempDirUtils.newFolder(tmp),
                 UUID.randomUUID(),
                 materializedSstFiles,
@@ -160,7 +159,7 @@ class RocksIncrementalSnapshotStrategyTest {
 
     public IncrementalRemoteKeyedStateHandle snapshot(
             long checkpointId,
-            RocksIncrementalSnapshotStrategy checkpointSnapshotStrategy,
+            RocksIncrementalSnapshotStrategy<?> checkpointSnapshotStrategy,
             FsCheckpointStreamFactory checkpointStreamFactory,
             CloseableRegistry closeableRegistry)
             throws Exception {

Reply via email to