This is an automated email from the ASF dual-hosted git repository.

MartijnVisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new afb5ef94dd9 [FLINK-39879][tests] Stop 
CheckpointAcknowledgeFailureITCase hanging on slow CI
afb5ef94dd9 is described below

commit afb5ef94dd9ffc619ff90dd663501224777592ec
Author: Martijn Visser <[email protected]>
AuthorDate: Sat Jun 6 21:20:12 2026 +0200

    [FLINK-39879][tests] Stop CheckpointAcknowledgeFailureITCase hanging on 
slow CI
    
    The test waited on an unbounded future that never completes when the tiny
    pekko.ask.timeout (load-bearing for the AskTimeoutException assertion) fails
    the job before the keyed state is updated, hanging the surefire fork until
    the CI watchdog kills it. Propagate a terminal job failure into the wait so
    the test fails fast with the real cause, and add @Timeout(5, MINUTES) as the
    hard anti-hang guard. The product-side follow-up is tracked as FLINK-39738.
    
    Generated-by: Claude Opus 4.8 (1M context)
---
 .../CheckpointAcknowledgeFailureITCase.java        | 29 +++++++++++++++++++---
 1 file changed, 25 insertions(+), 4 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAcknowledgeFailureITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAcknowledgeFailureITCase.java
index e55337177f9..70213a1ce77 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAcknowledgeFailureITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAcknowledgeFailureITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
@@ -39,11 +40,13 @@ import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.time.Duration;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTING_TIMEOUT;
 import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINT_STORAGE;
@@ -100,6 +103,7 @@ public class CheckpointAcknowledgeFailureITCase extends 
TestLogger {
      * reason is exceeding pekko framesize.
      */
     @Test
+    @Timeout(value = 5, unit = TimeUnit.MINUTES)
     void testCheckpointAckFailure(@InjectMiniCluster MiniCluster cluster) 
throws Exception {
         SharedReference<CompletableFuture<Object>> stateUpdatedFuture =
                 sharedObjects.add(new CompletableFuture<>());
@@ -107,15 +111,32 @@ public class CheckpointAcknowledgeFailureITCase extends 
TestLogger {
         cfg.set(CHECKPOINTING_TIMEOUT, CHECKPOINT_TIMEOUT);
         final StreamExecutionEnvironment env =
                 StreamExecutionEnvironment.getExecutionEnvironment(cfg);
-        JobID jobID = executeJobAsync(env, stateUpdatedFuture);
-        stateUpdatedFuture.get().join();
+        JobClient jobClient = executeJobAsync(env, stateUpdatedFuture);
+        JobID jobID = jobClient.getJobID();
+
+        // The tiny pekko.ask.timeout that forces the oversized ACK to time 
out also applies to
+        // every other RPC, so on a slow machine the job can fail before the 
state is updated and
+        // stateUpdatedFuture would never complete. Propagate a terminal job 
failure into it so the
+        // wait fails fast with the real cause; @Timeout above is the hard 
guard against any hang.
+        jobClient
+                .getJobExecutionResult()
+                .whenComplete(
+                        (result, throwable) -> {
+                            if (throwable != null) {
+                                stateUpdatedFuture
+                                        .get()
+                                        .completeExceptionally(
+                                                
ExceptionUtils.stripCompletionException(throwable));
+                            }
+                        });
+        stateUpdatedFuture.get().get();
 
         assertThatThrownBy(() -> cluster.triggerCheckpoint(jobID).get())
                 .hasCauseInstanceOf(CheckpointException.class)
                 
.matches(CheckpointAcknowledgeFailureITCase::hasAskTimeoutException);
     }
 
-    private static JobID executeJobAsync(
+    private static JobClient executeJobAsync(
             StreamExecutionEnvironment env, 
SharedReference<CompletableFuture<Object>> stateUpdated)
             throws Exception {
         env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE)
@@ -140,7 +161,7 @@ public class CheckpointAcknowledgeFailureITCase extends 
TestLogger {
                             }
                         })
                 .sinkTo(new DiscardingSink<>());
-        return env.executeAsync().getJobID();
+        return env.executeAsync();
     }
 
     private static byte[] buildState() {

Reply via email to