This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5222a25540db7f6f6b7292ffea3086f4b7986e04
Author: Arvid Heise <[email protected]>
AuthorDate: Wed Oct 23 17:38:00 2024 +0200

    [FLINK-36455] Fix PendingCommittable metric in sink
    
    We can only set the gauge once.
    
    (cherry picked from commit 21c344ccf8c622ccbfc4eada3c65f034fe9d4f25)
---
 .../committables/CheckpointCommittableManagerImpl.java     | 14 +++++++-------
 .../operators/sink/committables/CommittableCollector.java  |  7 +++++++
 2 files changed, 14 insertions(+), 7 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
index 00c3b7f65d4..da4491cda61 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
@@ -30,12 +30,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -142,13 +143,13 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
     @Override
     public void commit(Committer<CommT> committer, int maxRetries)
             throws IOException, InterruptedException {
-        Collection<CommitRequestImpl<CommT>> requests = getPendingRequests();
+        Collection<CommitRequestImpl<CommT>> requests =
+                getPendingRequests().collect(Collectors.toList());
         for (int retry = 0; !requests.isEmpty() && retry <= maxRetries; 
retry++) {
             requests.forEach(CommitRequestImpl::setSelected);
-            committer.commit(new ArrayList<>(requests));
+            committer.commit(Collections.unmodifiableCollection(requests));
             requests.forEach(CommitRequestImpl::setCommittedIfNoError);
             requests = requests.stream().filter(r -> 
!r.isFinished()).collect(Collectors.toList());
-            metricGroup.setCurrentPendingCommittablesGauge(requests::size);
         }
         if (!requests.isEmpty()) {
             throw new IOException(
@@ -165,11 +166,10 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
                 .collect(Collectors.toList());
     }
 
-    Collection<CommitRequestImpl<CommT>> getPendingRequests() {
+    Stream<CommitRequestImpl<CommT>> getPendingRequests() {
         return subtasksCommittableManagers.values().stream()
                 .peek(this::assertReceivedAll)
-                .flatMap(SubtaskCommittableManager::getPendingRequests)
-                .collect(Collectors.toList());
+                .flatMap(SubtaskCommittableManager::getPendingRequests);
     }
 
     /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
index 098de7f186e..4e49d73279e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
@@ -66,6 +66,13 @@ public class CommittableCollector<CommT> {
             SinkCommitterMetricGroup metricGroup) {
         this.checkpointCommittables = new 
TreeMap<>(checkNotNull(checkpointCommittables));
         this.metricGroup = metricGroup;
+        
this.metricGroup.setCurrentPendingCommittablesGauge(this::getNumPending);
+    }
+
+    private int getNumPending() {
+        return checkpointCommittables.values().stream()
+                .mapToInt(m -> (int) m.getPendingRequests().count())
+                .sum();
     }
 
     /**

Reply via email to