mateczagany commented on code in PR #27466:
URL: https://github.com/apache/flink/pull/27466#discussion_r2721531855


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java:
##########
@@ -161,24 +163,34 @@ private void buildGraph(StreamExecutionEnvironment env) {
                 .sinkTo(new DiscardingSink<>());
     }
 
+    /**
+     * Trigger checkpoints until the first failing checkpoint. The exception 
should come from {@link
+     * CheckpointStateOutputStream#close()} which should get called by the 
channel state writer
+     * after catching an exception in {@link 
CheckpointStateOutputStream#closeAndGetHandle()}. In
+     * some cases on writing checkpoints, only {@link 
CheckpointStateOutputStream#close()} will be
+     * called, so `failOnClose` has to be checked here.
+     */
     private void triggerFailingCheckpoint(
-            JobID jobID, Class<TestException> expectedException, MiniCluster 
miniCluster)
+            JobID jobID,
+            Class<TestException> expectedException,
+            SharedReference<AtomicBoolean> failOnCloseRef,
+            MiniCluster miniCluster)
             throws InterruptedException, ExecutionException {
-        while (true) {
+        boolean foundCheckpointFailure = false;
+        do {
             Optional<Throwable> cpFailure =
                     miniCluster
                             .triggerCheckpoint(jobID)
                             .thenApply(ign -> Optional.empty())
                             .handle((ign, err) -> Optional.ofNullable(err))
                             .get();
-            if (!cpFailure.isPresent()) {
-                Thread.sleep(50); // trigger again - in case of no channel 
data was written
-            } else if (isCausedBy(cpFailure.get(), expectedException)) {
-                return;
-            } else {
-                rethrow(cpFailure.get());
+
+            if (cpFailure.isPresent()) {
+                if (isCausedBy(cpFailure.get(), expectedException)) {
+                    foundCheckpointFailure = true;
+                }
             }
-        }
+        } while (!foundCheckpointFailure || failOnCloseRef.get().get());

Review Comment:
   I think it's not that bad, as there is also a call to trigger the checkpoint 
that also waits for the result



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to