This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 27f1dfdf72da19ea8cb20a353aed40323fc83c41
Author: fanrui <[email protected]>
AuthorDate: Mon Feb 28 17:45:12 2022 +0800

    [FLINK-26049][checkpoint] Moving checkpoint failure log and report failed 
checkpoint to CheckpointFailureManager
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 29 ++-----
 .../checkpoint/CheckpointFailureManager.java       | 46 +++++++++--
 .../runtime/checkpoint/PendingCheckpoint.java      | 17 +----
 .../checkpoint/CheckpointFailureManagerTest.java   | 26 +++++++
 .../runtime/checkpoint/PendingCheckpointTest.java  | 89 ++++------------------
 5 files changed, 84 insertions(+), 123 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 384e047..3d79824 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -80,7 +80,6 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 
@@ -177,9 +176,6 @@ public class CheckpointCoordinator {
     /** Actor that receives status updates from the execution graph this 
coordinator works for. */
     private JobStatusListener jobStatusListener;
 
-    /** The number of consecutive failed trigger attempts. */
-    private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new 
AtomicInteger(0);
-
     /** A handle to the current periodic trigger, to cancel it when necessary. 
*/
     private ScheduledFuture<?> currentPeriodicTrigger;
 
@@ -862,7 +858,6 @@ public class CheckpointCoordinator {
     /** Trigger request is successful. NOTE, it must be invoked if trigger 
request is successful. */
     private void onTriggerSuccess() {
         isTriggering = false;
-        numUnsuccessfulCheckpointsTriggers.set(0);
         executeQueuedRequest();
     }
 
@@ -911,25 +906,12 @@ public class CheckpointCoordinator {
                             
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable);
 
             if (checkpoint != null && !checkpoint.isDisposed()) {
-                int numUnsuccessful = 
numUnsuccessfulCheckpointsTriggers.incrementAndGet();
-                LOG.warn(
-                        "Failed to trigger checkpoint {} for job {}. ({} 
consecutive failed attempts so far)",
-                        checkpoint.getCheckpointId(),
-                        job,
-                        numUnsuccessful,
-                        throwable);
-
                 synchronized (lock) {
                     abortPendingCheckpoint(checkpoint, cause);
                 }
             } else {
-                LOG.info(
-                        "Failed to trigger checkpoint for job {} because {}",
-                        job,
-                        throwable.getMessage());
-
                 failureManager.handleCheckpointException(
-                        checkpoint, checkpointProperties, cause, null);
+                        checkpoint, checkpointProperties, cause, null, job, 
null);
             }
         } finally {
             isTriggering = false;
@@ -1778,8 +1760,6 @@ public class CheckpointCoordinator {
             final CheckpointException reason =
                     new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SUSPEND);
             abortPendingAndQueuedCheckpoints(reason);
-
-            numUnsuccessfulCheckpointsTriggers.set(0);
         }
     }
 
