This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b8ea81f11d [FLINK-29969][checkpoint] Show the root cause when 
exceeded checkpoint tolerable failure threshold
5b8ea81f11d is described below

commit 5b8ea81f11df1d094b6331de6cb6f824e5401bcd
Author: 1996fanrui <[email protected]>
AuthorDate: Thu Nov 10 10:59:52 2022 +0800

    [FLINK-29969][checkpoint] Show the root cause when exceeded checkpoint 
tolerable failure threshold
---
 .../flink/runtime/checkpoint/CheckpointFailureManager.java       | 9 +++++++--
 .../flink/test/checkpointing/CheckpointFailureManagerITCase.java | 5 ++++-
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 8db1fe307a1..61404496529 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -203,8 +203,13 @@ public class CheckpointFailureManager {
             checkFailureCounter(exception, checkpointId);
             if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
                 clearCount();
-                errorHandler.accept(
-                        new 
FlinkRuntimeException(EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE));
+                String exceptionMessage =
+                        String.format(
+                                "%s The latest checkpoint failed due to %s, 
view the Checkpoint History tab"
+                                        + " or the Job Manager log to find out 
why continuous checkpoints failed.",
+                                EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
+                                
exception.getCheckpointFailureReason().message());
+                errorHandler.accept(new 
FlinkRuntimeException(exceptionMessage));
             }
         }
     }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
index 27f6d78501f..7745f6ef18c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
@@ -264,7 +264,10 @@ public class CheckpointFailureManagerITCase extends 
TestLogger {
 
     private boolean isCheckpointFailure(JobExecutionException jobException) {
         return ExceptionUtils.findThrowable(jobException, 
FlinkRuntimeException.class)
-                .filter(ex -> 
ex.getMessage().equals(EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE))
+                .filter(
+                        ex ->
+                                ex.getMessage()
+                                        
.startsWith(EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE))
                 .isPresent();
     }
 }

Reply via email to