This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 01ed7dbbde9f9bc8c3dc54e063c81fbe7de7f86b
Author: 1996fanrui <[email protected]>
AuthorDate: Thu Apr 27 19:03:58 2023 +0800

    [FLINK-31959][checkpoint] Correct the unaligned checkpoint type at 
checkpoint level
---
 .../src/test/resources/rest_api_v1.snapshot        |  8 ++--
 .../detail/job-checkpoints-detail.component.ts     |  8 ++--
 .../checkpoint/AbstractCheckpointStats.java        |  3 ++
 .../checkpoint/CompletedCheckpointStats.java       | 12 ++++++
 .../runtime/checkpoint/FailedCheckpointStats.java  |  3 ++
 .../runtime/checkpoint/PendingCheckpointStats.java | 15 +++++++
 .../messages/checkpoints/CheckpointStatistics.java | 22 +++++++---
 .../CompletedCheckpointStatsSummaryTest.java       | 10 ++++-
 .../checkpoint/CompletedCheckpointTest.java        |  3 ++
 .../checkpoint/FailedCheckpointStatsTest.java      |  3 ++
 .../checkpoint/PendingCheckpointStatsTest.java     | 27 ++++++++----
 .../checkpoints/CheckpointingStatisticsTest.java   | 50 +++++++++++++++++++---
 12 files changed, 133 insertions(+), 31 deletions(-)

diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index e66b6f35fe7..075879621ba 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1206,7 +1206,7 @@
                 },
                 "checkpoint_type" : {
                   "type" : "string",
-                  "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+                  "enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", 
"SAVEPOINT", "SYNC_SAVEPOINT" ]
                 },
                 "tasks" : {
                   "type" : "object",
@@ -1312,7 +1312,7 @@
                 },
                 "checkpoint_type" : {
                   "type" : "string",
-                  "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+                  "enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", 
"SAVEPOINT", "SYNC_SAVEPOINT" ]
                 },
                 "tasks" : {
                   "type" : "object",
@@ -1400,7 +1400,7 @@
               },
               "checkpoint_type" : {
                 "type" : "string",
-                "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+                "enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", "SAVEPOINT", 
"SYNC_SAVEPOINT" ]
               },
               "tasks" : {
                 "type" : "object",
@@ -1594,7 +1594,7 @@
         },
         "checkpoint_type" : {
           "type" : "string",
-          "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+          "enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", "SAVEPOINT", 
"SYNC_SAVEPOINT" ]
         },
         "tasks" : {
           "type" : "object",
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts
index 9ebf07bff5d..6652c833cd5 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts
@@ -115,11 +115,9 @@ export class JobCheckpointsDetailComponent implements 
OnInit, OnDestroy {
             this.checkPointConfig = config;
             this.checkPointDetail = detail;
             if (this.checkPointDetail.checkpoint_type === 'CHECKPOINT') {
-              if (this.checkPointConfig.unaligned_checkpoints) {
-                this.checkPointType = 'unaligned checkpoint';
-              } else {
-                this.checkPointType = 'aligned checkpoint';
-              }
+              this.checkPointType = 'aligned checkpoint';
+            } else if (this.checkPointDetail.checkpoint_type === 
'UNALIGNED_CHECKPOINT') {
+              this.checkPointType = 'unaligned checkpoint';
             } else if (this.checkPointDetail.checkpoint_type === 
'SYNC_SAVEPOINT') {
               this.checkPointType = 'savepoint on cancel';
             } else if (this.checkPointDetail.checkpoint_type === 'SAVEPOINT') {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
index d27e6c75417..2a3b1d241cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
@@ -99,6 +99,9 @@ public abstract class AbstractCheckpointStats implements 
Serializable {
     /** @return the total number of persisted bytes during the checkpoint. */
     public abstract long getPersistedData();
 
+    /** @return whether the checkpoint is unaligned. */
+    public abstract boolean isUnalignedCheckpoint();
+
     /**
      * Returns the latest acknowledged subtask stats or <code>null</code> if 
none was acknowledged
      * yet.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
index 06cc8a66b9e..1a6888ff0ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
@@ -48,6 +48,8 @@ public class CompletedCheckpointStats extends 
AbstractCheckpointStats {
 
     private final long persistedData;
 
+    private final boolean unalignedCheckpoint;
+
     /** The latest acknowledged subtask stats. */
     private final SubtaskStateStats latestAcknowledgedSubtask;
 
@@ -67,6 +69,7 @@ public class CompletedCheckpointStats extends 
AbstractCheckpointStats {
             long stateSize,
             long processedData,
             long persistedData,
+            boolean unalignedCheckpoint,
             SubtaskStateStats latestAcknowledgedSubtask,
             String externalPointer) {
         this(
@@ -80,6 +83,7 @@ public class CompletedCheckpointStats extends 
AbstractCheckpointStats {
                 stateSize,
                 processedData,
                 persistedData,
+                unalignedCheckpoint,
                 latestAcknowledgedSubtask,
                 externalPointer);
     }
@@ -98,6 +102,7 @@ public class CompletedCheckpointStats extends 
AbstractCheckpointStats {
      * @param stateSize Total checkpoint state size over all subtasks.
      * @param processedData Processed data during the checkpoint.
      * @param persistedData Persisted data during the checkpoint.
+     * @param unalignedCheckpoint Whether the checkpoint is unaligned.
      * @param latestAcknowledgedSubtask The latest acknowledged subtask stats.
      * @param externalPointer Optional external path if persisted externally.
      */
@@ -112,6 +117,7 @@ public class CompletedCheckpointStats extends 
AbstractCheckpointStats {
             long stateSize,
             long processedData,
             long persistedData,
+            boolean unalignedCheckpoint,
             SubtaskStateStats latestAcknowledgedSubtask,
             String externalPointer) {
 
@@ -124,6 +130,7 @@ public class CompletedCheckpointStats extends 
AbstractCheckpointStats {
         this.stateSize = stateSize;
         this.processedData = processedData;
         this.persistedData = persistedData;
+        this.unalignedCheckpoint = unalignedCheckpoint;
         this.latestAcknowledgedSubtask = 
checkNotNull(latestAcknowledgedSubtask);
         this.externalPointer = externalPointer;
     }
@@ -158,6 +165,11 @@ public class CompletedCheckpointStats extends 
AbstractCheckpointStats {
         return persistedData;
     }
 
+    @Override
+    public boolean isUnalignedCheckpoint() {
+        return unalignedCheckpoint;
+    }
+
     @Override
     @Nullable
     public SubtaskStateStats getLatestAcknowledgedSubtaskStats() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
index 43a2a1d0e01..3f63ec22226 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
@@ -55,6 +55,7 @@ public class FailedCheckpointStats extends 
PendingCheckpointStats {
      * @param stateSize Total checkpoint state size over all subtasks.
      * @param processedData Processed data during the checkpoint.
      * @param persistedData Persisted data during the checkpoint.
+     * @param unalignedCheckpoint Whether the checkpoint is unaligned.
      * @param failureTimestamp Timestamp when this checkpoint failed.
      * @param latestAcknowledgedSubtask The latest acknowledged subtask stats 
or <code>null</code>.
      * @param cause Cause of the checkpoint failure or <code>null</code>.
@@ -70,6 +71,7 @@ public class FailedCheckpointStats extends 
PendingCheckpointStats {
             long stateSize,
             long processedData,
             long persistedData,
+            boolean unalignedCheckpoint,
             long failureTimestamp,
             @Nullable SubtaskStateStats latestAcknowledgedSubtask,
             @Nullable Throwable cause) {
@@ -85,6 +87,7 @@ public class FailedCheckpointStats extends 
PendingCheckpointStats {
                 stateSize,
                 processedData,
                 persistedData,
+                unalignedCheckpoint,
                 latestAcknowledgedSubtask);
         checkArgument(numAcknowledgedSubtasks >= 0, "Negative number of ACKs");
         this.failureTimestamp = failureTimestamp;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
index a8d74f5e581..70cee1ffd42 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
@@ -56,6 +56,8 @@ public class PendingCheckpointStats extends 
AbstractCheckpointStats {
 
     private volatile long currentPersistedData;
 
+    private volatile boolean unalignedCheckpoint;
+
     /** Stats of the latest acknowledged subtask. */
     private volatile SubtaskStateStats latestAcknowledgedSubtask;
 
@@ -110,6 +112,7 @@ public class PendingCheckpointStats extends 
AbstractCheckpointStats {
                 0,
                 0,
                 0,
+                false,
                 null);
     }
 
@@ -124,6 +127,7 @@ public class PendingCheckpointStats extends 
AbstractCheckpointStats {
             long currentStateSize,
             long processedData,
             long persistedData,
+            boolean unalignedCheckpoint,
             @Nullable SubtaskStateStats latestAcknowledgedSubtask) {
 
         super(checkpointId, triggerTimestamp, props, totalSubtaskCount, 
taskStats);
@@ -131,6 +135,7 @@ public class PendingCheckpointStats extends 
AbstractCheckpointStats {
         this.currentStateSize = currentStateSize;
         this.currentProcessedData = processedData;
         this.currentPersistedData = persistedData;
+        this.unalignedCheckpoint = unalignedCheckpoint;
         this.latestAcknowledgedSubtask = latestAcknowledgedSubtask;
         this.currentNumAcknowledgedSubtasks = acknowledgedSubtaskCount;
     }
@@ -165,6 +170,11 @@ public class PendingCheckpointStats extends 
AbstractCheckpointStats {
         return currentPersistedData;
     }
 
+    @Override
+    public boolean isUnalignedCheckpoint() {
+        return unalignedCheckpoint;
+    }
+
     @Override
     public SubtaskStateStats getLatestAcknowledgedSubtaskStats() {
         return latestAcknowledgedSubtask;
@@ -202,6 +212,9 @@ public class PendingCheckpointStats extends 
AbstractCheckpointStats {
             if (persistedData > 0) {
                 currentPersistedData += persistedData;
             }
+
+            unalignedCheckpoint |= subtask.getUnalignedCheckpoint();
+
             return true;
         } else {
             return false;
@@ -220,6 +233,7 @@ public class PendingCheckpointStats extends 
AbstractCheckpointStats {
                 currentStateSize,
                 currentProcessedData,
                 currentPersistedData,
+                unalignedCheckpoint,
                 latestAcknowledgedSubtask,
                 externalPointer);
     }
@@ -242,6 +256,7 @@ public class PendingCheckpointStats extends 
AbstractCheckpointStats {
                 currentStateSize,
                 currentProcessedData,
                 currentPersistedData,
+                unalignedCheckpoint,
                 failureTimestamp,
                 latestAcknowledgedSubtask,
                 cause);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
index 2c29cdcaeb9..a8a05ea5f9d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
@@ -347,7 +347,8 @@ public class CheckpointStatistics implements ResponseBody {
                     completedCheckpointStats.getNumberOfSubtasks(),
                     completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
                     RestAPICheckpointType.valueOf(
-                            
completedCheckpointStats.getProperties().getCheckpointType()),
+                            
completedCheckpointStats.getProperties().getCheckpointType(),
+                            completedCheckpointStats.isUnalignedCheckpoint()),
                     checkpointStatisticsPerTask,
                     completedCheckpointStats.getExternalPath(),
                     completedCheckpointStats.isDiscarded());
@@ -371,7 +372,8 @@ public class CheckpointStatistics implements ResponseBody {
                     failedCheckpointStats.getNumberOfSubtasks(),
                     failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
                     RestAPICheckpointType.valueOf(
-                            
failedCheckpointStats.getProperties().getCheckpointType()),
+                            
failedCheckpointStats.getProperties().getCheckpointType(),
+                            failedCheckpointStats.isUnalignedCheckpoint()),
                     checkpointStatisticsPerTask,
                     failedCheckpointStats.getFailureTimestamp(),
                     failedCheckpointStats.getFailureMessage());
@@ -395,7 +397,8 @@ public class CheckpointStatistics implements ResponseBody {
                     pendingCheckpointStats.getNumberOfSubtasks(),
                     pendingCheckpointStats.getNumberOfAcknowledgedSubtasks(),
                     RestAPICheckpointType.valueOf(
-                            
pendingCheckpointStats.getProperties().getCheckpointType()),
+                            
pendingCheckpointStats.getProperties().getCheckpointType(),
+                            pendingCheckpointStats.isUnalignedCheckpoint()),
                     checkpointStatisticsPerTask);
         } else {
             throw new IllegalArgumentException(
@@ -411,16 +414,23 @@ public class CheckpointStatistics implements ResponseBody 
{
      */
     enum RestAPICheckpointType {
         CHECKPOINT,
+        UNALIGNED_CHECKPOINT,
         SAVEPOINT,
         SYNC_SAVEPOINT;
 
-        public static RestAPICheckpointType valueOf(SnapshotType 
checkpointType) {
+        public static RestAPICheckpointType valueOf(
+                SnapshotType checkpointType, boolean isUnalignedCheckpoint) {
             if (checkpointType.isSavepoint()) {
+                Preconditions.checkArgument(
+                        !isUnalignedCheckpoint,
+                        "Currently the savepoint doesn't support unaligned 
checkpoint.");
                 SavepointType savepointType = (SavepointType) checkpointType;
                 return savepointType.isSynchronous() ? SYNC_SAVEPOINT : 
SAVEPOINT;
-            } else {
-                return CHECKPOINT;
             }
+            if (isUnalignedCheckpoint) {
+                return UNALIGNED_CHECKPOINT;
+            }
+            return CHECKPOINT;
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
index 4ae0e14163b..ad36a2e955d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStatsSummaryTest.java
@@ -42,6 +42,7 @@ public class CompletedCheckpointStatsSummaryTest {
         long stateSize = Integer.MAX_VALUE + 17787L;
         long processedData = Integer.MAX_VALUE + 123123L;
         long persistedData = Integer.MAX_VALUE + 42L;
+        boolean unalignedCheckpoint = true;
 
         CompletedCheckpointStatsSummary summary = new 
CompletedCheckpointStatsSummary();
         assertThat(summary.getStateSizeStats().getCount()).isZero();
@@ -59,7 +60,8 @@ public class CompletedCheckpointStatsSummaryTest {
                             ackTimestamp + i,
                             stateSize + i,
                             processedData + i,
-                            persistedData + i);
+                            persistedData + i,
+                            unalignedCheckpoint);
 
             summary.updateSummary(completed);
 
@@ -93,7 +95,8 @@ public class CompletedCheckpointStatsSummaryTest {
             long ackTimestamp,
             long stateSize,
             long processedData,
-            long persistedData) {
+            long persistedData,
+            boolean unalignedCheckpoint) {
 
         SubtaskStateStats latest = mock(SubtaskStateStats.class);
         when(latest.getAckTimestamp()).thenReturn(ackTimestamp);
@@ -113,6 +116,7 @@ public class CompletedCheckpointStatsSummaryTest {
                 stateSize,
                 processedData,
                 persistedData,
+                unalignedCheckpoint,
                 latest,
                 null);
     }
@@ -123,6 +127,7 @@ public class CompletedCheckpointStatsSummaryTest {
         int stateSize = 100;
         int processedData = 200;
         int persistedData = 300;
+        boolean unalignedCheckpoint = true;
         long triggerTimestamp = 1234;
         long lastAck = triggerTimestamp + 123;
 
@@ -138,6 +143,7 @@ public class CompletedCheckpointStatsSummaryTest {
                         stateSize,
                         processedData,
                         persistedData,
+                        unalignedCheckpoint,
                         new SubtaskStateStats(0, lastAck),
                         ""));
         CompletedCheckpointStatsSummarySnapshot snapshot = 
summary.createSnapshot();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 2f6b7259cf2..6170834f684 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -368,6 +368,7 @@ public class CompletedCheckpointTest {
                         1,
                         1,
                         1,
+                        true,
                         mock(SubtaskStateStats.class),
                         null);
         CompletedCheckpoint completed =
@@ -408,6 +409,7 @@ public class CompletedCheckpointTest {
                         123129837912L,
                         42L,
                         44L,
+                        true,
                         new SubtaskStateStats(
                                 123, 213123, 123123, 123123, 0, 0, 0, 0, 0, 0, 
false, true),
                         null);
@@ -424,6 +426,7 @@ public class CompletedCheckpointTest {
         assertThat(copy.getStateSize()).isEqualTo(completed.getStateSize());
         
assertThat(copy.getProcessedData()).isEqualTo(completed.getProcessedData());
         
assertThat(copy.getPersistedData()).isEqualTo(completed.getPersistedData());
+        
assertThat(copy.isUnalignedCheckpoint()).isEqualTo(completed.isUnalignedCheckpoint());
         assertThat(copy.getLatestAcknowledgedSubtaskStats().getSubtaskIndex())
                 
.isEqualTo(completed.getLatestAcknowledgedSubtaskStats().getSubtaskIndex());
         assertThat(copy.getStatus()).isEqualTo(completed.getStatus());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
index 28beabc3e05..38cc9c850ba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
@@ -57,6 +57,7 @@ public class FailedCheckpointStatsTest {
                         0,
                         0,
                         0,
+                        false,
                         failureTimestamp,
                         null,
                         null);
@@ -87,6 +88,7 @@ public class FailedCheckpointStatsTest {
                         190890123,
                         4242,
                         4444,
+                        true,
                         failureTimestamp,
                         null,
                         new NotSerializableException("message"));
@@ -103,6 +105,7 @@ public class FailedCheckpointStatsTest {
         assertThat(copy.getStateSize()).isEqualTo(failed.getStateSize());
         
assertThat(copy.getProcessedData()).isEqualTo(failed.getProcessedData());
         
assertThat(copy.getPersistedData()).isEqualTo(failed.getPersistedData());
+        
assertThat(copy.isUnalignedCheckpoint()).isEqualTo(failed.isUnalignedCheckpoint());
         assertThat(copy.getLatestAcknowledgedSubtaskStats())
                 .isEqualTo(failed.getLatestAcknowledgedSubtaskStats());
         assertThat(copy.getStatus()).isEqualTo(failed.getStatus());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
index 33d3313c767..65d5a11a0fb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
@@ -68,17 +68,19 @@ public class PendingCheckpointStatsTest {
         assertThat(pending.getTaskStateStats(new JobVertexID())).isNull();
 
         // Report subtasks and check getters
-        assertThat(pending.reportSubtaskStats(new JobVertexID(), 
createSubtaskStats(0))).isFalse();
+        assertThat(pending.reportSubtaskStats(new JobVertexID(), 
createSubtaskStats(0, false)))
+                .isFalse();
 
         long stateSize = 0;
 
         // Report 1st task
         for (int i = 0; i < task1.getNumberOfSubtasks(); i++) {
-            SubtaskStateStats subtask = createSubtaskStats(i);
+            SubtaskStateStats subtask = createSubtaskStats(i, false);
             stateSize += subtask.getStateSize();
 
             pending.reportSubtaskStats(task1.getJobVertexId(), subtask);
 
+            assertThat(pending.isUnalignedCheckpoint()).isFalse();
             
assertThat(pending.getLatestAcknowledgedSubtaskStats()).isEqualTo(subtask);
             
assertThat(pending.getLatestAckTimestamp()).isEqualTo(subtask.getAckTimestamp());
             assertThat(pending.getEndToEndDuration())
@@ -92,11 +94,12 @@ public class PendingCheckpointStatsTest {
 
         // Report 2nd task
         for (int i = 0; i < task2.getNumberOfSubtasks(); i++) {
-            SubtaskStateStats subtask = createSubtaskStats(i);
+            SubtaskStateStats subtask = createSubtaskStats(i, true);
             stateSize += subtask.getStateSize();
 
             pending.reportSubtaskStats(task2.getJobVertexId(), subtask);
 
+            assertThat(pending.isUnalignedCheckpoint()).isTrue();
             
assertThat(pending.getLatestAcknowledgedSubtaskStats()).isEqualTo(subtask);
             
assertThat(pending.getLatestAckTimestamp()).isEqualTo(subtask.getAckTimestamp());
             assertThat(pending.getEndToEndDuration())
@@ -131,11 +134,13 @@ public class PendingCheckpointStatsTest {
 
         // Report subtasks
         for (int i = 0; i < task1.getNumberOfSubtasks(); i++) {
-            pending.reportSubtaskStats(task1.getJobVertexId(), 
createSubtaskStats(i));
+            pending.reportSubtaskStats(task1.getJobVertexId(), 
createSubtaskStats(i, false));
+            assertThat(pending.isUnalignedCheckpoint()).isFalse();
         }
 
         for (int i = 0; i < task2.getNumberOfSubtasks(); i++) {
-            pending.reportSubtaskStats(task2.getJobVertexId(), 
createSubtaskStats(i));
+            pending.reportSubtaskStats(task2.getJobVertexId(), 
createSubtaskStats(i, true));
+            assertThat(pending.isUnalignedCheckpoint()).isTrue();
         }
 
         // Report completed
@@ -164,6 +169,7 @@ public class PendingCheckpointStatsTest {
         
assertThat(completed.getLatestAckTimestamp()).isEqualTo(pending.getLatestAckTimestamp());
         
assertThat(completed.getEndToEndDuration()).isEqualTo(pending.getEndToEndDuration());
         assertThat(completed.getStateSize()).isEqualTo(pending.getStateSize());
+        assertThat(completed.isUnalignedCheckpoint()).isTrue();
         
assertThat(completed.getTaskStateStats(task1.getJobVertexId())).isEqualTo(task1);
         
assertThat(completed.getTaskStateStats(task2.getJobVertexId())).isEqualTo(task2);
     }
@@ -192,11 +198,13 @@ public class PendingCheckpointStatsTest {
 
         // Report subtasks
         for (int i = 0; i < task1.getNumberOfSubtasks(); i++) {
-            pending.reportSubtaskStats(task1.getJobVertexId(), 
createSubtaskStats(i));
+            pending.reportSubtaskStats(task1.getJobVertexId(), 
createSubtaskStats(i, false));
+            assertThat(pending.isUnalignedCheckpoint()).isFalse();
         }
 
         for (int i = 0; i < task2.getNumberOfSubtasks(); i++) {
-            pending.reportSubtaskStats(task2.getJobVertexId(), 
createSubtaskStats(i));
+            pending.reportSubtaskStats(task2.getJobVertexId(), 
createSubtaskStats(i, true));
+            assertThat(pending.isUnalignedCheckpoint()).isTrue();
         }
 
         // Report failed
@@ -223,6 +231,7 @@ public class PendingCheckpointStatsTest {
         
assertThat(failed.getLatestAckTimestamp()).isEqualTo(pending.getLatestAckTimestamp());
         assertThat(failed.getEndToEndDuration()).isEqualTo(failureTimestamp - 
triggerTimestamp);
         assertThat(failed.getStateSize()).isEqualTo(pending.getStateSize());
+        assertThat(failed.isUnalignedCheckpoint()).isTrue();
         
assertThat(failed.getTaskStateStats(task1.getJobVertexId())).isEqualTo(task1);
         
assertThat(failed.getTaskStateStats(task2.getJobVertexId())).isEqualTo(task2);
     }
@@ -262,7 +271,7 @@ public class PendingCheckpointStatsTest {
 
     // ------------------------------------------------------------------------
 
-    private SubtaskStateStats createSubtaskStats(int index) {
+    private SubtaskStateStats createSubtaskStats(int index, boolean 
unalignedCheckpoint) {
         return new SubtaskStateStats(
                 index,
                 Integer.MAX_VALUE + (long) index,
@@ -274,7 +283,7 @@ public class PendingCheckpointStatsTest {
                 Integer.MAX_VALUE + (long) index,
                 Integer.MAX_VALUE + (long) index,
                 Integer.MAX_VALUE + (long) index,
-                false,
+                unalignedCheckpoint,
                 true);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
index e2cab889af9..3afc8daac3a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
@@ -26,11 +26,16 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics.RestAPICheckpointType;
 
+import org.junit.jupiter.api.Test;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
 /** Tests for {@link CheckpointingStatistics}. */
 public class CheckpointingStatisticsTest
         extends RestResponseMarshallingTestBase<CheckpointingStatistics> {
@@ -82,7 +87,7 @@ public class CheckpointingStatisticsTest
                         44L,
                         10,
                         10,
-                        
RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT),
+                        
RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT, true),
                         Collections.emptyMap(),
                         null,
                         false);
@@ -100,11 +105,11 @@ public class CheckpointingStatisticsTest
                         1L,
                         0L,
                         31337L,
-                        4244L,
+                        0L,
                         9,
                         9,
                         RestAPICheckpointType.valueOf(
-                                
SavepointType.savepoint(SavepointFormatType.CANONICAL)),
+                                
SavepointType.savepoint(SavepointFormatType.CANONICAL), false),
                         checkpointStatisticsPerTask,
                         "externalPath",
                         false);
@@ -125,7 +130,7 @@ public class CheckpointingStatisticsTest
                         22L,
                         11,
                         9,
-                        
RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT),
+                        
RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT, true),
                         Collections.emptyMap(),
                         100L,
                         "Test failure");
@@ -149,7 +154,7 @@ public class CheckpointingStatisticsTest
                         16L,
                         10,
                         10,
-                        
RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT),
+                        
RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT, true),
                         Collections.emptyMap());
 
         final CheckpointingStatistics.LatestCheckpoints latestCheckpoints =
@@ -162,4 +167,39 @@ public class CheckpointingStatisticsTest
                 latestCheckpoints,
                 Arrays.asList(completed, savepoint, failed, pending));
     }
+
+    @Test
+    void testRestAPICheckpointType() {
+        // Test for aligned checkpoint
+        assertThat(RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT, 
false))
+                .isEqualTo(RestAPICheckpointType.CHECKPOINT);
+        
assertThat(RestAPICheckpointType.valueOf(CheckpointType.FULL_CHECKPOINT, false))
+                .isEqualTo(RestAPICheckpointType.CHECKPOINT);
+
+        // Test for unaligned checkpoint
+        assertThat(RestAPICheckpointType.valueOf(CheckpointType.CHECKPOINT, 
true))
+                .isEqualTo(RestAPICheckpointType.UNALIGNED_CHECKPOINT);
+        
assertThat(RestAPICheckpointType.valueOf(CheckpointType.FULL_CHECKPOINT, true))
+                .isEqualTo(RestAPICheckpointType.UNALIGNED_CHECKPOINT);
+
+        // Test for savepoint
+        assertThat(
+                        RestAPICheckpointType.valueOf(
+                                
SavepointType.savepoint(SavepointFormatType.NATIVE), false))
+                .isEqualTo(RestAPICheckpointType.SAVEPOINT);
+        assertThat(
+                        RestAPICheckpointType.valueOf(
+                                
SavepointType.suspend(SavepointFormatType.NATIVE), false))
+                .isEqualTo(RestAPICheckpointType.SYNC_SAVEPOINT);
+        assertThat(
+                        RestAPICheckpointType.valueOf(
+                                
SavepointType.terminate(SavepointFormatType.NATIVE), false))
+                .isEqualTo(RestAPICheckpointType.SYNC_SAVEPOINT);
+
+        assertThatThrownBy(
+                        () ->
+                                RestAPICheckpointType.valueOf(
+                                        
SavepointType.terminate(SavepointFormatType.NATIVE), true))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
 }

Reply via email to