XComp commented on a change in pull request #13458:
URL: https://github.com/apache/flink/pull/13458#discussion_r494339617
##########
File path: flink-runtime-web/src/test/resources/rest_api_v1.snapshot
##########
@@ -1254,6 +1263,9 @@
"num_acknowledged_subtasks" : {
"type" : "integer"
},
+ "check_point_type" : {
Review comment:
You shouldn't edit this file manually. Instead, you can run
`org.apache.flink.runtime.rest.compatibility.RestAPIStabilityTest#testDispatcherRestAPIStability`
with `-Dgenerate-rest-snapshot` to generate the file from sources and commit
it. That's going to be easier and less error-prone than editing the file
manually. It will also make
`org.apache.flink.runtime.rest.compatibility.RestAPIStabilityTest` to succeed
(as it is not right now) again after an API change. :-)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -113,23 +116,27 @@
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
private final int numAckSubtasks;
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
+ private final CheckpointType checkPointType;
Review comment:
```suggestion
private final CheckpointType checkpointType;
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -113,23 +116,27 @@
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
private final int numAckSubtasks;
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
+ private final CheckpointType checkPointType;
+
@JsonProperty(FIELD_NAME_TASKS)
@JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
private final Map<JobVertexID, TaskCheckpointStatistics>
checkpointStatisticsPerTask;
@JsonCreator
private CheckpointStatistics(
- @JsonProperty(FIELD_NAME_ID) long id,
- @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus
status,
- @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean
savepoint,
- @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long
triggerTimestamp,
- @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long
latestAckTimestamp,
- @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
- @JsonProperty(FIELD_NAME_DURATION) long duration,
- @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
- @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
- @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
- @JsonDeserialize(keyUsing =
JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS)
Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
+ @JsonProperty(FIELD_NAME_ID) long id,
+ @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
+ @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+ @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long
triggerTimestamp,
+ @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long
latestAckTimestamp,
+ @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+ @JsonProperty(FIELD_NAME_DURATION) long duration,
+ @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
+ @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+ @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) CheckpointType
checkPointType,
Review comment:
```suggestion
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) CheckpointType
checkpointType,
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -477,6 +497,7 @@ public PendingCheckpointStatistics(
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
CheckpointType checkPointType,
Review comment:
```suggestion
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
CheckpointType checkpointType,
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -489,6 +510,7 @@ public PendingCheckpointStatistics(
alignmentBuffered,
numSubtasks,
numAckSubtasks,
+ checkPointType,
Review comment:
```suggestion
checkpointType,
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -140,6 +147,7 @@ private CheckpointStatistics(
this.alignmentBuffered = alignmentBuffered;
this.numSubtasks = numSubtasks;
this.numAckSubtasks = numAckSubtasks;
+ this.checkPointType = checkPointType;
Review comment:
```suggestion
this.checkpointType = checkpointType;
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -423,6 +442,7 @@ public FailedCheckpointStatistics(
alignmentBuffered,
numSubtasks,
numAckSubtasks,
+ checkPointType,
Review comment:
```suggestion
checkpointType,
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -330,6 +346,7 @@ public CompletedCheckpointStatistics(
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
CheckpointType checkPointType,
Review comment:
```suggestion
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
CheckpointType checkpointType,
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -203,12 +215,13 @@ public boolean equals(Object o) {
numSubtasks == that.numSubtasks &&
numAckSubtasks == that.numAckSubtasks &&
status == that.status &&
+ Objects.equals(checkPointType, that.checkPointType) &&
Objects.equals(checkpointStatisticsPerTask,
that.checkpointStatisticsPerTask);
}
@Override
public int hashCode() {
- return Objects.hash(id, status, savepoint, triggerTimestamp,
latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks,
numAckSubtasks, checkpointStatisticsPerTask);
+ return Objects.hash(id, status, savepoint, triggerTimestamp,
latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks,
numAckSubtasks, checkPointType, checkpointStatisticsPerTask);
Review comment:
```suggestion
return Objects.hash(id, status, savepoint, triggerTimestamp,
latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks,
numAckSubtasks, checkpointType, checkpointStatisticsPerTask);
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -113,23 +116,27 @@
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
private final int numAckSubtasks;
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
+ private final CheckpointType checkPointType;
+
@JsonProperty(FIELD_NAME_TASKS)
@JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
private final Map<JobVertexID, TaskCheckpointStatistics>
checkpointStatisticsPerTask;
@JsonCreator
private CheckpointStatistics(
- @JsonProperty(FIELD_NAME_ID) long id,
- @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus
status,
- @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean
savepoint,
- @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long
triggerTimestamp,
- @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long
latestAckTimestamp,
- @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
- @JsonProperty(FIELD_NAME_DURATION) long duration,
- @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
- @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
- @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
- @JsonDeserialize(keyUsing =
JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS)
Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
+ @JsonProperty(FIELD_NAME_ID) long id,
+ @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
+ @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+ @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long
triggerTimestamp,
+ @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long
latestAckTimestamp,
+ @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+ @JsonProperty(FIELD_NAME_DURATION) long duration,
+ @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
+ @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+ @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE) CheckpointType
checkPointType,
+ @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class)
@JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics>
checkpointStatisticsPerTask) {
Review comment:
Formatting changed. Please revert the format change and only commit code
changes.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
##########
@@ -27,6 +28,8 @@
import java.util.HashMap;
import java.util.Map;
+
+
Review comment:
Are those accidentally added?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -179,6 +187,10 @@ public int getNumAckSubtasks() {
return numAckSubtasks;
}
+ public CheckpointType getCheckPointType() {
+ return checkPointType;
Review comment:
```suggestion
public CheckpointType getCheckpointType() {
return checkpointType;
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -409,6 +427,7 @@ public FailedCheckpointStatistics(
@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
+ @JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
CheckpointType checkPointType,
Review comment:
```suggestion
@JsonProperty(FIELD_NAME_CHECKPOINT_TYPE)
CheckpointType checkpointType,
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
##########
@@ -203,12 +215,13 @@ public boolean equals(Object o) {
numSubtasks == that.numSubtasks &&
numAckSubtasks == that.numAckSubtasks &&
status == that.status &&
+ Objects.equals(checkPointType, that.checkPointType) &&
Review comment:
```suggestion
Objects.equals(checkPointType, that.checkpointType) &&
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]