tillrohrmann commented on a change in pull request #9364:
[FLINK-13593][checkpointing] Prevent failing the wrong execution attempt in
CheckpointFailureManager
URL: https://github.com/apache/flink/pull/9364#discussion_r312356499
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -50,15 +51,46 @@ public CheckpointFailureManager(int
tolerableCpFailureNumber, FailJobCallback fa
}
/**
- * Handle checkpoint exception with a handler callback.
+ * Handle job level checkpoint exception with a handler callback.
*
* @param exception the checkpoint exception.
* @param checkpointId the failed checkpoint id used to count the
continuous failure number based on
* checkpoint id sequence. In trigger phase, we may
not get the checkpoint id when the failure
* happens before the checkpoint id generation. In
this case, it will be specified a negative
* latest generated checkpoint id as a special
flag.
*/
- public void handleCheckpointException(CheckpointException exception,
long checkpointId) {
+ public void handleJobLevelCheckpointException(CheckpointException
exception, long checkpointId) {
+ checkFailureCounter(exception, checkpointId);
+ if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
+ clearCount();
+ failureCallback.failJob(new
FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."));
+ }
+ }
+
+ /**
+ * Handle task level checkpoint exception with a handler callback.
+ *
+ * @param exception the checkpoint exception.
+ * @param checkpointId the failed checkpoint id used to count the
continuous failure number based on
+ * checkpoint id sequence. In trigger phase, we may
not get the checkpoint id when the failure
+ * happens before the checkpoint id generation. In
this case, it will be specified a negative
+ * latest generated checkpoint id as a special
flag.
+ * @param executionAttemptID the execution attempt id, as a safe guard.
+ */
+ public void handleTaskLevelCheckpointException(
+ CheckpointException exception,
+ long checkpointId,
+ ExecutionAttemptID executionAttemptID) {
+ checkFailureCounter(exception, checkpointId);
+ if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
+ clearCount();
+ failureCallback.failJobDueToTaskFailure(new
FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."),
executionAttemptID);
Review comment:
This is unrelated to this PR, but I think we should not throw a
`RuntimeException` here. It is a recoverable exception and hence it should be
checked.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services