rionmonster commented on code in PR #27598:
URL: https://github.com/apache/flink/pull/27598#discussion_r2800332655


##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java:
##########
@@ -55,16 +55,20 @@ public class CommittableCollector<CommT> {
     private final SinkCommitterMetricGroup metricGroup;
 
     public CommittableCollector(SinkCommitterMetricGroup metricGroup) {
-        this(new TreeMap<>(), metricGroup);
+        this(new TreeMap<>(), metricGroup, true);
     }
 
     /** For deep-copy. */
     CommittableCollector(
             Map<Long, CheckpointCommittableManagerImpl<CommT>> 
checkpointCommittables,
-            SinkCommitterMetricGroup metricGroup) {
+            SinkCommitterMetricGroup metricGroup,
+            boolean setCurrentPendingCommitablesGauge) {
         this.checkpointCommittables = new 
TreeMap<>(checkNotNull(checkpointCommittables));
         this.metricGroup = metricGroup;
-        
this.metricGroup.setCurrentPendingCommittablesGauge(this::getNumPending);
+
+        if (setCurrentPendingCommitablesGauge) {

Review Comment:
   I think we'll want add a unit test in `CommittableCollectorTest.java` to 
verify that `copy()` does not re-register metrics as expected. A simple metric 
group stub that counts gauge registrations with before/after assertions should 
be enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to