rkhachatryan commented on a change in pull request #16582:
URL: https://github.com/apache/flink/pull/16582#discussion_r678245161
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1243,6 +1250,29 @@ private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint)
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
exception);
}
+
+ try {
+
allPreviousCheckpoints.removeAll(completedCheckpointStore.getAllCheckpoints());
+ if (!allPreviousCheckpoints.isEmpty()) {
Review comment:
> Yes, I agree that the returned subsumed checkpoint might be null
From the (current) name, it's unclear which checkpoint is returned, even if
not null.
> I can't see why we need to add a separate method?
> who will call the original addCheckpoint method?
In my opinion, it would make a more clear API.
`CheckpointCoordinator` will call it (same as now). Let me demonstrate this:
https://github.com/rkhachatryan/flink/commit/10242b7aef2f4f2db94c924f940928fce4ab0fc9#diff-2c7004a2d412c3566de5ff6fb9e6d027742328c2acad60685e47ce4ba9df0810R1286
WDYT?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
##########
@@ -67,6 +75,7 @@ public static void subsume(
}
// Don't break out from the loop to subsume intermediate savepoints
}
+ return lastSubsumedCheckpoint;
Review comment:
WDYT about
[this](https://github.com/rkhachatryan/flink/commit/10242b7aef2f4f2db94c924f940928fce4ab0fc9#diff-1bc8de30a2deacb9128e6bb8c13169b3cb692beddb08e6724f1cf2b550435effR264)
approach?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1220,8 +1224,15 @@ private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint)
Preconditions.checkState(pendingCheckpoint.isDisposed() &&
completedCheckpoint != null);
try {
- completedCheckpointStore.addCheckpoint(
- completedCheckpoint, checkpointsCleaner,
this::scheduleTriggerRequest);
+ CompletedCheckpoint lastSubsumed =
+
completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
+ completedCheckpoint,
+ checkpointsCleaner,
+ this::scheduleTriggerRequest);
+ if (lastSubsumed != null && lastSubsumed.discardOnSubsume()) {
Review comment:
> which checkpoint id should we notify?
I think it should always be the latest (regardless whether it's savepoint or
checkpoint).
> I see your PR of #16575 also consider the savepoint case on TM side, was
that the reason you prefer to send notification of savepoints?
No, I mentioned the reasons above (the main one is generality).
Regarding the PR (#16575), TM needs to distinguish when discarding the state
anyways. This is because it may have non-confirmed savepoints (that might be
aborted); at the same, a newer checkpoint might be subsumed; which would mean
that those older savepoints are subsumed.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1220,8 +1224,15 @@ private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint)
Preconditions.checkState(pendingCheckpoint.isDisposed() &&
completedCheckpoint != null);
try {
- completedCheckpointStore.addCheckpoint(
- completedCheckpoint, checkpointsCleaner,
this::scheduleTriggerRequest);
+ CompletedCheckpoint lastSubsumed =
+
completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
+ completedCheckpoint,
+ checkpointsCleaner,
+ this::scheduleTriggerRequest);
+ if (lastSubsumed != null && lastSubsumed.discardOnSubsume()) {
Review comment:
> which checkpoint id should we notify?
I think it should always be the latest (regardless of whether it's savepoint
or checkpoint).
> I see your PR of #16575 also consider the savepoint case on TM side, was
that the reason you prefer to send notification of savepoints?
No, I mentioned the reasons above (the main one is generality).
Regarding the PR (#16575), TM needs to distinguish when discarding the state
anyways. This is because it may have non-confirmed savepoints (that might be
aborted); at the same, a newer checkpoint might be subsumed; which would mean
that those older savepoints are subsumed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]