This is an automated email from the ASF dual-hosted git repository.

RocMarshal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0156b330949 [FLINK-39918][tests] Stop operator-restore tests hanging 
on unexpected job state (#28408)
0156b330949 is described below

commit 0156b330949f22d118c00ef7dc8026b1e22ad217
Author: Martijn Visser <[email protected]>
AuthorDate: Sat Jun 13 11:08:06 2026 +0200

    [FLINK-39918][tests] Stop operator-restore tests hanging on unexpected job 
state (#28408)
    
    AbstractOperatorRestoreTestBase waits for one specific job status against
    a ~2.7h deadline, so a job reaching a different globally terminal state
    (e.g. FAILED) hangs the surefire fork until the CI watchdog kills it
    without a usable error. Wait via a shared helper that fails fast with the
    unexpected terminal state instead of retrying until the deadline.
    
    Generated-by: Claude Opus 4.8 (1M context)
---
 .../restore/AbstractOperatorRestoreTestBase.java   | 60 +++++++++++++---------
 1 file changed, 37 insertions(+), 23 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 43a0f861e37..f69723f0676 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.state.operator.restore;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.client.program.ClusterClient;
@@ -63,6 +64,7 @@ import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
@@ -187,15 +189,7 @@ public abstract class AbstractOperatorRestoreTestBase 
implements MigrationTest {
 
         clusterClient.submitJob(jobToMigrate).get();
 
-        CompletableFuture<JobStatus> jobRunningFuture =
-                FutureUtils.retrySuccessfulWithDelay(
-                        () -> 
clusterClient.getJobStatus(jobToMigrate.getJobID()),
-                        Duration.ofMillis(50),
-                        deadline,
-                        (jobStatus) -> jobStatus == JobStatus.RUNNING,
-                        scheduledExecutor);
-        assertThat(jobRunningFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS))
-                .isEqualTo(JobStatus.RUNNING);
+        waitForJobStatus(clusterClient, jobToMigrate.getJobID(), deadline, 
JobStatus.RUNNING);
 
         // Trigger savepoint
         String savepointPath = null;
@@ -224,15 +218,7 @@ public abstract class AbstractOperatorRestoreTestBase 
implements MigrationTest {
 
         assertThat(savepointPath).as("Could not take savepoint.").isNotNull();
 
-        CompletableFuture<JobStatus> jobCanceledFuture =
-                FutureUtils.retrySuccessfulWithDelay(
-                        () -> 
clusterClient.getJobStatus(jobToMigrate.getJobID()),
-                        Duration.ofMillis(50),
-                        deadline,
-                        (jobStatus) -> jobStatus == JobStatus.CANCELED,
-                        scheduledExecutor);
-        assertThat(jobCanceledFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS))
-                .isEqualTo(JobStatus.CANCELED);
+        waitForJobStatus(clusterClient, jobToMigrate.getJobID(), deadline, 
JobStatus.CANCELED);
 
         return savepointPath;
     }
@@ -247,15 +233,43 @@ public abstract class AbstractOperatorRestoreTestBase 
implements MigrationTest {
 
         clusterClient.submitJob(jobToRestore).get();
 
-        CompletableFuture<JobStatus> jobStatusFuture =
+        waitForJobStatus(clusterClient, jobToRestore.getJobID(), deadline, 
JobStatus.FINISHED);
+    }
+
+    /**
+     * Waits until the job reaches {@code targetStatus}, failing fast if the 
job reaches a globally
+     * terminal state that is not the target (for example {@code FAILED} when 
waiting for {@code
+     * FINISHED}) instead of retrying until the deadline.
+     */
+    private void waitForJobStatus(
+            ClusterClient<?> clusterClient, JobID jobId, Deadline deadline, 
JobStatus targetStatus)
+            throws Exception {
+        final CompletableFuture<JobStatus> statusFuture =
                 FutureUtils.retrySuccessfulWithDelay(
-                        () -> 
clusterClient.getJobStatus(jobToRestore.getJobID()),
+                        () ->
+                                clusterClient
+                                        .getJobStatus(jobId)
+                                        .thenApply(
+                                                jobStatus -> {
+                                                    if (jobStatus != 
targetStatus
+                                                            && jobStatus
+                                                                    
.isGloballyTerminalState()) {
+                                                        throw new 
CompletionException(
+                                                                new 
IllegalStateException(
+                                                                        
String.format(
+                                                                               
 "Job %s reached terminal state %s while waiting for %s.",
+                                                                               
 jobId,
+                                                                               
 jobStatus,
+                                                                               
 targetStatus)));
+                                                    }
+                                                    return jobStatus;
+                                                }),
                         Duration.ofMillis(50),
                         deadline,
-                        (jobStatus) -> jobStatus == JobStatus.FINISHED,
+                        jobStatus -> jobStatus == targetStatus,
                         scheduledExecutor);
-        assertThat(jobStatusFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS))
-                .isEqualTo(JobStatus.FINISHED);
+        assertThat(statusFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS))
+                .isEqualTo(targetStatus);
     }
 
     private JobGraph createJobGraph(ExecutionMode mode) {

Reply via email to