Myasuka commented on a change in pull request #16582:
URL: https://github.com/apache/flink/pull/16582#discussion_r677193306
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1243,6 +1250,29 @@ private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint)
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
exception);
}
+
+ try {
+
allPreviousCheckpoints.removeAll(completedCheckpointStore.getAllCheckpoints());
+ if (!allPreviousCheckpoints.isEmpty()) {
Review comment:
Actually, current `CheckpointStore#addCheckpoint` has the meaning that
this is an atomic operation which will add the completed checkpoint and
subsumed an older checkpoint. I think return the subsumed checkpoint is clearly
for this operation.
If we adopt the API `long getLatestSubsumedCheckpointID()`, we will miss
some information that when the checkpoint is subsumed and we might also need to
check whether the latest subsumed checkpoint ID has been changed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]