This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 906f3244a3febf3bfe5221290d4cd3ad4746765f Author: Anton Kalashnikov <[email protected]> AuthorDate: Wed Feb 23 16:55:31 2022 +0100 [FLINK-25958][runtime] Report completed statistic only after the completed checkpoint will be added to checkpoint store --- .../runtime/checkpoint/CheckpointCoordinator.java | 31 ++++++++++++++++++++-- .../runtime/checkpoint/PendingCheckpoint.java | 18 +------------ .../checkpoint/CheckpointCoordinatorTest.java | 21 +++++++++++++-- .../runtime/checkpoint/PendingCheckpointTest.java | 5 ++-- 4 files changed, 51 insertions(+), 24 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 5248538..5deee51 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1256,11 +1256,39 @@ public class CheckpointCoordinator { } else { lastSubsumed = null; } + + reportCompletedCheckpoint(completedCheckpoint); } finally { pendingCheckpoints.remove(checkpointId); scheduleTriggerRequest(); } + cleanupAfterCompletedCheckpoint( + pendingCheckpoint, checkpointId, completedCheckpoint, lastSubsumed, props); + } + + private void reportCompletedCheckpoint(CompletedCheckpoint completedCheckpoint) { + CompletedCheckpointStats completedCheckpointStats = completedCheckpoint.getStatistic(); + if (completedCheckpointStats != null) { + LOG.trace( + "Checkpoint {} size: {}Kb, duration: {}ms", + completedCheckpoint.getCheckpointID(), + completedCheckpointStats.getStateSize() == 0 + ? 0 + : completedCheckpointStats.getStateSize() / 1024, + completedCheckpointStats.getEndToEndDuration()); + // Finalize the statsCallback and give the completed checkpoint a + // callback for discards. + statsTracker.reportCompletedCheckpoint(completedCheckpointStats); + } + } + + private void cleanupAfterCompletedCheckpoint( + PendingCheckpoint pendingCheckpoint, + long checkpointId, + CompletedCheckpoint completedCheckpoint, + CompletedCheckpoint lastSubsumed, + CheckpointProperties props) { // remember recent checkpoint id for debugging purposes rememberRecentCheckpointId(checkpointId); @@ -1313,8 +1341,7 @@ public class CheckpointCoordinator { pendingCheckpoint.finalizeCheckpoint( checkpointsCleaner, this::scheduleTriggerRequest, - executor, - statsTracker); + executor); failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID()); return completedCheckpoint; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 76c6a44..08cd23f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -303,10 +303,7 @@ public class PendingCheckpoint implements Checkpoint { } public CompletedCheckpoint finalizeCheckpoint( - CheckpointsCleaner checkpointsCleaner, - Runnable postCleanup, - Executor executor, - CheckpointStatsTracker statsTracker) + CheckpointsCleaner checkpointsCleaner, Runnable postCleanup, Executor executor) throws IOException { synchronized (lock) { @@ -342,19 +339,6 @@ public class PendingCheckpoint implements Checkpoint { finalizedLocation, toCompletedCheckpointStats(finalizedLocation)); - CompletedCheckpointStats completedCheckpointStats = completed.getStatistic(); - if (completedCheckpointStats != null) { - LOG.trace( - "Checkpoint {} size: {}Kb, duration: {}ms", - checkpointId, - completedCheckpointStats.getStateSize() == 0 - ? 0 - : completedCheckpointStats.getStateSize() / 1024, - completedCheckpointStats.getEndToEndDuration()); - - statsTracker.reportCompletedCheckpoint(completedCheckpointStats); - } - onCompletionPromise.complete(completed); // mark this pending checkpoint as disposed, but do NOT drop the state diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 2cfa183..df8d606 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -1988,9 +1988,20 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId(); ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId(); - + CheckpointStatsTracker statsTracker = + new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup()); // set up the coordinator and validate the initial state - CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph); + CheckpointCoordinator checkpointCoordinator = + new CheckpointCoordinatorBuilder() + .setExecutionGraph(graph) + .setCheckpointCoordinatorConfiguration( + CheckpointCoordinatorConfiguration.builder() + .setAlignedCheckpointTimeout(Long.MAX_VALUE) + .setMaxConcurrentCheckpoints(Integer.MAX_VALUE) + .build()) + .setTimer(manuallyTriggeredScheduledExecutor) + .setCheckpointStatsTracker(statsTracker) + .build(); assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints()); assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()); @@ -2083,6 +2094,12 @@ public class CheckpointCoordinatorTest extends TestLogger { assertEquals(pending.getCheckpointId(), success.getCheckpointID()); assertEquals(2, success.getOperatorStates().size()); + AbstractCheckpointStats actualStats = + statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId); + + assertEquals(checkpointId, actualStats.getCheckpointId()); + assertEquals(CheckpointStatsStatus.COMPLETED, actualStats.getStatus()); + checkpointCoordinator.shutdown(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 294c6fa..799d743 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -217,8 +217,7 @@ public class PendingCheckpointTest { assertFalse(future.isDone()); pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); assertTrue(pending.areTasksFullyAcknowledged()); - pending.finalizeCheckpoint( - new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null); + pending.finalizeCheckpoint(new CheckpointsCleaner(), () -> {}, Executors.directExecutor()); assertTrue(future.isDone()); // Finalize (missing ACKs) @@ -228,7 +227,7 @@ public class PendingCheckpointTest { assertFalse(future.isDone()); try { pending.finalizeCheckpoint( - new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null); + new CheckpointsCleaner(), () -> {}, Executors.directExecutor()); fail("Did not throw expected Exception"); } catch (IllegalStateException ignored) { // Expected
