This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 602261648db [FLINK-32347][checkpoint] Exceptions from the 
CompletedCheckpointStore are not registered by the CheckpointFailureManager. 
(#22793)
602261648db is described below

commit 602261648dbc387c51ae113139486b2d1f0935e2
Author: Stefan Richter <srich...@apache.org>
AuthorDate: Mon Jun 26 19:22:10 2023 +0200

    [FLINK-32347][checkpoint] Exceptions from the CompletedCheckpointStore are 
not registered by the CheckpointFailureManager. (#22793)
    
    Currently if an error occurs while saving a completed checkpoint in the 
CompletedCheckpointStore, CheckpointCoordinator doesn't call 
CheckpointFailureManager to handle the error. Such behavior leads to the fact, 
that errors from CompletedCheckpointStore don't increase the failed checkpoints 
count and 'execution.checkpointing.tolerable-failed-checkpoints' option does 
not limit the number of errors of this kind in any way.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 33 ++++----
 .../checkpoint/CheckpointCoordinatorTest.java      | 91 ++++++++++++++++++++++
 2 files changed, 110 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 585a35ffb1c..41902b4af39 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1330,6 +1330,7 @@ public class CheckpointCoordinator {
     }
 
     private void reportCompletedCheckpoint(CompletedCheckpoint 
completedCheckpoint) {
+        
failureManager.handleCheckpointSuccess(completedCheckpoint.getCheckpointID());
         CompletedCheckpointStats completedCheckpointStats = 
completedCheckpoint.getStatistic();
         if (completedCheckpointStats != null) {
             LOG.trace(
@@ -1403,7 +1404,6 @@ public class CheckpointCoordinator {
                     pendingCheckpoint.finalizeCheckpoint(
                             checkpointsCleaner, this::scheduleTriggerRequest, 
executor);
 
-            
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID());
             return completedCheckpoint;
         } catch (Exception e1) {
             // abort the current pending checkpoint if we fails to finalize 
the pending
@@ -1467,23 +1467,28 @@ public class CheckpointCoordinator {
                 
checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, 
executor);
             }
 
-            reportFailedCheckpoint(checkpointId, exception);
+            final CheckpointException checkpointException =
+                    new CheckpointException(
+                            "Could not complete the pending checkpoint " + 
checkpointId + '.',
+                            
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
+                            exception);
+            reportFailedCheckpoint(pendingCheckpoint, checkpointException);
             sendAbortedMessages(tasksToAbort, checkpointId, 
completedCheckpoint.getTimestamp());
-            throw new CheckpointException(
-                    "Could not complete the pending checkpoint " + 
checkpointId + '.',
-                    CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
-                    exception);
+            throw checkpointException;
         }
     }
 
-    private void reportFailedCheckpoint(long checkpointId, Exception 
exception) {
-        PendingCheckpointStats pendingCheckpointStats =
-                statsTracker.getPendingCheckpointStats(checkpointId);
-        if (pendingCheckpointStats != null) {
-            statsTracker.reportFailedCheckpoint(
-                    pendingCheckpointStats.toFailedCheckpoint(
-                            System.currentTimeMillis(), exception));
-        }
+    private void reportFailedCheckpoint(
+            PendingCheckpoint pendingCheckpoint, CheckpointException 
exception) {
+
+        failureManager.handleCheckpointException(
+                pendingCheckpoint,
+                pendingCheckpoint.getProps(),
+                exception,
+                null,
+                job,
+                getStatsCallback(pendingCheckpoint),
+                statsTracker);
     }
 
     void scheduleTriggerRequest() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 660671dfa8f..93d565644cb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryImpl;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestingStreamStateHandle;
@@ -123,6 +124,7 @@ import static java.util.Collections.singletonMap;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_EXPIRED;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.IO_EXCEPTION;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
@@ -3167,6 +3169,95 @@ class CheckpointCoordinatorTest extends TestLogger {
         assertThat(statsTracker.getPendingCheckpointStats(1)).isNull();
     }
 
+    /**
+     * This test checks that an exception that occurs while trying to store a 
{@link
+     * CompletedCheckpoint} in the {@link CompletedCheckpointStore} is 
properly reported to the
+     * {@link CheckpointFailureManager}, see FLINK-32347.
+     */
+    @Test
+    void testExceptionInStoringCompletedCheckpointIsReportedToFailureManager() 
throws Exception {
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .setTransitToRunning(false)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        ExecutionVertex task = 
graph.getJobVertex(jobVertexID).getTaskVertices()[0];
+        
task.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
+
+        CheckpointIDCounterWithOwner testingCounter = new 
CheckpointIDCounterWithOwner();
+        TestFailJobCallback failureCallback = new TestFailJobCallback();
+
+        CheckpointStatsTracker statsTracker =
+                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+
+        final String exceptionMsg = "Test store exception.";
+        try (SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl()) {
+            // Prepare a store that fails when the coordinator stores a 
checkpoint.
+            TestingCompletedCheckpointStore testingCompletedCheckpointStore =
+                    TestingCompletedCheckpointStore.builder()
+                            .withGetSharedStateRegistrySupplier(() -> 
sharedStateRegistry)
+                            .withAddCheckpointAndSubsumeOldestOneFunction(
+                                    (cp, cleaner, runnable) -> {
+                                        throw new 
RuntimeException(exceptionMsg);
+                                    })
+                            .build();
+
+            CheckpointCoordinator checkpointCoordinator =
+                    new CheckpointCoordinatorBuilder()
+                            .setCheckpointIDCounter(testingCounter)
+                            .setFailureManager(new CheckpointFailureManager(0, 
failureCallback))
+                            .setTimer(manuallyTriggeredScheduledExecutor)
+                            .setCheckpointStatsTracker(statsTracker)
+                            
.setCompletedCheckpointStore(testingCompletedCheckpointStore)
+                            .build(graph);
+            testingCounter.setOwner(checkpointCoordinator);
+
+            checkpointCoordinator.triggerCheckpoint(false);
+            manuallyTriggeredScheduledExecutor.triggerAll();
+
+            PendingCheckpoint pendingCheckpoint =
+                    checkpointCoordinator
+                            .getPendingCheckpoints()
+                            .entrySet()
+                            .iterator()
+                            .next()
+                            .getValue();
+
+            try {
+                checkpointCoordinator.receiveAcknowledgeMessage(
+                        new AcknowledgeCheckpoint(
+                                pendingCheckpoint.getJobId(),
+                                
task.getCurrentExecutionAttempt().getAttemptId(),
+                                pendingCheckpoint.getCheckpointID(),
+                                new CheckpointMetrics(),
+                                new TaskStateSnapshot()),
+                        "localhost");
+
+                fail("Exception is expected here");
+            } catch (CheckpointException cpex) {
+                assertThat(cpex.getCheckpointFailureReason())
+                        .isEqualTo(FINALIZE_CHECKPOINT_FAILURE);
+                
assertThat(cpex.getCause().getMessage()).isEqualTo(exceptionMsg);
+            }
+
+            // then: Failure manager should fail the job.
+            assertThat(failureCallback.getInvokeCounter()).isOne();
+
+            // then: The NumberOfFailedCheckpoints and 
TotalNumberOfCheckpoints should be 1.
+            CheckpointStatsCounts counts = 
statsTracker.createSnapshot().getCounts();
+            assertThat(counts.getNumberOfRestoredCheckpoints()).isZero();
+            assertThat(counts.getTotalNumberOfCheckpoints()).isOne();
+            assertThat(counts.getNumberOfInProgressCheckpoints()).isZero();
+            assertThat(counts.getNumberOfCompletedCheckpoints()).isZero();
+            assertThat(counts.getNumberOfFailedCheckpoints()).isOne();
+
+            // then: The PendingCheckpoint should already exist.
+            assertThat(statsTracker.getPendingCheckpointStats(1)).isNotNull();
+        }
+    }
+
     private void testTriggerCheckpoint(
             CheckpointCoordinator checkpointCoordinator,
             CheckpointFailureReason expectedFailureReason)

Reply via email to