gm7y8 commented on a change in pull request #13458:
URL: https://github.com/apache/flink/pull/13458#discussion_r494056973
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -113,6 +114,9 @@
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
private final int numAckSubtasks;
+ @JsonProperty(FIELD_NAME_CHECK_POINT_TYPE)
+ private final String checkPointType;
Review comment:
done
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -83,6 +82,8 @@
public static final String FIELD_NAME_TASKS = "tasks";
+ public static final String FIELD_NAME_CHECK_POINT_TYPE =
"check_point_type";
Review comment:
done
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -18,6 +18,14 @@
package org.apache.flink.runtime.rest.messages.checkpoints;
+import java.util.Collection;
Review comment:
done
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -18,32 +18,22 @@
package org.apache.flink.runtime.rest.messages.checkpoints;
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
-import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.checkpoint.*;
Review comment:
done
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
##########
@@ -82,6 +82,7 @@ protected CheckpointingStatistics getTestResponseInstance()
throws Exception {
0L,
10,
10,
+ "Checkpoint",
Review comment:
done
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
##########
@@ -97,6 +98,7 @@ protected CheckpointingStatistics getTestResponseInstance()
throws Exception {
0L,
9,
9,
+ "Savepoint",
Review comment:
done
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
##########
@@ -112,6 +114,7 @@ protected CheckpointingStatistics getTestResponseInstance()
throws Exception {
0L,
11,
9,
+ "Checkpoint",
Review comment:
done
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
##########
@@ -133,6 +136,7 @@ protected CheckpointingStatistics getTestResponseInstance()
throws Exception {
0L,
10,
10,
+ "Checkpoint",
Review comment:
done
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
##########
@@ -18,15 +18,15 @@
package org.apache.flink.runtime.rest.messages.checkpoints;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
-
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
Review comment:
done
##########
File path: flink-runtime-web/src/test/resources/rest_api_v1.snapshot
##########
@@ -1254,6 +1263,9 @@
"num_acknowledged_subtasks" : {
"type" : "integer"
},
+ "check_point_type" : {
Review comment:
done.. should these steps be documented in the developer guide
https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html
.. let me if I can add them here?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -113,23 +116,27 @@
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
private final int numAckSubtasks;
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
+ private final CheckpointType checkPointType;
Review comment:
done
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -113,23 +116,27 @@
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
private final int numAckSubtasks;
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
+ private final CheckpointType checkPointType;
+
@JsonProperty(FIELD_NAME_TASKS)
@JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
private final Map<JobVertexID, TaskCheckpointStatistics>
checkpointStatisticsPerTask;
@JsonCreator
private CheckpointStatistics(
- @JsonProperty(FIELD_NAME_ID) long id,
- @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus
status,
- @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean
savepoint,
- @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long
triggerTimestamp,
- @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long
latestAckTimestamp,
- @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
- @JsonProperty(FIELD_NAME_DURATION) long duration,
- @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
- @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
- @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
- @JsonDeserialize(keyUsing =
JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS)
Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
+ @JsonProperty(FIELD_NAME_ID) long id,
+ @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
+ @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+ @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long
triggerTimestamp,
+ @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long
latestAckTimestamp,
+ @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+ @JsonProperty(FIELD_NAME_DURATION) long duration,
+ @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
+ @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+ @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) CheckpointType
checkPointType,
+ @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class)
@JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics>
checkpointStatisticsPerTask) {
Review comment:
done
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -113,23 +116,27 @@
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
private final int numAckSubtasks;
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
+ private final CheckpointType checkPointType;
+
@JsonProperty(FIELD_NAME_TASKS)
@JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
private final Map<JobVertexID, TaskCheckpointStatistics>
checkpointStatisticsPerTask;
@JsonCreator
private CheckpointStatistics(
- @JsonProperty(FIELD_NAME_ID) long id,
- @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus
status,
- @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean
savepoint,
- @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long
triggerTimestamp,
- @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long
latestAckTimestamp,
- @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
- @JsonProperty(FIELD_NAME_DURATION) long duration,
- @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
- @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
- @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
- @JsonDeserialize(keyUsing =
JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS)
Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
+ @JsonProperty(FIELD_NAME_ID) long id,
+ @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
+ @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+ @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long
triggerTimestamp,
+ @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long
latestAckTimestamp,
+ @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+ @JsonProperty(FIELD_NAME_DURATION) long duration,
+ @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
+ @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+ @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) CheckpointType
checkPointType,
Review comment:
done
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -140,6 +147,7 @@ private CheckpointStatistics(
this.alignmentBuffered = alignmentBuffered;
this.numSubtasks = numSubtasks;
this.numAckSubtasks = numAckSubtasks;
+ this.checkPointType = checkPointType;
Review comment:
done
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -179,6 +187,10 @@ public int getNumAckSubtasks() {
return numAckSubtasks;
}
+ public CheckpointType getCheckPointType() {
+ return checkPointType;
Review comment:
done
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -203,12 +215,13 @@ public boolean equals(Object o) {
numSubtasks == that.numSubtasks &&
numAckSubtasks == that.numAckSubtasks &&
status == that.status &&
+ Objects.equals(checkPointType, that.checkPointType) &&
Review comment:
done
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -203,12 +215,13 @@ public boolean equals(Object o) {
numSubtasks == that.numSubtasks &&
numAckSubtasks == that.numAckSubtasks &&
status == that.status &&
+ Objects.equals(checkPointType, that.checkPointType) &&
Objects.equals(checkpointStatisticsPerTask,
that.checkpointStatisticsPerTask);
}
@Override
public int hashCode() {
- return Objects.hash(id, status, savepoint, triggerTimestamp,
latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks,
numAckSubtasks, checkpointStatisticsPerTask);
+ return Objects.hash(id, status, savepoint, triggerTimestamp,
latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks,
numAckSubtasks, checkPointType, checkpointStatisticsPerTask);
Review comment:
done
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -330,6 +346,7 @@ public CompletedCheckpointStatistics(
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
CheckpointType checkPointType,
Review comment:
done
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -409,6 +427,7 @@ public FailedCheckpointStatistics(
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
CheckpointType checkPointType,
Review comment:
done
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -423,6 +442,7 @@ public FailedCheckpointStatistics(
alignmentBuffered,
numSubtasks,
numAckSubtasks,
+ checkPointType,
Review comment:
done
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -477,6 +497,7 @@ public PendingCheckpointStatistics(
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
CheckpointType checkPointType,
Review comment:
done
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -489,6 +510,7 @@ public PendingCheckpointStatistics(
alignmentBuffered,
numSubtasks,
numAckSubtasks,
+ checkPointType,
Review comment:
done
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
##########
@@ -27,6 +28,8 @@
import java.util.HashMap;
import java.util.Map;
+
+
Review comment:
yes I believe, removed these empty lines
##########
File path: flink-runtime-web/src/test/resources/rest_api_v1.snapshot
##########
@@ -1254,6 +1263,9 @@
"num_acknowledged_subtasks" : {
"type" : "integer"
},
+ "check_point_type" : {
Review comment:
done.. should these steps be documented in the developer guide
https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html
.. let me know if I can comit them here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]