This is an automated email from the ASF dual-hosted git repository.
MartijnVisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new afb5ef94dd9 [FLINK-39879][tests] Stop
CheckpointAcknowledgeFailureITCase hanging on slow CI
afb5ef94dd9 is described below
commit afb5ef94dd9ffc619ff90dd663501224777592ec
Author: Martijn Visser <[email protected]>
AuthorDate: Sat Jun 6 21:20:12 2026 +0200
[FLINK-39879][tests] Stop CheckpointAcknowledgeFailureITCase hanging on
slow CI
The test waited on an unbounded future that never completes when the tiny
pekko.ask.timeout (load-bearing for the AskTimeoutException assertion) fails
the job before the keyed state is updated, hanging the surefire fork until
the CI watchdog kills it. Propagate a terminal job failure into the wait so
the test fails fast with the real cause, and add @Timeout(5, MINUTES) as the
hard anti-hang guard. The product-side follow-up is tracked as FLINK-39738.
Generated-by: Claude Opus 4.8 (1M context)
---
.../CheckpointAcknowledgeFailureITCase.java | 29 +++++++++++++++++++---
1 file changed, 25 insertions(+), 4 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAcknowledgeFailureITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAcknowledgeFailureITCase.java
index e55337177f9..70213a1ce77 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAcknowledgeFailureITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAcknowledgeFailureITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
@@ -39,11 +40,13 @@ import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import static
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTING_TIMEOUT;
import static
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINT_STORAGE;
@@ -100,6 +103,7 @@ public class CheckpointAcknowledgeFailureITCase extends
TestLogger {
* reason is exceeding pekko framesize.
*/
@Test
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
void testCheckpointAckFailure(@InjectMiniCluster MiniCluster cluster)
throws Exception {
SharedReference<CompletableFuture<Object>> stateUpdatedFuture =
sharedObjects.add(new CompletableFuture<>());
@@ -107,15 +111,32 @@ public class CheckpointAcknowledgeFailureITCase extends
TestLogger {
cfg.set(CHECKPOINTING_TIMEOUT, CHECKPOINT_TIMEOUT);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(cfg);
- JobID jobID = executeJobAsync(env, stateUpdatedFuture);
- stateUpdatedFuture.get().join();
+ JobClient jobClient = executeJobAsync(env, stateUpdatedFuture);
+ JobID jobID = jobClient.getJobID();
+
+ // The tiny pekko.ask.timeout that forces the oversized ACK to time
out also applies to
+ // every other RPC, so on a slow machine the job can fail before the
state is updated and
+ // stateUpdatedFuture would never complete. Propagate a terminal job
failure into it so the
+ // wait fails fast with the real cause; @Timeout above is the hard
guard against any hang.
+ jobClient
+ .getJobExecutionResult()
+ .whenComplete(
+ (result, throwable) -> {
+ if (throwable != null) {
+ stateUpdatedFuture
+ .get()
+ .completeExceptionally(
+
ExceptionUtils.stripCompletionException(throwable));
+ }
+ });
+ stateUpdatedFuture.get().get();
assertThatThrownBy(() -> cluster.triggerCheckpoint(jobID).get())
.hasCauseInstanceOf(CheckpointException.class)
.matches(CheckpointAcknowledgeFailureITCase::hasAskTimeoutException);
}
- private static JobID executeJobAsync(
+ private static JobClient executeJobAsync(
StreamExecutionEnvironment env,
SharedReference<CompletableFuture<Object>> stateUpdated)
throws Exception {
env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE)
@@ -140,7 +161,7 @@ public class CheckpointAcknowledgeFailureITCase extends
TestLogger {
}
})
.sinkTo(new DiscardingSink<>());
- return env.executeAsync().getJobID();
+ return env.executeAsync();
}
private static byte[] buildState() {