pnowojski commented on a change in pull request #18845:
URL: https://github.com/apache/flink/pull/18845#discussion_r811081842



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1255,6 +1255,21 @@ private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint)
             } else {
                 lastSubsumed = null;
             }
+
+            PendingCheckpointStats statsCallback = 
getStatsCallback(pendingCheckpoint);
+            if (statsCallback != null) {
+                LOG.trace(
+                        "Checkpoint {} size: {}Kb, duration: {}ms",
+                        checkpointId,
+                        statsCallback.getStateSize() == 0 ? 0 : 
statsCallback.getStateSize() / 1024,
+                        statsCallback.getEndToEndDuration());
+                // Finalize the statsCallback and give the completed 
checkpoint a
+                // callback for discards.
+                CompletedCheckpointStats.DiscardCallback discardCallback =
+                        statsCallback.reportCompletedCheckpoint(
+                                completedCheckpoint.getExternalPointer());
+                completedCheckpoint.setDiscardCallback(discardCallback);
+            }

Review comment:
       Can you extract some of the code from this method to shrink it and 
increase readability? Maybe extract this new code to a new method and the lines 
L1264:L1282 to another new method (`cleanupAfterCompletedCheckpoint`?)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1385,6 +1385,7 @@ private CompletedCheckpoint 
addCompletedCheckpointToStoreAndSubsumeOldest(
                 
checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, 
executor);
             }
 
+            reportFailedCheckpoint(completedCheckpoint, exception);

Review comment:
       I think it's not perfect that we are reporting failures and successful 
completions in two very different places. Can not we bring both reporting 
closer together? For example add the successful completion reporting to the 
`addCompletedCheckpointToStoreAndSubsumeOldest` method as well?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2083,6 +2094,12 @@ public void testTriggerAndConfirmSimpleSavepoint() 
throws Exception {
         assertEquals(pending.getCheckpointId(), success.getCheckpointID());
         assertEquals(2, success.getOperatorStates().size());
 
+        AbstractCheckpointStats actualStats =
+                
statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId);
+
+        assertEquals(checkpointId, actualStats.getCheckpointId());
+        assertEquals(CheckpointStatsStatus.COMPLETED, actualStats.getStatus());
+

Review comment:
       Is this test adding actual regression test coverage for the FLINK-25958 
bug? Is it failing one the current master?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
##########
@@ -327,6 +331,9 @@ public static boolean checkpointsMatch(
      */
     void setDiscardCallback(@Nullable CompletedCheckpointStats.DiscardCallback 
discardCallback) {
         this.discardCallback = discardCallback;
+        if (discardCallback != null && isDiscarded) {
+            discardCallback.notifyDiscardedCheckpoint();
+        }

Review comment:
       Why do we need this change?
   1. Making the field `volatile` sounds like something is fishy with the 
threading model. If I remember correctly those classes are supposed to be non 
thread safe used either in a single thread, or protected by the 
`CheckpointCoordinator`s lock.
   2. triggering the callback in a setter method is also a bit strange




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to