This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8ce679ae8476c8f2dc6d765103cf24a57b653470
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Sep 12 09:29:39 2024 +0200

    [FLINK-25920] Turn CommittableManager#merge functional
    
    Remove the side-effect and create a new (rather cheap) instance of the 
managers.
---
 .../CheckpointCommittableManagerImpl.java          |  5 ++--
 .../CommittableCollectorSerializer.java            | 16 ++---------
 .../committables/SubtaskCommittableManager.java    | 21 ++++++++------
 .../SubtaskCommittableManagerTest.java             | 33 +++++++++++-----------
 4 files changed, 36 insertions(+), 39 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
index 7740a87da42..09a33b72651 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
@@ -147,14 +147,15 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
 
     CheckpointCommittableManagerImpl<CommT> 
merge(CheckpointCommittableManagerImpl<CommT> other) {
         checkArgument(other.checkpointId == checkpointId);
+        CheckpointCommittableManagerImpl<CommT> merged = copy();
         for (Map.Entry<Integer, SubtaskCommittableManager<CommT>> subtaskEntry 
:
                 other.subtasksCommittableManagers.entrySet()) {
-            subtasksCommittableManagers.merge(
+            merged.subtasksCommittableManagers.merge(
                     subtaskEntry.getKey(),
                     subtaskEntry.getValue(),
                     SubtaskCommittableManager::merge);
         }
-        return this;
+        return merged;
     }
 
     CheckpointCommittableManagerImpl<CommT> copy() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
index 0cd59305786..7bb6d769a48 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java
@@ -166,19 +166,9 @@ public final class CommittableCollectorSerializer<CommT>
             for (SubtaskCommittableManager<CommT> subtaskCommittableManager :
                     subtaskCommittableManagers) {
 
-                // check if we already have manager for current
-                // subtaskCommittableManager.getSubtaskId() if yes,
-                // then merge them.
-                SubtaskCommittableManager<CommT> mergedManager =
-                        subtasksCommittableManagers.computeIfPresent(
-                                subtaskId,
-                                (key, manager) -> 
manager.merge(subtaskCommittableManager));
-
-                // This is new subtaskId, lets add the mapping.
-                if (mergedManager == null) {
-                    subtasksCommittableManagers.put(
-                            subtaskCommittableManager.getSubtaskId(), 
subtaskCommittableManager);
-                }
+                // merge in case we already have a manager for that subtaskId
+                subtasksCommittableManagers.merge(
+                        subtaskId, subtaskCommittableManager, 
SubtaskCommittableManager::merge);
             }
 
             return new CheckpointCommittableManagerImpl<>(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
index 381cec977f3..a459930ad68 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
@@ -39,12 +39,12 @@ import static 
org.apache.flink.util.Preconditions.checkState;
 /** Manages the committables coming from one subtask. */
 class SubtaskCommittableManager<CommT> {
     private final Deque<CommitRequestImpl<CommT>> requests;
-    private int numExpectedCommittables;
+    private final int numExpectedCommittables;
     private final long checkpointId;
     private final int subtaskId;
     private int numDrained;
     private int numFailed;
-    private SinkCommitterMetricGroup metricGroup;
+    private final SinkCommitterMetricGroup metricGroup;
 
     SubtaskCommittableManager(
             int numExpectedCommittables,
@@ -186,12 +186,17 @@ class SubtaskCommittableManager<CommT> {
     }
 
     SubtaskCommittableManager<CommT> merge(SubtaskCommittableManager<CommT> 
other) {
-        checkArgument(other.getSubtaskId() == this.getSubtaskId());
-        this.numExpectedCommittables += other.numExpectedCommittables;
-        this.requests.addAll(other.requests);
-        this.numDrained += other.numDrained;
-        this.numFailed += other.numFailed;
-        return this;
+        checkArgument(other.getSubtaskId() == this.getSubtaskId(), "Different 
subtasks.");
+        checkArgument(other.getCheckpointId() == this.getCheckpointId(), 
"Different checkpoints.");
+        return new SubtaskCommittableManager<>(
+                Stream.concat(requests.stream(), other.requests.stream())
+                        .collect(Collectors.toList()),
+                numExpectedCommittables + other.numExpectedCommittables,
+                numDrained + other.numDrained,
+                numFailed + other.numFailed,
+                subtaskId,
+                checkpointId,
+                metricGroup);
     }
 
     SubtaskCommittableManager<CommT> copy() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java
index 0bb4f0d3db0..dd4fdf91d6e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java
@@ -111,21 +111,22 @@ class SubtaskCommittableManagerTest {
                         1,
                         2L,
                         METRIC_GROUP);
-        subtaskCommittableManager.merge(
-                new SubtaskCommittableManager<>(
-                        Arrays.asList(
-                                new CommitRequestImpl<>(2, METRIC_GROUP),
-                                new CommitRequestImpl<>(3, METRIC_GROUP)),
-                        10,
-                        2,
-                        3,
-                        1,
-                        2L,
-                        METRIC_GROUP));
-        
assertThat(subtaskCommittableManager.getNumCommittables()).isEqualTo(11);
-        assertThat(subtaskCommittableManager.getNumDrained()).isEqualTo(3);
-        assertThat(subtaskCommittableManager.isFinished()).isFalse();
-        assertThat(subtaskCommittableManager.getNumFailed()).isEqualTo(5);
-        assertThat(subtaskCommittableManager.getPendingRequests()).hasSize(3);
+        SubtaskCommittableManager<Integer> merged =
+                subtaskCommittableManager.merge(
+                        new SubtaskCommittableManager<>(
+                                Arrays.asList(
+                                        new CommitRequestImpl<>(2, 
METRIC_GROUP),
+                                        new CommitRequestImpl<>(3, 
METRIC_GROUP)),
+                                10,
+                                2,
+                                3,
+                                1,
+                                2L,
+                                METRIC_GROUP));
+        assertThat(merged.getNumCommittables()).isEqualTo(11);
+        assertThat(merged.getNumDrained()).isEqualTo(3);
+        assertThat(merged.isFinished()).isFalse();
+        assertThat(merged.getNumFailed()).isEqualTo(5);
+        assertThat(merged.getPendingRequests()).hasSize(3);
     }
 }

Reply via email to