This is an automated email from the ASF dual-hosted git repository.
RocMarshal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0156b330949 [FLINK-39918][tests] Stop operator-restore tests hanging
on unexpected job state (#28408)
0156b330949 is described below
commit 0156b330949f22d118c00ef7dc8026b1e22ad217
Author: Martijn Visser <[email protected]>
AuthorDate: Sat Jun 13 11:08:06 2026 +0200
[FLINK-39918][tests] Stop operator-restore tests hanging on unexpected job
state (#28408)
AbstractOperatorRestoreTestBase waits for one specific job status against
a ~2.7h deadline, so a job reaching a different globally terminal state
(e.g. FAILED) hangs the surefire fork until the CI watchdog kills it
without a usable error. Wait via a shared helper that fails fast with the
unexpected terminal state instead of retrying until the deadline.
Generated-by: Claude Opus 4.8 (1M context)
---
.../restore/AbstractOperatorRestoreTestBase.java | 60 +++++++++++++---------
1 file changed, 37 insertions(+), 23 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 43a0f861e37..f69723f0676 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -19,6 +19,7 @@
package org.apache.flink.test.state.operator.restore;
import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.program.ClusterClient;
@@ -63,6 +64,7 @@ import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@@ -187,15 +189,7 @@ public abstract class AbstractOperatorRestoreTestBase
implements MigrationTest {
clusterClient.submitJob(jobToMigrate).get();
- CompletableFuture<JobStatus> jobRunningFuture =
- FutureUtils.retrySuccessfulWithDelay(
- () ->
clusterClient.getJobStatus(jobToMigrate.getJobID()),
- Duration.ofMillis(50),
- deadline,
- (jobStatus) -> jobStatus == JobStatus.RUNNING,
- scheduledExecutor);
- assertThat(jobRunningFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS))
- .isEqualTo(JobStatus.RUNNING);
+ waitForJobStatus(clusterClient, jobToMigrate.getJobID(), deadline,
JobStatus.RUNNING);
// Trigger savepoint
String savepointPath = null;
@@ -224,15 +218,7 @@ public abstract class AbstractOperatorRestoreTestBase
implements MigrationTest {
assertThat(savepointPath).as("Could not take savepoint.").isNotNull();
- CompletableFuture<JobStatus> jobCanceledFuture =
- FutureUtils.retrySuccessfulWithDelay(
- () ->
clusterClient.getJobStatus(jobToMigrate.getJobID()),
- Duration.ofMillis(50),
- deadline,
- (jobStatus) -> jobStatus == JobStatus.CANCELED,
- scheduledExecutor);
- assertThat(jobCanceledFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS))
- .isEqualTo(JobStatus.CANCELED);
+ waitForJobStatus(clusterClient, jobToMigrate.getJobID(), deadline,
JobStatus.CANCELED);
return savepointPath;
}
@@ -247,15 +233,43 @@ public abstract class AbstractOperatorRestoreTestBase
implements MigrationTest {
clusterClient.submitJob(jobToRestore).get();
- CompletableFuture<JobStatus> jobStatusFuture =
+ waitForJobStatus(clusterClient, jobToRestore.getJobID(), deadline,
JobStatus.FINISHED);
+ }
+
+ /**
+ * Waits until the job reaches {@code targetStatus}, failing fast if the
job reaches a globally
+ * terminal state that is not the target (for example {@code FAILED} when
waiting for {@code
+ * FINISHED}) instead of retrying until the deadline.
+ */
+ private void waitForJobStatus(
+ ClusterClient<?> clusterClient, JobID jobId, Deadline deadline,
JobStatus targetStatus)
+ throws Exception {
+ final CompletableFuture<JobStatus> statusFuture =
FutureUtils.retrySuccessfulWithDelay(
- () ->
clusterClient.getJobStatus(jobToRestore.getJobID()),
+ () ->
+ clusterClient
+ .getJobStatus(jobId)
+ .thenApply(
+ jobStatus -> {
+ if (jobStatus !=
targetStatus
+ && jobStatus
+
.isGloballyTerminalState()) {
+ throw new
CompletionException(
+ new
IllegalStateException(
+
String.format(
+
"Job %s reached terminal state %s while waiting for %s.",
+
jobId,
+
jobStatus,
+
targetStatus)));
+ }
+ return jobStatus;
+ }),
Duration.ofMillis(50),
deadline,
- (jobStatus) -> jobStatus == JobStatus.FINISHED,
+ jobStatus -> jobStatus == targetStatus,
scheduledExecutor);
- assertThat(jobStatusFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS))
- .isEqualTo(JobStatus.FINISHED);
+ assertThat(statusFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS))
+ .isEqualTo(targetStatus);
}
private JobGraph createJobGraph(ExecutionMode mode) {