liuml07 commented on code in PR #27466:
URL: https://github.com/apache/flink/pull/27466#discussion_r2722365092


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java:
##########
@@ -161,24 +163,34 @@ private void buildGraph(StreamExecutionEnvironment env) {
                 .sinkTo(new DiscardingSink<>());
     }
 
+    /**
+     * Trigger checkpoints until the first failing checkpoint. The exception 
should come from {@link
+     * CheckpointStateOutputStream#close()} which should get called by the 
channel state writer
+     * after catching an exception in {@link 
CheckpointStateOutputStream#closeAndGetHandle()}. In
+     * some cases on writing checkpoints, only {@link 
CheckpointStateOutputStream#close()} will be
+     * called, so `failOnClose` has to be checked here.
+     */
     private void triggerFailingCheckpoint(
-            JobID jobID, Class<TestException> expectedException, MiniCluster 
miniCluster)
+            JobID jobID,
+            Class<TestException> expectedException,
+            SharedReference<AtomicBoolean> failOnCloseRef,
+            MiniCluster miniCluster)
             throws InterruptedException, ExecutionException {
-        while (true) {
+        boolean foundCheckpointFailure = false;
+        do {
             Optional<Throwable> cpFailure =
                     miniCluster
                             .triggerCheckpoint(jobID)
                             .thenApply(ign -> Optional.empty())
                             .handle((ign, err) -> Optional.ofNullable(err))
                             .get();
-            if (!cpFailure.isPresent()) {
-                Thread.sleep(50); // trigger again - in case of no channel 
data was written
-            } else if (isCausedBy(cpFailure.get(), expectedException)) {
-                return;
-            } else {
-                rethrow(cpFailure.get());
+
+            if (cpFailure.isPresent()) {
+                if (isCausedBy(cpFailure.get(), expectedException)) {
+                    foundCheckpointFailure = true;
+                }

Review Comment:
   If it's not caused by expected exception, shall we still rethrow to ail fast 
with actual error ? 



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java:
##########
@@ -161,24 +163,34 @@ private void buildGraph(StreamExecutionEnvironment env) {
                 .sinkTo(new DiscardingSink<>());
     }
 
+    /**
+     * Trigger checkpoints until the first failing checkpoint. The exception 
should come from {@link
+     * CheckpointStateOutputStream#close()} which should get called by the 
channel state writer
+     * after catching an exception in {@link 
CheckpointStateOutputStream#closeAndGetHandle()}. In
+     * some cases on writing checkpoints, only {@link 
CheckpointStateOutputStream#close()} will be
+     * called, so `failOnClose` has to be checked here.
+     */
     private void triggerFailingCheckpoint(
-            JobID jobID, Class<TestException> expectedException, MiniCluster 
miniCluster)
+            JobID jobID,
+            Class<TestException> expectedException,
+            SharedReference<AtomicBoolean> failOnCloseRef,
+            MiniCluster miniCluster)
             throws InterruptedException, ExecutionException {
-        while (true) {
+        boolean foundCheckpointFailure = false;
+        do {
             Optional<Throwable> cpFailure =
                     miniCluster
                             .triggerCheckpoint(jobID)
                             .thenApply(ign -> Optional.empty())
                             .handle((ign, err) -> Optional.ofNullable(err))
                             .get();
-            if (!cpFailure.isPresent()) {
-                Thread.sleep(50); // trigger again - in case of no channel 
data was written
-            } else if (isCausedBy(cpFailure.get(), expectedException)) {
-                return;
-            } else {
-                rethrow(cpFailure.get());
+
+            if (cpFailure.isPresent()) {
+                if (isCausedBy(cpFailure.get(), expectedException)) {
+                    foundCheckpointFailure = true;
+                }
             }
-        }
+        } while (!foundCheckpointFailure || failOnCloseRef.get().get());

Review Comment:
   fair enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to