rkhachatryan commented on a change in pull request #16582:
URL: https://github.com/apache/flink/pull/16582#discussion_r677217464
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1243,6 +1250,29 @@ private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint)
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
exception);
}
+
+ try {
+
allPreviousCheckpoints.removeAll(completedCheckpointStore.getAllCheckpoints());
+ if (!allPreviousCheckpoints.isEmpty()) {
Review comment:
> Actually, current CheckpointStore#addCheckpoint has the meaning that
this is an atomic operation which will add the completed checkpoint and
subsumed an older checkpoint. I think return the subsumed checkpoint is clearly
for this operation.
To me, it's only clear that it might subsume some older checkpoints. It is
unclear whether returned is the first subsumed, the latest subsumed, or the
added one.
> If we adopt the API long getLatestSubsumedCheckpointID()`, we will miss
some information that when the checkpoint is subsumed and we might also need to
check whether the latest subsumed checkpoint ID has been changed.
I think there is no requirement for the CheckpointCoordinator to report
strictly latest ID (it will be outdated right after the RPC). It must only be
non-decreasing. If it somehow becomes greater than the added one we could use
the smallest from them.
And currently Store is used from the same thread, so calling `addCheckpoint`
and then `getLatestSubsumed` is safe (I don't think this concurrency model
should be changed).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]