akalash commented on a change in pull request #17331:
URL: https://github.com/apache/flink/pull/17331#discussion_r714462624
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -53,6 +56,56 @@ public CheckpointFailureManager(int
tolerableCpFailureNumber, FailJobCallback fa
this.countedCheckpointIds = ConcurrentHashMap.newKeySet();
}
+ /**
+ * Failures on JM:
+ *
+ * <ul>
+ * <li>all checkpoints - go against failure counter.
+ * <li>any savepoints - don’t do anything, manual action, the failover
will not help anyway.
+ * </ul>
+ *
+ * <p>Failures on TM:
+ *
+ * <ul>
+ * <li>all checkpoints - go against failure counter (failover might help
and we want to notify
+ * users).
+ * <li>sync savepoints - we must always fail, otherwise we risk deadlock
when the job
+ * cancelation waiting for finishing savepoint which never happens.
+ * <li>non sync savepoints - go against failure counter (failover might
help solve the
+ * problem).
+ * </ul>
+ *
+ * @param pendingCheckpoint the failed checkpoint if it was initialized
already.
+ * @param checkpointProperties the checkpoint properties in order to
determinate which handle
+ * strategy can be used.
+ * @param exception the checkpoint exception.
+ * @param executionAttemptID the execution attempt id, as a safe guard.
+ */
+ public void handleCheckpointException(
+ @Nullable PendingCheckpoint pendingCheckpoint,
+ CheckpointProperties checkpointProperties,
+ CheckpointException exception,
+ @Nullable ExecutionAttemptID executionAttemptID) {
+ if (isJMFailure(pendingCheckpoint, exception, executionAttemptID)) {
+ handleJobLevelCheckpointException(
+ checkpointProperties,
+ exception,
+ pendingCheckpoint == null
+ ? UNKNOWN_CHECKPOINT_ID
+ : pendingCheckpoint.getCheckpointID());
+ } else {
+ handleTaskLevelCheckpointException(pendingCheckpoint, exception,
executionAttemptID);
+ }
+ }
+
+ /** Check if the exception occurs on the job manager side or not. */
+ private boolean isJMFailure(
+ @Nullable PendingCheckpoint pendingCheckpoint,
+ CheckpointException exception,
+ @Nullable ExecutionAttemptID executionAttemptID) {
+ return pendingCheckpoint == null || isJMThrowable(exception) ||
executionAttemptID == null;
Review comment:
I agree that CheckpointFailureReason#isPreFlight should be enough but it
was just extra protection.
I removed `pendingCheckpoint == null` because it indeed can rely on
isPreFlight method but `executionAttemptID == null` is more complicated so I
left it as you proposed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]