rkhachatryan commented on a change in pull request #16582:
URL: https://github.com/apache/flink/pull/16582#discussion_r678245161



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1243,6 +1250,29 @@ private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint)
                         CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
                         exception);
             }
+
+            try {
+                
allPreviousCheckpoints.removeAll(completedCheckpointStore.getAllCheckpoints());
+                if (!allPreviousCheckpoints.isEmpty()) {

Review comment:
       > Yes, I agree that the returned subsumed checkpoint might be null
   
   From the (current) name, it's unclear which checkpoint is returned, even if 
not null.
   
   > I can't see why we need to add a separate method? 
   > who will call the original addCheckpoint method?
   
   In my opinion, it would make a more clear API.
   `CheckpointCoordinator` will call it (same as now). Let me demonstrate this: 
   
https://github.com/rkhachatryan/flink/commit/10242b7aef2f4f2db94c924f940928fce4ab0fc9#diff-2c7004a2d412c3566de5ff6fb9e6d027742328c2acad60685e47ce4ba9df0810R1286
   
   WDYT?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
##########
@@ -67,6 +75,7 @@ public static void subsume(
             }
             // Don't break out from the loop to subsume intermediate savepoints
         }
+        return lastSubsumedCheckpoint;

Review comment:
       WDYT about 
[this](https://github.com/rkhachatryan/flink/commit/10242b7aef2f4f2db94c924f940928fce4ab0fc9#diff-1bc8de30a2deacb9128e6bb8c13169b3cb692beddb08e6724f1cf2b550435effR264)
 approach?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1220,8 +1224,15 @@ private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint)
             Preconditions.checkState(pendingCheckpoint.isDisposed() && 
completedCheckpoint != null);
 
             try {
-                completedCheckpointStore.addCheckpoint(
-                        completedCheckpoint, checkpointsCleaner, 
this::scheduleTriggerRequest);
+                CompletedCheckpoint lastSubsumed =
+                        
completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
+                                completedCheckpoint,
+                                checkpointsCleaner,
+                                this::scheduleTriggerRequest);
+                if (lastSubsumed != null && lastSubsumed.discardOnSubsume()) {

Review comment:
       > which checkpoint id should we notify?
   
   I think it should always be the latest (regardless whether it's savepoint or 
checkpoint).
   
   > I see your PR of #16575 also consider the savepoint case on TM side, was 
that the reason you prefer to send notification of savepoints?
   
   No, I mentioned the reasons above (the main one is generality). 
   
   Regarding the PR (#16575), TM needs to distinguish when discarding the state 
anyways. This is because it may have non-confirmed savepoints (that might be 
aborted); at the same, a newer checkpoint might be subsumed; which would mean 
that those older savepoints are subsumed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1220,8 +1224,15 @@ private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint)
             Preconditions.checkState(pendingCheckpoint.isDisposed() && 
completedCheckpoint != null);
 
             try {
-                completedCheckpointStore.addCheckpoint(
-                        completedCheckpoint, checkpointsCleaner, 
this::scheduleTriggerRequest);
+                CompletedCheckpoint lastSubsumed =
+                        
completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
+                                completedCheckpoint,
+                                checkpointsCleaner,
+                                this::scheduleTriggerRequest);
+                if (lastSubsumed != null && lastSubsumed.discardOnSubsume()) {

Review comment:
       > which checkpoint id should we notify?
   
   I think it should always be the latest (regardless of whether it's savepoint 
or checkpoint).
   
   > I see your PR of #16575 also consider the savepoint case on TM side, was 
that the reason you prefer to send notification of savepoints?
   
   No, I mentioned the reasons above (the main one is generality). 
   
   Regarding the PR (#16575), TM needs to distinguish when discarding the state 
anyways. This is because it may have non-confirmed savepoints (that might be 
aborted); at the same, a newer checkpoint might be subsumed; which would mean 
that those older savepoints are subsumed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to