[FLINK-8915] CheckpointingStatisticsHandler fails to return PendingCheckpointStats
This closes #5703. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/341a3fd8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/341a3fd8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/341a3fd8 Branch: refs/heads/release-1.5 Commit: 341a3fd8e20a4645dd8326f5c290314c2344f728 Parents: 7739cb7 Author: yanghua <[email protected]> Authored: Thu Mar 15 16:24:35 2018 +0800 Committer: Till Rohrmann <[email protected]> Committed: Sun Mar 18 18:55:21 2018 +0100 ---------------------------------------------------------------------- .../checkpoints/CheckpointStatistics.java | 77 +++++++++++++++++++- .../CheckpointingStatisticsTest.java | 16 +++- 2 files changed, 90 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/341a3fd8/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java index 333c016..f8aeb26 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.PendingCheckpointStats; import org.apache.flink.runtime.checkpoint.TaskStateStats; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.ResponseBody; @@ -50,7 +51,8 @@ import java.util.Objects; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") @JsonSubTypes({ @JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"), - @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")}) + @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed"), + @JsonSubTypes.Type(value = CheckpointStatistics.PendingCheckpointStatistics.class, name = "in_progress")}) public class CheckpointStatistics implements ResponseBody { public static final String FIELD_NAME_ID = "id"; @@ -272,8 +274,25 @@ public class CheckpointStatistics implements ResponseBody { checkpointStatisticsPerTask, failedCheckpointStats.getFailureTimestamp(), failedCheckpointStats.getFailureMessage()); + } else if (checkpointStats instanceof PendingCheckpointStats) { + final PendingCheckpointStats pendingCheckpointStats = ((PendingCheckpointStats) checkpointStats); + + return new CheckpointStatistics.PendingCheckpointStatistics( + pendingCheckpointStats.getCheckpointId(), + pendingCheckpointStats.getStatus(), + pendingCheckpointStats.getProperties().isSavepoint(), + pendingCheckpointStats.getTriggerTimestamp(), + pendingCheckpointStats.getLatestAckTimestamp(), + pendingCheckpointStats.getStateSize(), + pendingCheckpointStats.getEndToEndDuration(), + pendingCheckpointStats.getAlignmentBuffered(), + pendingCheckpointStats.getNumberOfSubtasks(), + pendingCheckpointStats.getNumberOfAcknowledgedSubtasks(), + checkpointStatisticsPerTask + ); } else { - throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted."); + throw new IllegalArgumentException("Given checkpoint stats object of type " + + checkpointStats.getClass().getName() + " cannot be converted."); } } @@ -438,4 +457,58 @@ public class CheckpointStatistics implements ResponseBody { return Objects.hash(super.hashCode(), failureTimestamp, failureMessage); } } + + /** + * Statistics for a pending checkpoint. + */ + public static final class PendingCheckpointStatistics extends CheckpointStatistics { + + @JsonCreator + public PendingCheckpointStatistics( + @JsonProperty(FIELD_NAME_ID) long id, + @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, + @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask) { + super( + id, + status, + savepoint, + triggerTimestamp, + latestAckTimestamp, + stateSize, + duration, + alignmentBuffered, + numSubtasks, + numAckSubtasks, + checkpointingStatisticsPerTask); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode()); + } + + } } http://git-wip-us.apache.org/repos/asf/flink/blob/341a3fd8/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java index 562418e..8f25a59 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java @@ -122,6 +122,20 @@ public class CheckpointingStatisticsTest extends RestResponseMarshallingTestBase true, "foobar"); + CheckpointStatistics.PendingCheckpointStatistics pending = new CheckpointStatistics.PendingCheckpointStatistics( + 5L, + CheckpointStatsStatus.IN_PROGRESS, + false, + 42L, + 41L, + 1337L, + 1L, + 0L, + 10, + 10, + Collections.emptyMap() + ); + final CheckpointingStatistics.LatestCheckpoints latestCheckpoints = new CheckpointingStatistics.LatestCheckpoints( completed, savepoint, @@ -132,6 +146,6 @@ public class CheckpointingStatisticsTest extends RestResponseMarshallingTestBase counts, summary, latestCheckpoints, - Arrays.asList(completed, savepoint, failed)); + Arrays.asList(completed, savepoint, failed, pending)); } }
