This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.1 by this push:
     new b75ea980d32 [FLINK-38408][checkpoint] Complete the checkpoint 
CompletableFuture after updating statistics to ensures semantic correctness and 
prevent test failure
b75ea980d32 is described below

commit b75ea980d32ce75cee1ff614f8adc36b54af5976
Author: Rui Fan <[email protected]>
AuthorDate: Fri Sep 26 17:21:16 2025 +0200

    [FLINK-38408][checkpoint] Complete the checkpoint CompletableFuture after 
updating statistics to ensures semantic correctness and prevent test failure
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |   2 +-
 .../checkpoint/CheckpointCoordinatorTest.java      | 105 +++++++++++++++++++++
 2 files changed, 106 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e4d455a3f95..85622387daa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1385,8 +1385,8 @@ public class CheckpointCoordinator {
                 lastSubsumed = null;
             }
 
-            
pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
             reportCompletedCheckpoint(completedCheckpoint);
+            
pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
         } catch (Exception exception) {
             // For robustness reasons, we need catch exception and try marking 
the checkpoint
             // completed.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index a7bba5a6f63..7cd736108a6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -119,6 +119,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -154,6 +155,8 @@ import static org.mockito.Mockito.when;
 /** Tests for the checkpoint coordinator. */
 class CheckpointCoordinatorTest {
 
+    private static final long TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
+
     @RegisterExtension
     static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
             TestingUtils.defaultExecutorExtension();
@@ -4409,4 +4412,106 @@ class CheckpointCoordinatorTest {
             }
         }
     }
+
+    /**
+     * Tests that Checkpoint CompletableFuture completion happens after 
reportCompletedCheckpoint
+     * finishes. This ensures that when external components are notified via 
the CompletableFuture
+     * that a checkpoint is complete, all statistics have already been updated.
+     */
+    @Test
+    void testCompletionFutureCompletesAfterReporting() throws Exception {
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        ControllableCheckpointStatsTracker tracker = new 
ControllableCheckpointStatsTracker();
+
+        CheckpointCoordinator coordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setCheckpointStatsTracker(tracker)
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .build(graph);
+
+        CompletableFuture<CompletedCheckpoint> checkpointFuture =
+                coordinator.triggerCheckpoint(false);
+        manuallyTriggeredScheduledExecutor.triggerAll();
+
+        CompletableFuture<Void> ackTask =
+                CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                ackCheckpoint(
+                                        1L,
+                                        coordinator,
+                                        jobVertexID,
+                                        graph,
+                                        handle(),
+                                        handle(),
+                                        handle());
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        assertThat(tracker.getReportStartedFuture().get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS))
+                .as("reportCompletedCheckpoint should be started soon when 
checkpoint is acked.")
+                .isNull();
+
+        for (int i = 0; i < 30; i++) {
+            assertThat(checkpointFuture)
+                    .as(
+                            "Checkpoint future should not complete while 
reportCompletedCheckpoint is blocked")
+                    .isNotDone();
+            Thread.sleep(100);
+        }
+
+        tracker.getReportBlockingFuture().complete(null);
+
+        CompletedCheckpoint result = checkpointFuture.get(TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        assertThat(result)
+                .as("Checkpoint future should complete after 
reportCompletedCheckpoint finishes")
+                .isNotNull();
+
+        ackTask.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    /**
+     * A controllable checkpoint stats tracker for testing purposes. Allows 
precise control over
+     * when reportCompletedCheckpoint() completes, enabling verification of 
execution order and
+     * timing in tests.
+     */
+    private static class ControllableCheckpointStatsTracker extends 
DefaultCheckpointStatsTracker {
+        private final CompletableFuture<Void> reportStartedFuture;
+        private final CompletableFuture<Void> reportBlockingFuture;
+
+        public ControllableCheckpointStatsTracker() {
+            super(
+                    Integer.MAX_VALUE,
+                    
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+            this.reportStartedFuture = new CompletableFuture<>();
+            this.reportBlockingFuture = new CompletableFuture<>();
+        }
+
+        public CompletableFuture<Void> getReportStartedFuture() {
+            return reportStartedFuture;
+        }
+
+        public CompletableFuture<Void> getReportBlockingFuture() {
+            return reportBlockingFuture;
+        }
+
+        @Override
+        public void reportCompletedCheckpoint(CompletedCheckpointStats 
completed) {
+            reportStartedFuture.complete(null);
+
+            try {
+                reportBlockingFuture.get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            super.reportCompletedCheckpoint(completed);
+        }
+    }
 }

Reply via email to