pnowojski commented on a change in pull request #17331:
URL: https://github.com/apache/flink/pull/17331#discussion_r714192215



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -62,36 +115,45 @@ public CheckpointFailureManager(int 
tolerableCpFailureNumber, FailJobCallback fa
      *     the failure happens before the checkpoint id generation. In this 
case, it will be
      *     specified a negative latest generated checkpoint id as a special 
flag.
      */
-    public void handleJobLevelCheckpointException(
-            CheckpointException exception, long checkpointId) {
-        handleCheckpointException(exception, checkpointId, 
failureCallback::failJob);
+    void handleJobLevelCheckpointException(
+            CheckpointProperties checkpointProperties,
+            CheckpointException exception,
+            long checkpointId) {
+        if (!checkpointProperties.isSavepoint()) {
+            checkFailureAgainstCounter(exception, checkpointId, 
failureCallback::failJob);
+        }
     }
 
     /**
      * Handle task level checkpoint exception with a handler callback.
      *
      * @param exception the checkpoint exception.
-     * @param checkpointId the failed checkpoint id used to count the 
continuous failure number
+     * @param pendingCheckpoint the failed checkpoint used to count the 
continuous failure number
      *     based on checkpoint id sequence. In trigger phase, we may not get 
the checkpoint id when
      *     the failure happens before the checkpoint id generation. In this 
case, it will be
      *     specified a negative latest generated checkpoint id as a special 
flag.
      * @param executionAttemptID the execution attempt id, as a safe guard.

Review comment:
       nit: javadoc and parameters are out of order

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
##########
@@ -17,105 +17,137 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.io.IOException;
+
 import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_EXPIRED;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointProperties.forCheckpoint;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
 import static org.junit.Assert.assertEquals;
 
 /** Tests for the checkpoint failure manager. */
 public class CheckpointFailureManagerTest extends TestLogger {
 
     @Test
-    public void testIgnoresPastCheckpoints() {
+    public void testIgnoresPastCheckpoints() throws IOException, JobException {
         TestFailJobCallback callback = new TestFailJobCallback();
         CheckpointFailureManager failureManager = new 
CheckpointFailureManager(2, callback);
+        CheckpointProperties checkpointProp = 
forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);

Review comment:
       `checkpointProperties` (and in other places)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -53,6 +56,56 @@ public CheckpointFailureManager(int 
tolerableCpFailureNumber, FailJobCallback fa
         this.countedCheckpointIds = ConcurrentHashMap.newKeySet();
     }
 
+    /**
+     * Failures on JM:
+     *
+     * <ul>
+     *   <li>all checkpoints - go against failure counter.
+     *   <li>any savepoints - don’t do anything, manual action, the failover 
will not help anyway.
+     * </ul>
+     *
+     * <p>Failures on TM:
+     *
+     * <ul>
+     *   <li>all checkpoints - go against failure counter (failover might help 
and we want to notify
+     *       users).
+     *   <li>sync savepoints - we must always fail, otherwise we risk deadlock 
when the job
+     *       cancelation waiting for finishing savepoint which never happens.
+     *   <li>non sync savepoints - go against failure counter (failover might 
help solve the
+     *       problem).
+     * </ul>
+     *
+     * @param pendingCheckpoint the failed checkpoint if it was initialized 
already.
+     * @param checkpointProperties the checkpoint properties in order to 
determinate which handle
+     *     strategy can be used.
+     * @param exception the checkpoint exception.
+     * @param executionAttemptID the execution attempt id, as a safe guard.
+     */
+    public void handleCheckpointException(
+            @Nullable PendingCheckpoint pendingCheckpoint,
+            CheckpointProperties checkpointProperties,
+            CheckpointException exception,
+            @Nullable ExecutionAttemptID executionAttemptID) {
+        if (isJMFailure(pendingCheckpoint, exception, executionAttemptID)) {
+            handleJobLevelCheckpointException(
+                    checkpointProperties,
+                    exception,
+                    pendingCheckpoint == null
+                            ? UNKNOWN_CHECKPOINT_ID
+                            : pendingCheckpoint.getCheckpointID());
+        } else {
+            handleTaskLevelCheckpointException(pendingCheckpoint, exception, 
executionAttemptID);
+        }
+    }
+
+    /** Check if the exception occurs on the job manager side or not. */
+    private boolean isJMFailure(

Review comment:
       nit: please no abbreviations, and probably the java doc is redundant? 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -170,22 +233,7 @@ private void clearCount() {
         countedCheckpointIds.clear();
     }
 
-    /**
-     * Fails the whole job graph in case an in-progress synchronous savepoint 
is discarded.
-     *
-     * <p>If the checkpoint was cancelled at the checkpoint coordinator, i.e. 
before the synchronous
-     * savepoint barrier was sent to the tasks, then we do not cancel the job 
as we do not risk
-     * having a deadlock.
-     *
-     * @param cause The reason why the job is cancelled.
-     */
-    void handleSynchronousSavepointFailure(final Throwable cause) {
-        if (!isPreFlightFailure(cause)) {
-            failureCallback.failJob(cause);
-        }
-    }
-
-    private static boolean isPreFlightFailure(final Throwable cause) {
+    private static boolean isJMThrowable(final Throwable cause) {

Review comment:
       `isPreFlightFailure` in this case I think makes more sense for the sake 
of the consistency with `CheckpointFailureReason#isPreFlight`. At least as long 
as we want to keep `CheckpointFailureReason#isPreFlight` name, as I think it 
might be a good idea to rename it to `isJobManagerFailure()`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -53,6 +56,56 @@ public CheckpointFailureManager(int 
tolerableCpFailureNumber, FailJobCallback fa
         this.countedCheckpointIds = ConcurrentHashMap.newKeySet();
     }
 
+    /**
+     * Failures on JM:
+     *
+     * <ul>
+     *   <li>all checkpoints - go against failure counter.
+     *   <li>any savepoints - don’t do anything, manual action, the failover 
will not help anyway.
+     * </ul>
+     *
+     * <p>Failures on TM:
+     *
+     * <ul>
+     *   <li>all checkpoints - go against failure counter (failover might help 
and we want to notify
+     *       users).
+     *   <li>sync savepoints - we must always fail, otherwise we risk deadlock 
when the job
+     *       cancelation waiting for finishing savepoint which never happens.
+     *   <li>non sync savepoints - go against failure counter (failover might 
help solve the
+     *       problem).
+     * </ul>
+     *
+     * @param pendingCheckpoint the failed checkpoint if it was initialized 
already.
+     * @param checkpointProperties the checkpoint properties in order to 
determinate which handle
+     *     strategy can be used.
+     * @param exception the checkpoint exception.
+     * @param executionAttemptID the execution attempt id, as a safe guard.
+     */
+    public void handleCheckpointException(
+            @Nullable PendingCheckpoint pendingCheckpoint,
+            CheckpointProperties checkpointProperties,
+            CheckpointException exception,
+            @Nullable ExecutionAttemptID executionAttemptID) {
+        if (isJMFailure(pendingCheckpoint, exception, executionAttemptID)) {
+            handleJobLevelCheckpointException(
+                    checkpointProperties,
+                    exception,
+                    pendingCheckpoint == null
+                            ? UNKNOWN_CHECKPOINT_ID
+                            : pendingCheckpoint.getCheckpointID());
+        } else {
+            handleTaskLevelCheckpointException(pendingCheckpoint, exception, 
executionAttemptID);
+        }
+    }
+
+    /** Check if the exception occurs on the job manager side or not. */
+    private boolean isJMFailure(
+            @Nullable PendingCheckpoint pendingCheckpoint,
+            CheckpointException exception,
+            @Nullable ExecutionAttemptID executionAttemptID) {
+        return pendingCheckpoint == null || isJMThrowable(exception) || 
executionAttemptID == null;

Review comment:
       Hmmm, since we have `CheckpointFailureReason#isPreFlight`, that begs a 
question, why do we even need the other conditions? 
   
   About `pendingCheckpoint == null` (that's the new feature), I would actually 
try to get rid of it and make sure that `CheckpointFailureReason#isPreFlight` 
returns true for every case when we could expect `pendingCheckpoint == null` 
(go through all of the paths leading to it)? I'm not sure how big of an effort 
that would be.
   
   About `executionAttemptID == null;` I would do the same, but that's more 
risky, as it can cause a regression very late in the release process, so I 
would keep it as it is, with a `TODO` for the future. And maybe immediately 
after fixing this bug follow up on it, and try to get rid of it as well?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -53,6 +56,56 @@ public CheckpointFailureManager(int 
tolerableCpFailureNumber, FailJobCallback fa
         this.countedCheckpointIds = ConcurrentHashMap.newKeySet();
     }
 
+    /**
+     * Failures on JM:
+     *
+     * <ul>
+     *   <li>all checkpoints - go against failure counter.
+     *   <li>any savepoints - don’t do anything, manual action, the failover 
will not help anyway.
+     * </ul>
+     *
+     * <p>Failures on TM:
+     *
+     * <ul>
+     *   <li>all checkpoints - go against failure counter (failover might help 
and we want to notify
+     *       users).
+     *   <li>sync savepoints - we must always fail, otherwise we risk deadlock 
when the job
+     *       cancelation waiting for finishing savepoint which never happens.
+     *   <li>non sync savepoints - go against failure counter (failover might 
help solve the
+     *       problem).
+     * </ul>
+     *
+     * @param pendingCheckpoint the failed checkpoint if it was initialized 
already.
+     * @param checkpointProperties the checkpoint properties in order to 
determinate which handle
+     *     strategy can be used.
+     * @param exception the checkpoint exception.
+     * @param executionAttemptID the execution attempt id, as a safe guard.
+     */
+    public void handleCheckpointException(
+            @Nullable PendingCheckpoint pendingCheckpoint,
+            CheckpointProperties checkpointProperties,
+            CheckpointException exception,
+            @Nullable ExecutionAttemptID executionAttemptID) {
+        if (isJMFailure(pendingCheckpoint, exception, executionAttemptID)) {
+            handleJobLevelCheckpointException(
+                    checkpointProperties,
+                    exception,
+                    pendingCheckpoint == null
+                            ? UNKNOWN_CHECKPOINT_ID
+                            : pendingCheckpoint.getCheckpointID());
+        } else {
+            handleTaskLevelCheckpointException(pendingCheckpoint, exception, 
executionAttemptID);

Review comment:
       ```
   handleTaskLevelCheckpointException(checkNotNull(pendingCheckpoint), 
exception, checkNotNull(executionAttemptID));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to