This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ffe353a305e677340d344d3fa45994536757e323 Author: fanrui <[email protected]> AuthorDate: Mon Feb 28 18:12:34 2022 +0800 [FLINK-26049][checkpoint] Adding CheckpointStatsTracker logic without pending checkpoint --- .../checkpoint/CheckpointFailureManager.java | 2 + .../runtime/checkpoint/CheckpointStatsCounts.java | 9 +++++ .../runtime/checkpoint/CheckpointStatsTracker.java | 15 +++++++ .../checkpoint/CheckpointCoordinatorTest.java | 46 ++++++++++++++++++++++ .../checkpoint/CheckpointStatsCountsTest.java | 7 ++++ .../checkpoint/CheckpointStatsTrackerTest.java | 9 +++++ 6 files changed, 88 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java index df429a4..0205adc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java @@ -132,6 +132,8 @@ public class CheckpointFailureManager { long failureTimestamp = System.currentTimeMillis(); statsTracker.reportFailedCheckpoint( pendingCheckpointStats.toFailedCheckpoint(failureTimestamp, exception)); + } else { + statsTracker.reportFailedCheckpointsWithoutInProgress(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java index 8d06a1b..7265bb3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java @@ -163,6 +163,15 @@ public class CheckpointStatsCounts implements Serializable { } /** + * Increments the number of failed checkpoints without in progress checkpoint. For example, it + * should be callback when triggering checkpoint failure before creating PendingCheckpoint. + */ + void incrementFailedCheckpointsWithoutInProgress() { + numFailedCheckpoints++; + numTotalCheckpoints++; + } + + /** * Creates a snapshot of the current state. * * @return Snapshot of the current state. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java index f10a668..7971493 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java @@ -232,6 +232,21 @@ public class CheckpointStatsTracker { } } + /** + * Callback when a checkpoint failure without in progress checkpoint. For example, it should be + * callback when triggering checkpoint failure before creating PendingCheckpoint. + */ + public void reportFailedCheckpointsWithoutInProgress() { + statsReadWriteLock.lock(); + try { + counts.incrementFailedCheckpointsWithoutInProgress(); + + dirty = true; + } finally { + statsReadWriteLock.unlock(); + } + } + public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) { statsReadWriteLock.lock(); try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index b493ef4..30af82f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -129,6 +129,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.anyList; @@ -3113,6 +3114,43 @@ public class CheckpointCoordinatorTest extends TestLogger { testTriggerCheckpoint(checkpointCoordinator, PERIODIC_SCHEDULER_SHUTDOWN); } + /** Tests that do not trigger checkpoint when CheckpointIDCounter IOException occurred. */ + @Test + public void testTriggerCheckpointWithCounterIOException() throws Exception { + // given: Checkpoint coordinator which fails on getCheckpointId. + IOExceptionCheckpointIDCounter testingCounter = new IOExceptionCheckpointIDCounter(); + TestFailJobCallback failureCallback = new TestFailJobCallback(); + + CheckpointStatsTracker statsTracker = + new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup()); + + CheckpointCoordinator checkpointCoordinator = + new CheckpointCoordinatorBuilder() + .setCheckpointIDCounter(testingCounter) + .setFailureManager(new CheckpointFailureManager(0, failureCallback)) + .setTimer(manuallyTriggeredScheduledExecutor) + .setCheckpointStatsTracker(statsTracker) + .build(); + testingCounter.setOwner(checkpointCoordinator); + + // when: The checkpoint is triggered. + testTriggerCheckpoint(checkpointCoordinator, IO_EXCEPTION); + + // then: Failure manager should fail the job. + assertEquals(1, failureCallback.getInvokeCounter()); + + // then: The NumberOfFailedCheckpoints and TotalNumberOfCheckpoints should be 1. + CheckpointStatsCounts counts = statsTracker.createSnapshot().getCounts(); + assertEquals(0, counts.getNumberOfRestoredCheckpoints()); + assertEquals(1, counts.getTotalNumberOfCheckpoints()); + assertEquals(0, counts.getNumberOfInProgressCheckpoints()); + assertEquals(0, counts.getNumberOfCompletedCheckpoints()); + assertEquals(1, counts.getNumberOfFailedCheckpoints()); + + // then: The PendingCheckpoint shouldn't be created. + assertNull(statsTracker.getPendingCheckpointStats(1)); + } + private void testTriggerCheckpoint( CheckpointCoordinator checkpointCoordinator, CheckpointFailureReason expectedFailureReason) @@ -3848,6 +3886,14 @@ public class CheckpointCoordinatorTest extends TestLogger { } } + private static class IOExceptionCheckpointIDCounter extends CheckpointIDCounterWithOwner { + @Override + public long getAndIncrement() throws Exception { + checkNotNull(owner); + throw new IOException("disk is error!"); + } + } + private static class IOExceptionCheckpointStorage extends JobManagerCheckpointStorage { @Override public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java index c3ffd6d..5d60810 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java @@ -72,6 +72,13 @@ public class CheckpointStatsCountsTest { assertEquals(0, counts.getNumberOfInProgressCheckpoints()); assertEquals(1, counts.getNumberOfCompletedCheckpoints()); assertEquals(1, counts.getNumberOfFailedCheckpoints()); + + counts.incrementFailedCheckpointsWithoutInProgress(); + assertEquals(1, counts.getNumberOfRestoredCheckpoints()); + assertEquals(3, counts.getTotalNumberOfCheckpoints()); + assertEquals(0, counts.getNumberOfInProgressCheckpoints()); + assertEquals(1, counts.getNumberOfCompletedCheckpoints()); + assertEquals(2, counts.getNumberOfFailedCheckpoints()); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java index 8e7b0bd..14b3a94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java @@ -172,6 +172,15 @@ public class CheckpointStatsTrackerTest { assertEquals(2, counts.getNumberOfCompletedCheckpoints()); assertEquals(1, counts.getNumberOfFailedCheckpoints()); + tracker.reportFailedCheckpointsWithoutInProgress(); + + CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot(); + counts = snapshot1.getCounts(); + assertEquals(5, counts.getTotalNumberOfCheckpoints()); + assertEquals(1, counts.getNumberOfInProgressCheckpoints()); + assertEquals(2, counts.getNumberOfCompletedCheckpoints()); + assertEquals(2, counts.getNumberOfFailedCheckpoints()); + // Summary stats CompletedCheckpointStatsSummarySnapshot summary = snapshot.getSummaryStats(); assertEquals(2, summary.getStateSizeStats().getCount());
