DeamonDev commented on code in PR #27598:
URL: https://github.com/apache/flink/pull/27598#discussion_r2803174696
##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java:
##########
@@ -55,16 +55,20 @@ public class CommittableCollector<CommT> {
private final SinkCommitterMetricGroup metricGroup;
public CommittableCollector(SinkCommitterMetricGroup metricGroup) {
- this(new TreeMap<>(), metricGroup);
+ this(new TreeMap<>(), metricGroup, true);
}
/** For deep-copy. */
CommittableCollector(
Map<Long, CheckpointCommittableManagerImpl<CommT>>
checkpointCommittables,
- SinkCommitterMetricGroup metricGroup) {
+ SinkCommitterMetricGroup metricGroup,
+ boolean setCurrentPendingCommitablesGauge) {
this.checkpointCommittables = new
TreeMap<>(checkNotNull(checkpointCommittables));
this.metricGroup = metricGroup;
-
this.metricGroup.setCurrentPendingCommittablesGauge(this::getNumPending);
+
+ if (setCurrentPendingCommitablesGauge) {
Review Comment:
I added unit test by simply using `spy` method provided by `Mockito`. I
think its more elegant than writing new wrapper with custom reference counting.
I reset Mockito's reference counters before each test, so it is thread-safe.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]