This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fbfdb0e468356fe71826eb6b185ecda9bc8b1de3 Author: Anton Kalashnikov <[email protected]> AuthorDate: Wed Feb 23 16:59:58 2022 +0100 [FLINK-25958][runtime] Report failed statistic if adding of completed checkpoint to checkpoint store fails --- .../runtime/checkpoint/CheckpointCoordinator.java | 15 ++++++++--- .../CheckpointCoordinatorFailureTest.java | 31 ++++++++++++++++++++-- .../checkpoint/CheckpointCoordinatorTest.java | 10 ++++++- 3 files changed, 50 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 5deee51..475effc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1339,9 +1339,7 @@ public class CheckpointCoordinator { try { final CompletedCheckpoint completedCheckpoint = pendingCheckpoint.finalizeCheckpoint( - checkpointsCleaner, - this::scheduleTriggerRequest, - executor); + checkpointsCleaner, this::scheduleTriggerRequest, executor); failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID()); return completedCheckpoint; @@ -1401,6 +1399,7 @@ public class CheckpointCoordinator { checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, executor); } + reportFailedCheckpoint(checkpointId, exception); sendAbortedMessages(tasksToAbort, checkpointId, completedCheckpoint.getTimestamp()); throw new CheckpointException( "Could not complete the pending checkpoint " + checkpointId + '.', @@ -1409,6 +1408,16 @@ public class CheckpointCoordinator { } } + private void reportFailedCheckpoint(long checkpointId, Exception exception) { + PendingCheckpointStats pendingCheckpointStats = + statsTracker.getPendingCheckpointStats(checkpointId); + if (pendingCheckpointStats != null) { + statsTracker.reportFailedCheckpoint( + pendingCheckpointStats.toFailedCheckpoint( + System.currentTimeMillis(), exception)); + } + } + void scheduleTriggerRequest() { synchronized (lock) { if (isShutdown()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index c02ee16..3674515 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo; @@ -49,6 +50,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.emptyList; +import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.assertStatsMetrics; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -204,6 +206,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { final CompletedCheckpointStore completedCheckpointStore = new FailingCompletedCheckpointStore(failure); + CheckpointStatsTracker statsTracker = + new CheckpointStatsTracker(Integer.MAX_VALUE, new UnregisteredMetricsGroup()); final AtomicInteger cleanupCallCount = new AtomicInteger(0); final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder() @@ -226,14 +230,27 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { }) .setCompletedCheckpointStore(completedCheckpointStore) .setTimer(manuallyTriggeredScheduledExecutor) + .setCheckpointStatsTracker(statsTracker) .build(); checkpointCoordinator.triggerCheckpoint(false); manuallyTriggeredScheduledExecutor.triggerAll(); - + CheckpointMetrics expectedReportedMetrics = + new CheckpointMetricsBuilder() + .setTotalBytesPersisted(18) + .setBytesPersistedOfThisCheckpoint(18) + .setBytesProcessedDuringAlignment(19) + .setAsyncDurationMillis(20) + .setAlignmentDurationNanos(123 * 1_000_000) + .setCheckpointStartDelayNanos(567 * 1_000_000) + .build(); try { checkpointCoordinator.receiveAcknowledgeMessage( new AcknowledgeCheckpoint( - graph.getJobID(), attemptId, checkpointIDCounter.getLast()), + graph.getJobID(), + attemptId, + checkpointIDCounter.getLast(), + expectedReportedMetrics, + new TaskStateSnapshot()), "unknown location"); fail("CheckpointException should have been thrown."); } catch (CheckpointException e) { @@ -242,6 +259,16 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { is(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE)); } + AbstractCheckpointStats actualStats = + statsTracker + .createSnapshot() + .getHistory() + .getCheckpointById(checkpointIDCounter.getLast()); + + assertEquals(checkpointIDCounter.getLast(), actualStats.getCheckpointId()); + assertEquals(CheckpointStatsStatus.FAILED, actualStats.getStatus()); + assertStatsMetrics(vertex.getJobvertexId(), 0, expectedReportedMetrics, actualStats); + assertThat(cleanupCallCount.get(), is(expectedCleanupCalls)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index df8d606..3c09945 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -299,8 +299,16 @@ public class CheckpointCoordinatorTest extends TestLogger { AbstractCheckpointStats actual) { assertEquals(checkpointId, actual.getCheckpointId()); assertEquals(CheckpointStatsStatus.FAILED, actual.getStatus()); - assertEquals(expected.getTotalBytesPersisted(), actual.getStateSize()); assertEquals(0, actual.getNumberOfAcknowledgedSubtasks()); + assertStatsMetrics(jobVertexID, subtasIdx, expected, actual); + } + + public static void assertStatsMetrics( + JobVertexID jobVertexID, + int subtasIdx, + CheckpointMetrics expected, + AbstractCheckpointStats actual) { + assertEquals(expected.getTotalBytesPersisted(), actual.getStateSize()); SubtaskStateStats taskStats = actual.getAllTaskStateStats().stream() .filter(s -> s.getJobVertexId().equals(jobVertexID))
