[FLINK-8915] CheckpointingStatisticsHandler fails to return 
PendingCheckpointStats

This closes #5703.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31c0754e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31c0754e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31c0754e

Branch: refs/heads/master
Commit: 31c0754ee70265a1b440cbfbc21e1220e5d9718a
Parents: 096a89a
Author: yanghua <[email protected]>
Authored: Thu Mar 15 16:24:35 2018 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Sun Mar 18 16:08:11 2018 +0100

----------------------------------------------------------------------
 .../checkpoints/CheckpointStatistics.java       | 77 +++++++++++++++++++-
 .../CheckpointingStatisticsTest.java            | 16 +++-
 2 files changed, 90 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/31c0754e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
index 333c016..f8aeb26 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
@@ -50,7 +51,8 @@ import java.util.Objects;
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "@class")
 @JsonSubTypes({
        @JsonSubTypes.Type(value = 
CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
-       @JsonSubTypes.Type(value = 
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+       @JsonSubTypes.Type(value = 
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed"),
+       @JsonSubTypes.Type(value = 
CheckpointStatistics.PendingCheckpointStatistics.class, name = "in_progress")})
 public class CheckpointStatistics implements ResponseBody {
 
        public static final String FIELD_NAME_ID = "id";
@@ -272,8 +274,25 @@ public class CheckpointStatistics implements ResponseBody {
                                checkpointStatisticsPerTask,
                                failedCheckpointStats.getFailureTimestamp(),
                                failedCheckpointStats.getFailureMessage());
+               } else if (checkpointStats instanceof PendingCheckpointStats) {
+                       final PendingCheckpointStats pendingCheckpointStats = 
((PendingCheckpointStats) checkpointStats);
+
+                       return new 
CheckpointStatistics.PendingCheckpointStatistics(
+                               pendingCheckpointStats.getCheckpointId(),
+                               pendingCheckpointStats.getStatus(),
+                               
pendingCheckpointStats.getProperties().isSavepoint(),
+                               pendingCheckpointStats.getTriggerTimestamp(),
+                               pendingCheckpointStats.getLatestAckTimestamp(),
+                               pendingCheckpointStats.getStateSize(),
+                               pendingCheckpointStats.getEndToEndDuration(),
+                               pendingCheckpointStats.getAlignmentBuffered(),
+                               pendingCheckpointStats.getNumberOfSubtasks(),
+                               
pendingCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+                               checkpointStatisticsPerTask
+                       );
                } else {
-                       throw new IllegalArgumentException("Given checkpoint 
stats object of type " + checkpointStats.getClass().getName() + " cannot be 
converted.");
+                       throw new IllegalArgumentException("Given checkpoint 
stats object of type "
+                               + checkpointStats.getClass().getName() + " 
cannot be converted.");
                }
        }
 
@@ -438,4 +457,58 @@ public class CheckpointStatistics implements ResponseBody {
                        return Objects.hash(super.hashCode(), failureTimestamp, 
failureMessage);
                }
        }
+
+       /**
+        * Statistics for a pending checkpoint.
+        */
+       public static final class PendingCheckpointStatistics extends 
CheckpointStatistics {
+
+               @JsonCreator
+               public PendingCheckpointStatistics(
+                       @JsonProperty(FIELD_NAME_ID) long id,
+                       @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus 
status,
+                       @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean 
savepoint,
+                       @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long 
triggerTimestamp,
+                       @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long 
latestAckTimestamp,
+                       @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+                       @JsonProperty(FIELD_NAME_DURATION) long duration,
+                       @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long 
alignmentBuffered,
+                       @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+                       @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks,
+                       @JsonDeserialize(keyUsing = 
JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) 
Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask) {
+                       super(
+                               id,
+                               status,
+                               savepoint,
+                               triggerTimestamp,
+                               latestAckTimestamp,
+                               stateSize,
+                               duration,
+                               alignmentBuffered,
+                               numSubtasks,
+                               numAckSubtasks,
+                               checkpointingStatisticsPerTask);
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       if (!super.equals(o)) {
+                               return false;
+                       }
+
+                       return true;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(super.hashCode());
+               }
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31c0754e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
index 562418e..8f25a59 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
@@ -122,6 +122,20 @@ public class CheckpointingStatisticsTest extends 
RestResponseMarshallingTestBase
                        true,
                        "foobar");
 
+               CheckpointStatistics.PendingCheckpointStatistics pending = new 
CheckpointStatistics.PendingCheckpointStatistics(
+                       5L,
+                       CheckpointStatsStatus.IN_PROGRESS,
+                       false,
+                       42L,
+                       41L,
+                       1337L,
+                       1L,
+                       0L,
+                       10,
+                       10,
+                       Collections.emptyMap()
+               );
+
                final CheckpointingStatistics.LatestCheckpoints 
latestCheckpoints = new CheckpointingStatistics.LatestCheckpoints(
                        completed,
                        savepoint,
@@ -132,6 +146,6 @@ public class CheckpointingStatisticsTest extends 
RestResponseMarshallingTestBase
                        counts,
                        summary,
                        latestCheckpoints,
-                       Arrays.asList(completed, savepoint, failed));
+                       Arrays.asList(completed, savepoint, failed, pending));
        }
 }

Reply via email to