This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
     new 8b289000f65 [FLINK-34200][test] Fix the bug that 
AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally
8b289000f65 is described below

commit 8b289000f65aef39f48d6cc6ddb817208a62fdee
Author: Rui Fan <[email protected]>
AuthorDate: Thu Feb 1 21:07:40 2024 +0800

    [FLINK-34200][test] Fix the bug that 
AutoRescalingITCase.testCheckpointRescalingInKeyedState fails occasionally
---
 .../flink/runtime/testutils/CommonTestUtils.java   | 67 +++++++++++-----------
 .../test/checkpointing/AutoRescalingITCase.java    | 14 +++--
 2 files changed, 44 insertions(+), 37 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index da016150500..0720197e911 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -343,49 +343,52 @@ public class CommonTestUtils {
 
     /** Wait for (at least) the given number of successful checkpoints. */
     public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, 
int numCheckpoints)
-            throws Exception, FlinkJobNotFoundException {
-        waitUntilCondition(
-                () -> {
-                    AccessExecutionGraph graph = 
miniCluster.getExecutionGraph(jobID).get();
-                    if (Optional.ofNullable(graph.getCheckpointStatsSnapshot())
-                            .filter(
-                                    st ->
-                                            
st.getCounts().getNumberOfCompletedCheckpoints()
-                                                    >= numCheckpoints)
-                            .isPresent()) {
-                        return true;
-                    } else if (graph.getState().isGloballyTerminalState()) {
-                        checkState(
-                                graph.getFailureInfo() != null,
-                                "Job terminated before taking required %s 
checkpoints: %s",
-                                numCheckpoints,
-                                graph.getState());
-                        throw graph.getFailureInfo().getException();
-                    } else {
+            throws Exception {
+        waitForCheckpoints(
+                jobID,
+                miniCluster,
+                checkpointStatsSnapshot ->
+                        checkpointStatsSnapshot != null
+                                && checkpointStatsSnapshot
+                                                .getCounts()
+                                                
.getNumberOfCompletedCheckpoints()
+                                        >= numCheckpoints);
+    }
+
+    /**
+     * Wait for a new completed checkpoint, the new checkpoint must be 
triggered after
+     * waitForNewCheckpoint is called.
+     */
+    public static void waitForNewCheckpoint(JobID jobID, MiniCluster 
miniCluster) throws Exception {
+        final long startTime = System.currentTimeMillis();
+        waitForCheckpoints(
+                jobID,
+                miniCluster,
+                checkpointStatsSnapshot -> {
+                    if (checkpointStatsSnapshot == null) {
                         return false;
                     }
+                    final CompletedCheckpointStats latestCompletedCheckpoint =
+                            
checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint();
+                    return latestCompletedCheckpoint != null
+                            && latestCompletedCheckpoint.getTriggerTimestamp() 
> startTime;
                 });
     }
 
-    /** Wait for on more completed checkpoint. */
-    public static void waitForOneMoreCheckpoint(JobID jobID, MiniCluster 
miniCluster)
+    // Wait for CheckpointStatsSnapshot to meet the condition.
+    private static void waitForCheckpoints(
+            JobID jobId, MiniCluster miniCluster, 
Predicate<CheckpointStatsSnapshot> condition)
             throws Exception {
-        final long[] currentCheckpoint = new long[] {-1L};
         waitUntilCondition(
                 () -> {
-                    AccessExecutionGraph graph = 
miniCluster.getExecutionGraph(jobID).get();
-                    CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
-                    if (snapshot != null) {
-                        long currentCount = 
snapshot.getCounts().getNumberOfCompletedCheckpoints();
-                        if (currentCheckpoint[0] < 0L) {
-                            currentCheckpoint[0] = currentCount;
-                        } else {
-                            return currentCount > currentCheckpoint[0];
-                        }
+                    final AccessExecutionGraph graph = 
miniCluster.getExecutionGraph(jobId).get();
+                    final CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
+                    if (condition.test(snapshot)) {
+                        return true;
                     } else if (graph.getState().isGloballyTerminalState()) {
                         checkState(
                                 graph.getFailureInfo() != null,
-                                "Job terminated before taking required 
checkpoint.",
+                                "Job terminated (state=%s) before completing 
the requested checkpoint(s).",
                                 graph.getState());
                         throw graph.getFailureInfo().getException();
                     }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
index 691688ae602..eab82b1c727 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
@@ -86,7 +86,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
-import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForOneMoreCheckpoint;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForNewCheckpoint;
 import static 
org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase.waitForAvailableSlots;
 import static 
org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase.waitForRunningTasks;
 import static org.junit.Assert.assertEquals;
@@ -261,7 +261,11 @@ public class AutoRescalingITCase extends TestLogger {
 
             waitForAllTaskRunning(cluster.getMiniCluster(), 
jobGraph.getJobID(), false);
 
-            waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+            // We need to wait for a checkpoint to be completed that was 
triggered after all the
+            // data was processed. That ensures the entire data being flushed 
out of the Operator's
+            // network buffers to avoid reprocessing test data twice after the 
restore (see
+            // FLINK-34200).
+            waitForNewCheckpoint(jobID, cluster.getMiniCluster());
 
             SubtaskIndexSource.SOURCE_LATCH.reset();
 
@@ -328,7 +332,7 @@ public class AutoRescalingITCase extends TestLogger {
             // wait until the operator handles some data
             StateSourceBase.workStartedLatch.await();
 
-            waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+            waitForNewCheckpoint(jobID, cluster.getMiniCluster());
 
             JobResourceRequirements.Builder builder = 
JobResourceRequirements.newBuilder();
             for (JobVertex vertex : jobGraph.getVertices()) {
@@ -411,7 +415,7 @@ public class AutoRescalingITCase extends TestLogger {
             // clear the CollectionSink set for the restarted job
             CollectionSink.clearElementsSet();
 
-            waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+            waitForNewCheckpoint(jobID, cluster.getMiniCluster());
 
             SubtaskIndexSource.SOURCE_LATCH.reset();
 
@@ -513,7 +517,7 @@ public class AutoRescalingITCase extends TestLogger {
         // wait until the operator handles some data
         StateSourceBase.workStartedLatch.await();
 
-        waitForOneMoreCheckpoint(jobID, cluster.getMiniCluster());
+        waitForNewCheckpoint(jobID, cluster.getMiniCluster());
 
         JobResourceRequirements.Builder builder = 
JobResourceRequirements.newBuilder();
         for (JobVertex vertex : jobGraph.getVertices()) {

Reply via email to