liuml07 commented on code in PR #27466:
URL: https://github.com/apache/flink/pull/27466#discussion_r2722365092
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java:
##########
@@ -161,24 +163,34 @@ private void buildGraph(StreamExecutionEnvironment env) {
.sinkTo(new DiscardingSink<>());
}
+ /**
+ * Trigger checkpoints until the first failing checkpoint. The exception
should come from {@link
+ * CheckpointStateOutputStream#close()} which should get called by the
channel state writer
+ * after catching an exception in {@link
CheckpointStateOutputStream#closeAndGetHandle()}. In
+ * some cases on writing checkpoints, only {@link
CheckpointStateOutputStream#close()} will be
+ * called, so `failOnClose` has to be checked here.
+ */
private void triggerFailingCheckpoint(
- JobID jobID, Class<TestException> expectedException, MiniCluster
miniCluster)
+ JobID jobID,
+ Class<TestException> expectedException,
+ SharedReference<AtomicBoolean> failOnCloseRef,
+ MiniCluster miniCluster)
throws InterruptedException, ExecutionException {
- while (true) {
+ boolean foundCheckpointFailure = false;
+ do {
Optional<Throwable> cpFailure =
miniCluster
.triggerCheckpoint(jobID)
.thenApply(ign -> Optional.empty())
.handle((ign, err) -> Optional.ofNullable(err))
.get();
- if (!cpFailure.isPresent()) {
- Thread.sleep(50); // trigger again - in case of no channel
data was written
- } else if (isCausedBy(cpFailure.get(), expectedException)) {
- return;
- } else {
- rethrow(cpFailure.get());
+
+ if (cpFailure.isPresent()) {
+ if (isCausedBy(cpFailure.get(), expectedException)) {
+ foundCheckpointFailure = true;
+ }
Review Comment:
If it's not caused by expected exception, shall we still rethrow to ail fast
with actual error ?
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java:
##########
@@ -161,24 +163,34 @@ private void buildGraph(StreamExecutionEnvironment env) {
.sinkTo(new DiscardingSink<>());
}
+ /**
+ * Trigger checkpoints until the first failing checkpoint. The exception
should come from {@link
+ * CheckpointStateOutputStream#close()} which should get called by the
channel state writer
+ * after catching an exception in {@link
CheckpointStateOutputStream#closeAndGetHandle()}. In
+ * some cases on writing checkpoints, only {@link
CheckpointStateOutputStream#close()} will be
+ * called, so `failOnClose` has to be checked here.
+ */
private void triggerFailingCheckpoint(
- JobID jobID, Class<TestException> expectedException, MiniCluster
miniCluster)
+ JobID jobID,
+ Class<TestException> expectedException,
+ SharedReference<AtomicBoolean> failOnCloseRef,
+ MiniCluster miniCluster)
throws InterruptedException, ExecutionException {
- while (true) {
+ boolean foundCheckpointFailure = false;
+ do {
Optional<Throwable> cpFailure =
miniCluster
.triggerCheckpoint(jobID)
.thenApply(ign -> Optional.empty())
.handle((ign, err) -> Optional.ofNullable(err))
.get();
- if (!cpFailure.isPresent()) {
- Thread.sleep(50); // trigger again - in case of no channel
data was written
- } else if (isCausedBy(cpFailure.get(), expectedException)) {
- return;
- } else {
- rethrow(cpFailure.get());
+
+ if (cpFailure.isPresent()) {
+ if (isCausedBy(cpFailure.get(), expectedException)) {
+ foundCheckpointFailure = true;
+ }
}
- }
+ } while (!foundCheckpointFailure || failOnCloseRef.get().get());
Review Comment:
fair enough
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]