This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8ce679ae8476c8f2dc6d765103cf24a57b653470 Author: Arvid Heise <[email protected]> AuthorDate: Thu Sep 12 09:29:39 2024 +0200 [FLINK-25920] Turn CommittableManager#merge functional Remove the side-effect and create a new (rather cheap) instance of the managers. --- .../CheckpointCommittableManagerImpl.java | 5 ++-- .../CommittableCollectorSerializer.java | 16 ++--------- .../committables/SubtaskCommittableManager.java | 21 ++++++++------ .../SubtaskCommittableManagerTest.java | 33 +++++++++++----------- 4 files changed, 36 insertions(+), 39 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index 7740a87da42..09a33b72651 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -147,14 +147,15 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa CheckpointCommittableManagerImpl<CommT> merge(CheckpointCommittableManagerImpl<CommT> other) { checkArgument(other.checkpointId == checkpointId); + CheckpointCommittableManagerImpl<CommT> merged = copy(); for (Map.Entry<Integer, SubtaskCommittableManager<CommT>> subtaskEntry : other.subtasksCommittableManagers.entrySet()) { - subtasksCommittableManagers.merge( + merged.subtasksCommittableManagers.merge( subtaskEntry.getKey(), subtaskEntry.getValue(), SubtaskCommittableManager::merge); } - return this; + return merged; } CheckpointCommittableManagerImpl<CommT> copy() { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java index 0cd59305786..7bb6d769a48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java @@ -166,19 +166,9 @@ public final class CommittableCollectorSerializer<CommT> for (SubtaskCommittableManager<CommT> subtaskCommittableManager : subtaskCommittableManagers) { - // check if we already have manager for current - // subtaskCommittableManager.getSubtaskId() if yes, - // then merge them. - SubtaskCommittableManager<CommT> mergedManager = - subtasksCommittableManagers.computeIfPresent( - subtaskId, - (key, manager) -> manager.merge(subtaskCommittableManager)); - - // This is new subtaskId, lets add the mapping. - if (mergedManager == null) { - subtasksCommittableManagers.put( - subtaskCommittableManager.getSubtaskId(), subtaskCommittableManager); - } + // merge in case we already have a manager for that subtaskId + subtasksCommittableManagers.merge( + subtaskId, subtaskCommittableManager, SubtaskCommittableManager::merge); } return new CheckpointCommittableManagerImpl<>( diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java index 381cec977f3..a459930ad68 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java @@ -39,12 +39,12 @@ import static org.apache.flink.util.Preconditions.checkState; /** Manages the committables coming from one subtask. */ class SubtaskCommittableManager<CommT> { private final Deque<CommitRequestImpl<CommT>> requests; - private int numExpectedCommittables; + private final int numExpectedCommittables; private final long checkpointId; private final int subtaskId; private int numDrained; private int numFailed; - private SinkCommitterMetricGroup metricGroup; + private final SinkCommitterMetricGroup metricGroup; SubtaskCommittableManager( int numExpectedCommittables, @@ -186,12 +186,17 @@ class SubtaskCommittableManager<CommT> { } SubtaskCommittableManager<CommT> merge(SubtaskCommittableManager<CommT> other) { - checkArgument(other.getSubtaskId() == this.getSubtaskId()); - this.numExpectedCommittables += other.numExpectedCommittables; - this.requests.addAll(other.requests); - this.numDrained += other.numDrained; - this.numFailed += other.numFailed; - return this; + checkArgument(other.getSubtaskId() == this.getSubtaskId(), "Different subtasks."); + checkArgument(other.getCheckpointId() == this.getCheckpointId(), "Different checkpoints."); + return new SubtaskCommittableManager<>( + Stream.concat(requests.stream(), other.requests.stream()) + .collect(Collectors.toList()), + numExpectedCommittables + other.numExpectedCommittables, + numDrained + other.numDrained, + numFailed + other.numFailed, + subtaskId, + checkpointId, + metricGroup); } SubtaskCommittableManager<CommT> copy() { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java index 0bb4f0d3db0..dd4fdf91d6e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManagerTest.java @@ -111,21 +111,22 @@ class SubtaskCommittableManagerTest { 1, 2L, METRIC_GROUP); - subtaskCommittableManager.merge( - new SubtaskCommittableManager<>( - Arrays.asList( - new CommitRequestImpl<>(2, METRIC_GROUP), - new CommitRequestImpl<>(3, METRIC_GROUP)), - 10, - 2, - 3, - 1, - 2L, - METRIC_GROUP)); - assertThat(subtaskCommittableManager.getNumCommittables()).isEqualTo(11); - assertThat(subtaskCommittableManager.getNumDrained()).isEqualTo(3); - assertThat(subtaskCommittableManager.isFinished()).isFalse(); - assertThat(subtaskCommittableManager.getNumFailed()).isEqualTo(5); - assertThat(subtaskCommittableManager.getPendingRequests()).hasSize(3); + SubtaskCommittableManager<Integer> merged = + subtaskCommittableManager.merge( + new SubtaskCommittableManager<>( + Arrays.asList( + new CommitRequestImpl<>(2, METRIC_GROUP), + new CommitRequestImpl<>(3, METRIC_GROUP)), + 10, + 2, + 3, + 1, + 2L, + METRIC_GROUP)); + assertThat(merged.getNumCommittables()).isEqualTo(11); + assertThat(merged.getNumDrained()).isEqualTo(3); + assertThat(merged.isFinished()).isFalse(); + assertThat(merged.getNumFailed()).isEqualTo(5); + assertThat(merged.getPendingRequests()).hasSize(3); } }
