This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b2a342c [FLINK-18851][runtime-web] Add checkpoint type to checkpoint
history
b2a342c is described below
commit b2a342c6a6ef154ed3c1a44826ce2be14e538386
Author: gm7y8 <[email protected]>
AuthorDate: Tue Oct 20 02:41:28 2020 -0700
[FLINK-18851][runtime-web] Add checkpoint type to checkpoint history
---
docs/_includes/generated/rest_v1_dispatcher.html | 16 ++++++++++++
.../src/test/resources/rest_api_v1.snapshot | 16 ++++++++++++
.../src/app/interfaces/job-checkpoint.ts | 3 +++
.../detail/job-checkpoints-detail.component.html | 2 ++
.../detail/job-checkpoints-detail.component.ts | 29 +++++++++++++++++++---
.../messages/checkpoints/CheckpointStatistics.java | 23 +++++++++++++++++
.../checkpoints/CheckpointingStatisticsTest.java | 5 ++++
7 files changed, 90 insertions(+), 4 deletions(-)
diff --git a/docs/_includes/generated/rest_v1_dispatcher.html
b/docs/_includes/generated/rest_v1_dispatcher.html
index 4d3f646..a494e9b 100644
--- a/docs/_includes/generated/rest_v1_dispatcher.html
+++ b/docs/_includes/generated/rest_v1_dispatcher.html
@@ -1489,6 +1489,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H
"Expect:" -F "jarfile=@pa
"alignment_buffered" : {
"type" : "integer"
},
+ "checkpoint_type" : {
+ "type" : "string",
+ "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+ },
"end_to_end_duration" : {
"type" : "integer"
},
@@ -1544,6 +1548,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H
"Expect:" -F "jarfile=@pa
"alignment_buffered" : {
"type" : "integer"
},
+ "checkpoint_type" : {
+ "type" : "string",
+ "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+ },
"discarded" : {
"type" : "boolean"
},
@@ -1633,6 +1641,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H
"Expect:" -F "jarfile=@pa
"alignment_buffered" : {
"type" : "integer"
},
+ "checkpoint_type" : {
+ "type" : "string",
+ "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+ },
"end_to_end_duration" : {
"type" : "integer"
},
@@ -1882,6 +1894,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H
"Expect:" -F "jarfile=@pa
"alignment_buffered" : {
"type" : "integer"
},
+ "checkpoint_type" : {
+ "type" : "string",
+ "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+ },
"end_to_end_duration" : {
"type" : "integer"
},
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index c6180ce..764e0b5 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -995,6 +995,10 @@
"num_acknowledged_subtasks" : {
"type" : "integer"
},
+ "checkpoint_type" : {
+ "type" : "string",
+ "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+ },
"tasks" : {
"type" : "object",
"additionalProperties" : {
@@ -1088,6 +1092,10 @@
"num_acknowledged_subtasks" : {
"type" : "integer"
},
+ "checkpoint_type" : {
+ "type" : "string",
+ "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+ },
"tasks" : {
"type" : "object",
"additionalProperties" : {
@@ -1166,6 +1174,10 @@
"num_acknowledged_subtasks" : {
"type" : "integer"
},
+ "checkpoint_type" : {
+ "type" : "string",
+ "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+ },
"tasks" : {
"type" : "object",
"additionalProperties" : {
@@ -1292,6 +1304,10 @@
"num_acknowledged_subtasks" : {
"type" : "integer"
},
+ "checkpoint_type" : {
+ "type" : "string",
+ "enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
+ },
"tasks" : {
"type" : "object",
"additionalProperties" : {
diff --git
a/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts
b/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts
index ac0dbf8..5766557 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-checkpoint.ts
@@ -91,6 +91,7 @@ export interface CheckPointCompletedStatisticsInterface {
tasks: CheckPointTaskStatisticsInterface;
external_path: string;
discarded: boolean;
+ checkpoint_type: string;
}
export interface CheckPointTaskStatisticsInterface {
@@ -114,6 +115,7 @@ export interface CheckPointConfigInterface {
enabled: boolean;
delete_on_cancellation: boolean;
};
+ unaligned_checkpoints: boolean;
}
export interface CheckPointDetailInterface {
@@ -130,6 +132,7 @@ export interface CheckPointDetailInterface {
failure_message?: string;
num_subtasks: number;
num_acknowledged_subtasks: number;
+ checkpoint_type: string;
tasks: Array<{
[ taskId: string ]: {
id: number;
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html
b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html
index c8f9b91..4a1331c 100644
---
a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html
+++
b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html
@@ -21,6 +21,8 @@
<strong>Path:</strong> {{checkPointDetail?.external_path || '-'}}
<nz-divider nzType="vertical"></nz-divider>
<strong>Discarded:</strong> {{checkPointDetail?.discarded || '-'}}
+ <nz-divider nzType="vertical"></nz-divider>
+ <strong>Checkpoint Type:</strong> {{checkPointType}}
<ng-container *ngIf="checkPointDetail?.failure_message">
<nz-divider nzType="vertical"></nz-divider>
<strong>Failure Message:</strong> {{checkPointDetail?.failure_message ||
'-' }}
diff --git
a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts
b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts
index 05c3316..1580dc7 100644
---
a/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts
+++
b/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.ts
@@ -21,8 +21,10 @@ import {
CheckPointCompletedStatisticsInterface,
CheckPointDetailInterface,
JobDetailCorrectInterface,
- VerticesItemInterface
+ VerticesItemInterface,
+ CheckPointConfigInterface
} from 'interfaces';
+import { forkJoin } from 'rxjs';
import { first } from 'rxjs/operators';
import { JobService } from 'services';
@@ -35,6 +37,7 @@ import { JobService } from 'services';
export class JobCheckpointsDetailComponent implements OnInit {
innerCheckPoint: CheckPointCompletedStatisticsInterface;
jobDetail: JobDetailCorrectInterface;
+ checkPointType: string;
@Input()
set checkPoint(value) {
@@ -47,6 +50,7 @@ export class JobCheckpointsDetailComponent implements OnInit {
}
checkPointDetail: CheckPointDetailInterface;
+ checkPointConfig: CheckPointConfigInterface;
listOfVertex: VerticesItemInterface[] = [];
isLoading = true;
@@ -57,9 +61,26 @@ export class JobCheckpointsDetailComponent implements OnInit
{
refresh() {
this.isLoading = true;
if (this.jobDetail && this.jobDetail.jid) {
- this.jobService.loadCheckpointDetails(this.jobDetail.jid,
this.checkPoint.id).subscribe(
- data => {
- this.checkPointDetail = data;
+ forkJoin([
+ this.jobService.loadCheckpointConfig(this.jobDetail.jid),
+ this.jobService.loadCheckpointDetails(this.jobDetail.jid,
this.checkPoint.id)
+ ]).subscribe(
+ ([config, detail]) => {
+ this.checkPointConfig = config;
+ this.checkPointDetail = detail;
+ if (this.checkPointDetail.checkpoint_type === 'CHECKPOINT') {
+ if (this.checkPointConfig.unaligned_checkpoints) {
+ this.checkPointType = 'unaligned checkpoint';
+ } else {
+ this.checkPointType = 'aligned checkpoint';
+ }
+ } else if (this.checkPointDetail.checkpoint_type ===
'SYNC_SAVEPOINT') {
+ this.checkPointType = 'savepoint on cancel';
+ } else if (this.checkPointDetail.checkpoint_type === 'SAVEPOINT') {
+ this.checkPointType = 'savepoint';
+ } else {
+ this.checkPointType = '-';
+ }
this.isLoading = false;
this.cdr.markForCheck();
},
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
index 888c1f5..237a84d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.messages.checkpoints;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
@@ -87,6 +88,8 @@ public class CheckpointStatistics implements ResponseBody {
public static final String FIELD_NAME_TASKS = "tasks";
+ public static final String FIELD_NAME_CHECKPOINT_TYPE =
"checkpoint_type";
+
@JsonProperty(FIELD_NAME_ID)
private final long id;
@@ -123,6 +126,9 @@ public class CheckpointStatistics implements ResponseBody {
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
private final int numAckSubtasks;
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
+ private final CheckpointType checkpointType;
+
@JsonProperty(FIELD_NAME_TASKS)
@JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
private final Map<JobVertexID, TaskCheckpointStatistics>
checkpointStatisticsPerTask;
@@ -141,6 +147,7 @@ public class CheckpointStatistics implements ResponseBody {
@JsonProperty(FIELD_NAME_PERSISTED_DATA) long
persistedData,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
CheckpointType checkpointType,
@JsonDeserialize(keyUsing =
JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS)
Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
this.id = id;
this.status = Preconditions.checkNotNull(status);
@@ -154,6 +161,7 @@ public class CheckpointStatistics implements ResponseBody {
this.persistedData = persistedData;
this.numSubtasks = numSubtasks;
this.numAckSubtasks = numAckSubtasks;
+ this.checkpointType =
Preconditions.checkNotNull(checkpointType);
this.checkpointStatisticsPerTask =
Preconditions.checkNotNull(checkpointStatisticsPerTask);
}
@@ -193,6 +201,10 @@ public class CheckpointStatistics implements ResponseBody {
return numAckSubtasks;
}
+ public CheckpointType getCheckpointType() {
+ return checkpointType;
+ }
+
@Nullable
public Map<JobVertexID, TaskCheckpointStatistics>
getCheckpointStatisticsPerTask() {
return checkpointStatisticsPerTask;
@@ -219,6 +231,7 @@ public class CheckpointStatistics implements ResponseBody {
numSubtasks == that.numSubtasks &&
numAckSubtasks == that.numAckSubtasks &&
status == that.status &&
+ Objects.equals(checkpointType, that.checkpointType) &&
Objects.equals(checkpointStatisticsPerTask,
that.checkpointStatisticsPerTask);
}
@@ -237,6 +250,7 @@ public class CheckpointStatistics implements ResponseBody {
persistedData,
numSubtasks,
numAckSubtasks,
+ checkpointType,
checkpointStatisticsPerTask);
}
@@ -289,6 +303,7 @@ public class CheckpointStatistics implements ResponseBody {
completedCheckpointStats.getPersistedData(),
completedCheckpointStats.getNumberOfSubtasks(),
completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+
completedCheckpointStats.getProperties().getCheckpointType(),
checkpointStatisticsPerTask,
completedCheckpointStats.getExternalPath(),
completedCheckpointStats.isDiscarded());
@@ -308,6 +323,7 @@ public class CheckpointStatistics implements ResponseBody {
failedCheckpointStats.getPersistedData(),
failedCheckpointStats.getNumberOfSubtasks(),
failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+
failedCheckpointStats.getProperties().getCheckpointType(),
checkpointStatisticsPerTask,
failedCheckpointStats.getFailureTimestamp(),
failedCheckpointStats.getFailureMessage());
@@ -327,6 +343,7 @@ public class CheckpointStatistics implements ResponseBody {
pendingCheckpointStats.getPersistedData(),
pendingCheckpointStats.getNumberOfSubtasks(),
pendingCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+
pendingCheckpointStats.getProperties().getCheckpointType(),
checkpointStatisticsPerTask
);
} else {
@@ -369,6 +386,7 @@ public class CheckpointStatistics implements ResponseBody {
@JsonProperty(FIELD_NAME_PERSISTED_DATA) long
persistedData,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
CheckpointType checkpointType,
@JsonDeserialize(keyUsing =
JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS)
Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable
String externalPath,
@JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
@@ -385,6 +403,7 @@ public class CheckpointStatistics implements ResponseBody {
persistedData,
numSubtasks,
numAckSubtasks,
+ checkpointType,
checkpointingStatisticsPerTask);
this.externalPath = externalPath;
@@ -452,6 +471,7 @@ public class CheckpointStatistics implements ResponseBody {
@JsonProperty(FIELD_NAME_PERSISTED_DATA) long
persistedData,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
CheckpointType checkpointType,
@JsonDeserialize(keyUsing =
JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS)
Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long
failureTimestamp,
@JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable
String failureMessage) {
@@ -468,6 +488,7 @@ public class CheckpointStatistics implements ResponseBody {
persistedData,
numSubtasks,
numAckSubtasks,
+ checkpointType,
checkpointingStatisticsPerTask);
this.failureTimestamp = failureTimestamp;
@@ -524,6 +545,7 @@ public class CheckpointStatistics implements ResponseBody {
@JsonProperty(FIELD_NAME_PERSISTED_DATA) long
persistedData,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
CheckpointType checkpointType,
@JsonDeserialize(keyUsing =
JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS)
Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask) {
super(
id,
@@ -538,6 +560,7 @@ public class CheckpointStatistics implements ResponseBody {
persistedData,
numSubtasks,
numAckSubtasks,
+ checkpointType,
checkpointingStatisticsPerTask);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
index 3684b00..a55bc15 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.messages.checkpoints;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
@@ -90,6 +91,7 @@ public class CheckpointingStatisticsTest extends
RestResponseMarshallingTestBase
44L,
10,
10,
+ CheckpointType.CHECKPOINT,
Collections.emptyMap(),
null,
false);
@@ -107,6 +109,7 @@ public class CheckpointingStatisticsTest extends
RestResponseMarshallingTestBase
4244L,
9,
9,
+ CheckpointType.SAVEPOINT,
checkpointStatisticsPerTask,
"externalPath",
false);
@@ -124,6 +127,7 @@ public class CheckpointingStatisticsTest extends
RestResponseMarshallingTestBase
22L,
11,
9,
+ CheckpointType.CHECKPOINT,
Collections.emptyMap(),
100L,
"Test failure");
@@ -147,6 +151,7 @@ public class CheckpointingStatisticsTest extends
RestResponseMarshallingTestBase
16L,
10,
10,
+ CheckpointType.CHECKPOINT,
Collections.emptyMap()
);