tillrohrmann commented on a change in pull request #9364: 
[FLINK-13593][checkpointing] Prevent failing the wrong execution attempt in 
CheckpointFailureManager
URL: https://github.com/apache/flink/pull/9364#discussion_r312356499
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##########
 @@ -50,15 +51,46 @@ public CheckpointFailureManager(int 
tolerableCpFailureNumber, FailJobCallback fa
        }
 
        /**
-        * Handle checkpoint exception with a handler callback.
+        * Handle job level checkpoint exception with a handler callback.
         *
         * @param exception the checkpoint exception.
         * @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
         *                     checkpoint id sequence. In trigger phase, we may 
not get the checkpoint id when the failure
         *                     happens before the checkpoint id generation. In 
this case, it will be specified a negative
         *                      latest generated checkpoint id as a special 
flag.
         */
-       public void handleCheckpointException(CheckpointException exception, 
long checkpointId) {
+       public void handleJobLevelCheckpointException(CheckpointException 
exception, long checkpointId) {
+               checkFailureCounter(exception, checkpointId);
+               if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
+                       clearCount();
+                       failureCallback.failJob(new 
FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."));
+               }
+       }
+
+       /**
+        * Handle task level checkpoint exception with a handler callback.
+        *
+        * @param exception the checkpoint exception.
+        * @param checkpointId the failed checkpoint id used to count the 
continuous failure number based on
+        *                     checkpoint id sequence. In trigger phase, we may 
not get the checkpoint id when the failure
+        *                     happens before the checkpoint id generation. In 
this case, it will be specified a negative
+        *                      latest generated checkpoint id as a special 
flag.
+        * @param executionAttemptID the execution attempt id, as a safe guard.
+        */
+       public void handleTaskLevelCheckpointException(
+                       CheckpointException exception,
+                       long checkpointId,
+                       ExecutionAttemptID executionAttemptID) {
+               checkFailureCounter(exception, checkpointId);
+               if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
+                       clearCount();
+                       failureCallback.failJobDueToTaskFailure(new 
FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."), 
executionAttemptID);
 
 Review comment:
   This is unrelated to this PR, but I think we should not throw a 
`RuntimeException` here. It is a recoverable exception and hence it should be 
checked.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to