This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.1 by this push:
new b75ea980d32 [FLINK-38408][checkpoint] Complete the checkpoint
CompletableFuture after updating statistics to ensures semantic correctness and
prevent test failure
b75ea980d32 is described below
commit b75ea980d32ce75cee1ff614f8adc36b54af5976
Author: Rui Fan <[email protected]>
AuthorDate: Fri Sep 26 17:21:16 2025 +0200
[FLINK-38408][checkpoint] Complete the checkpoint CompletableFuture after
updating statistics to ensures semantic correctness and prevent test failure
---
.../runtime/checkpoint/CheckpointCoordinator.java | 2 +-
.../checkpoint/CheckpointCoordinatorTest.java | 105 +++++++++++++++++++++
2 files changed, 106 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e4d455a3f95..85622387daa 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1385,8 +1385,8 @@ public class CheckpointCoordinator {
lastSubsumed = null;
}
-
pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
reportCompletedCheckpoint(completedCheckpoint);
+
pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
} catch (Exception exception) {
// For robustness reasons, we need catch exception and try marking
the checkpoint
// completed.
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index a7bba5a6f63..7cd736108a6 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -119,6 +119,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -154,6 +155,8 @@ import static org.mockito.Mockito.when;
/** Tests for the checkpoint coordinator. */
class CheckpointCoordinatorTest {
+ private static final long TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
+
@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorExtension();
@@ -4409,4 +4412,106 @@ class CheckpointCoordinatorTest {
}
}
}
+
+ /**
+ * Tests that Checkpoint CompletableFuture completion happens after
reportCompletedCheckpoint
+ * finishes. This ensures that when external components are notified via
the CompletableFuture
+ * that a checkpoint is complete, all statistics have already been updated.
+ */
+ @Test
+ void testCompletionFutureCompletesAfterReporting() throws Exception {
+ JobVertexID jobVertexID = new JobVertexID();
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID)
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ ControllableCheckpointStatsTracker tracker = new
ControllableCheckpointStatsTracker();
+
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorBuilder()
+ .setCheckpointStatsTracker(tracker)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .build(graph);
+
+ CompletableFuture<CompletedCheckpoint> checkpointFuture =
+ coordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ CompletableFuture<Void> ackTask =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ ackCheckpoint(
+ 1L,
+ coordinator,
+ jobVertexID,
+ graph,
+ handle(),
+ handle(),
+ handle());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertThat(tracker.getReportStartedFuture().get(TIMEOUT_SECONDS,
TimeUnit.SECONDS))
+ .as("reportCompletedCheckpoint should be started soon when
checkpoint is acked.")
+ .isNull();
+
+ for (int i = 0; i < 30; i++) {
+ assertThat(checkpointFuture)
+ .as(
+ "Checkpoint future should not complete while
reportCompletedCheckpoint is blocked")
+ .isNotDone();
+ Thread.sleep(100);
+ }
+
+ tracker.getReportBlockingFuture().complete(null);
+
+ CompletedCheckpoint result = checkpointFuture.get(TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ assertThat(result)
+ .as("Checkpoint future should complete after
reportCompletedCheckpoint finishes")
+ .isNotNull();
+
+ ackTask.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ /**
+ * A controllable checkpoint stats tracker for testing purposes. Allows
precise control over
+ * when reportCompletedCheckpoint() completes, enabling verification of
execution order and
+ * timing in tests.
+ */
+ private static class ControllableCheckpointStatsTracker extends
DefaultCheckpointStatsTracker {
+ private final CompletableFuture<Void> reportStartedFuture;
+ private final CompletableFuture<Void> reportBlockingFuture;
+
+ public ControllableCheckpointStatsTracker() {
+ super(
+ Integer.MAX_VALUE,
+
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
+ this.reportStartedFuture = new CompletableFuture<>();
+ this.reportBlockingFuture = new CompletableFuture<>();
+ }
+
+ public CompletableFuture<Void> getReportStartedFuture() {
+ return reportStartedFuture;
+ }
+
+ public CompletableFuture<Void> getReportBlockingFuture() {
+ return reportBlockingFuture;
+ }
+
+ @Override
+ public void reportCompletedCheckpoint(CompletedCheckpointStats
completed) {
+ reportStartedFuture.complete(null);
+
+ try {
+ reportBlockingFuture.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ super.reportCompletedCheckpoint(completed);
+ }
+ }
}