Myasuka commented on a change in pull request #16582:
URL: https://github.com/apache/flink/pull/16582#discussion_r677193306



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1243,6 +1250,29 @@ private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint)
                         CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
                         exception);
             }
+
+            try {
+                
allPreviousCheckpoints.removeAll(completedCheckpointStore.getAllCheckpoints());
+                if (!allPreviousCheckpoints.isEmpty()) {

Review comment:
       Actually, current `CheckpointStore#addCheckpoint` has the meaning that 
this is an atomic operation which will add the completed checkpoint and 
subsumed an older checkpoint. I think return the subsumed checkpoint is clearly 
for this operation.
   If we adopt the API `long getLatestSubsumedCheckpointID()`, we will miss 
some information that when the checkpoint is subsumed and we might also need to 
check whether the latest subsumed checkpoint ID has been changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to