AHeise commented on code in PR #25353:
URL: https://github.com/apache/flink/pull/25353#discussion_r1771313021


##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java:
##########
@@ -254,4 +249,9 @@ private CheckpointCommittableManagerImpl<CommT> 
getCheckpointCommittables(
                 
this.checkpointCommittables.get(committable.getCheckpointIdOrEOI());
         return checkNotNull(committables, "Unknown checkpoint for %s", 
committable);
     }
+
+    /** Removes all metadata about checkpoints of which all committables are 
fully committed. */
+    public void cleanFinished() {

Review Comment:
   I moved it out for two reasons:
   * It was previously cleaned in #getCheckpointCommittablesUpTo as a 
side-effect. Side-effects are usually hard to test and require a stronger 
mental model. (I was asking myself at some point: What when do we actually 
clean up in the map?). We could solve it by actually renaming the method to 
drain, however:
   * It always cleans up shifted by one checkpoint. So in the happy path, I get 
the committables for checkpoint 4 and as a side-effect clean up the 
committables of checkpoint 3. That's counter-intuitive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to