This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 39afe944840e37a4d0290d99e91849d20e60e1c3
Author: fanrui <[email protected]>
AuthorDate: Mon Feb 28 18:12:34 2022 +0800

    [FLINK-26049][checkpoint] Adding CheckpointStatsTracker logic without 
pending checkpoint
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  5 ++-
 .../checkpoint/CheckpointFailureManager.java       |  9 +++-
 .../runtime/checkpoint/CheckpointStatsCounts.java  |  9 ++++
 .../runtime/checkpoint/CheckpointStatsTracker.java | 15 +++++++
 .../checkpoint/CheckpointCoordinatorTest.java      | 49 ++++++++++++++++++++++
 .../checkpoint/CheckpointFailureManagerTest.java   | 22 ++++++++--
 .../checkpoint/CheckpointStatsCountsTest.java      |  7 ++++
 .../checkpoint/CheckpointStatsTrackerTest.java     |  9 ++++
 8 files changed, 118 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3d79824..e74ad24 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -911,7 +911,7 @@ public class CheckpointCoordinator {
                 }
             } else {
                 failureManager.handleCheckpointException(
-                        checkpoint, checkpointProperties, cause, null, job, 
null);
+                        checkpoint, checkpointProperties, cause, null, job, 
null, statsTracker);
             }
         } finally {
             isTriggering = false;
@@ -1942,7 +1942,8 @@ public class CheckpointCoordinator {
                         exception,
                         executionAttemptID,
                         job,
-                        getStatsCallback(pendingCheckpoint));
+                        getStatsCallback(pendingCheckpoint),
+                        statsTracker);
             } finally {
                 sendAbortedMessages(
                         
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 7b30034..f77d1a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -88,6 +88,7 @@ public class CheckpointFailureManager {
      * @param executionAttemptID the execution attempt id, as a safe guard.
      * @param job the JobID.
      * @param pendingCheckpointStats the pending checkpoint statistics.
+     * @param statsTracker the tracker for checkpoint statistics.
      */
     public void handleCheckpointException(
             @Nullable PendingCheckpoint pendingCheckpoint,
@@ -95,12 +96,13 @@ public class CheckpointFailureManager {
             CheckpointException exception,
             @Nullable ExecutionAttemptID executionAttemptID,
             JobID job,
-            @Nullable PendingCheckpointStats pendingCheckpointStats) {
+            @Nullable PendingCheckpointStats pendingCheckpointStats,
+            CheckpointStatsTracker statsTracker) {
         long checkpointId =
                 pendingCheckpoint == null
                         ? UNKNOWN_CHECKPOINT_ID
                         : pendingCheckpoint.getCheckpointID();
-        updateStatsAfterCheckpointFailed(pendingCheckpointStats, exception);
+        updateStatsAfterCheckpointFailed(pendingCheckpointStats, statsTracker, 
exception);
 
         LOG.warn(
                 "Failed to trigger checkpoint {} for job {}. ({} consecutive 
failed attempts so far)",
@@ -124,10 +126,13 @@ public class CheckpointFailureManager {
      */
     private void updateStatsAfterCheckpointFailed(
             @Nullable PendingCheckpointStats pendingCheckpointStats,
+            CheckpointStatsTracker statsTracker,
             CheckpointException exception) {
         if (pendingCheckpointStats != null) {
             long failureTimestamp = System.currentTimeMillis();
             pendingCheckpointStats.reportFailedCheckpoint(failureTimestamp, 
exception);
+        } else {
+            statsTracker.reportFailedCheckpointsWithoutInProgress();
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
index 8d06a1b..7265bb3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
@@ -163,6 +163,15 @@ public class CheckpointStatsCounts implements Serializable 
{
     }
 
     /**
+     * Increments the number of failed checkpoints without in progress 
checkpoint. For example, it
+     * should be callback when triggering checkpoint failure before creating 
PendingCheckpoint.
+     */
+    void incrementFailedCheckpointsWithoutInProgress() {
+        numFailedCheckpoints++;
+        numTotalCheckpoints++;
+    }
+
+    /**
      * Creates a snapshot of the current state.
      *
      * @return Snapshot of the current state.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index 8c4ac97..afcd9e0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -255,6 +255,21 @@ public class CheckpointStatsTracker {
         }
     }
 
+    /**
+     * Callback when a checkpoint failure without in progress checkpoint. For 
example, it should be
+     * callback when triggering checkpoint failure before creating 
PendingCheckpoint.
+     */
+    public void reportFailedCheckpointsWithoutInProgress() {
+        statsReadWriteLock.lock();
+        try {
+            counts.incrementFailedCheckpointsWithoutInProgress();
+
+            dirty = true;
+        } finally {
+            statsReadWriteLock.unlock();
+        }
+    }
+
     public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) 
{
         statsReadWriteLock.lock();
         try {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index f6f50b9..f8a0005 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -120,6 +120,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -3068,6 +3069,46 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
         testTriggerCheckpoint(checkpointCoordinator, 
PERIODIC_SCHEDULER_SHUTDOWN);
     }
 
+    /** Tests that do not trigger checkpoint when CheckpointIDCounter 
IOException occurred. */
+    @Test
+    public void testTriggerCheckpointWithCounterIOException() throws Exception 
{
+        // given: Checkpoint coordinator which fails on getCheckpointId.
+        IOExceptionCheckpointIDCounter testingCounter = new 
IOExceptionCheckpointIDCounter();
+        TestFailJobCallback failureCallback = new TestFailJobCallback();
+
+        CheckpointStatsTracker statsTracker =
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE,
+                        mock(CheckpointCoordinatorConfiguration.class),
+                        new UnregisteredMetricsGroup());
+
+        CheckpointCoordinator checkpointCoordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setCheckpointIDCounter(testingCounter)
+                        .setFailureManager(new CheckpointFailureManager(0, 
failureCallback))
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .setCheckpointStatsTracker(statsTracker)
+                        .build();
+        testingCounter.setOwner(checkpointCoordinator);
+
+        // when: The checkpoint is triggered.
+        testTriggerCheckpoint(checkpointCoordinator, IO_EXCEPTION);
+
+        // then: Failure manager should fail the job.
+        assertEquals(1, failureCallback.getInvokeCounter());
+
+        // then: The NumberOfFailedCheckpoints and TotalNumberOfCheckpoints 
should be 1.
+        CheckpointStatsCounts counts = 
statsTracker.createSnapshot().getCounts();
+        assertEquals(0, counts.getNumberOfRestoredCheckpoints());
+        assertEquals(1, counts.getTotalNumberOfCheckpoints());
+        assertEquals(0, counts.getNumberOfInProgressCheckpoints());
+        assertEquals(0, counts.getNumberOfCompletedCheckpoints());
+        assertEquals(1, counts.getNumberOfFailedCheckpoints());
+
+        // then: The PendingCheckpoint shouldn't be created.
+        assertNull(statsTracker.getPendingCheckpointStats(1));
+    }
+
     private void testTriggerCheckpoint(
             CheckpointCoordinator checkpointCoordinator,
             CheckpointFailureReason expectedFailureReason)
@@ -3776,6 +3817,14 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
         }
     }
 
+    private static class IOExceptionCheckpointIDCounter extends 
CheckpointIDCounterWithOwner {
+        @Override
+        public long getAndIncrement() throws Exception {
+            checkNotNull(owner);
+            throw new IOException("disk is error!");
+        }
+    }
+
     private static class IOExceptionCheckpointStorage extends 
JobManagerCheckpointStorage {
         @Override
         public CheckpointStorageAccess createCheckpointStorage(JobID jobId) 
throws IOException {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
index e027ed9..243e3e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java
@@ -167,7 +167,8 @@ public class CheckpointFailureManagerTest extends 
TestLogger {
                 new CheckpointFailureManager(2, new TestFailJobCallback());
 
         PendingCheckpoint pendingCheckpoint = mock(PendingCheckpoint.class);
-        PendingCheckpointStats callback = mock(PendingCheckpointStats.class);
+        PendingCheckpointStats pendingCheckpointCallback = 
mock(PendingCheckpointStats.class);
+        CheckpointStatsTracker statsTracker = 
mock(CheckpointStatsTracker.class);
 
         failureManager.handleCheckpointException(
                 pendingCheckpoint,
@@ -175,8 +176,23 @@ public class CheckpointFailureManagerTest extends 
TestLogger {
                 new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_SUBSUMED, null),
                 null,
                 new JobID(),
-                callback);
-        verify(callback, times(1)).reportFailedCheckpoint(anyLong(), 
any(Exception.class));
+                pendingCheckpointCallback,
+                statsTracker);
+        verify(pendingCheckpointCallback, times(1))
+                .reportFailedCheckpoint(anyLong(), any(Exception.class));
+        verify(statsTracker, 
times(0)).reportFailedCheckpointsWithoutInProgress();
+
+        failureManager.handleCheckpointException(
+                null,
+                checkpointProperties,
+                new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_SUBSUMED, null),
+                null,
+                new JobID(),
+                null,
+                statsTracker);
+        verify(pendingCheckpointCallback, times(1))
+                .reportFailedCheckpoint(anyLong(), any(Exception.class));
+        verify(statsTracker, 
times(1)).reportFailedCheckpointsWithoutInProgress();
     }
 
     /** A failure handler callback for testing. */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
index c3ffd6d..5d60810 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
@@ -72,6 +72,13 @@ public class CheckpointStatsCountsTest {
         assertEquals(0, counts.getNumberOfInProgressCheckpoints());
         assertEquals(1, counts.getNumberOfCompletedCheckpoints());
         assertEquals(1, counts.getNumberOfFailedCheckpoints());
+
+        counts.incrementFailedCheckpointsWithoutInProgress();
+        assertEquals(1, counts.getNumberOfRestoredCheckpoints());
+        assertEquals(3, counts.getTotalNumberOfCheckpoints());
+        assertEquals(0, counts.getNumberOfInProgressCheckpoints());
+        assertEquals(1, counts.getNumberOfCompletedCheckpoints());
+        assertEquals(2, counts.getNumberOfFailedCheckpoints());
     }
 
     /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 0e97958..433d4bc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -205,6 +205,15 @@ public class CheckpointStatsTrackerTest {
         assertEquals(2, counts.getNumberOfCompletedCheckpoints());
         assertEquals(1, counts.getNumberOfFailedCheckpoints());
 
+        tracker.reportFailedCheckpointsWithoutInProgress();
+
+        CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
+        counts = snapshot1.getCounts();
+        assertEquals(5, counts.getTotalNumberOfCheckpoints());
+        assertEquals(1, counts.getNumberOfInProgressCheckpoints());
+        assertEquals(2, counts.getNumberOfCompletedCheckpoints());
+        assertEquals(2, counts.getNumberOfFailedCheckpoints());
+
         // Summary stats
         CompletedCheckpointStatsSummarySnapshot summary = 
snapshot.getSummaryStats();
         assertEquals(2, summary.getStateSizeStats().getCount());

Reply via email to