AHeise commented on code in PR #25353:
URL: https://github.com/apache/flink/pull/25353#discussion_r1771313021
##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java:
##########
@@ -254,4 +249,9 @@ private CheckpointCommittableManagerImpl<CommT>
getCheckpointCommittables(
this.checkpointCommittables.get(committable.getCheckpointIdOrEOI());
return checkNotNull(committables, "Unknown checkpoint for %s",
committable);
}
+
+ /** Removes all metadata about checkpoints of which all committables are
fully committed. */
+ public void cleanFinished() {
Review Comment:
I moved it out for two reasons:
* It was previously cleaned in #getCheckpointCommittablesUpTo as a
side-effect. Side-effects are usually hard to test and require a stronger
mental model. (I was asking myself at some point: Wait, when do we actually
clean up in the map?). We could solve it by actually renaming the method to
drain, however:
* It always cleans up shifted by one checkpoint. So in the happy path, I get
the committables for checkpoint 4 and as a side-effect clean up the
committables of checkpoint 3. That's counter-intuitive.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]