pnowojski commented on a change in pull request #17331:
URL: https://github.com/apache/flink/pull/17331#discussion_r713977586



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -53,6 +56,49 @@ public CheckpointFailureManager(int 
tolerableCpFailureNumber, FailJobCallback fa
         this.countedCheckpointIds = ConcurrentHashMap.newKeySet();
     }
 
+    /**
+     * Failures on JM:
+     *
+     * <ul>
+     *   <li>all checkpoints - go against failure counter.
+     *   <li>any savepoints - don’t do anything, manual action, the failover 
will not help anyway.
+     * </ul>
+     *
+     * <p>Failures on TM:
+     *
+     * <ul>
+     *   <li>all checkpoints - go against failure counter (failover might help 
and we want to notify
+     *       users).
+     *   <li>sync savepoints - we must always fail, otherwise we risk deadlock 
when the job
+     *       cancelation waiting for finishing savepoint which never happens.
+     *   <li>non sync savepoints - go against failure counter (failover might 
help solve the
+     *       problem).
+     * </ul>
+     *
+     * @param pendingCheckpoint the failed checkpoint if it was initialized 
already.
+     * @param checkpointProperties the checkpoint properties in order to 
determinate which handle
+     *     strategy can be used.
+     * @param exception the checkpoint exception.
+     * @param executionAttemptID the execution attempt id, as a safe guard.
+     */
+    public void handleCheckpointException(
+            @Nullable PendingCheckpoint pendingCheckpoint,
+            CheckpointProperties checkpointProperties,
+            CheckpointException exception,
+            @Nullable ExecutionAttemptID executionAttemptID) {
+        if (checkpointProperties.isSavepoint() && 
checkpointProperties.isSynchronous()) {
+            handleSynchronousSavepointFailure(exception);
+        } else if (executionAttemptID != null && pendingCheckpoint != null) {
+            handleTaskLevelCheckpointException(
+                    exception, pendingCheckpoint.getCheckpointId(), 
executionAttemptID);
+        } else if (pendingCheckpoint != null) {
+            handleJobLevelCheckpointException(exception, 
pendingCheckpoint.getCheckpointId());
+        } else if (!checkpointProperties.isSavepoint()) {
+            handleJobLevelCheckpointException(exception);
+        }

Review comment:
       I think it's quite difficult to follow the logic/conditions here. Can we 
maybe rewrite it to mimic the javadoc logic? Split the logic TM/JM failure 
first, next whether it is checkpoint/savepoint/sync savepoint. And we could 
also rename the methods called from here, to things like:
   ```
   checkFailureAgainstCounter();
   failJob();
   ```
   ?
   
   For example as it is,  I was quite confused where do we handle
   > <li>any savepoints - don’t do anything, manual action, the failover will 
not help anyway.
   
   and it's actually not in this method, but one level deeper.
   
   If this method grows too large, we could always split it:
   
   ```
   if (isTMFailure()) {
     handleTaskLevelCheckpointException(...);
   }
   else {
     handleJobLevelCheckpointException(...);
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to