@@ -1954,14 +1934,15 @@ public class CheckpointCoordinator {
                         exception.getCause(),
                         checkpointsCleaner,
                         this::scheduleTriggerRequest,
-                        executor,
-                        getStatsCallback(pendingCheckpoint));
+                        executor);
 
                 failureManager.handleCheckpointException(
                         pendingCheckpoint,
                         pendingCheckpoint.getProps(),
                         exception,
-                        executionAttemptID);
+                        executionAttemptID,
+                        job,
+                        getStatsCallback(pendingCheckpoint));
             } finally {
                 sendAbortedMessages(
                         
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 8ccccc6..7b30034 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -17,10 +17,14 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.util.Set;
@@ -34,6 +38,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /** The checkpoint failure manager which centralized manage checkpoint failure 
processing logic. */
 public class CheckpointFailureManager {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointFailureManager.class);
+
     public static final int UNLIMITED_TOLERABLE_FAILURE_NUMBER = 
Integer.MAX_VALUE;
     public static final String EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE =
             "Exceeded checkpoint tolerable failure threshold.";
@@ -80,25 +86,51 @@ public class CheckpointFailureManager {
      *     strategy can be used.
      * @param exception the checkpoint exception.
      * @param executionAttemptID the execution attempt id, as a safe guard.
+     * @param job the JobID.
+     * @param pendingCheckpointStats the pending checkpoint statistics.
      */
     public void handleCheckpointException(
             @Nullable PendingCheckpoint pendingCheckpoint,
             CheckpointProperties checkpointProperties,
             CheckpointException exception,
-            @Nullable ExecutionAttemptID executionAttemptID) {
+            @Nullable ExecutionAttemptID executionAttemptID,
+            JobID job,
+            @Nullable PendingCheckpointStats pendingCheckpointStats) {
+        long checkpointId =
+                pendingCheckpoint == null
+                        ? UNKNOWN_CHECKPOINT_ID
+                        : pendingCheckpoint.getCheckpointID();
+        updateStatsAfterCheckpointFailed(pendingCheckpointStats, exception);
+
+        LOG.warn(
+                "Failed to trigger checkpoint {} for job {}. ({} consecutive 
failed attempts so far)",
+                checkpointId == UNKNOWN_CHECKPOINT_ID ? 
"UNKNOWN_CHECKPOINT_ID" : checkpointId,
+                job,
+                continuousFailureCounter.get(),
+                exception);
         if (isJobManagerFailure(exception, executionAttemptID)) {
-            handleJobLevelCheckpointException(
-                    checkpointProperties,
-                    exception,
-                    pendingCheckpoint == null
-                            ? UNKNOWN_CHECKPOINT_ID
-                            : pendingCheckpoint.getCheckpointID());
+            handleJobLevelCheckpointException(checkpointProperties, exception, 
checkpointId);
         } else {
             handleTaskLevelCheckpointException(
                     checkNotNull(pendingCheckpoint), exception, 
checkNotNull(executionAttemptID));
         }
     }
 
+    /**
+     * Updating checkpoint statistics after checkpoint failed.
+     *
+     * @param pendingCheckpointStats the pending checkpoint statistics.
+     * @param exception the checkpoint exception.
+     */
+    private void updateStatsAfterCheckpointFailed(
+            @Nullable PendingCheckpointStats pendingCheckpointStats,
+            CheckpointException exception) {
+        if (pendingCheckpointStats != null) {
+            long failureTimestamp = System.currentTimeMillis();
+            pendingCheckpointStats.reportFailedCheckpoint(failureTimestamp, 
exception);
+        }
+    }
+
     private boolean isJobManagerFailure(
             CheckpointException exception, @Nullable ExecutionAttemptID 
executionAttemptID) {
         // TODO: Try to get rid of checking nullability of executionAttemptID 
because false value of
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 47e7a32..94ae442 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -542,12 +542,10 @@ public class PendingCheckpoint implements Checkpoint {
             @Nullable Throwable cause,
             CheckpointsCleaner checkpointsCleaner,
             Runnable postCleanup,
-            Executor executor,
-            PendingCheckpointStats statsCallback) {
+            Executor executor) {
         try {
             failureCause = new CheckpointException(reason, cause);
             onCompletionPromise.completeExceptionally(failureCause);
-            reportFailedCheckpoint(failureCause, statsCallback);
             assertAbortSubsumedForced(reason);
         } finally {
             dispose(true, checkpointsCleaner, postCleanup, executor);
@@ -627,19 +625,6 @@ public class PendingCheckpoint implements Checkpoint {
         }
     }
 
-    /**
-     * Reports a failed checkpoint with the given optional cause.
-     *
-     * @param cause The failure cause or <code>null</code>.
-     */
-    private void reportFailedCheckpoint(Exception cause, 
PendingCheckpointStats statsCallback) {
-        // to prevent null-pointers from concurrent modification, copy 
reference onto stack
-        if (statsCallback != null) {
-            long failureTimestamp = System.currentTimeMillis();
-            statsCallback.reportFailedCheckpoint(failureTimestamp, cause);
-        }
-    }
-
     // ------------------------------------------------------------------------
     //  Utilities
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
index 8ed9b03..e027ed9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.util.TestLogger;
@@ -29,6 +30,11 @@ import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKP
 import static 
org.apache.flink.runtime.checkpoint.CheckpointProperties.forCheckpoint;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /** Tests for the checkpoint failure manager. */
 public class CheckpointFailureManagerTest extends TestLogger {
@@ -153,6 +159,26 @@ public class CheckpointFailureManagerTest extends 
TestLogger {
         assertEquals(0, callback.getInvokeCounter());
     }
 
+    /** Tests that the stats callbacks happen if the callback is registered. */
+    @Test
+    public void testPendingCheckpointStatsCallbacks() throws Exception {
+        CheckpointProperties checkpointProperties = 
forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION);
+        CheckpointFailureManager failureManager =
+                new CheckpointFailureManager(2, new TestFailJobCallback());
+
+        PendingCheckpoint pendingCheckpoint = mock(PendingCheckpoint.class);
+        PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
+
+        failureManager.handleCheckpointException(
+                pendingCheckpoint,
+                checkpointProperties,
+                new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_SUBSUMED, null),
+                null,
+                new JobID(),
+                callback);
+        verify(callback, times(1)).reportFailedCheckpoint(anyLong(), 
any(Exception.class));
+    }
+
     /** A failure handler callback for testing. */
     private static class TestFailJobCallback implements 
CheckpointFailureManager.FailJobCallback {
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 3174e2b..e7d3872 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -73,7 +73,6 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
@@ -271,70 +270,20 @@ public class PendingCheckpointTest {
     /** Tests that the stats callbacks happen if the callback is registered. */
     @Test
     public void testPendingCheckpointStatsCallbacks() throws Exception {
-        {
-            // Complete successfully
-            PendingCheckpointStats callback = 
mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
-            pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), 
callback);
-            verify(callback, times(1))
-                    .reportSubtaskStats(nullable(JobVertexID.class), 
any(SubtaskStateStats.class));
-
-            pending.finalizeCheckpoint(
-                    new CheckpointsCleaner(), () -> {}, 
Executors.directExecutor(), callback);
-            verify(callback, 
times(1)).reportCompletedCheckpoint(any(String.class));
-        }
-
-        {
-            // Fail subsumed
-            PendingCheckpointStats callback = 
mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
-            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, 
callback);
-            verify(callback, times(1)).reportFailedCheckpoint(anyLong(), 
any(Exception.class));
-        }
-
-        {
-            // Fail subsumed
-            PendingCheckpointStats callback = 
mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
-            abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED, 
callback);
-            verify(callback, times(1)).reportFailedCheckpoint(anyLong(), 
any(Exception.class));
-        }
-
-        {
-            // Fail subsumed
-            PendingCheckpointStats callback = 
mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
-
-            abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, 
callback);
-            verify(callback, times(1)).reportFailedCheckpoint(anyLong(), 
any(Exception.class));
-        }
+        // Complete successfully
+        PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
+        PendingCheckpoint pending =
+                createPendingCheckpoint(
+                        CheckpointProperties.forCheckpoint(
+                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
 
-        {
-            // Fail subsumed
-            PendingCheckpointStats callback = 
mock(PendingCheckpointStats.class);
-            PendingCheckpoint pending =
-                    createPendingCheckpoint(
-                            CheckpointProperties.forCheckpoint(
-                                    
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
+        pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), 
callback);
+        verify(callback, times(1))
+                .reportSubtaskStats(nullable(JobVertexID.class), 
any(SubtaskStateStats.class));
 
-            abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED, 
callback);
-            verify(callback, times(1)).reportFailedCheckpoint(anyLong(), 
any(Exception.class));
-        }
+        pending.finalizeCheckpoint(
+                new CheckpointsCleaner(), () -> {}, 
Executors.directExecutor(), callback);
+        verify(callback, 
times(1)).reportCompletedCheckpoint(any(String.class));
     }
 
     /**
@@ -711,20 +660,8 @@ public class PendingCheckpointTest {
     }
 
     private void abort(PendingCheckpoint checkpoint, CheckpointFailureReason 
reason) {
-        abort(checkpoint, reason, null);
-    }
-
-    private void abort(
-            PendingCheckpoint checkpoint,
-            CheckpointFailureReason reason,
-            PendingCheckpointStats statsCallback) {
         checkpoint.abort(
-                reason,
-                null,
-                new CheckpointsCleaner(),
-                () -> {},
-                Executors.directExecutor(),
-                statsCallback);
+                reason, null, new CheckpointsCleaner(), () -> {}, 
Executors.directExecutor());
     }
 
     private static final class QueueExecutor implements Executor {

Reply via email to