This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 33e3b75bb74dad5f75ad0bea788906e24c3df4e8 Author: fanrui <[email protected]> AuthorDate: Mon Feb 28 17:45:12 2022 +0800 [FLINK-26049][checkpoint] Moving checkpoint failure log and report failed checkpoint to CheckpointFailureManager --- .../runtime/checkpoint/CheckpointCoordinator.java | 27 +++--------- .../checkpoint/CheckpointFailureManager.java | 50 +++++++++++++++++++--- .../runtime/checkpoint/PendingCheckpoint.java | 14 ------ .../runtime/checkpoint/PendingCheckpointTest.java | 7 --- 4 files changed, 48 insertions(+), 50 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 4efff80..2efc034 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -79,7 +79,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.stream.Stream; @@ -177,9 +176,6 @@ public class CheckpointCoordinator { /** Actor that receives status updates from the execution graph this coordinator works for. */ private JobStatusListener jobStatusListener; - /** The number of consecutive failed trigger attempts. */ - private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0); - /** A handle to the current periodic trigger, to cancel it when necessary. */ private ScheduledFuture<?> currentPeriodicTrigger; @@ -886,7 +882,6 @@ public class CheckpointCoordinator { /** Trigger request is successful. NOTE, it must be invoked if trigger request is successful. */ private void onTriggerSuccess() { isTriggering = false; - numUnsuccessfulCheckpointsTriggers.set(0); executeQueuedRequest(); } @@ -935,25 +930,12 @@ public class CheckpointCoordinator { CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable); if (checkpoint != null && !checkpoint.isDisposed()) { - int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); - LOG.warn( - "Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)", - checkpoint.getCheckpointId(), - job, - numUnsuccessful, - throwable); - synchronized (lock) { abortPendingCheckpoint(checkpoint, cause); } } else { - LOG.info( - "Failed to trigger checkpoint for job {} because {}", - job, - throwable.getMessage()); - failureManager.handleCheckpointException( - checkpoint, checkpointProperties, cause, null); + checkpoint, checkpointProperties, cause, null, job, null, statsTracker); } } finally { isTriggering = false; @@ -1917,8 +1899,6 @@ public class CheckpointCoordinator { final CheckpointException reason = new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SUSPEND); abortPendingAndQueuedCheckpoints(reason); - - numUnsuccessfulCheckpointsTriggers.set(0); } } @@ -2098,7 +2078,10 @@ public class CheckpointCoordinator { pendingCheckpoint, pendingCheckpoint.getProps(), exception, - executionAttemptID); + executionAttemptID, + job, + getStatsCallback(pendingCheckpoint), + statsTracker); } finally { sendAbortedMessages( pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java index 8ccccc6..df429a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java @@ -17,10 +17,14 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.util.Set; @@ -34,6 +38,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** The checkpoint failure manager which centralized manage checkpoint failure processing logic. */ public class CheckpointFailureManager { + private static final Logger LOG = LoggerFactory.getLogger(CheckpointFailureManager.class); + public static final int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; public static final String EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE = "Exceeded checkpoint tolerable failure threshold."; @@ -80,25 +86,55 @@ public class CheckpointFailureManager { * strategy can be used. * @param exception the checkpoint exception. * @param executionAttemptID the execution attempt id, as a safe guard. + * @param job the JobID. + * @param pendingCheckpointStats the pending checkpoint statistics. + * @param statsTracker the tracker for checkpoint statistics. */ public void handleCheckpointException( @Nullable PendingCheckpoint pendingCheckpoint, CheckpointProperties checkpointProperties, CheckpointException exception, - @Nullable ExecutionAttemptID executionAttemptID) { + @Nullable ExecutionAttemptID executionAttemptID, + JobID job, + @Nullable PendingCheckpointStats pendingCheckpointStats, + CheckpointStatsTracker statsTracker) { + long checkpointId = + pendingCheckpoint == null + ? UNKNOWN_CHECKPOINT_ID + : pendingCheckpoint.getCheckpointID(); + updateStatsAfterCheckpointFailed(pendingCheckpointStats, statsTracker, exception); + + LOG.warn( + "Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)", + checkpointId == UNKNOWN_CHECKPOINT_ID ? "UNKNOWN_CHECKPOINT_ID" : checkpointId, + job, + continuousFailureCounter.get(), + exception); if (isJobManagerFailure(exception, executionAttemptID)) { - handleJobLevelCheckpointException( - checkpointProperties, - exception, - pendingCheckpoint == null - ? UNKNOWN_CHECKPOINT_ID - : pendingCheckpoint.getCheckpointID()); + handleJobLevelCheckpointException(checkpointProperties, exception, checkpointId); } else { handleTaskLevelCheckpointException( checkNotNull(pendingCheckpoint), exception, checkNotNull(executionAttemptID)); } } + /** + * Updating checkpoint statistics after checkpoint failed. + * + * @param pendingCheckpointStats the pending checkpoint statistics. + * @param exception the checkpoint exception. + */ + private void updateStatsAfterCheckpointFailed( + @Nullable PendingCheckpointStats pendingCheckpointStats, + CheckpointStatsTracker statsTracker, + CheckpointException exception) { + if (pendingCheckpointStats != null) { + long failureTimestamp = System.currentTimeMillis(); + statsTracker.reportFailedCheckpoint( + pendingCheckpointStats.toFailedCheckpoint(failureTimestamp, exception)); + } + } + private boolean isJobManagerFailure( CheckpointException exception, @Nullable ExecutionAttemptID executionAttemptID) { // TODO: Try to get rid of checking nullability of executionAttemptID because false value of diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 9229024..b4bd8ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -544,7 +544,6 @@ public class PendingCheckpoint implements Checkpoint { try { failureCause = new CheckpointException(reason, cause); onCompletionPromise.completeExceptionally(failureCause); - reportFailedCheckpoint(statsTracker, failureCause); assertAbortSubsumedForced(reason); } finally { dispose(true, checkpointsCleaner, postCleanup, executor); @@ -596,19 +595,6 @@ public class PendingCheckpoint implements Checkpoint { } } - /** - * Reports a failed checkpoint with the given optional cause. - * - * @param cause The failure cause or <code>null</code>. - */ - private void reportFailedCheckpoint(CheckpointStatsTracker statsTracker, Exception cause) { - // to prevent null-pointers from concurrent modification, copy reference onto stack - if (pendingCheckpointStats != null) { - statsTracker.reportFailedCheckpoint( - pendingCheckpointStats.toFailedCheckpoint(System.currentTimeMillis(), cause)); - } - } - // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index a0b07b0..392bd0e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -661,13 +661,6 @@ public class PendingCheckpointTest { } private void abort(PendingCheckpoint checkpoint, CheckpointFailureReason reason) { - abort(checkpoint, reason, null); - } - - private void abort( - PendingCheckpoint checkpoint, - CheckpointFailureReason reason, - PendingCheckpointStats statsCallback) { checkpoint.abort( reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null); }
