This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 27f1dfdf72da19ea8cb20a353aed40323fc83c41 Author: fanrui <[email protected]> AuthorDate: Mon Feb 28 17:45:12 2022 +0800 [FLINK-26049][checkpoint] Moving checkpoint failure log and report failed checkpoint to CheckpointFailureManager --- .../runtime/checkpoint/CheckpointCoordinator.java | 29 ++----- .../checkpoint/CheckpointFailureManager.java | 46 +++++++++-- .../runtime/checkpoint/PendingCheckpoint.java | 17 +---- .../checkpoint/CheckpointFailureManagerTest.java | 26 +++++++ .../runtime/checkpoint/PendingCheckpointTest.java | 89 ++++------------------ 5 files changed, 84 insertions(+), 123 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 384e047..3d79824 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -80,7 +80,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.stream.Stream; @@ -177,9 +176,6 @@ public class CheckpointCoordinator { /** Actor that receives status updates from the execution graph this coordinator works for. */ private JobStatusListener jobStatusListener; - /** The number of consecutive failed trigger attempts. */ - private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0); - /** A handle to the current periodic trigger, to cancel it when necessary. */ private ScheduledFuture<?> currentPeriodicTrigger; @@ -862,7 +858,6 @@ public class CheckpointCoordinator { /** Trigger request is successful. NOTE, it must be invoked if trigger request is successful. */ private void onTriggerSuccess() { isTriggering = false; - numUnsuccessfulCheckpointsTriggers.set(0); executeQueuedRequest(); } @@ -911,25 +906,12 @@ public class CheckpointCoordinator { CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable); if (checkpoint != null && !checkpoint.isDisposed()) { - int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet(); - LOG.warn( - "Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)", - checkpoint.getCheckpointId(), - job, - numUnsuccessful, - throwable); - synchronized (lock) { abortPendingCheckpoint(checkpoint, cause); } } else { - LOG.info( - "Failed to trigger checkpoint for job {} because {}", - job, - throwable.getMessage()); - failureManager.handleCheckpointException( - checkpoint, checkpointProperties, cause, null); + checkpoint, checkpointProperties, cause, null, job, null); } } finally { isTriggering = false; @@ -1778,8 +1760,6 @@ public class CheckpointCoordinator { final CheckpointException reason = new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SUSPEND); abortPendingAndQueuedCheckpoints(reason); - - numUnsuccessfulCheckpointsTriggers.set(0); } } @@ -1954,14 +1934,15 @@ public class CheckpointCoordinator { exception.getCause(), checkpointsCleaner, this::scheduleTriggerRequest, - executor, - getStatsCallback(pendingCheckpoint)); + executor); failureManager.handleCheckpointException( pendingCheckpoint, pendingCheckpoint.getProps(), exception, - executionAttemptID); + executionAttemptID, + job, + getStatsCallback(pendingCheckpoint)); } finally { sendAbortedMessages( pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java index 8ccccc6..7b30034 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java @@ -17,10 +17,14 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.util.Set; @@ -34,6 +38,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** The checkpoint failure manager which centralized manage checkpoint failure processing logic. */ public class CheckpointFailureManager { + private static final Logger LOG = LoggerFactory.getLogger(CheckpointFailureManager.class); + public static final int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; public static final String EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE = "Exceeded checkpoint tolerable failure threshold."; @@ -80,25 +86,51 @@ public class CheckpointFailureManager { * strategy can be used. * @param exception the checkpoint exception. * @param executionAttemptID the execution attempt id, as a safe guard. + * @param job the JobID. + * @param pendingCheckpointStats the pending checkpoint statistics. */ public void handleCheckpointException( @Nullable PendingCheckpoint pendingCheckpoint, CheckpointProperties checkpointProperties, CheckpointException exception, - @Nullable ExecutionAttemptID executionAttemptID) { + @Nullable ExecutionAttemptID executionAttemptID, + JobID job, + @Nullable PendingCheckpointStats pendingCheckpointStats) { + long checkpointId = + pendingCheckpoint == null + ? UNKNOWN_CHECKPOINT_ID + : pendingCheckpoint.getCheckpointID(); + updateStatsAfterCheckpointFailed(pendingCheckpointStats, exception); + + LOG.warn( + "Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)", + checkpointId == UNKNOWN_CHECKPOINT_ID ? "UNKNOWN_CHECKPOINT_ID" : checkpointId, + job, + continuousFailureCounter.get(), + exception); if (isJobManagerFailure(exception, executionAttemptID)) { - handleJobLevelCheckpointException( - checkpointProperties, - exception, - pendingCheckpoint == null - ? UNKNOWN_CHECKPOINT_ID - : pendingCheckpoint.getCheckpointID()); + handleJobLevelCheckpointException(checkpointProperties, exception, checkpointId); } else { handleTaskLevelCheckpointException( checkNotNull(pendingCheckpoint), exception, checkNotNull(executionAttemptID)); } } + /** + * Updating checkpoint statistics after checkpoint failed. + * + * @param pendingCheckpointStats the pending checkpoint statistics. + * @param exception the checkpoint exception. + */ + private void updateStatsAfterCheckpointFailed( + @Nullable PendingCheckpointStats pendingCheckpointStats, + CheckpointException exception) { + if (pendingCheckpointStats != null) { + long failureTimestamp = System.currentTimeMillis(); + pendingCheckpointStats.reportFailedCheckpoint(failureTimestamp, exception); + } + } + private boolean isJobManagerFailure( CheckpointException exception, @Nullable ExecutionAttemptID executionAttemptID) { // TODO: Try to get rid of checking nullability of executionAttemptID because false value of diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 47e7a32..94ae442 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -542,12 +542,10 @@ public class PendingCheckpoint implements Checkpoint { @Nullable Throwable cause, CheckpointsCleaner checkpointsCleaner, Runnable postCleanup, - Executor executor, - PendingCheckpointStats statsCallback) { + Executor executor) { try { failureCause = new CheckpointException(reason, cause); onCompletionPromise.completeExceptionally(failureCause); - reportFailedCheckpoint(failureCause, statsCallback); assertAbortSubsumedForced(reason); } finally { dispose(true, checkpointsCleaner, postCleanup, executor); @@ -627,19 +625,6 @@ public class PendingCheckpoint implements Checkpoint { } } - /** - * Reports a failed checkpoint with the given optional cause. - * - * @param cause The failure cause or <code>null</code>. - */ - private void reportFailedCheckpoint(Exception cause, PendingCheckpointStats statsCallback) { - // to prevent null-pointers from concurrent modification, copy reference onto stack - if (statsCallback != null) { - long failureTimestamp = System.currentTimeMillis(); - statsCallback.reportFailedCheckpoint(failureTimestamp, cause); - } - } - // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java index 8ed9b03..e027ed9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java @@ -17,6 +17,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.util.TestLogger; @@ -29,6 +30,11 @@ import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKP import static org.apache.flink.runtime.checkpoint.CheckpointProperties.forCheckpoint; import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** Tests for the checkpoint failure manager. */ public class CheckpointFailureManagerTest extends TestLogger { @@ -153,6 +159,26 @@ public class CheckpointFailureManagerTest extends TestLogger { assertEquals(0, callback.getInvokeCounter()); } + /** Tests that the stats callbacks happen if the callback is registered. */ + @Test + public void testPendingCheckpointStatsCallbacks() throws Exception { + CheckpointProperties checkpointProperties = forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION); + CheckpointFailureManager failureManager = + new CheckpointFailureManager(2, new TestFailJobCallback()); + + PendingCheckpoint pendingCheckpoint = mock(PendingCheckpoint.class); + PendingCheckpointStats callback = mock(PendingCheckpointStats.class); + + failureManager.handleCheckpointException( + pendingCheckpoint, + checkpointProperties, + new CheckpointException(CheckpointFailureReason.CHECKPOINT_SUBSUMED, null), + null, + new JobID(), + callback); + verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class)); + } + /** A failure handler callback for testing. */ private static class TestFailJobCallback implements CheckpointFailureManager.FailJobCallback { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 3174e2b..e7d3872 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -73,7 +73,6 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -271,70 +270,20 @@ public class PendingCheckpointTest { /** Tests that the stats callbacks happen if the callback is registered. */ @Test public void testPendingCheckpointStatsCallbacks() throws Exception { - { - // Complete successfully - PendingCheckpointStats callback = mock(PendingCheckpointStats.class); - PendingCheckpoint pending = - createPendingCheckpoint( - CheckpointProperties.forCheckpoint( - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); - - pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), callback); - verify(callback, times(1)) - .reportSubtaskStats(nullable(JobVertexID.class), any(SubtaskStateStats.class)); - - pending.finalizeCheckpoint( - new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), callback); - verify(callback, times(1)).reportCompletedCheckpoint(any(String.class)); - } - - { - // Fail subsumed - PendingCheckpointStats callback = mock(PendingCheckpointStats.class); - PendingCheckpoint pending = - createPendingCheckpoint( - CheckpointProperties.forCheckpoint( - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); - - abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback); - verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class)); - } - - { - // Fail subsumed - PendingCheckpointStats callback = mock(PendingCheckpointStats.class); - PendingCheckpoint pending = - createPendingCheckpoint( - CheckpointProperties.forCheckpoint( - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); - - abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED, callback); - verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class)); - } - - { - // Fail subsumed - PendingCheckpointStats callback = mock(PendingCheckpointStats.class); - PendingCheckpoint pending = - createPendingCheckpoint( - CheckpointProperties.forCheckpoint( - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); - - abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback); - verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class)); - } + // Complete successfully + PendingCheckpointStats callback = mock(PendingCheckpointStats.class); + PendingCheckpoint pending = + createPendingCheckpoint( + CheckpointProperties.forCheckpoint( + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); - { - // Fail subsumed - PendingCheckpointStats callback = mock(PendingCheckpointStats.class); - PendingCheckpoint pending = - createPendingCheckpoint( - CheckpointProperties.forCheckpoint( - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); + pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), callback); + verify(callback, times(1)) + .reportSubtaskStats(nullable(JobVertexID.class), any(SubtaskStateStats.class)); - abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED, callback); - verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class)); - } + pending.finalizeCheckpoint( + new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), callback); + verify(callback, times(1)).reportCompletedCheckpoint(any(String.class)); } /** @@ -711,20 +660,8 @@ public class PendingCheckpointTest { } private void abort(PendingCheckpoint checkpoint, CheckpointFailureReason reason) { - abort(checkpoint, reason, null); - } - - private void abort( - PendingCheckpoint checkpoint, - CheckpointFailureReason reason, - PendingCheckpointStats statsCallback) { checkpoint.abort( - reason, - null, - new CheckpointsCleaner(), - () -> {}, - Executors.directExecutor(), - statsCallback); + reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor()); } private static final class QueueExecutor implements Executor {
