1996fanrui commented on code in PR #27050:
URL: https://github.com/apache/flink/pull/27050#discussion_r2429083671


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -4409,4 +4410,106 @@ public boolean isDiscarded() {
             }
         }
     }
+
+    /**
+     * Tests that Checkpoint CompletableFuture completion happens after 
reportCompletedCheckpoint
+     * finishes. This ensures that when external components are notified via 
the CompletableFuture
+     * that a checkpoint is complete, all statistics have already been updated.
+     */
+    @Test
+    void testCompletionFutureCompletesAfterReporting() throws Exception {
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        ControllableCheckpointStatsTracker tracker = new 
ControllableCheckpointStatsTracker();
+
+        CheckpointCoordinator coordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setCheckpointStatsTracker(tracker)
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .build(graph);
+
+        CompletableFuture<CompletedCheckpoint> checkpointFuture =
+                coordinator.triggerCheckpoint(false);
+        manuallyTriggeredScheduledExecutor.triggerAll();
+
+        CompletableFuture<Void> ackTask =
+                CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                ackCheckpoint(
+                                        1L,
+                                        coordinator,
+                                        jobVertexID,
+                                        graph,
+                                        handle(),
+                                        handle(),
+                                        handle());
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        assertThat(tracker.getReportStartedFuture().get(20, TimeUnit.SECONDS))
+                .as("reportCompletedCheckpoint should be started soon when 
checkpoint is acked.")
+                .isNull();
+
+        for (int i = 0; i < 30; i++) {

Review Comment:
   The purpose of adding this test is that if `isDone` occurs here, then there 
must be a bug. It will let developer is aware of bugs.
   
   > how would your test distinguish between: Future wasn't complete because of 
"happens before" condition vs Future wasn't complete because VM froze and 
responsible thread was not making progress for more than 3 seconds.
   
   From current testing, it cannot to distinguish them. Here we are testing 
that an unexpected case did not occur. If `VM froze` happens, both expected 
case or expected case do not be executed. So I really do not know how to 
distinguish them.
   
   > but testing it this way you have weaker guarantees of "happens before" 
condition being actually tested.
   
   I also hope to avoid sleep in tests as much as possible. However, I haven't 
figured out how to use CompletableFuture or CountDownLatch to replace it. 
   
   May I know do you have any suggestions on this? I'd really appreciate a 
better alternative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to