This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b2a342c  [FLINK-18851][runtime-web] Add checkpoint type to checkpoint 
history
b2a342c is described below

commit b2a342c6a6ef154ed3c1a44826ce2be14e538386
Author: gm7y8 <[email protected]>
AuthorDate: Tue Oct 20 02:41:28 2020 -0700

    [FLINK-18851][runtime-web] Add checkpoint type to checkpoint history
---
 docs/_includes/generated/rest_v1_dispatcher.html   | 16 ++++++++++++
 .../src/test/resources/rest_api_v1.snapshot        | 16 ++++++++++++
 .../src/app/interfaces/job-checkpoint.ts           |  3 +++
 .../detail/job-checkpoints-detail.component.html   |  2 ++
 .../detail/job-checkpoints-detail.component.ts     | 29 +++++++++++++++++++---
 .../messages/checkpoints/CheckpointStatistics.java | 23 +++++++++++++++++
 .../checkpoints/CheckpointingStatisticsTest.java   |  5 ++++
 7 files changed, 90 insertions(+), 4 deletions(-)

diff --git a/docs/_includes/generated/rest_v1_dispatcher.html 
b/docs/_includes/generated/rest_v1_dispatcher.html
index 4d3f646..a494e9b 100644
--- a/docs/_includes/generated/rest_v1_dispatcher.html
+++ b/docs/_includes/generated/rest_v1_dispatcher.html
@@ -1489,6 +1489,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
           "alignment_buffered" : {
             "type" : "integer"
           },
