This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5222a25540db7f6f6b7292ffea3086f4b7986e04 Author: Arvid Heise <[email protected]> AuthorDate: Wed Oct 23 17:38:00 2024 +0200 [FLINK-36455] Fix PendingCommittable metric in sink We can only set the gauge once. (cherry picked from commit 21c344ccf8c622ccbfc4eada3c65f034fe9d4f25) --- .../committables/CheckpointCommittableManagerImpl.java | 14 +++++++------- .../operators/sink/committables/CommittableCollector.java | 7 +++++++ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index 00c3b7f65d4..da4491cda61 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -30,12 +30,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -142,13 +143,13 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa @Override public void commit(Committer<CommT> committer, int maxRetries) throws IOException, InterruptedException { - Collection<CommitRequestImpl<CommT>> requests = getPendingRequests(); + Collection<CommitRequestImpl<CommT>> requests = + getPendingRequests().collect(Collectors.toList()); for (int retry = 0; !requests.isEmpty() && retry <= maxRetries; retry++) { requests.forEach(CommitRequestImpl::setSelected); - committer.commit(new ArrayList<>(requests)); + committer.commit(Collections.unmodifiableCollection(requests)); requests.forEach(CommitRequestImpl::setCommittedIfNoError); requests = requests.stream().filter(r -> !r.isFinished()).collect(Collectors.toList()); - metricGroup.setCurrentPendingCommittablesGauge(requests::size); } if (!requests.isEmpty()) { throw new IOException( @@ -165,11 +166,10 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa .collect(Collectors.toList()); } - Collection<CommitRequestImpl<CommT>> getPendingRequests() { + Stream<CommitRequestImpl<CommT>> getPendingRequests() { return subtasksCommittableManagers.values().stream() .peek(this::assertReceivedAll) - .flatMap(SubtaskCommittableManager::getPendingRequests) - .collect(Collectors.toList()); + .flatMap(SubtaskCommittableManager::getPendingRequests); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java index 098de7f186e..4e49d73279e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java @@ -66,6 +66,13 @@ public class CommittableCollector<CommT> { SinkCommitterMetricGroup metricGroup) { this.checkpointCommittables = new TreeMap<>(checkNotNull(checkpointCommittables)); this.metricGroup = metricGroup; + this.metricGroup.setCurrentPendingCommittablesGauge(this::getNumPending); + } + + private int getNumPending() { + return checkpointCommittables.values().stream() + .mapToInt(m -> (int) m.getPendingRequests().count()) + .sum(); } /**
