This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 39afe944840e37a4d0290d99e91849d20e60e1c3 Author: fanrui <[email protected]> AuthorDate: Mon Feb 28 18:12:34 2022 +0800 [FLINK-26049][checkpoint] Adding CheckpointStatsTracker logic without pending checkpoint --- .../runtime/checkpoint/CheckpointCoordinator.java | 5 ++- .../checkpoint/CheckpointFailureManager.java | 9 +++- .../runtime/checkpoint/CheckpointStatsCounts.java | 9 ++++ .../runtime/checkpoint/CheckpointStatsTracker.java | 15 +++++++ .../checkpoint/CheckpointCoordinatorTest.java | 49 ++++++++++++++++++++++ .../checkpoint/CheckpointFailureManagerTest.java | 22 ++++++++-- .../checkpoint/CheckpointStatsCountsTest.java | 7 ++++ .../checkpoint/CheckpointStatsTrackerTest.java | 9 ++++ 8 files changed, 118 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 3d79824..e74ad24 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -911,7 +911,7 @@ public class CheckpointCoordinator { } } else { failureManager.handleCheckpointException( - checkpoint, checkpointProperties, cause, null, job, null); + checkpoint, checkpointProperties, cause, null, job, null, statsTracker); } } finally { isTriggering = false; @@ -1942,7 +1942,8 @@ public class CheckpointCoordinator { exception, executionAttemptID, job, - getStatsCallback(pendingCheckpoint)); + getStatsCallback(pendingCheckpoint), + statsTracker); } finally { sendAbortedMessages( pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java index 7b30034..f77d1a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java @@ -88,6 +88,7 @@ public class CheckpointFailureManager { * @param executionAttemptID the execution attempt id, as a safe guard. * @param job the JobID. * @param pendingCheckpointStats the pending checkpoint statistics. + * @param statsTracker the tracker for checkpoint statistics. */ public void handleCheckpointException( @Nullable PendingCheckpoint pendingCheckpoint, @@ -95,12 +96,13 @@ public class CheckpointFailureManager { CheckpointException exception, @Nullable ExecutionAttemptID executionAttemptID, JobID job, - @Nullable PendingCheckpointStats pendingCheckpointStats) { + @Nullable PendingCheckpointStats pendingCheckpointStats, + CheckpointStatsTracker statsTracker) { long checkpointId = pendingCheckpoint == null ? UNKNOWN_CHECKPOINT_ID : pendingCheckpoint.getCheckpointID(); - updateStatsAfterCheckpointFailed(pendingCheckpointStats, exception); + updateStatsAfterCheckpointFailed(pendingCheckpointStats, statsTracker, exception); LOG.warn( "Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)", @@ -124,10 +126,13 @@ public class CheckpointFailureManager { */ private void updateStatsAfterCheckpointFailed( @Nullable PendingCheckpointStats pendingCheckpointStats, + CheckpointStatsTracker statsTracker, CheckpointException exception) { if (pendingCheckpointStats != null) { long failureTimestamp = System.currentTimeMillis(); pendingCheckpointStats.reportFailedCheckpoint(failureTimestamp, exception); + } else { + statsTracker.reportFailedCheckpointsWithoutInProgress(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java index 8d06a1b..7265bb3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java @@ -163,6 +163,15 @@ public class CheckpointStatsCounts implements Serializable { } /** + * Increments the number of failed checkpoints without in progress checkpoint. For example, it + * should be callback when triggering checkpoint failure before creating PendingCheckpoint. + */ + void incrementFailedCheckpointsWithoutInProgress() { + numFailedCheckpoints++; + numTotalCheckpoints++; + } + + /** * Creates a snapshot of the current state. * * @return Snapshot of the current state. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java index 8c4ac97..afcd9e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java @@ -255,6 +255,21 @@ public class CheckpointStatsTracker { } } + /** + * Callback when a checkpoint failure without in progress checkpoint. For example, it should be + * callback when triggering checkpoint failure before creating PendingCheckpoint. + */ + public void reportFailedCheckpointsWithoutInProgress() { + statsReadWriteLock.lock(); + try { + counts.incrementFailedCheckpointsWithoutInProgress(); + + dirty = true; + } finally { + statsReadWriteLock.unlock(); + } + } + public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) { statsReadWriteLock.lock(); try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index f6f50b9..f8a0005 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -120,6 +120,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.anyLong; @@ -3068,6 +3069,46 @@ public class CheckpointCoordinatorTest extends TestLogger { testTriggerCheckpoint(checkpointCoordinator, PERIODIC_SCHEDULER_SHUTDOWN); } + /** Tests that do not trigger checkpoint when CheckpointIDCounter IOException occurred. */ + @Test + public void testTriggerCheckpointWithCounterIOException() throws Exception { + // given: Checkpoint coordinator which fails on getCheckpointId. + IOExceptionCheckpointIDCounter testingCounter = new IOExceptionCheckpointIDCounter(); + TestFailJobCallback failureCallback = new TestFailJobCallback(); + + CheckpointStatsTracker statsTracker = + new CheckpointStatsTracker( + Integer.MAX_VALUE, + mock(CheckpointCoordinatorConfiguration.class), + new UnregisteredMetricsGroup()); + + CheckpointCoordinator checkpointCoordinator = + new CheckpointCoordinatorBuilder() + .setCheckpointIDCounter(testingCounter) + .setFailureManager(new CheckpointFailureManager(0, failureCallback)) + .setTimer(manuallyTriggeredScheduledExecutor) + .setCheckpointStatsTracker(statsTracker) + .build(); + testingCounter.setOwner(checkpointCoordinator); + + // when: The checkpoint is triggered. + testTriggerCheckpoint(checkpointCoordinator, IO_EXCEPTION); + + // then: Failure manager should fail the job. + assertEquals(1, failureCallback.getInvokeCounter()); + + // then: The NumberOfFailedCheckpoints and TotalNumberOfCheckpoints should be 1. + CheckpointStatsCounts counts = statsTracker.createSnapshot().getCounts(); + assertEquals(0, counts.getNumberOfRestoredCheckpoints()); + assertEquals(1, counts.getTotalNumberOfCheckpoints()); + assertEquals(0, counts.getNumberOfInProgressCheckpoints()); + assertEquals(0, counts.getNumberOfCompletedCheckpoints()); + assertEquals(1, counts.getNumberOfFailedCheckpoints()); + + // then: The PendingCheckpoint shouldn't be created. + assertNull(statsTracker.getPendingCheckpointStats(1)); + } + private void testTriggerCheckpoint( CheckpointCoordinator checkpointCoordinator, CheckpointFailureReason expectedFailureReason) @@ -3776,6 +3817,14 @@ public class CheckpointCoordinatorTest extends TestLogger { } } + private static class IOExceptionCheckpointIDCounter extends CheckpointIDCounterWithOwner { + @Override + public long getAndIncrement() throws Exception { + checkNotNull(owner); + throw new IOException("disk is error!"); + } + } + private static class IOExceptionCheckpointStorage extends JobManagerCheckpointStorage { @Override public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java index e027ed9..243e3e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java @@ -167,7 +167,8 @@ public class CheckpointFailureManagerTest extends TestLogger { new CheckpointFailureManager(2, new TestFailJobCallback()); PendingCheckpoint pendingCheckpoint = mock(PendingCheckpoint.class); - PendingCheckpointStats callback = mock(PendingCheckpointStats.class); + PendingCheckpointStats pendingCheckpointCallback = mock(PendingCheckpointStats.class); + CheckpointStatsTracker statsTracker = mock(CheckpointStatsTracker.class); failureManager.handleCheckpointException( pendingCheckpoint, @@ -175,8 +176,23 @@ public class CheckpointFailureManagerTest extends TestLogger { new CheckpointException(CheckpointFailureReason.CHECKPOINT_SUBSUMED, null), null, new JobID(), - callback); - verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class)); + pendingCheckpointCallback, + statsTracker); + verify(pendingCheckpointCallback, times(1)) + .reportFailedCheckpoint(anyLong(), any(Exception.class)); + verify(statsTracker, times(0)).reportFailedCheckpointsWithoutInProgress(); + + failureManager.handleCheckpointException( + null, + checkpointProperties, + new CheckpointException(CheckpointFailureReason.CHECKPOINT_SUBSUMED, null), + null, + new JobID(), + null, + statsTracker); + verify(pendingCheckpointCallback, times(1)) + .reportFailedCheckpoint(anyLong(), any(Exception.class)); + verify(statsTracker, times(1)).reportFailedCheckpointsWithoutInProgress(); } /** A failure handler callback for testing. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java index c3ffd6d..5d60810 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java @@ -72,6 +72,13 @@ public class CheckpointStatsCountsTest { assertEquals(0, counts.getNumberOfInProgressCheckpoints()); assertEquals(1, counts.getNumberOfCompletedCheckpoints()); assertEquals(1, counts.getNumberOfFailedCheckpoints()); + + counts.incrementFailedCheckpointsWithoutInProgress(); + assertEquals(1, counts.getNumberOfRestoredCheckpoints()); + assertEquals(3, counts.getTotalNumberOfCheckpoints()); + assertEquals(0, counts.getNumberOfInProgressCheckpoints()); + assertEquals(1, counts.getNumberOfCompletedCheckpoints()); + assertEquals(2, counts.getNumberOfFailedCheckpoints()); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java index 0e97958..433d4bc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java @@ -205,6 +205,15 @@ public class CheckpointStatsTrackerTest { assertEquals(2, counts.getNumberOfCompletedCheckpoints()); assertEquals(1, counts.getNumberOfFailedCheckpoints()); + tracker.reportFailedCheckpointsWithoutInProgress(); + + CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot(); + counts = snapshot1.getCounts(); + assertEquals(5, counts.getTotalNumberOfCheckpoints()); + assertEquals(1, counts.getNumberOfInProgressCheckpoints()); + assertEquals(2, counts.getNumberOfCompletedCheckpoints()); + assertEquals(2, counts.getNumberOfFailedCheckpoints()); + // Summary stats CompletedCheckpointStatsSummarySnapshot summary = snapshot.getSummaryStats(); assertEquals(2, summary.getStateSizeStats().getCount());
