This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 01ed7dbbde9f9bc8c3dc54e063c81fbe7de7f86b Author: 1996fanrui <[email protected]> AuthorDate: Thu Apr 27 19:03:58 2023 +0800 [FLINK-31959][checkpoint] Correct the unaligned checkpoint type at checkpoint level --- .../src/test/resources/rest_api_v1.snapshot | 8 ++-- .../detail/job-checkpoints-detail.component.ts | 8 ++-- .../checkpoint/AbstractCheckpointStats.java | 3 ++ .../checkpoint/CompletedCheckpointStats.java | 12 ++++++ .../runtime/checkpoint/FailedCheckpointStats.java | 3 ++ .../runtime/checkpoint/PendingCheckpointStats.java | 15 +++++++ .../messages/checkpoints/CheckpointStatistics.java | 22 +++++++--- .../CompletedCheckpointStatsSummaryTest.java | 10 ++++- .../checkpoint/CompletedCheckpointTest.java | 3 ++ .../checkpoint/FailedCheckpointStatsTest.java | 3 ++ .../checkpoint/PendingCheckpointStatsTest.java | 27 ++++++++---- .../checkpoints/CheckpointingStatisticsTest.java | 50 +++++++++++++++++++--- 12 files changed, 133 insertions(+), 31 deletions(-) diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index e66b6f35fe7..075879621ba 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -1206,7 +1206,7 @@ }, "checkpoint_type" : { "type" : "string", - "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ] + "enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ] }, "tasks" : { "type" : "object", @@ -1312,7 +1312,7 @@ }, "checkpoint_type" : { "type" : "string", - "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ] + "enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ] }, "tasks" : { "type" : "object", @@ -1400,7 +1400,7 @@ }, "checkpoint_type" : { "type" : "string", - "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ] + "enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ] }, "tasks" : { "type" : "object", @@ -1594,7 +1594,7 @@ }, "checkpoint_type" : { "type" : "string", - "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ] + "enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ] }, "tasks" : { "type" : "object", diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts index 9ebf07bff5d..6652c833cd5 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts @@ -115,11 +115,9 @@ export class JobCheckpointsDetailComponent implements OnInit, OnDestroy { this.checkPointConfig = config; this.checkPointDetail = detail; if (this.checkPointDetail.checkpoint_type === 'CHECKPOINT') { - if (this.checkPointConfig.unaligned_checkpoints) { - this.checkPointType = 'unaligned checkpoint'; - } else { - this.checkPointType = 'aligned checkpoint'; - } + this.checkPointType = 'aligned checkpoint'; + } else if (this.checkPointDetail.checkpoint_type === 'UNALIGNED_CHECKPOINT') { + this.checkPointType = 'unaligned checkpoint'; } else if (this.checkPointDetail.checkpoint_type === 'SYNC_SAVEPOINT') { this.checkPointType = 'savepoint on cancel'; } else if (this.checkPointDetail.checkpoint_type === 'SAVEPOINT') { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java index d27e6c75417..2a3b1d241cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java @@ -99,6 +99,9 @@ public abstract class AbstractCheckpointStats implements Serializable { /** @return the total number of persisted bytes during the checkpoint. */ public abstract long getPersistedData(); + /** @return whether the checkpoint is unaligned. */ + public abstract boolean isUnalignedCheckpoint(); + /** * Returns the latest acknowledged subtask stats or <code>null</code> if none was acknowledged * yet. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java index 06cc8a66b9e..1a6888ff0ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java @@ -48,6 +48,8 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats { private final long persistedData; + private final boolean unalignedCheckpoint; + /** The latest acknowledged subtask stats. */ private final SubtaskStateStats latestAcknowledgedSubtask; @@ -67,6 +69,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats { long stateSize, long processedData, long persistedData, + boolean unalignedCheckpoint, SubtaskStateStats latestAcknowledgedSubtask, String externalPointer) { this( @@ -80,6 +83,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats { stateSize, processedData, persistedData, + unalignedCheckpoint, latestAcknowledgedSubtask, externalPointer); } @@ -98,6 +102,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats { * @param stateSize Total checkpoint state size over all subtasks. * @param processedData Processed data during the checkpoint. * @param persistedData Persisted data during the checkpoint. + * @param unalignedCheckpoint Whether the checkpoint is unaligned. * @param latestAcknowledgedSubtask The latest acknowledged subtask stats. * @param externalPointer Optional external path if persisted externally. */ @@ -112,6 +117,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats { long stateSize, long processedData, long persistedData, + boolean unalignedCheckpoint, SubtaskStateStats latestAcknowledgedSubtask, String externalPointer) { @@ -124,6 +130,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats { this.stateSize = stateSize; this.processedData = processedData; this.persistedData = persistedData; + this.unalignedCheckpoint = unalignedCheckpoint; this.latestAcknowledgedSubtask = checkNotNull(latestAcknowledgedSubtask); this.externalPointer = externalPointer; } @@ -158,6 +165,11 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats { return persistedData; } + @Override + public boolean isUnalignedCheckpoint() { + return unalignedCheckpoint; + } + @Override @Nullable public SubtaskStateStats getLatestAcknowledgedSubtaskStats() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java index 43a2a1d0e01..3f63ec22226 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java @@ -55,6 +55,7 @@ public class FailedCheckpointStats extends PendingCheckpointStats { * @param stateSize Total checkpoint state size over all subtasks. * @param processedData Processed data during the checkpoint. * @param persistedData Persisted data during the checkpoint. + * @param unalignedCheckpoint Whether the checkpoint is unaligned. * @param failureTimestamp Timestamp when this checkpoint failed. * @param latestAcknowledgedSubtask The latest acknowledged subtask stats or <code>null</code>. * @param cause Cause of the checkpoint failure or <code>null</code>. @@ -70,6 +71,7 @@ public class FailedCheckpointStats extends PendingCheckpointStats { long stateSize, long processedData, long persistedData, + boolean unalignedCheckpoint, long failureTimestamp, @Nullable SubtaskStateStats latestAcknowledgedSubtask, @Nullable Throwable cause) { @@ -85,6 +87,7 @@ public class FailedCheckpointStats extends PendingCheckpointStats { stateSize, processedData, persistedData, + unalignedCheckpoint, latestAcknowledgedSubtask); checkArgument(numAcknowledgedSubtasks >= 0, "Negative number of ACKs"); this.failureTimestamp = failureTimestamp; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java index a8d74f5e581..70cee1ffd42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java @@ -56,6 +56,8 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { private volatile long currentPersistedData; + private volatile boolean unalignedCheckpoint; + /** Stats of the latest acknowledged subtask. */ private volatile SubtaskStateStats latestAcknowledgedSubtask; @@ -110,6 +112,7 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { 0, 0, 0, + false, null); } @@ -124,6 +127,7 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { long currentStateSize, long processedData, long persistedData, + boolean unalignedCheckpoint, @Nullable SubtaskStateStats latestAcknowledgedSubtask) { super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats); @@ -131,6 +135,7 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { this.currentStateSize = currentStateSize; this.currentProcessedData = processedData; this.currentPersistedData = persistedData; + this.unalignedCheckpoint = unalignedCheckpoint; this.latestAcknowledgedSubtask = latestAcknowledgedSubtask; this.currentNumAcknowledgedSubtasks = acknowledgedSubtaskCount; } @@ -165,6 +170,11 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { return currentPersistedData; } + @Override + public boolean isUnalignedCheckpoint() { + return unalignedCheckpoint; + } + @Override public SubtaskStateStats getLatestAcknowledgedSubtaskStats() { return latestAcknowledgedSubtask; @@ -202,6 +212,9 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { if (persistedData > 0) { currentPersistedData += persistedData; } + + unalignedCheckpoint |= subtask.getUnalignedCheckpoint(); + return true; } else { return false; @@ -220,6 +233,7 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { currentStateSize, currentProcessedData, currentPersistedData, + unalignedCheckpoint, latestAcknowledgedSubtask, externalPointer); } @@ -242,6 +256,7 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { currentStateSize, currentProcessedData, currentPersistedData, + unalignedCheckpoint, failureTimestamp, latestAcknowledgedSubtask, cause); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java index 2c29cdcaeb9..a8a05ea5f9d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java @@ -347,7 +347,8 @@ public class CheckpointStatistics implements ResponseBody { completedCheckpointStats.getNumberOfSubtasks(), completedCheckpointStats.getNumberOfAcknowledgedSubtasks(), RestAPICheckpointType.valueOf( - completedCheckpointStats.getProperties().getCheckpointType()), + completedCheckpointStats.getProperties().getCheckpointType(), + completedCheckpointStats.isUnalignedCheckpoint()), checkpointStatisticsPerTask, completedCheckpointStats.getExternalPath(), completedCheckpointStats.isDiscarded()); @@ -371,7 +372,8 @@ public class CheckpointStatistics implements ResponseBody { failedCheckpointStats.getNumberOfSubtasks(), failedCheckpointStats.getNumberOfAcknowledgedSubtasks(), RestAPICheckpointType.valueOf( - failedCheckpointStats.getProperties().getCheckpointType()), + failedCheckpointStats.getProperties().getCheckpointType(), + failedCheckpointStats.isUnalignedCheckpoint()), checkpointStatisticsPerTask, failedCheckpointStats.getFailureTimestamp(), failedCheckpointStats.getFailureMessage()); @@ -395,7 +397,8 @@ public class CheckpointStatistics implements ResponseBody { pendingCheckpointStats.getNumberOfSubtasks(), pendingCheckpointStats.getNumberOfAcknowledgedSubtasks(), RestAPICheckpointType.valueOf( - pendingCheckpointStats.getProperties().getCheckpointType()), + pendingCheckpointStats.getProperties().getCheckpointType(), + pendingCheckpointStats.isUnalignedCheckpoint()), checkpointStatisticsPerTask); } else { throw new IllegalArgumentException( @@ -411,16 +414,23 @@ public class CheckpointStatistics implements ResponseBody { */ enum RestAPICheckpointType { CHECKPOINT, + UNALIGNED_CHECKPOINT, SAVEPOINT, SYNC_SAVEPOINT; - public static RestAPICheckpointType valueOf(SnapshotType checkpointType) { + public static RestAPICheckpointType valueOf( + SnapshotType checkpointType, boolean isUnalignedCheckpoint) { if (checkpointType.isSavepoint()) { + Preconditions.checkArgument( + !isUnalignedCheckpoint, + "Currently the savepoint doesn't support unaligned checkpoint."); SavepointType savepointType = (SavepointType) checkpointType; return savepointType.isSynchronous() ? SYNC_SAVEPOINT : SAVEPOINT; - } else { - return CHECKPOINT; } + if (isUnalignedCheckpoint) { + return UNALIGNED_CHECKPOINT; + } + return CHECKPOINT; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java index 4ae0e14163b..ad36a2e955d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java @@ -42,6 +42,7 @@ public class CompletedCheckpointStatsSummaryTest { long stateSize = Integer.MAX_VALUE + 17787L; long processedData = Integer.MAX_VALUE + 123123L; long persistedData = Integer.MAX_VALUE + 42L; + boolean unalignedCheckpoint = true; CompletedCheckpointStatsSummary summary = new CompletedCheckpointStatsSummary(); assertThat(summary.getStateSizeStats().getCount()).isZero(); @@ -59,7 +60,8 @@ public class CompletedCheckpointStatsSummaryTest { ackTimestamp + i, stateSize + i, processedData + i, - persistedData + i); + persistedData + i, + unalignedCheckpoint); summary.updateSummary(completed); @@ -93,7 +95,8 @@ public class CompletedCheckpointStatsSummaryTest { long ackTimestamp, long stateSize, long processedData, - long persistedData) { + long persistedData, + boolean unalignedCheckpoint) { SubtaskStateStats latest = mock(SubtaskStateStats.class); when(latest.getAckTimestamp()).thenReturn(ackTimestamp); @@ -113,6 +116,7 @@ public class CompletedCheckpointStatsSummaryTest { stateSize, processedData, persistedData, + unalignedCheckpoint, latest, null); } @@ -123,6 +127,7 @@ public class CompletedCheckpointStatsSummaryTest { int stateSize = 100; int processedData = 200; int persistedData = 300; + boolean unalignedCheckpoint = true; long triggerTimestamp = 1234; long lastAck = triggerTimestamp + 123; @@ -138,6 +143,7 @@ public class CompletedCheckpointStatsSummaryTest { stateSize, processedData, persistedData, + unalignedCheckpoint, new SubtaskStateStats(0, lastAck), "")); CompletedCheckpointStatsSummarySnapshot snapshot = summary.createSnapshot(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index 2f6b7259cf2..6170834f684 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -368,6 +368,7 @@ public class CompletedCheckpointTest { 1, 1, 1, + true, mock(SubtaskStateStats.class), null); CompletedCheckpoint completed = @@ -408,6 +409,7 @@ public class CompletedCheckpointTest { 123129837912L, 42L, 44L, + true, new SubtaskStateStats( 123, 213123, 123123, 123123, 0, 0, 0, 0, 0, 0, false, true), null); @@ -424,6 +426,7 @@ public class CompletedCheckpointTest { assertThat(copy.getStateSize()).isEqualTo(completed.getStateSize()); assertThat(copy.getProcessedData()).isEqualTo(completed.getProcessedData()); assertThat(copy.getPersistedData()).isEqualTo(completed.getPersistedData()); + assertThat(copy.isUnalignedCheckpoint()).isEqualTo(completed.isUnalignedCheckpoint()); assertThat(copy.getLatestAcknowledgedSubtaskStats().getSubtaskIndex()) .isEqualTo(completed.getLatestAcknowledgedSubtaskStats().getSubtaskIndex()); assertThat(copy.getStatus()).isEqualTo(completed.getStatus()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java index 28beabc3e05..38cc9c850ba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java @@ -57,6 +57,7 @@ public class FailedCheckpointStatsTest { 0, 0, 0, + false, failureTimestamp, null, null); @@ -87,6 +88,7 @@ public class FailedCheckpointStatsTest { 190890123, 4242, 4444, + true, failureTimestamp, null, new NotSerializableException("message")); @@ -103,6 +105,7 @@ public class FailedCheckpointStatsTest { assertThat(copy.getStateSize()).isEqualTo(failed.getStateSize()); assertThat(copy.getProcessedData()).isEqualTo(failed.getProcessedData()); assertThat(copy.getPersistedData()).isEqualTo(failed.getPersistedData()); + assertThat(copy.isUnalignedCheckpoint()).isEqualTo(failed.isUnalignedCheckpoint()); assertThat(copy.getLatestAcknowledgedSubtaskStats()) .isEqualTo(failed.getLatestAcknowledgedSubtaskStats()); assertThat(copy.getStatus()).isEqualTo(failed.getStatus()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java index 33d3313c767..65d5a11a0fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java @@ -68,17 +68,19 @@ public class PendingCheckpointStatsTest { assertThat(pending.getTaskStateStats(new JobVertexID())).isNull(); // Report subtasks and check getters - assertThat(pending.reportSubtaskStats(new JobVertexID(), createSubtaskStats(0))).isFalse(); + assertThat(pending.reportSubtaskStats(new JobVertexID(), createSubtaskStats(0, false))) + .isFalse(); long stateSize = 0; // Report 1st task for (int i = 0; i < task1.getNumberOfSubtasks(); i++) { - SubtaskStateStats subtask = createSubtaskStats(i); + SubtaskStateStats subtask = createSubtaskStats(i, false); stateSize += subtask.getStateSize(); pending.reportSubtaskStats(task1.getJobVertexId(), subtask); + assertThat(pending.isUnalignedCheckpoint()).isFalse(); assertThat(pending.getLatestAcknowledgedSubtaskStats()).isEqualTo(subtask); assertThat(pending.getLatestAckTimestamp()).isEqualTo(subtask.getAckTimestamp()); assertThat(pending.getEndToEndDuration()) @@ -92,11 +94,12 @@ public class PendingCheckpointStatsTest { // Report 2nd task for (int i = 0; i < task2.getNumberOfSubtasks(); i++) { - SubtaskStateStats subtask = createSubtaskStats(i); + SubtaskStateStats subtask = createSubtaskStats(i, true); stateSize += subtask.getStateSize(); pending.reportSubtaskStats(task2.getJobVertexId(), subtask); + assertThat(pending.isUnalignedCheckpoint()).isTrue(); assertThat(pending.getLatestAcknowledgedSubtaskStats()).isEqualTo(subtask); assertThat(pending.getLatestAckTimestamp()).isEqualTo(subtask.getAckTimestamp()); assertThat(pending.getEndToEndDuration()) @@ -131,11 +134,13 @@ public class PendingCheckpointStatsTest { // Report subtasks for (int i = 0; i < task1.getNumberOfSubtasks(); i++) { - pending.reportSubtaskStats(task1.getJobVertexId(), createSubtaskStats(i)); + pending.reportSubtaskStats(task1.getJobVertexId(), createSubtaskStats(i, false)); + assertThat(pending.isUnalignedCheckpoint()).isFalse(); } for (int i = 0; i < task2.getNumberOfSubtasks(); i++) { - pending.reportSubtaskStats(task2.getJobVertexId(), createSubtaskStats(i)); + pending.reportSubtaskStats(task2.getJobVertexId(), createSubtaskStats(i, true)); + assertThat(pending.isUnalignedCheckpoint()).isTrue(); } // Report completed @@ -164,6 +169,7 @@ public class PendingCheckpointStatsTest { assertThat(completed.getLatestAckTimestamp()).isEqualTo(pending.getLatestAckTimestamp()); assertThat(completed.getEndToEndDuration()).isEqualTo(pending.getEndToEndDuration()); assertThat(completed.getStateSize()).isEqualTo(pending.getStateSize()); + assertThat(completed.isUnalignedCheckpoint()).isTrue(); assertThat(completed.getTaskStateStats(task1.getJobVertexId())).isEqualTo(task1); assertThat(completed.getTaskStateStats(task2.getJobVertexId())).isEqualTo(task2); } @@ -192,11 +198,13 @@ public class PendingCheckpointStatsTest { // Report subtasks for (int i = 0; i < task1.getNumberOfSubtasks(); i++) { - pending.reportSubtaskStats(task1.getJobVertexId(), createSubtaskStats(i)); + pending.reportSubtaskStats(task1.getJobVertexId(), createSubtaskStats(i, false)); + assertThat(pending.isUnalignedCheckpoint()).isFalse(); } for (int i = 0; i < task2.getNumberOfSubtasks(); i++) { - pending.reportSubtaskStats(task2.getJobVertexId(), createSubtaskStats(i)); + pending.reportSubtaskStats(task2.getJobVertexId(), createSubtaskStats(i, true)); + assertThat(pending.isUnalignedCheckpoint()).isTrue(); } // Report failed @@ -223,6 +231,7 @@ public class PendingCheckpointStatsTest { assertThat(failed.getLatestAckTimestamp()).isEqualTo(pending.getLatestAckTimestamp()); assertThat(failed.getEndToEndDuration()).isEqualTo(failureTimestamp - triggerTimestamp); assertThat(failed.getStateSize()).isEqualTo(pending.getStateSize()); + assertThat(failed.isUnalignedCheckpoint()).isTrue(); assertThat(failed.getTaskStateStats(task1.getJobVertexId())).isEqualTo(task1); assertThat(failed.getTaskStateStats(task2.getJobVertexId())).isEqualTo(task2); } @@ -262,7 +271,7 @@ public class PendingCheckpointStatsTest { // ------------------------------------------------------------------------ - private SubtaskStateStats createSubtaskStats(int index) { + private SubtaskStateStats createSubtaskStats(int index, boolean unalignedCheckpoint) { return new SubtaskStateStats( index, Integer.MAX_VALUE + (long) index, @@ -274,7 +283,7 @@ public class PendingCheckpointStatsTest { Integer.MAX_VALUE + (long) index, Integer.MAX_VALUE + (long) index, Integer.MAX_VALUE + (long) index, - false, + unalignedCheckpoint, true); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java index e2cab889af9..3afc8daac3a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java @@ -26,11 +26,16 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics.RestAPICheckpointType; +import org.junit.jupiter.api.Test; + import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + /** Tests for {@link CheckpointingStatistics}. */ public class CheckpointingStatisticsTest extends RestResponseMarshallingTestBase<CheckpointingStatistics> { @@ -82,7 +87,7 @@ public class CheckpointingStatisticsTest 44L, 10, 10, - RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT), + RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT, true), Collections.emptyMap(), null, false); @@ -100,11 +105,11 @@ public class CheckpointingStatisticsTest 1L, 0L, 31337L, - 4244L, + 0L, 9, 9, RestAPICheckpointType.valueOf( - SavepointType.savepoint(SavepointFormatType.CANONICAL)), + SavepointType.savepoint(SavepointFormatType.CANONICAL), false), checkpointStatisticsPerTask, "externalPath", false); @@ -125,7 +130,7 @@ public class CheckpointingStatisticsTest 22L, 11, 9, - RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT), + RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT, true), Collections.emptyMap(), 100L, "Test failure"); @@ -149,7 +154,7 @@ public class CheckpointingStatisticsTest 16L, 10, 10, - RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT), + RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT, true), Collections.emptyMap()); final CheckpointingStatistics.LatestCheckpoints latestCheckpoints = @@ -162,4 +167,39 @@ public class CheckpointingStatisticsTest latestCheckpoints, Arrays.asList(completed, savepoint, failed, pending)); } + + @Test + void testRestAPICheckpointType() { + // Test for aligned checkpoint + assertThat(RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT, false)) + .isEqualTo(RestAPICheckpointType.CHECKPOINT); + assertThat(RestAPICheckpointType.valueOf(CheckpointType.FULL_CHECKPOINT, false)) + .isEqualTo(RestAPICheckpointType.CHECKPOINT); + + // Test for unaligned checkpoint + assertThat(RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT, true)) + .isEqualTo(RestAPICheckpointType.UNALIGNED_CHECKPOINT); + assertThat(RestAPICheckpointType.valueOf(CheckpointType.FULL_CHECKPOINT, true)) + .isEqualTo(RestAPICheckpointType.UNALIGNED_CHECKPOINT); + + // Test for savepoint + assertThat( + RestAPICheckpointType.valueOf( + SavepointType.savepoint(SavepointFormatType.NATIVE), false)) + .isEqualTo(RestAPICheckpointType.SAVEPOINT); + assertThat( + RestAPICheckpointType.valueOf( + SavepointType.suspend(SavepointFormatType.NATIVE), false)) + .isEqualTo(RestAPICheckpointType.SYNC_SAVEPOINT); + assertThat( + RestAPICheckpointType.valueOf( + SavepointType.terminate(SavepointFormatType.NATIVE), false)) + .isEqualTo(RestAPICheckpointType.SYNC_SAVEPOINT); + + assertThatThrownBy( + () -> + RestAPICheckpointType.valueOf( + SavepointType.terminate(SavepointFormatType.NATIVE), true)) + .isInstanceOf(IllegalArgumentException.class); + } }
