This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 602261648db [FLINK-32347][checkpoint] Exceptions from the CompletedCheckpointStore are not registered by the CheckpointFailureManager. (#22793) 602261648db is described below commit 602261648dbc387c51ae113139486b2d1f0935e2 Author: Stefan Richter <srich...@apache.org> AuthorDate: Mon Jun 26 19:22:10 2023 +0200 [FLINK-32347][checkpoint] Exceptions from the CompletedCheckpointStore are not registered by the CheckpointFailureManager. (#22793) Currently if an error occurs while saving a completed checkpoint in the CompletedCheckpointStore, CheckpointCoordinator doesn't call CheckpointFailureManager to handle the error. Such behavior leads to the fact, that errors from CompletedCheckpointStore don't increase the failed checkpoints count and 'execution.checkpointing.tolerable-failed-checkpoints' option does not limit the number of errors of this kind in any way. --- .../runtime/checkpoint/CheckpointCoordinator.java | 33 ++++---- .../checkpoint/CheckpointCoordinatorTest.java | 91 ++++++++++++++++++++++ 2 files changed, 110 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 585a35ffb1c..41902b4af39 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1330,6 +1330,7 @@ public class CheckpointCoordinator { } private void reportCompletedCheckpoint(CompletedCheckpoint completedCheckpoint) { + failureManager.handleCheckpointSuccess(completedCheckpoint.getCheckpointID()); CompletedCheckpointStats completedCheckpointStats = completedCheckpoint.getStatistic(); if (completedCheckpointStats != null) { LOG.trace( @@ -1403,7 +1404,6 @@ public class CheckpointCoordinator { pendingCheckpoint.finalizeCheckpoint( checkpointsCleaner, this::scheduleTriggerRequest, executor); - failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID()); return completedCheckpoint; } catch (Exception e1) { // abort the current pending checkpoint if we fails to finalize the pending @@ -1467,23 +1467,28 @@ public class CheckpointCoordinator { checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, executor); } - reportFailedCheckpoint(checkpointId, exception); + final CheckpointException checkpointException = + new CheckpointException( + "Could not complete the pending checkpoint " + checkpointId + '.', + CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, + exception); + reportFailedCheckpoint(pendingCheckpoint, checkpointException); sendAbortedMessages(tasksToAbort, checkpointId, completedCheckpoint.getTimestamp()); - throw new CheckpointException( - "Could not complete the pending checkpoint " + checkpointId + '.', - CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, - exception); + throw checkpointException; } } - private void reportFailedCheckpoint(long checkpointId, Exception exception) { - PendingCheckpointStats pendingCheckpointStats = - statsTracker.getPendingCheckpointStats(checkpointId); - if (pendingCheckpointStats != null) { - statsTracker.reportFailedCheckpoint( - pendingCheckpointStats.toFailedCheckpoint( - System.currentTimeMillis(), exception)); - } + private void reportFailedCheckpoint( + PendingCheckpoint pendingCheckpoint, CheckpointException exception) { + + failureManager.handleCheckpointException( + pendingCheckpoint, + pendingCheckpoint.getProps(), + exception, + null, + job, + getStatsCallback(pendingCheckpoint), + statsTracker); } void scheduleTriggerRequest() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 660671dfa8f..93d565644cb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -58,6 +58,7 @@ import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.SharedStateRegistryImpl; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TestingStreamStateHandle; @@ -123,6 +124,7 @@ import static java.util.Collections.singletonMap; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_EXPIRED; +import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.IO_EXCEPTION; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN; import static org.apache.flink.runtime.checkpoint.CheckpointStoreUtil.INVALID_CHECKPOINT_ID; @@ -3167,6 +3169,95 @@ class CheckpointCoordinatorTest extends TestLogger { assertThat(statsTracker.getPendingCheckpointStats(1)).isNull(); } + /** + * This test checks that an exception that occurs while trying to store a {@link + * CompletedCheckpoint} in the {@link CompletedCheckpointStore} is properly reported to the + * {@link CheckpointFailureManager}, see FLINK-32347. + */ + @Test + void testExceptionInStoringCompletedCheckpointIsReportedToFailureManager() throws Exception { + JobVertexID jobVertexID = new JobVertexID(); + ExecutionGraph graph = + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(jobVertexID) + .setTransitToRunning(false) + .build(EXECUTOR_RESOURCE.getExecutor()); + + ExecutionVertex task = graph.getJobVertex(jobVertexID).getTaskVertices()[0]; + task.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING); + + CheckpointIDCounterWithOwner testingCounter = new CheckpointIDCounterWithOwner(); + TestFailJobCallback failureCallback = new TestFailJobCallback(); + + CheckpointStatsTracker statsTracker = + new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup()); + + final String exceptionMsg = "Test store exception."; + try (SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl()) { + // Prepare a store that fails when the coordinator stores a checkpoint. + TestingCompletedCheckpointStore testingCompletedCheckpointStore = + TestingCompletedCheckpointStore.builder() + .withGetSharedStateRegistrySupplier(() -> sharedStateRegistry) + .withAddCheckpointAndSubsumeOldestOneFunction( + (cp, cleaner, runnable) -> { + throw new RuntimeException(exceptionMsg); + }) + .build(); + + CheckpointCoordinator checkpointCoordinator = + new CheckpointCoordinatorBuilder() + .setCheckpointIDCounter(testingCounter) + .setFailureManager(new CheckpointFailureManager(0, failureCallback)) + .setTimer(manuallyTriggeredScheduledExecutor) + .setCheckpointStatsTracker(statsTracker) + .setCompletedCheckpointStore(testingCompletedCheckpointStore) + .build(graph); + testingCounter.setOwner(checkpointCoordinator); + + checkpointCoordinator.triggerCheckpoint(false); + manuallyTriggeredScheduledExecutor.triggerAll(); + + PendingCheckpoint pendingCheckpoint = + checkpointCoordinator + .getPendingCheckpoints() + .entrySet() + .iterator() + .next() + .getValue(); + + try { + checkpointCoordinator.receiveAcknowledgeMessage( + new AcknowledgeCheckpoint( + pendingCheckpoint.getJobId(), + task.getCurrentExecutionAttempt().getAttemptId(), + pendingCheckpoint.getCheckpointID(), + new CheckpointMetrics(), + new TaskStateSnapshot()), + "localhost"); + + fail("Exception is expected here"); + } catch (CheckpointException cpex) { + assertThat(cpex.getCheckpointFailureReason()) + .isEqualTo(FINALIZE_CHECKPOINT_FAILURE); + assertThat(cpex.getCause().getMessage()).isEqualTo(exceptionMsg); + } + + // then: Failure manager should fail the job. + assertThat(failureCallback.getInvokeCounter()).isOne(); + + // then: The NumberOfFailedCheckpoints and TotalNumberOfCheckpoints should be 1. + CheckpointStatsCounts counts = statsTracker.createSnapshot().getCounts(); + assertThat(counts.getNumberOfRestoredCheckpoints()).isZero(); + assertThat(counts.getTotalNumberOfCheckpoints()).isOne(); + assertThat(counts.getNumberOfInProgressCheckpoints()).isZero(); + assertThat(counts.getNumberOfCompletedCheckpoints()).isZero(); + assertThat(counts.getNumberOfFailedCheckpoints()).isOne(); + + // then: The PendingCheckpoint should already exist. + assertThat(statsTracker.getPendingCheckpointStats(1)).isNotNull(); + } + } + private void testTriggerCheckpoint( CheckpointCoordinator checkpointCoordinator, CheckpointFailureReason expectedFailureReason)