This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new 87f898ec800 [FLINK-38408][checkpoint] Complete the checkpoint
CompletableFuture after updating statistics to ensures semantic correctness and
prevent test failure
87f898ec800 is described below
commit 87f898ec800c53e1c996634ca0a1f64bc79ecfca
Author: Rui Fan <[email protected]>
AuthorDate: Fri Sep 26 17:21:16 2025 +0200
[FLINK-38408][checkpoint] Complete the checkpoint CompletableFuture after
updating statistics to ensures semantic correctness and prevent test failure
---
.../runtime/checkpoint/CheckpointCoordinator.java | 2 +-
.../checkpoint/CheckpointCoordinatorTest.java | 107 +++++++++++++++++++++
2 files changed, 108 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index c05efb10b48..8750f1fbf97 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1384,8 +1384,8 @@ public class CheckpointCoordinator {
lastSubsumed = null;
}
-
pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
reportCompletedCheckpoint(completedCheckpoint);
+
pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
} catch (Exception exception) {
// For robustness reasons, we need catch exception and try marking
the checkpoint
// completed.
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index de7419e18e4..c2d2144a1fd 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -48,6 +48,7 @@ import
org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
@@ -120,6 +121,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -155,6 +157,8 @@ import static org.mockito.Mockito.when;
/** Tests for the checkpoint coordinator. */
class CheckpointCoordinatorTest {
+ private static final long TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
+
@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorExtension();
@@ -4403,4 +4407,107 @@ class CheckpointCoordinatorTest {
}
}
}
+
+ /**
+ * Tests that Checkpoint CompletableFuture completion happens after
reportCompletedCheckpoint
+ * finishes. This ensures that when external components are notified via
the CompletableFuture
+ * that a checkpoint is complete, all statistics have already been updated.
+ */
+ @Test
+ void testCompletionFutureCompletesAfterReporting() throws Exception {
+ JobVertexID jobVertexID = new JobVertexID();
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID)
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ ControllableCheckpointStatsTracker tracker = new
ControllableCheckpointStatsTracker();
+
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorBuilder()
+ .setCheckpointStatsTracker(tracker)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .build(graph);
+
+ CompletableFuture<CompletedCheckpoint> checkpointFuture =
+ coordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ CompletableFuture<Void> ackTask =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ ackCheckpoint(
+ 1L,
+ coordinator,
+ jobVertexID,
+ graph,
+ handle(),
+ handle(),
+ handle());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertThat(tracker.getReportStartedFuture().get(TIMEOUT_SECONDS,
TimeUnit.SECONDS))
+ .as("reportCompletedCheckpoint should be started soon when
checkpoint is acked.")
+ .isNull();
+
+ for (int i = 0; i < 30; i++) {
+ assertThat(checkpointFuture)
+ .as(
+ "Checkpoint future should not complete while
reportCompletedCheckpoint is blocked")
+ .isNotDone();
+ Thread.sleep(100);
+ }
+
+ tracker.getReportBlockingFuture().complete(null);
+
+ CompletedCheckpoint result = checkpointFuture.get(TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ assertThat(result)
+ .as("Checkpoint future should complete after
reportCompletedCheckpoint finishes")
+ .isNotNull();
+
+ ackTask.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ /**
+ * A controllable checkpoint stats tracker for testing purposes. Allows
precise control over
+ * when reportCompletedCheckpoint() completes, enabling verification of
execution order and
+ * timing in tests.
+ */
+ private static class ControllableCheckpointStatsTracker extends
CheckpointStatsTracker {
+ private final CompletableFuture<Void> reportStartedFuture;
+ private final CompletableFuture<Void> reportBlockingFuture;
+
+ public ControllableCheckpointStatsTracker() {
+ super(
+ Integer.MAX_VALUE,
+
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
+ JobID.generate());
+ this.reportStartedFuture = new CompletableFuture<>();
+ this.reportBlockingFuture = new CompletableFuture<>();
+ }
+
+ public CompletableFuture<Void> getReportStartedFuture() {
+ return reportStartedFuture;
+ }
+
+ public CompletableFuture<Void> getReportBlockingFuture() {
+ return reportBlockingFuture;
+ }
+
+ @Override
+ public void reportCompletedCheckpoint(CompletedCheckpointStats
completed) {
+ reportStartedFuture.complete(null);
+
+ try {
+ reportBlockingFuture.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ super.reportCompletedCheckpoint(completed);
+ }
+ }
}