This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5b8ea81f11d [FLINK-29969][checkpoint] Show the root cause when
exceeded checkpoint tolerable failure threshold
5b8ea81f11d is described below
commit 5b8ea81f11df1d094b6331de6cb6f824e5401bcd
Author: 1996fanrui <[email protected]>
AuthorDate: Thu Nov 10 10:59:52 2022 +0800
[FLINK-29969][checkpoint] Show the root cause when exceeded checkpoint
tolerable failure threshold
---
.../flink/runtime/checkpoint/CheckpointFailureManager.java | 9 +++++++--
.../flink/test/checkpointing/CheckpointFailureManagerITCase.java | 5 ++++-
2 files changed, 11 insertions(+), 3 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 8db1fe307a1..61404496529 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -203,8 +203,13 @@ public class CheckpointFailureManager {
checkFailureCounter(exception, checkpointId);
if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
clearCount();
- errorHandler.accept(
- new
FlinkRuntimeException(EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE));
+ String exceptionMessage =
+ String.format(
+ "%s The latest checkpoint failed due to %s,
view the Checkpoint History tab"
+ + " or the Job Manager log to find out
why continuous checkpoints failed.",
+ EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
+
exception.getCheckpointFailureReason().message());
+ errorHandler.accept(new
FlinkRuntimeException(exceptionMessage));
}
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
index 27f6d78501f..7745f6ef18c 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
@@ -264,7 +264,10 @@ public class CheckpointFailureManagerITCase extends
TestLogger {
private boolean isCheckpointFailure(JobExecutionException jobException) {
return ExceptionUtils.findThrowable(jobException,
FlinkRuntimeException.class)
- .filter(ex ->
ex.getMessage().equals(EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE))
+ .filter(
+ ex ->
+ ex.getMessage()
+
.startsWith(EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE))
.isPresent();
}
}