pnowojski commented on a change in pull request #18845:
URL: https://github.com/apache/flink/pull/18845#discussion_r811081842
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1255,6 +1255,21 @@ private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint)
} else {
lastSubsumed = null;
}
+
+ PendingCheckpointStats statsCallback =
getStatsCallback(pendingCheckpoint);
+ if (statsCallback != null) {
+ LOG.trace(
+ "Checkpoint {} size: {}Kb, duration: {}ms",
+ checkpointId,
+ statsCallback.getStateSize() == 0 ? 0 :
statsCallback.getStateSize() / 1024,
+ statsCallback.getEndToEndDuration());
+ // Finalize the statsCallback and give the completed
checkpoint a
+ // callback for discards.
+ CompletedCheckpointStats.DiscardCallback discardCallback =
+ statsCallback.reportCompletedCheckpoint(
+ completedCheckpoint.getExternalPointer());
+ completedCheckpoint.setDiscardCallback(discardCallback);
+ }
Review comment:
Can you extract some of the code from this method to shrink it and
increase readability? Maybe extract this new code to a new method and the lines
L1264:L1282 to another new method (`cleanupAfterCompletedCheckpoint`?)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1385,6 +1385,7 @@ private CompletedCheckpoint
addCompletedCheckpointToStoreAndSubsumeOldest(
checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint,
executor);
}
+ reportFailedCheckpoint(completedCheckpoint, exception);
Review comment:
I think it's not perfect that we are reporting failures and successful
completions in two very different places. Can not we bring both reporting
closer together? For example add the successful completion reporting to the
`addCompletedCheckpointToStoreAndSubsumeOldest` method as well?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2083,6 +2094,12 @@ public void testTriggerAndConfirmSimpleSavepoint()
throws Exception {
assertEquals(pending.getCheckpointId(), success.getCheckpointID());
assertEquals(2, success.getOperatorStates().size());
+ AbstractCheckpointStats actualStats =
+
statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId);
+
+ assertEquals(checkpointId, actualStats.getCheckpointId());
+ assertEquals(CheckpointStatsStatus.COMPLETED, actualStats.getStatus());
+
Review comment:
Is this test adding actual regression test coverage for the FLINK-25958
bug? Is it failing one the current master?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
##########
@@ -327,6 +331,9 @@ public static boolean checkpointsMatch(
*/
void setDiscardCallback(@Nullable CompletedCheckpointStats.DiscardCallback
discardCallback) {
this.discardCallback = discardCallback;
+ if (discardCallback != null && isDiscarded) {
+ discardCallback.notifyDiscardedCheckpoint();
+ }
Review comment:
Why do we need this change?
1. Making the field `volatile` sounds like something is fishy with the
threading model. If I remember correctly those classes are supposed to be non
thread safe used either in a single thread, or protected by the
`CheckpointCoordinator`s lock.
2. triggering the callback in a setter method is also a bit strange
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]