+          "checkpoint_type" : {
+            "type" : "string",
+            "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+          },
           "end_to_end_duration" : {
             "type" : "integer"
           },
@@ -1544,6 +1548,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
             "alignment_buffered" : {
               "type" : "integer"
             },
+            "checkpoint_type" : {
+              "type" : "string",
+              "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+            },
             "discarded" : {
               "type" : "boolean"
             },
@@ -1633,6 +1641,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
             "alignment_buffered" : {
               "type" : "integer"
             },
+            "checkpoint_type" : {
+              "type" : "string",
+              "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+            },
             "end_to_end_duration" : {
               "type" : "integer"
             },
@@ -1882,6 +1894,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
     "alignment_buffered" : {
       "type" : "integer"
     },
+    "checkpoint_type" : {
+      "type" : "string",
+      "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+    },
     "end_to_end_duration" : {
       "type" : "integer"
     },
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index c6180ce..764e0b5 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -995,6 +995,10 @@
                 "num_acknowledged_subtasks" : {
                   "type" : "integer"
                 },
+                "checkpoint_type" : {
+                  "type" : "string",
+                  "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+                },
                 "tasks" : {
                   "type" : "object",
                   "additionalProperties" : {
@@ -1088,6 +1092,10 @@
                 "num_acknowledged_subtasks" : {
                   "type" : "integer"
                 },
+                "checkpoint_type" : {
+                  "type" : "string",
+                  "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+                },
                 "tasks" : {
                   "type" : "object",
                   "additionalProperties" : {
@@ -1166,6 +1174,10 @@
               "num_acknowledged_subtasks" : {
                 "type" : "integer"
               },
+              "checkpoint_type" : {
+                "type" : "string",
+                "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+              },
               "tasks" : {
                 "type" : "object",
                 "additionalProperties" : {
@@ -1292,6 +1304,10 @@
         "num_acknowledged_subtasks" : {
           "type" : "integer"
         },
+        "checkpoint_type" : {
+          "type" : "string",
+          "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+        },
         "tasks" : {
           "type" : "object",
           "additionalProperties" : {
diff --git 
a/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts 
b/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts
index ac0dbf8..5766557 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts
@@ -91,6 +91,7 @@ export interface CheckPointCompletedStatisticsInterface {
   tasks: CheckPointTaskStatisticsInterface;
   external_path: string;
   discarded: boolean;
+  checkpoint_type: string;
 }
 
 export interface CheckPointTaskStatisticsInterface {
@@ -114,6 +115,7 @@ export interface CheckPointConfigInterface {
     enabled: boolean;
     delete_on_cancellation: boolean;
   };
+  unaligned_checkpoints: boolean;
 }
 
 export interface CheckPointDetailInterface {
@@ -130,6 +132,7 @@ export interface CheckPointDetailInterface {
   failure_message?: string;
   num_subtasks: number;
   num_acknowledged_subtasks: number;
+  checkpoint_type: string;
   tasks: Array<{
     [ taskId: string ]: {
       id: number;
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html
index c8f9b91..4a1331c 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html
@@ -21,6 +21,8 @@
     <strong>Path:</strong> {{checkPointDetail?.external_path || '-'}}
     <nz-divider nzType="vertical"></nz-divider>
     <strong>Discarded:</strong> {{checkPointDetail?.discarded || '-'}}
+    <nz-divider nzType="vertical"></nz-divider>
+    <strong>Checkpoint Type:</strong> {{checkPointType}}
     <ng-container *ngIf="checkPointDetail?.failure_message">
       <nz-divider nzType="vertical"></nz-divider>
       <strong>Failure Message:</strong> {{checkPointDetail?.failure_message || 
'-' }}
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts
index 05c3316..1580dc7 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts
@@ -21,8 +21,10 @@ import {
   CheckPointCompletedStatisticsInterface,
   CheckPointDetailInterface,
   JobDetailCorrectInterface,
-  VerticesItemInterface
+  VerticesItemInterface,
+  CheckPointConfigInterface
 } from 'interfaces';
+import { forkJoin } from 'rxjs';
 import { first } from 'rxjs/operators';
 import { JobService } from 'services';
 
@@ -35,6 +37,7 @@ import { JobService } from 'services';
 export class JobCheckpointsDetailComponent implements OnInit {
   innerCheckPoint: CheckPointCompletedStatisticsInterface;
   jobDetail: JobDetailCorrectInterface;
+  checkPointType: string;
 
   @Input()
   set checkPoint(value) {
@@ -47,6 +50,7 @@ export class JobCheckpointsDetailComponent implements OnInit {
   }
 
   checkPointDetail: CheckPointDetailInterface;
+  checkPointConfig: CheckPointConfigInterface;
   listOfVertex: VerticesItemInterface[] = [];
   isLoading = true;
 
@@ -57,9 +61,26 @@ export class JobCheckpointsDetailComponent implements OnInit 
{
   refresh() {
     this.isLoading = true;
     if (this.jobDetail && this.jobDetail.jid) {
-      this.jobService.loadCheckpointDetails(this.jobDetail.jid, 
this.checkPoint.id).subscribe(
-        data => {
-          this.checkPointDetail = data;
+      forkJoin([
+        this.jobService.loadCheckpointConfig(this.jobDetail.jid),
+        this.jobService.loadCheckpointDetails(this.jobDetail.jid, 
this.checkPoint.id)
+      ]).subscribe(
+        ([config, detail]) => {
+          this.checkPointConfig = config;
+          this.checkPointDetail = detail;
+          if (this.checkPointDetail.checkpoint_type === 'CHECKPOINT') {
+            if (this.checkPointConfig.unaligned_checkpoints) {
+              this.checkPointType = 'unaligned checkpoint';
+            } else {
+              this.checkPointType = 'aligned checkpoint';
+            }
+          } else if (this.checkPointDetail.checkpoint_type === 
'SYNC_SAVEPOINT') {
+            this.checkPointType = 'savepoint on cancel';
+          } else if (this.checkPointDetail.checkpoint_type === 'SAVEPOINT') {
+            this.checkPointType = 'savepoint';
+          } else {
+            this.checkPointType = '-';
+          }
           this.isLoading = false;
           this.cdr.markForCheck();
         },
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
index 888c1f5..237a84d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.messages.checkpoints;
 
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
@@ -87,6 +88,8 @@ public class CheckpointStatistics implements ResponseBody {
 
        public static final String FIELD_NAME_TASKS = "tasks";
 
+       public static final String FIELD_NAME_CHECKPOINT_TYPE = 
"checkpoint_type";
+
        @JsonProperty(FIELD_NAME_ID)
        private final long id;
 
@@ -123,6 +126,9 @@ public class CheckpointStatistics implements ResponseBody {
        @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
        private final int numAckSubtasks;
 
+       @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
+       private final CheckpointType checkpointType;
+
        @JsonProperty(FIELD_NAME_TASKS)
        @JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
        private final Map<JobVertexID, TaskCheckpointStatistics> 
checkpointStatisticsPerTask;
@@ -141,6 +147,7 @@ public class CheckpointStatistics implements ResponseBody {
                        @JsonProperty(FIELD_NAME_PERSISTED_DATA) long 
persistedData,
                        @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
                        @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks,
+                       @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) 
CheckpointType checkpointType,
                        @JsonDeserialize(keyUsing = 
JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) 
Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
                this.id = id;
                this.status = Preconditions.checkNotNull(status);
@@ -154,6 +161,7 @@ public class CheckpointStatistics implements ResponseBody {
                this.persistedData = persistedData;
                this.numSubtasks = numSubtasks;
                this.numAckSubtasks = numAckSubtasks;
+               this.checkpointType = 
Preconditions.checkNotNull(checkpointType);
                this.checkpointStatisticsPerTask = 
Preconditions.checkNotNull(checkpointStatisticsPerTask);
        }
 
@@ -193,6 +201,10 @@ public class CheckpointStatistics implements ResponseBody {
                return numAckSubtasks;
        }
 
+       public CheckpointType getCheckpointType() {
+               return checkpointType;
+       }
+
        @Nullable
        public Map<JobVertexID, TaskCheckpointStatistics> 
getCheckpointStatisticsPerTask() {
                return checkpointStatisticsPerTask;
@@ -219,6 +231,7 @@ public class CheckpointStatistics implements ResponseBody {
                        numSubtasks == that.numSubtasks &&
                        numAckSubtasks == that.numAckSubtasks &&
                        status == that.status &&
+                       Objects.equals(checkpointType, that.checkpointType) &&
                        Objects.equals(checkpointStatisticsPerTask, 
that.checkpointStatisticsPerTask);
        }
 
@@ -237,6 +250,7 @@ public class CheckpointStatistics implements ResponseBody {
                        persistedData,
                        numSubtasks,
                        numAckSubtasks,
+                       checkpointType,
                        checkpointStatisticsPerTask);
        }
 
@@ -289,6 +303,7 @@ public class CheckpointStatistics implements ResponseBody {
                                completedCheckpointStats.getPersistedData(),
                                completedCheckpointStats.getNumberOfSubtasks(),
                                
completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+                               
completedCheckpointStats.getProperties().getCheckpointType(),
                                checkpointStatisticsPerTask,
                                completedCheckpointStats.getExternalPath(),
                                completedCheckpointStats.isDiscarded());
@@ -308,6 +323,7 @@ public class CheckpointStatistics implements ResponseBody {
                                failedCheckpointStats.getPersistedData(),
                                failedCheckpointStats.getNumberOfSubtasks(),
                                
failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+                               
failedCheckpointStats.getProperties().getCheckpointType(),
                                checkpointStatisticsPerTask,
                                failedCheckpointStats.getFailureTimestamp(),
                                failedCheckpointStats.getFailureMessage());
@@ -327,6 +343,7 @@ public class CheckpointStatistics implements ResponseBody {
                                pendingCheckpointStats.getPersistedData(),
                                pendingCheckpointStats.getNumberOfSubtasks(),
                                
pendingCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+                               
pendingCheckpointStats.getProperties().getCheckpointType(),
                                checkpointStatisticsPerTask
                        );
                } else {
@@ -369,6 +386,7 @@ public class CheckpointStatistics implements ResponseBody {
                        @JsonProperty(FIELD_NAME_PERSISTED_DATA) long 
persistedData,
                        @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
                        @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks,
+                       @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) 
CheckpointType checkpointType,
                        @JsonDeserialize(keyUsing = 
JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) 
Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
                        @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable 
String externalPath,
                        @JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
@@ -385,6 +403,7 @@ public class CheckpointStatistics implements ResponseBody {
                                persistedData,
                                numSubtasks,
                                numAckSubtasks,
+                               checkpointType,
                                checkpointingStatisticsPerTask);
 
                        this.externalPath = externalPath;
@@ -452,6 +471,7 @@ public class CheckpointStatistics implements ResponseBody {
                        @JsonProperty(FIELD_NAME_PERSISTED_DATA) long 
persistedData,
                        @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
                        @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks,
+                       @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) 
CheckpointType checkpointType,
                        @JsonDeserialize(keyUsing = 
JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) 
Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
                        @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long 
failureTimestamp,
                        @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable 
String failureMessage) {
@@ -468,6 +488,7 @@ public class CheckpointStatistics implements ResponseBody {
                                persistedData,
                                numSubtasks,
                                numAckSubtasks,
+                               checkpointType,
                                checkpointingStatisticsPerTask);
 
                        this.failureTimestamp = failureTimestamp;
@@ -524,6 +545,7 @@ public class CheckpointStatistics implements ResponseBody {
                        @JsonProperty(FIELD_NAME_PERSISTED_DATA) long 
persistedData,
                        @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
                        @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks,
+                       @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) 
CheckpointType checkpointType,
                        @JsonDeserialize(keyUsing = 
JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) 
Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask) {
                        super(
                                id,
@@ -538,6 +560,7 @@ public class CheckpointStatistics implements ResponseBody {
                                persistedData,
                                numSubtasks,
                                numAckSubtasks,
+                               checkpointType,
                                checkpointingStatisticsPerTask);
                }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
index 3684b00..a55bc15 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.messages.checkpoints;
 
 import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
 
@@ -90,6 +91,7 @@ public class CheckpointingStatisticsTest extends 
RestResponseMarshallingTestBase
                        44L,
                        10,
                        10,
+                       CheckpointType.CHECKPOINT,
                        Collections.emptyMap(),
                        null,
                        false);
@@ -107,6 +109,7 @@ public class CheckpointingStatisticsTest extends 
RestResponseMarshallingTestBase
                        4244L,
                        9,
                        9,
+                       CheckpointType.SAVEPOINT,
                        checkpointStatisticsPerTask,
                        "externalPath",
                        false);
@@ -124,6 +127,7 @@ public class CheckpointingStatisticsTest extends 
RestResponseMarshallingTestBase
                        22L,
                        11,
                        9,
+                       CheckpointType.CHECKPOINT,
                        Collections.emptyMap(),
                        100L,
                        "Test failure");
@@ -147,6 +151,7 @@ public class CheckpointingStatisticsTest extends 
RestResponseMarshallingTestBase
                        16L,
                        10,
                        10,
+                       CheckpointType.CHECKPOINT,
                        Collections.emptyMap()
                );
 

Reply via email to