http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
deleted file mode 100644
index ade8c7a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
+++ /dev/null
@@ -1,763 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
-import org.apache.flink.util.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-import javax.annotation.Nullable;
-
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Response of the {@link CheckpointStatisticsHandler}.
- */
-public class CheckpointStatistics implements ResponseBody {
-
-       public static final String FIELD_NAME_COUNTS = "counts";
-
-       public static final String FIELD_NAME_SUMMARY = "summary";
-
-       public static final String FIELD_NAME_LATEST_CHECKPOINTS = "latest";
-
-       public static final String FIELD_NAME_HISTORY = "history";
-
-       @JsonProperty(FIELD_NAME_COUNTS)
-       private final Counts counts;
-
-       @JsonProperty(FIELD_NAME_SUMMARY)
-       private final Summary summary;
-
-       @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS)
-       private final LatestCheckpoints latestCheckpoints;
-
-       @JsonProperty(FIELD_NAME_HISTORY)
-       private final List<BaseCheckpointStatistics> history;
-
-       @JsonCreator
-       public CheckpointStatistics(
-                       @JsonProperty(FIELD_NAME_COUNTS) Counts counts,
-                       @JsonProperty(FIELD_NAME_SUMMARY) Summary summary,
-                       @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) 
LatestCheckpoints latestCheckpoints,
-                       @JsonProperty(FIELD_NAME_HISTORY) 
List<BaseCheckpointStatistics> history) {
-               this.counts = Preconditions.checkNotNull(counts);
-               this.summary = Preconditions.checkNotNull(summary);
-               this.latestCheckpoints = 
Preconditions.checkNotNull(latestCheckpoints);
-               this.history = Preconditions.checkNotNull(history);
-       }
-
-       public Counts getCounts() {
-               return counts;
-       }
-
-       public Summary getSummary() {
-               return summary;
-       }
-
-       public LatestCheckpoints getLatestCheckpoints() {
-               return latestCheckpoints;
-       }
-
-       public List<BaseCheckpointStatistics> getHistory() {
-               return history;
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-               CheckpointStatistics that = (CheckpointStatistics) o;
-               return Objects.equals(counts, that.counts) &&
-                       Objects.equals(summary, that.summary) &&
-                       Objects.equals(latestCheckpoints, 
that.latestCheckpoints) &&
-                       Objects.equals(history, that.history);
-       }
-
-       @Override
-       public int hashCode() {
-               return Objects.hash(counts, summary, latestCheckpoints, 
history);
-       }
-
-       // ------------------------------------------------------------------
-       // Inner classes
-       // ------------------------------------------------------------------
-
-       /**
-        * Checkpoint counts.
-        */
-       public static final class Counts {
-
-               public static final String FIELD_NAME_RESTORED_CHECKPOINTS = 
"restored";
-
-               public static final String FIELD_NAME_TOTAL_CHECKPOINTS = 
"total";
-
-               public static final String FIELD_NAME_IN_PROGRESS_CHECKPOINTS = 
"in_progress";
-
-               public static final String FIELD_NAME_COMPLETED_CHECKPOINTS = 
"completed";
-
-               public static final String FIELD_NAME_FAILED_CHECKPOINTS = 
"failed";
-
-               @JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS)
-               private final long numberRestoredCheckpoints;
-
-               @JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS)
-               private final long totalNumberCheckpoints;
-
-               @JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS)
-               private final int numberInProgressCheckpoints;
-
-               @JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS)
-               private final long numberCompletedCheckpoints;
-
-               @JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS)
-               private final long numberFailedCheckpoints;
-
-               @JsonCreator
-               public Counts(
-                               @JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS) 
long numberRestoredCheckpoints,
-                               @JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS) 
long totalNumberCheckpoints,
-                               
@JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS) int 
numberInProgressCheckpoints,
-                               @JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS) 
long numberCompletedCheckpoints,
-                               @JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS) 
long numberFailedCheckpoints) {
-                       this.numberRestoredCheckpoints = 
numberRestoredCheckpoints;
-                       this.totalNumberCheckpoints = totalNumberCheckpoints;
-                       this.numberInProgressCheckpoints = 
numberInProgressCheckpoints;
-                       this.numberCompletedCheckpoints = 
numberCompletedCheckpoints;
-                       this.numberFailedCheckpoints = numberFailedCheckpoints;
-               }
-
-               public long getNumberRestoredCheckpoints() {
-                       return numberRestoredCheckpoints;
-               }
-
-               public long getTotalNumberCheckpoints() {
-                       return totalNumberCheckpoints;
-               }
-
-               public int getNumberInProgressCheckpoints() {
-                       return numberInProgressCheckpoints;
-               }
-
-               public long getNumberCompletedCheckpoints() {
-                       return numberCompletedCheckpoints;
-               }
-
-               public long getNumberFailedCheckpoints() {
-                       return numberFailedCheckpoints;
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-                       Counts counts = (Counts) o;
-                       return numberRestoredCheckpoints == 
counts.numberRestoredCheckpoints &&
-                               totalNumberCheckpoints == 
counts.totalNumberCheckpoints &&
-                               numberInProgressCheckpoints == 
counts.numberInProgressCheckpoints &&
-                               numberCompletedCheckpoints == 
counts.numberCompletedCheckpoints &&
-                               numberFailedCheckpoints == 
counts.numberFailedCheckpoints;
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hash(numberRestoredCheckpoints, 
totalNumberCheckpoints, numberInProgressCheckpoints, 
numberCompletedCheckpoints, numberFailedCheckpoints);
-               }
-       }
-
-       /**
-        * Checkpoint summary.
-        */
-       public static final class Summary {
-
-               public static final String FIELD_NAME_STATE_SIZE = "state_size";
-
-               public static final String FIELD_NAME_DURATION = 
"end_to_end_duration";
-
-               public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
-
-               @JsonProperty(FIELD_NAME_STATE_SIZE)
-               private final MinMaxAvgStatistics stateSize;
-
-               @JsonProperty(FIELD_NAME_DURATION)
-               private final MinMaxAvgStatistics duration;
-
-               @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
-               private final MinMaxAvgStatistics alignmentBuffered;
-
-               @JsonCreator
-               public Summary(
-                               @JsonProperty(FIELD_NAME_STATE_SIZE) 
MinMaxAvgStatistics stateSize,
-                               @JsonProperty(FIELD_NAME_DURATION) 
MinMaxAvgStatistics duration,
-                               @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) 
MinMaxAvgStatistics alignmentBuffered) {
-                       this.stateSize = stateSize;
-                       this.duration = duration;
-                       this.alignmentBuffered = alignmentBuffered;
-               }
-
-               public MinMaxAvgStatistics getStateSize() {
-                       return stateSize;
-               }
-
-               public MinMaxAvgStatistics getDuration() {
-                       return duration;
-               }
-
-               public MinMaxAvgStatistics getAlignmentBuffered() {
-                       return alignmentBuffered;
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-                       Summary summary = (Summary) o;
-                       return Objects.equals(stateSize, summary.stateSize) &&
-                               Objects.equals(duration, summary.duration) &&
-                               Objects.equals(alignmentBuffered, 
summary.alignmentBuffered);
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hash(stateSize, duration, 
alignmentBuffered);
-               }
-       }
-
-       /**
-        * Minimum, maximum and average statistics.
-        */
-       public static final class MinMaxAvgStatistics {
-
-               public static final String FIELD_NAME_MINIMUM = "min";
-
-               public static final String FIELD_NAME_MAXIMUM = "max";
-
-               public static final String FIELD_NAME_AVERAGE = "avg";
-
-               @JsonProperty(FIELD_NAME_MINIMUM)
-               private final long minimum;
-
-               @JsonProperty(FIELD_NAME_MAXIMUM)
-               private final long maximum;
-
-               @JsonProperty(FIELD_NAME_AVERAGE)
-               private final long average;
-
-               @JsonCreator
-               public MinMaxAvgStatistics(
-                               @JsonProperty(FIELD_NAME_MINIMUM) long minimum,
-                               @JsonProperty(FIELD_NAME_MAXIMUM) long maximum,
-                               @JsonProperty(FIELD_NAME_AVERAGE) long average) 
{
-                       this.minimum = minimum;
-                       this.maximum = maximum;
-                       this.average = average;
-               }
-
-               public long getMinimum() {
-                       return minimum;
-               }
-
-               public long getMaximum() {
-                       return maximum;
-               }
-
-               public long getAverage() {
-                       return average;
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-                       MinMaxAvgStatistics that = (MinMaxAvgStatistics) o;
-                       return minimum == that.minimum &&
-                               maximum == that.maximum &&
-                               average == that.average;
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hash(minimum, maximum, average);
-               }
-       }
-
-       /**
-        * Statistics about the latest checkpoints.
-        */
-       public static final class LatestCheckpoints {
-
-               public static final String FIELD_NAME_COMPLETED = "completed";
-
-               public static final String FIELD_NAME_SAVEPOINT = "savepoint";
-
-               public static final String FIELD_NAME_FAILED = "failed";
-
-               public static final String FIELD_NAME_RESTORED = "restored";
-
-               @JsonProperty(FIELD_NAME_COMPLETED)
-               @Nullable
-               private final CompletedCheckpointStatistics 
completedCheckpointStatistics;
-
-               @JsonProperty(FIELD_NAME_SAVEPOINT)
-               @Nullable
-               private final CompletedCheckpointStatistics savepointStatistics;
-
-               @JsonProperty(FIELD_NAME_FAILED)
-               @Nullable
-               private final FailedCheckpointStatistics 
failedCheckpointStatistics;
-
-               @JsonProperty(FIELD_NAME_RESTORED)
-               @Nullable
-               private final RestoredCheckpointStatistics 
restoredCheckpointStatistics;
-
-               @JsonCreator
-               public LatestCheckpoints(
-                               @JsonProperty(FIELD_NAME_COMPLETED) @Nullable 
CompletedCheckpointStatistics completedCheckpointStatistics,
-                               @JsonProperty(FIELD_NAME_SAVEPOINT) @Nullable 
CompletedCheckpointStatistics savepointStatistics,
-                               @JsonProperty(FIELD_NAME_FAILED) @Nullable 
FailedCheckpointStatistics failedCheckpointStatistics,
-                               @JsonProperty(FIELD_NAME_RESTORED) @Nullable 
RestoredCheckpointStatistics restoredCheckpointStatistics) {
-                       this.completedCheckpointStatistics = 
completedCheckpointStatistics;
-                       this.savepointStatistics = savepointStatistics;
-                       this.failedCheckpointStatistics = 
failedCheckpointStatistics;
-                       this.restoredCheckpointStatistics = 
restoredCheckpointStatistics;
-               }
-
-               @Nullable
-               public CompletedCheckpointStatistics 
getCompletedCheckpointStatistics() {
-                       return completedCheckpointStatistics;
-               }
-
-               @Nullable
-               public CompletedCheckpointStatistics getSavepointStatistics() {
-                       return savepointStatistics;
-               }
-
-               @Nullable
-               public FailedCheckpointStatistics 
getFailedCheckpointStatistics() {
-                       return failedCheckpointStatistics;
-               }
-
-               @Nullable
-               public RestoredCheckpointStatistics 
getRestoredCheckpointStatistics() {
-                       return restoredCheckpointStatistics;
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-                       LatestCheckpoints that = (LatestCheckpoints) o;
-                       return Objects.equals(completedCheckpointStatistics, 
that.completedCheckpointStatistics) &&
-                               Objects.equals(savepointStatistics, 
that.savepointStatistics) &&
-                               Objects.equals(failedCheckpointStatistics, 
that.failedCheckpointStatistics) &&
-                               Objects.equals(restoredCheckpointStatistics, 
that.restoredCheckpointStatistics);
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hash(completedCheckpointStatistics, 
savepointStatistics, failedCheckpointStatistics, restoredCheckpointStatistics);
-               }
-       }
-
-       /**
-        * Statistics for a checkpoint.
-        */
-       @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "@class")
-       @JsonSubTypes({
-               @JsonSubTypes.Type(value = CompletedCheckpointStatistics.class, 
name = "completed"),
-               @JsonSubTypes.Type(value = FailedCheckpointStatistics.class, 
name = "failed")})
-       public static class BaseCheckpointStatistics {
-
-               public static final String FIELD_NAME_ID = "id";
-
-               public static final String FIELD_NAME_STATUS = "status";
-
-               public static final String FIELD_NAME_IS_SAVEPOINT = 
"is_savepoint";
-
-               public static final String FIELD_NAME_TRIGGER_TIMESTAMP = 
"trigger_timestamp";
-
-               public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
-
-               public static final String FIELD_NAME_STATE_SIZE = "state_size";
-
-               public static final String FIELD_NAME_DURATION = 
"end_to_end_duration";
-
-               public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
-
-               public static final String FIELD_NAME_NUM_SUBTASKS = 
"num_subtasks";
-
-               public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
-
-               @JsonProperty(FIELD_NAME_ID)
-               private final long id;
-
-               @JsonProperty(FIELD_NAME_STATUS)
-               private final CheckpointStatsStatus status;
-
-               @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
-               private final boolean savepoint;
-
-               @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
-               private final long triggerTimestamp;
-
-               @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
-               private final long latestAckTimestamp;
-
-               @JsonProperty(FIELD_NAME_STATE_SIZE)
-               private final long stateSize;
-
-               @JsonProperty(FIELD_NAME_DURATION)
-               private final long duration;
-
-               @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
-               private final long alignmentBuffered;
-
-               @JsonProperty(FIELD_NAME_NUM_SUBTASKS)
-               private final int numSubtasks;
-
-               @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
-               private final int numAckSubtasks;
-
-               @JsonCreator
-               protected BaseCheckpointStatistics(
-                               @JsonProperty(FIELD_NAME_ID) long id,
-                               @JsonProperty(FIELD_NAME_STATUS) 
CheckpointStatsStatus status,
-                               @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean 
savepoint,
-                               @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) 
long triggerTimestamp,
-                               @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) 
long latestAckTimestamp,
-                               @JsonProperty(FIELD_NAME_STATE_SIZE) long 
stateSize,
-                               @JsonProperty(FIELD_NAME_DURATION) long 
duration,
-                               @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) 
long alignmentBuffered,
-                               @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int 
numSubtasks,
-                               @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks) {
-                       this.id = id;
-                       this.status = Preconditions.checkNotNull(status);
-                       this.savepoint = savepoint;
-                       this.triggerTimestamp = triggerTimestamp;
-                       this.latestAckTimestamp = latestAckTimestamp;
-                       this.stateSize = stateSize;
-                       this.duration = duration;
-                       this.alignmentBuffered = alignmentBuffered;
-                       this.numSubtasks = numSubtasks;
-                       this.numAckSubtasks = numAckSubtasks;
-               }
-
-               public long getId() {
-                       return id;
-               }
-
-               public CheckpointStatsStatus getStatus() {
-                       return status;
-               }
-
-               public boolean isSavepoint() {
-                       return savepoint;
-               }
-
-               public long getTriggerTimestamp() {
-                       return triggerTimestamp;
-               }
-
-               public long getLatestAckTimestamp() {
-                       return latestAckTimestamp;
-               }
-
-               public long getStateSize() {
-                       return stateSize;
-               }
-
-               public long getDuration() {
-                       return duration;
-               }
-
-               public long getAlignmentBuffered() {
-                       return alignmentBuffered;
-               }
-
-               public int getNumSubtasks() {
-                       return numSubtasks;
-               }
-
-               public int getNumAckSubtasks() {
-                       return numAckSubtasks;
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-                       BaseCheckpointStatistics that = 
(BaseCheckpointStatistics) o;
-                       return id == that.id &&
-                               savepoint == that.savepoint &&
-                               triggerTimestamp == that.triggerTimestamp &&
-                               latestAckTimestamp == that.latestAckTimestamp &&
-                               stateSize == that.stateSize &&
-                               duration == that.duration &&
-                               alignmentBuffered == that.alignmentBuffered &&
-                               numSubtasks == that.numSubtasks &&
-                               numAckSubtasks == that.numAckSubtasks &&
-                               status == that.status;
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hash(id, status, savepoint, 
triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, 
numSubtasks, numAckSubtasks);
-               }
-       }
-
-       /**
-        * Statistics for a completed checkpoint.
-        */
-       public static final class CompletedCheckpointStatistics extends 
BaseCheckpointStatistics {
-
-               public static final String FIELD_NAME_EXTERNAL_PATH = 
"external_path";
-
-               public static final String FIELD_NAME_DISCARDED = "discarded";
-
-               @JsonProperty(FIELD_NAME_EXTERNAL_PATH)
-               @Nullable
-               private final String externalPath;
-
-               @JsonProperty(FIELD_NAME_DISCARDED)
-               private final boolean discarded;
-
-               @JsonCreator
-               public CompletedCheckpointStatistics(
-                               @JsonProperty(FIELD_NAME_ID) long id,
-                               @JsonProperty(FIELD_NAME_STATUS) 
CheckpointStatsStatus status,
-                               @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean 
savepoint,
-                               @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) 
long triggerTimestamp,
-                               @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) 
long latestAckTimestamp,
-                               @JsonProperty(FIELD_NAME_STATE_SIZE) long 
stateSize,
-                               @JsonProperty(FIELD_NAME_DURATION) long 
duration,
-                               @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) 
long alignmentBuffered,
-                               @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int 
numSubtasks,
-                               @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks,
-                               @JsonProperty(FIELD_NAME_EXTERNAL_PATH) 
@Nullable String externalPath,
-                               @JsonProperty(FIELD_NAME_DISCARDED) boolean 
discarded) {
-                       super(id, status, savepoint, triggerTimestamp, 
latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, 
numAckSubtasks);
-
-                       this.externalPath = externalPath;
-                       this.discarded = discarded;
-               }
-
-               @Nullable
-               public String getExternalPath() {
-                       return externalPath;
-               }
-
-               public boolean isDiscarded() {
-                       return discarded;
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-                       if (!super.equals(o)) {
-                               return false;
-                       }
-                       CompletedCheckpointStatistics that = 
(CompletedCheckpointStatistics) o;
-                       return discarded == that.discarded &&
-                               Objects.equals(externalPath, that.externalPath);
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hash(super.hashCode(), externalPath, 
discarded);
-               }
-       }
-
-       /**
-        * Statistics for a failed checkpoint.
-        */
-       public static final class FailedCheckpointStatistics extends 
BaseCheckpointStatistics {
-
-               public static final String FIELD_NAME_FAILURE_TIMESTAMP = 
"failure_timestamp";
-
-               public static final String FIELD_NAME_FAILURE_MESSAGE = 
"failure_message";
-
-               @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP)
-               private final long failureTimestamp;
-
-               @JsonProperty(FIELD_NAME_FAILURE_MESSAGE)
-               @Nullable
-               private final String failureMessage;
-
-               @JsonCreator
-               public FailedCheckpointStatistics(
-                               @JsonProperty(FIELD_NAME_ID) long id,
-                               @JsonProperty(FIELD_NAME_STATUS) 
CheckpointStatsStatus status,
-                               @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean 
savepoint,
-                               @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) 
long triggerTimestamp,
-                               @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) 
long latestAckTimestamp,
-                               @JsonProperty(FIELD_NAME_STATE_SIZE) long 
stateSize,
-                               @JsonProperty(FIELD_NAME_DURATION) long 
duration,
-                               @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) 
long alignmentBuffered,
-                               @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int 
numSubtasks,
-                               @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks,
-                               @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) 
long failureTimestamp,
-                               @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) 
@Nullable String failureMessage) {
-                       super(id, status, savepoint, triggerTimestamp, 
latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, 
numAckSubtasks);
-
-                       this.failureTimestamp = failureTimestamp;
-                       this.failureMessage = failureMessage;
-               }
-
-               public long getFailureTimestamp() {
-                       return failureTimestamp;
-               }
-
-               @Nullable
-               public String getFailureMessage() {
-                       return failureMessage;
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-                       if (!super.equals(o)) {
-                               return false;
-                       }
-                       FailedCheckpointStatistics that = 
(FailedCheckpointStatistics) o;
-                       return failureTimestamp == that.failureTimestamp &&
-                               Objects.equals(failureMessage, 
that.failureMessage);
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hash(super.hashCode(), failureTimestamp, 
failureMessage);
-               }
-       }
-
-       /**
-        * Statistics for a restored checkpoint.
-        */
-       public static final class RestoredCheckpointStatistics {
-
-               public static final String FIELD_NAME_ID = "id";
-
-               public static final String FIELD_NAME_RESTORE_TIMESTAMP = 
"restore_timestamp";
-
-               public static final String FIELD_NAME_IS_SAVEPOINT = 
"is_savepoint";
-
-               public static final String FIELD_NAME_EXTERNAL_PATH = 
"external_path";
-
-               @JsonProperty(FIELD_NAME_ID)
-               private final long id;
-
-               @JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP)
-               private final long restoreTimestamp;
-
-               @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
-               private final boolean savepoint;
-
-               @JsonProperty(FIELD_NAME_EXTERNAL_PATH)
-               @Nullable
-               private final String externalPath;
-
-               @JsonCreator
-               public RestoredCheckpointStatistics(
-                               @JsonProperty(FIELD_NAME_ID) long id,
-                               @JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP) 
long restoreTimestamp,
-                               @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean 
savepoint,
-                               @JsonProperty(FIELD_NAME_EXTERNAL_PATH) 
@Nullable String externalPath) {
-                       this.id = id;
-                       this.restoreTimestamp = restoreTimestamp;
-                       this.savepoint = savepoint;
-                       this.externalPath = externalPath;
-               }
-
-               public long getId() {
-                       return id;
-               }
-
-               public long getRestoreTimestamp() {
-                       return restoreTimestamp;
-               }
-
-               public boolean isSavepoint() {
-                       return savepoint;
-               }
-
-               @Nullable
-               public String getExternalPath() {
-                       return externalPath;
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-                       RestoredCheckpointStatistics that = 
(RestoredCheckpointStatistics) o;
-                       return id == that.id &&
-                               restoreTimestamp == that.restoreTimestamp &&
-                               savepoint == that.savepoint &&
-                               Objects.equals(externalPath, that.externalPath);
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hash(id, restoreTimestamp, savepoint, 
externalPath);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java
deleted file mode 100644
index b062d0d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
-
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * Message headers for the {@link CheckpointStatisticsHandler}.
- */
-public class CheckpointStatisticsHeaders implements 
MessageHeaders<EmptyRequestBody, CheckpointStatistics, JobMessageParameters> {
-
-       private static final CheckpointStatisticsHeaders INSTANCE = new 
CheckpointStatisticsHeaders();
-
-       public static final String URL = "/jobs/:jobid/checkpoints";
-
-       @Override
-       public Class<EmptyRequestBody> getRequestClass() {
-               return EmptyRequestBody.class;
-       }
-
-       @Override
-       public Class<CheckpointStatistics> getResponseClass() {
-               return CheckpointStatistics.class;
-       }
-
-       @Override
-       public HttpResponseStatus getResponseStatusCode() {
-               return HttpResponseStatus.OK;
-       }
-
-       @Override
-       public JobMessageParameters getUnresolvedMessageParameters() {
-               return new JobMessageParameters();
-       }
-
-       @Override
-       public HttpMethodWrapper getHttpMethod() {
-               return HttpMethodWrapper.GET;
-       }
-
-       @Override
-       public String getTargetRestEndpointURL() {
-               return URL;
-       }
-
-       public static CheckpointStatisticsHeaders getInstance() {
-               return INSTANCE;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
index 9d74c95..1155892 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java
@@ -26,7 +26,7 @@ import java.util.Collections;
  */
 public class JobMessageParameters extends MessageParameters {
 
-       private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+       protected final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
 
        @Override
        public Collection<MessagePathParameter<?>> getPathParameters() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java
new file mode 100644
index 0000000..f0526a0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link CheckpointConfigHandler}.
+ */
+public class CheckpointConfigHeaders implements 
MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> {
+
+       private static final CheckpointConfigHeaders INSTANCE = new 
CheckpointConfigHeaders();
+
+       public static final String URL = "/jobs/:jobid/checkpoints/config";
+
+       private CheckpointConfigHeaders() {}
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public Class<CheckpointConfigInfo> getResponseClass() {
+               return CheckpointConfigInfo.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public JobMessageParameters getUnresolvedMessageParameters() {
+               return new JobMessageParameters();
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.GET;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return URL;
+       }
+
+       public static CheckpointConfigHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
new file mode 100644
index 0000000..797d3a5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Response class of the {@link CheckpointConfigHandler}.
+ */
+public class CheckpointConfigInfo implements ResponseBody {
+
+       public static final String FIELD_NAME_PROCESSING_MODE = "mode";
+
+       public static final String FIELD_NAME_CHECKPOINT_INTERVAL = "interval";
+
+       public static final String FIELD_NAME_CHECKPOINT_TIMEOUT = "timeout";
+
+       public static final String FIELD_NAME_CHECKPOINT_MIN_PAUSE = 
"min_pause";
+
+       public static final String FIELD_NAME_CHECKPOINT_MAX_CONCURRENT = 
"max_concurrent";
+
+       public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = 
"externalization";
+
+       @JsonProperty(FIELD_NAME_PROCESSING_MODE)
+       private final ProcessingMode processingMode;
+
+       @JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL)
+       private final long checkpointInterval;
+
+       @JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT)
+       private final long checkpointTimeout;
+
+       @JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE)
+       private final long minPauseBetweenCheckpoints;
+
+       @JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT)
+       private final long maxConcurrentCheckpoints;
+
+       @JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG)
+       private final ExternalizedCheckpointInfo externalizedCheckpointInfo;
+
+       @JsonCreator
+       public CheckpointConfigInfo(
+                       @JsonProperty(FIELD_NAME_PROCESSING_MODE) 
ProcessingMode processingMode,
+                       @JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL) long 
checkpointInterval,
+                       @JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT) long 
checkpointTimeout,
+                       @JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE) long 
minPauseBetweenCheckpoints,
+                       @JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT) int 
maxConcurrentCheckpoints,
+                       
@JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG) 
ExternalizedCheckpointInfo externalizedCheckpointInfo) {
+               this.processingMode = 
Preconditions.checkNotNull(processingMode);
+               this.checkpointInterval = checkpointInterval;
+               this.checkpointTimeout = checkpointTimeout;
+               this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+               this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+               this.externalizedCheckpointInfo = 
Preconditions.checkNotNull(externalizedCheckpointInfo);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               CheckpointConfigInfo that = (CheckpointConfigInfo) o;
+               return checkpointInterval == that.checkpointInterval &&
+                       checkpointTimeout == that.checkpointTimeout &&
+                       minPauseBetweenCheckpoints == 
that.minPauseBetweenCheckpoints &&
+                       maxConcurrentCheckpoints == 
that.maxConcurrentCheckpoints &&
+                       processingMode == that.processingMode &&
+                       Objects.equals(externalizedCheckpointInfo, 
that.externalizedCheckpointInfo);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(processingMode, checkpointInterval, 
checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, 
externalizedCheckpointInfo);
+       }
+
+       /**
+        * Contains information about the externalized checkpoint configuration.
+        */
+       public static final class ExternalizedCheckpointInfo {
+
+               public static final String FIELD_NAME_ENABLED = "enabled";
+
+               public static final String FIELD_NAME_DELETE_ON_CANCELLATION = 
"delete_on_cancellation";
+
+               @JsonProperty(FIELD_NAME_ENABLED)
+               private final boolean enabled;
+
+               @JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION)
+               private final boolean deleteOnCancellation;
+
+               @JsonCreator
+               public ExternalizedCheckpointInfo(
+                               @JsonProperty(FIELD_NAME_ENABLED) boolean 
enabled,
+                               
@JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION) boolean deleteOnCancellation) {
+                       this.enabled = enabled;
+                       this.deleteOnCancellation = deleteOnCancellation;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       ExternalizedCheckpointInfo that = 
(ExternalizedCheckpointInfo) o;
+                       return enabled == that.enabled &&
+                               deleteOnCancellation == 
that.deleteOnCancellation;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(enabled, deleteOnCancellation);
+               }
+       }
+
+       /**
+        * Processing mode.
+        */
+       public enum ProcessingMode {
+               AT_LEAST_ONCE,
+               EXACTLY_ONCE
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java
new file mode 100644
index 0000000..c08cc82
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+/**
+ * Path parameter for the checkpoint id of type {@link Long}.
+ */
+public class CheckpointIdPathParameter extends MessagePathParameter<Long> {
+
+       public static final String KEY = "checkpointid";
+
+       protected CheckpointIdPathParameter() {
+               super(KEY);
+       }
+
+       @Override
+       protected Long convertFromString(String value) throws 
ConversionException {
+               try {
+                       return Long.parseLong(value);
+               } catch (NumberFormatException nfe) {
+                       throw new ConversionException("Could not parse long 
from " + value + '.', nfe);
+               }
+       }
+
+       @Override
+       protected String convertToString(Long value) {
+               return value.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java
new file mode 100644
index 0000000..040aa87
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Message parameters for checkpoint related messages.
+ */
+public class CheckpointMessageParameters extends JobMessageParameters {
+
+       protected final CheckpointIdPathParameter checkpointIdPathParameter = 
new CheckpointIdPathParameter();
+
+       @Override
+       public Collection<MessagePathParameter<?>> getPathParameters() {
+               return Arrays.asList(jobPathParameter, 
checkpointIdPathParameter);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java
new file mode 100644
index 0000000..3d7ba2b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Headers for the {@link CheckpointStatisticDetailsHandler}.
+ */
+public class CheckpointStatisticDetailsHeaders implements 
MessageHeaders<EmptyRequestBody, CheckpointStatistics, 
CheckpointMessageParameters> {
+
+       private static final CheckpointStatisticDetailsHeaders INSTANCE = new 
CheckpointStatisticDetailsHeaders();
+
+       public static final String URL = 
"/jobs/:jobid/checkpoints/:checkpointid";
+
+       private CheckpointStatisticDetailsHeaders() {}
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public Class<CheckpointStatistics> getResponseClass() {
+               return CheckpointStatistics.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public CheckpointMessageParameters getUnresolvedMessageParameters() {
+               return new CheckpointMessageParameters();
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.GET;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return URL;
+       }
+
+       public static CheckpointStatisticDetailsHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
new file mode 100644
index 0000000..9fb1094
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Statistics for a checkpoint.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "@class")
+@JsonSubTypes({
+       @JsonSubTypes.Type(value = 
CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
+       @JsonSubTypes.Type(value = 
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+public class CheckpointStatistics implements ResponseBody {
+
+       public static final String FIELD_NAME_ID = "id";
+
+       public static final String FIELD_NAME_STATUS = "status";
+
+       public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+       public static final String FIELD_NAME_TRIGGER_TIMESTAMP = 
"trigger_timestamp";
+
+       public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
+
+       public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+       public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+       public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
+
+       public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+       public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
+
+       public static final String FIELD_NAME_TASKS = "tasks";
+
+       @JsonProperty(FIELD_NAME_ID)
+       private final long id;
+
+       @JsonProperty(FIELD_NAME_STATUS)
+       private final CheckpointStatsStatus status;
+
+       @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+       private final boolean savepoint;
+
+       @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+       private final long triggerTimestamp;
+
+       @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+       private final long latestAckTimestamp;
+
+       @JsonProperty(FIELD_NAME_STATE_SIZE)
+       private final long stateSize;
+
+       @JsonProperty(FIELD_NAME_DURATION)
+       private final long duration;
+
+       @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+       private final long alignmentBuffered;
+
+       @JsonProperty(FIELD_NAME_NUM_SUBTASKS)
+       private final int numSubtasks;
+
+       @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
+       private final int numAckSubtasks;
+
+       @JsonProperty(FIELD_NAME_TASKS)
+       @JsonSerialize(keyUsing = JobVertexIDSerializer.class)
+       private final Map<JobVertexID, TaskCheckpointStatistics> 
checkpointStatisticsPerTask;
+
+       @JsonCreator
+       private CheckpointStatistics(
+                       @JsonProperty(FIELD_NAME_ID) long id,
+                       @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus 
status,
+                       @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean 
savepoint,
+                       @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long 
triggerTimestamp,
+                       @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long 
latestAckTimestamp,
+                       @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+                       @JsonProperty(FIELD_NAME_DURATION) long duration,
+                       @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long 
alignmentBuffered,
+                       @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+                       @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks,
+                       @JsonDeserialize(keyUsing = 
JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, 
TaskCheckpointStatistics> checkpointStatisticsPerTask) {
+               this.id = id;
+               this.status = Preconditions.checkNotNull(status);
+               this.savepoint = savepoint;
+               this.triggerTimestamp = triggerTimestamp;
+               this.latestAckTimestamp = latestAckTimestamp;
+               this.stateSize = stateSize;
+               this.duration = duration;
+               this.alignmentBuffered = alignmentBuffered;
+               this.numSubtasks = numSubtasks;
+               this.numAckSubtasks = numAckSubtasks;
+               this.checkpointStatisticsPerTask = 
Preconditions.checkNotNull(checkpointStatisticsPerTask);
+       }
+
+       public long getId() {
+               return id;
+       }
+
+       public CheckpointStatsStatus getStatus() {
+               return status;
+       }
+
+       public boolean isSavepoint() {
+               return savepoint;
+       }
+
+       public long getTriggerTimestamp() {
+               return triggerTimestamp;
+       }
+
+       public long getLatestAckTimestamp() {
+               return latestAckTimestamp;
+       }
+
+       public long getStateSize() {
+               return stateSize;
+       }
+
+       public long getDuration() {
+               return duration;
+       }
+
+       public long getAlignmentBuffered() {
+               return alignmentBuffered;
+       }
+
+       public int getNumSubtasks() {
+               return numSubtasks;
+       }
+
+       public int getNumAckSubtasks() {
+               return numAckSubtasks;
+       }
+
+       @Nullable
+       public Map<JobVertexID, TaskCheckpointStatistics> 
getCheckpointStatisticsPerTask() {
+               return checkpointStatisticsPerTask;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               CheckpointStatistics that = (CheckpointStatistics) o;
+               return id == that.id &&
+                       savepoint == that.savepoint &&
+                       triggerTimestamp == that.triggerTimestamp &&
+                       latestAckTimestamp == that.latestAckTimestamp &&
+                       stateSize == that.stateSize &&
+                       duration == that.duration &&
+                       alignmentBuffered == that.alignmentBuffered &&
+                       numSubtasks == that.numSubtasks &&
+                       numAckSubtasks == that.numAckSubtasks &&
+                       status == that.status &&
+                       Objects.equals(checkpointStatisticsPerTask, 
that.checkpointStatisticsPerTask);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(id, status, savepoint, triggerTimestamp, 
latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, 
numAckSubtasks, checkpointStatisticsPerTask);
+       }
+
+       // 
-------------------------------------------------------------------------
+       // Static factory methods
+       // 
-------------------------------------------------------------------------
+
+       public static CheckpointStatistics 
generateCheckpointStatistics(AbstractCheckpointStats checkpointStats, boolean 
includeTaskCheckpointStatistics) {
+               Preconditions.checkNotNull(checkpointStats);
+
+               Map<JobVertexID, TaskCheckpointStatistics> 
checkpointStatisticsPerTask;
+
+               if (includeTaskCheckpointStatistics) {
+                       Collection<TaskStateStats> taskStateStats = 
checkpointStats.getAllTaskStateStats();
+
+                       checkpointStatisticsPerTask = new 
HashMap<>(taskStateStats.size());
+
+                       for (TaskStateStats taskStateStat : taskStateStats) {
+                               checkpointStatisticsPerTask.put(
+                                       taskStateStat.getJobVertexId(),
+                                       new TaskCheckpointStatistics(
+                                               
taskStateStat.getLatestAckTimestamp(),
+                                               taskStateStat.getStateSize(),
+                                               
taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
+                                               
taskStateStat.getAlignmentBuffered(),
+                                               
taskStateStat.getNumberOfSubtasks(),
+                                               
taskStateStat.getNumberOfAcknowledgedSubtasks()));
+                       }
+               } else {
+                       checkpointStatisticsPerTask = Collections.emptyMap();
+               }
+
+               if (checkpointStats instanceof CompletedCheckpointStats) {
+                       final CompletedCheckpointStats completedCheckpointStats 
= ((CompletedCheckpointStats) checkpointStats);
+
+                       return new 
CheckpointStatistics.CompletedCheckpointStatistics(
+                               completedCheckpointStats.getCheckpointId(),
+                               completedCheckpointStats.getStatus(),
+                               
completedCheckpointStats.getProperties().isSavepoint(),
+                               completedCheckpointStats.getTriggerTimestamp(),
+                               
completedCheckpointStats.getLatestAckTimestamp(),
+                               completedCheckpointStats.getStateSize(),
+                               completedCheckpointStats.getEndToEndDuration(),
+                               completedCheckpointStats.getAlignmentBuffered(),
+                               completedCheckpointStats.getNumberOfSubtasks(),
+                               
completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+                               checkpointStatisticsPerTask,
+                               completedCheckpointStats.getExternalPath(),
+                               completedCheckpointStats.isDiscarded());
+               } else if (checkpointStats instanceof FailedCheckpointStats) {
+                       final FailedCheckpointStats failedCheckpointStats = 
((FailedCheckpointStats) checkpointStats);
+
+                       return new 
CheckpointStatistics.FailedCheckpointStatistics(
+                               failedCheckpointStats.getCheckpointId(),
+                               failedCheckpointStats.getStatus(),
+                               
failedCheckpointStats.getProperties().isSavepoint(),
+                               failedCheckpointStats.getTriggerTimestamp(),
+                               failedCheckpointStats.getLatestAckTimestamp(),
+                               failedCheckpointStats.getStateSize(),
+                               failedCheckpointStats.getEndToEndDuration(),
+                               failedCheckpointStats.getAlignmentBuffered(),
+                               failedCheckpointStats.getNumberOfSubtasks(),
+                               
failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+                               checkpointStatisticsPerTask,
+                               failedCheckpointStats.getFailureTimestamp(),
+                               failedCheckpointStats.getFailureMessage());
+               } else {
+                       throw new IllegalArgumentException("Given checkpoint 
stats object of type " + checkpointStats.getClass().getName() + " cannot be 
converted.");
+               }
+       }
+
+       // ---------------------------------------------------------------------
+       // Static inner classes
+       // ---------------------------------------------------------------------
+
+       /**
+        * Checkpoint statistics for a single task.
+        */
+       public static final class TaskCheckpointStatistics {
+
+               public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
+
+               public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+               public static final String FIELD_NAME_DURATION = 
"end_to_end_duration";
+
+               public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
+
+               public static final String FIELD_NAME_NUM_SUBTASKS = 
"num_subtasks";
+
+               public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
+
+               @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+               private final long latestAckTimestamp;
+
+               @JsonProperty(FIELD_NAME_STATE_SIZE)
+               private final long stateSize;
+
+               @JsonProperty(FIELD_NAME_DURATION)
+               private final long duration;
+
+               @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+               private final long alignmentBuffered;
+
+               @JsonProperty(FIELD_NAME_NUM_SUBTASKS)
+               private final int numSubtasks;
+
+               @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
+               private final int numAckSubtasks;
+
+               @JsonCreator
+               public TaskCheckpointStatistics(
+                       @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long 
latestAckTimestamp,
+                       @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+                       @JsonProperty(FIELD_NAME_DURATION) long duration,
+                       @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long 
alignmentBuffered,
+                       @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+                       @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks) {
+                       this.latestAckTimestamp = latestAckTimestamp;
+                       this.stateSize = stateSize;
+                       this.duration = duration;
+                       this.alignmentBuffered = alignmentBuffered;
+                       this.numSubtasks = numSubtasks;
+                       this.numAckSubtasks = numAckSubtasks;
+               }
+
+               public long getLatestAckTimestamp() {
+                       return latestAckTimestamp;
+               }
+
+               public long getStateSize() {
+                       return stateSize;
+               }
+
+               public long getDuration() {
+                       return duration;
+               }
+
+               public long getAlignmentBuffered() {
+                       return alignmentBuffered;
+               }
+
+               public int getNumSubtasks() {
+                       return numSubtasks;
+               }
+
+               public int getNumAckSubtasks() {
+                       return numAckSubtasks;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       TaskCheckpointStatistics that = 
(TaskCheckpointStatistics) o;
+                       return latestAckTimestamp == that.latestAckTimestamp &&
+                               stateSize == that.stateSize &&
+                               duration == that.duration &&
+                               alignmentBuffered == that.alignmentBuffered &&
+                               numSubtasks == that.numSubtasks &&
+                               numAckSubtasks == that.numAckSubtasks;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(latestAckTimestamp, stateSize, 
duration, alignmentBuffered, numSubtasks, numAckSubtasks);
+               }
+       }
+
+       /**
+        * Statistics for a completed checkpoint.
+        */
+       public static final class CompletedCheckpointStatistics extends 
CheckpointStatistics {
+
+               public static final String FIELD_NAME_EXTERNAL_PATH = 
"external_path";
+
+               public static final String FIELD_NAME_DISCARDED = "discarded";
+
+               @JsonProperty(FIELD_NAME_EXTERNAL_PATH)
+               @Nullable
+               private final String externalPath;
+
+               @JsonProperty(FIELD_NAME_DISCARDED)
+               private final boolean discarded;
+
+               @JsonCreator
+               public CompletedCheckpointStatistics(
+                       @JsonProperty(FIELD_NAME_ID) long id,
+                       @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus 
status,
+                       @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean 
savepoint,
+                       @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long 
triggerTimestamp,
+                       @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long 
latestAckTimestamp,
+                       @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+                       @JsonProperty(FIELD_NAME_DURATION) long duration,
+                       @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long 
alignmentBuffered,
+                       @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+                       @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks,
+                       @JsonDeserialize(keyUsing = 
JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, 
TaskCheckpointStatistics> checkpointingStatisticsPerTask,
+                       @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable 
String externalPath,
+                       @JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
+                       super(
+                               id,
+                               status,
+                               savepoint,
+                               triggerTimestamp,
+                               latestAckTimestamp,
+                               stateSize,
+                               duration,
+                               alignmentBuffered,
+                               numSubtasks,
+                               numAckSubtasks,
+                               checkpointingStatisticsPerTask);
+
+                       this.externalPath = externalPath;
+                       this.discarded = discarded;
+               }
+
+               @Nullable
+               public String getExternalPath() {
+                       return externalPath;
+               }
+
+               public boolean isDiscarded() {
+                       return discarded;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       if (!super.equals(o)) {
+                               return false;
+                       }
+                       CompletedCheckpointStatistics that = 
(CompletedCheckpointStatistics) o;
+                       return discarded == that.discarded &&
+                               Objects.equals(externalPath, that.externalPath);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(super.hashCode(), externalPath, 
discarded);
+               }
+       }
+
+       /**
+        * Statistics for a failed checkpoint.
+        */
+       public static final class FailedCheckpointStatistics extends 
CheckpointStatistics {
+
+               public static final String FIELD_NAME_FAILURE_TIMESTAMP = 
"failure_timestamp";
+
+               public static final String FIELD_NAME_FAILURE_MESSAGE = 
"failure_message";
+
+               @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP)
+               private final long failureTimestamp;
+
+               @JsonProperty(FIELD_NAME_FAILURE_MESSAGE)
+               @Nullable
+               private final String failureMessage;
+
+               @JsonCreator
+               public FailedCheckpointStatistics(
+                       @JsonProperty(FIELD_NAME_ID) long id,
+                       @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus 
status,
+                       @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean 
savepoint,
+                       @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long 
triggerTimestamp,
+                       @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long 
latestAckTimestamp,
+                       @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+                       @JsonProperty(FIELD_NAME_DURATION) long duration,
+                       @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long 
alignmentBuffered,
+                       @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+                       @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks,
+                       @JsonDeserialize(keyUsing = 
JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, 
TaskCheckpointStatistics> checkpointingStatisticsPerTask,
+                       @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long 
failureTimestamp,
+                       @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable 
String failureMessage) {
+                       super(
+                               id,
+                               status,
+                               savepoint,
+                               triggerTimestamp,
+                               latestAckTimestamp,
+                               stateSize,
+                               duration,
+                               alignmentBuffered,
+                               numSubtasks,
+                               numAckSubtasks,
+                               checkpointingStatisticsPerTask);
+
+                       this.failureTimestamp = failureTimestamp;
+                       this.failureMessage = failureMessage;
+               }
+
+               public long getFailureTimestamp() {
+                       return failureTimestamp;
+               }
+
+               @Nullable
+               public String getFailureMessage() {
+                       return failureMessage;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       if (!super.equals(o)) {
+                               return false;
+                       }
+                       FailedCheckpointStatistics that = 
(FailedCheckpointStatistics) o;
+                       return failureTimestamp == that.failureTimestamp &&
+                               Objects.equals(failureMessage, 
that.failureMessage);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(super.hashCode(), failureTimestamp, 
failureMessage);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java
new file mode 100644
index 0000000..1f00fcc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response of the {@link CheckpointingStatisticsHandler}. This class contains 
information about
+ * the checkpointing of a given job.
+ */
+public class CheckpointingStatistics implements ResponseBody {
+
+       public static final String FIELD_NAME_COUNTS = "counts";
+
+       public static final String FIELD_NAME_SUMMARY = "summary";
+
+       public static final String FIELD_NAME_LATEST_CHECKPOINTS = "latest";
+
+       public static final String FIELD_NAME_HISTORY = "history";
+
+       @JsonProperty(FIELD_NAME_COUNTS)
+       private final Counts counts;
+
+       @JsonProperty(FIELD_NAME_SUMMARY)
+       private final Summary summary;
+
+       @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS)
+       private final LatestCheckpoints latestCheckpoints;
+
+       @JsonProperty(FIELD_NAME_HISTORY)
+       private final List<CheckpointStatistics> history;
+
+       @JsonCreator
+       public CheckpointingStatistics(
+                       @JsonProperty(FIELD_NAME_COUNTS) Counts counts,
+                       @JsonProperty(FIELD_NAME_SUMMARY) Summary summary,
+                       @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) 
LatestCheckpoints latestCheckpoints,
+                       @JsonProperty(FIELD_NAME_HISTORY) 
List<CheckpointStatistics> history) {
+               this.counts = Preconditions.checkNotNull(counts);
+               this.summary = Preconditions.checkNotNull(summary);
+               this.latestCheckpoints = 
Preconditions.checkNotNull(latestCheckpoints);
+               this.history = Preconditions.checkNotNull(history);
+       }
+
+       public Counts getCounts() {
+               return counts;
+       }
+
+       public Summary getSummary() {
+               return summary;
+       }
+
+       public LatestCheckpoints getLatestCheckpoints() {
+               return latestCheckpoints;
+       }
+
+       public List<CheckpointStatistics> getHistory() {
+               return history;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               CheckpointingStatistics that = (CheckpointingStatistics) o;
+               return Objects.equals(counts, that.counts) &&
+                       Objects.equals(summary, that.summary) &&
+                       Objects.equals(latestCheckpoints, 
that.latestCheckpoints) &&
+                       Objects.equals(history, that.history);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(counts, summary, latestCheckpoints, 
history);
+       }
+
+       // ------------------------------------------------------------------
+       // Inner classes
+       // ------------------------------------------------------------------
+
+       /**
+        * Checkpoint counts.
+        */
+       public static final class Counts {
+
+               public static final String FIELD_NAME_RESTORED_CHECKPOINTS = 
"restored";
+
+               public static final String FIELD_NAME_TOTAL_CHECKPOINTS = 
"total";
+
+               public static final String FIELD_NAME_IN_PROGRESS_CHECKPOINTS = 
"in_progress";
+
+               public static final String FIELD_NAME_COMPLETED_CHECKPOINTS = 
"completed";
+
+               public static final String FIELD_NAME_FAILED_CHECKPOINTS = 
"failed";
+
+               @JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS)
+               private final long numberRestoredCheckpoints;
+
+               @JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS)
+               private final long totalNumberCheckpoints;
+
+               @JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS)
+               private final int numberInProgressCheckpoints;
+
+               @JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS)
+               private final long numberCompletedCheckpoints;
+
+               @JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS)
+               private final long numberFailedCheckpoints;
+
+               @JsonCreator
+               public Counts(
+                               @JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS) 
long numberRestoredCheckpoints,
+                               @JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS) 
long totalNumberCheckpoints,
+                               
@JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS) int 
numberInProgressCheckpoints,
+                               @JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS) 
long numberCompletedCheckpoints,
+                               @JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS) 
long numberFailedCheckpoints) {
+                       this.numberRestoredCheckpoints = 
numberRestoredCheckpoints;
+                       this.totalNumberCheckpoints = totalNumberCheckpoints;
+                       this.numberInProgressCheckpoints = 
numberInProgressCheckpoints;
+                       this.numberCompletedCheckpoints = 
numberCompletedCheckpoints;
+                       this.numberFailedCheckpoints = numberFailedCheckpoints;
+               }
+
+               public long getNumberRestoredCheckpoints() {
+                       return numberRestoredCheckpoints;
+               }
+
+               public long getTotalNumberCheckpoints() {
+                       return totalNumberCheckpoints;
+               }
+
+               public int getNumberInProgressCheckpoints() {
+                       return numberInProgressCheckpoints;
+               }
+
+               public long getNumberCompletedCheckpoints() {
+                       return numberCompletedCheckpoints;
+               }
+
+               public long getNumberFailedCheckpoints() {
+                       return numberFailedCheckpoints;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       Counts counts = (Counts) o;
+                       return numberRestoredCheckpoints == 
counts.numberRestoredCheckpoints &&
+                               totalNumberCheckpoints == 
counts.totalNumberCheckpoints &&
+                               numberInProgressCheckpoints == 
counts.numberInProgressCheckpoints &&
+                               numberCompletedCheckpoints == 
counts.numberCompletedCheckpoints &&
+                               numberFailedCheckpoints == 
counts.numberFailedCheckpoints;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(numberRestoredCheckpoints, 
totalNumberCheckpoints, numberInProgressCheckpoints, 
numberCompletedCheckpoints, numberFailedCheckpoints);
+               }
+       }
+
+       /**
+        * Checkpoint summary.
+        */
+       public static final class Summary {
+
+               public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+               public static final String FIELD_NAME_DURATION = 
"end_to_end_duration";
+
+               public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
+
+               @JsonProperty(FIELD_NAME_STATE_SIZE)
+               private final MinMaxAvgStatistics stateSize;
+
+               @JsonProperty(FIELD_NAME_DURATION)
+               private final MinMaxAvgStatistics duration;
+
+               @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+               private final MinMaxAvgStatistics alignmentBuffered;
+
+               @JsonCreator
+               public Summary(
+                               @JsonProperty(FIELD_NAME_STATE_SIZE) 
MinMaxAvgStatistics stateSize,
+                               @JsonProperty(FIELD_NAME_DURATION) 
MinMaxAvgStatistics duration,
+                               @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) 
MinMaxAvgStatistics alignmentBuffered) {
+                       this.stateSize = stateSize;
+                       this.duration = duration;
+                       this.alignmentBuffered = alignmentBuffered;
+               }
+
+               public MinMaxAvgStatistics getStateSize() {
+                       return stateSize;
+               }
+
+               public MinMaxAvgStatistics getDuration() {
+                       return duration;
+               }
+
+               public MinMaxAvgStatistics getAlignmentBuffered() {
+                       return alignmentBuffered;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       Summary summary = (Summary) o;
+                       return Objects.equals(stateSize, summary.stateSize) &&
+                               Objects.equals(duration, summary.duration) &&
+                               Objects.equals(alignmentBuffered, 
summary.alignmentBuffered);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(stateSize, duration, 
alignmentBuffered);
+               }
+       }
+
+       /**
+        * Minimum, maximum and average statistics.
+        */
+       public static final class MinMaxAvgStatistics {
+
+               public static final String FIELD_NAME_MINIMUM = "min";
+
+               public static final String FIELD_NAME_MAXIMUM = "max";
+
+               public static final String FIELD_NAME_AVERAGE = "avg";
+
+               @JsonProperty(FIELD_NAME_MINIMUM)
+               private final long minimum;
+
+               @JsonProperty(FIELD_NAME_MAXIMUM)
+               private final long maximum;
+
+               @JsonProperty(FIELD_NAME_AVERAGE)
+               private final long average;
+
+               @JsonCreator
+               public MinMaxAvgStatistics(
+                               @JsonProperty(FIELD_NAME_MINIMUM) long minimum,
+                               @JsonProperty(FIELD_NAME_MAXIMUM) long maximum,
+                               @JsonProperty(FIELD_NAME_AVERAGE) long average) 
{
+                       this.minimum = minimum;
+                       this.maximum = maximum;
+                       this.average = average;
+               }
+
+               public long getMinimum() {
+                       return minimum;
+               }
+
+               public long getMaximum() {
+                       return maximum;
+               }
+
+               public long getAverage() {
+                       return average;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       MinMaxAvgStatistics that = (MinMaxAvgStatistics) o;
+                       return minimum == that.minimum &&
+                               maximum == that.maximum &&
+                               average == that.average;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(minimum, maximum, average);
+               }
+       }
+
+       /**
+        * Statistics about the latest checkpoints.
+        */
+       public static final class LatestCheckpoints {
+
+               public static final String FIELD_NAME_COMPLETED = "completed";
+
+               public static final String FIELD_NAME_SAVEPOINT = "savepoint";
+
+               public static final String FIELD_NAME_FAILED = "failed";
+
+               public static final String FIELD_NAME_RESTORED = "restored";
+
+               @JsonProperty(FIELD_NAME_COMPLETED)
+               @Nullable
+               private final 
CheckpointStatistics.CompletedCheckpointStatistics 
completedCheckpointStatistics;
+
+               @JsonProperty(FIELD_NAME_SAVEPOINT)
+               @Nullable
+               private final 
CheckpointStatistics.CompletedCheckpointStatistics savepointStatistics;
+
+               @JsonProperty(FIELD_NAME_FAILED)
+               @Nullable
+               private final CheckpointStatistics.FailedCheckpointStatistics 
failedCheckpointStatistics;
+
+               @JsonProperty(FIELD_NAME_RESTORED)
+               @Nullable
+               private final RestoredCheckpointStatistics 
restoredCheckpointStatistics;
+
+               @JsonCreator
+               public LatestCheckpoints(
+                               @JsonProperty(FIELD_NAME_COMPLETED) @Nullable 
CheckpointStatistics.CompletedCheckpointStatistics 
completedCheckpointStatistics,
+                               @JsonProperty(FIELD_NAME_SAVEPOINT) @Nullable 
CheckpointStatistics.CompletedCheckpointStatistics savepointStatistics,
+                               @JsonProperty(FIELD_NAME_FAILED) @Nullable 
CheckpointStatistics.FailedCheckpointStatistics failedCheckpointStatistics,
+                               @JsonProperty(FIELD_NAME_RESTORED) @Nullable 
RestoredCheckpointStatistics restoredCheckpointStatistics) {
+                       this.completedCheckpointStatistics = 
completedCheckpointStatistics;
+                       this.savepointStatistics = savepointStatistics;
+                       this.failedCheckpointStatistics = 
failedCheckpointStatistics;
+                       this.restoredCheckpointStatistics = 
restoredCheckpointStatistics;
+               }
+
+               @Nullable
+               public CheckpointStatistics.CompletedCheckpointStatistics 
getCompletedCheckpointStatistics() {
+                       return completedCheckpointStatistics;
+               }
+
+               @Nullable
+               public CheckpointStatistics.CompletedCheckpointStatistics 
getSavepointStatistics() {
+                       return savepointStatistics;
+               }
+
+               @Nullable
+               public CheckpointStatistics.FailedCheckpointStatistics 
getFailedCheckpointStatistics() {
+                       return failedCheckpointStatistics;
+               }
+
+               @Nullable
+               public RestoredCheckpointStatistics 
getRestoredCheckpointStatistics() {
+                       return restoredCheckpointStatistics;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       LatestCheckpoints that = (LatestCheckpoints) o;
+                       return Objects.equals(completedCheckpointStatistics, 
that.completedCheckpointStatistics) &&
+                               Objects.equals(savepointStatistics, 
that.savepointStatistics) &&
+                               Objects.equals(failedCheckpointStatistics, 
that.failedCheckpointStatistics) &&
+                               Objects.equals(restoredCheckpointStatistics, 
that.restoredCheckpointStatistics);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(completedCheckpointStatistics, 
savepointStatistics, failedCheckpointStatistics, restoredCheckpointStatistics);
+               }
+       }
+
+       /**
+        * Statistics for a restored checkpoint.
+        */
+       public static final class RestoredCheckpointStatistics {
+
+               public static final String FIELD_NAME_ID = "id";
+
+               public static final String FIELD_NAME_RESTORE_TIMESTAMP = 
"restore_timestamp";
+
+               public static final String FIELD_NAME_IS_SAVEPOINT = 
"is_savepoint";
+
+               public static final String FIELD_NAME_EXTERNAL_PATH = 
"external_path";
+
+               @JsonProperty(FIELD_NAME_ID)
+               private final long id;
+
+               @JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP)
+               private final long restoreTimestamp;
+
+               @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+               private final boolean savepoint;
+
+               @JsonProperty(FIELD_NAME_EXTERNAL_PATH)
+               @Nullable
+               private final String externalPath;
+
+               @JsonCreator
+               public RestoredCheckpointStatistics(
+                               @JsonProperty(FIELD_NAME_ID) long id,
+                               @JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP) 
long restoreTimestamp,
+                               @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean 
savepoint,
+                               @JsonProperty(FIELD_NAME_EXTERNAL_PATH) 
@Nullable String externalPath) {
+                       this.id = id;
+                       this.restoreTimestamp = restoreTimestamp;
+                       this.savepoint = savepoint;
+                       this.externalPath = externalPath;
+               }
+
+               public long getId() {
+                       return id;
+               }
+
+               public long getRestoreTimestamp() {
+                       return restoreTimestamp;
+               }
+
+               public boolean isSavepoint() {
+                       return savepoint;
+               }
+
+               @Nullable
+               public String getExternalPath() {
+                       return externalPath;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       RestoredCheckpointStatistics that = 
(RestoredCheckpointStatistics) o;
+                       return id == that.id &&
+                               restoreTimestamp == that.restoreTimestamp &&
+                               savepoint == that.savepoint &&
+                               Objects.equals(externalPath, that.externalPath);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(id, restoreTimestamp, savepoint, 
externalPath);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java
new file mode 100644
index 0000000..ce809e7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link CheckpointingStatisticsHandler}.
+ */
+public class CheckpointingStatisticsHeaders implements 
MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters> 
{
+
+       private static final CheckpointingStatisticsHeaders INSTANCE = new 
CheckpointingStatisticsHeaders();
+
+       public static final String URL = "/jobs/:jobid/checkpoints";
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public Class<CheckpointingStatistics> getResponseClass() {
+               return CheckpointingStatistics.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public JobMessageParameters getUnresolvedMessageParameters() {
+               return new JobMessageParameters();
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.GET;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return URL;
+       }
+
+       public static CheckpointingStatisticsHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
new file mode 100644
index 0000000..b1f083f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.KeyDeserializer;
+
+import java.io.IOException;
+
+/**
+ * Jackson deserializer for {@link JobVertexID}.
+ */
+public class JobVertexIDDeserializer extends KeyDeserializer {
+
+       @Override
+       public Object deserializeKey(String key, DeserializationContext ctxt) 
throws IOException {
+               return JobVertexID.fromHexString(key);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
new file mode 100644
index 0000000..f2b9859
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * Jackson serializer for {@link JobVertexID}.
+ */
+public class JobVertexIDSerializer extends StdSerializer<JobVertexID> {
+
+       private static final long serialVersionUID = 2970050507628933522L;
+
+       public JobVertexIDSerializer() {
+               super(JobVertexID.class);
+       }
+
+       @Override
+       public void serialize(JobVertexID value, JsonGenerator gen, 
SerializerProvider provider) throws IOException {
+               gen.writeFieldName(value.toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
index 04b1c55..7337772 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.rest.handler.legacy.checkpoints;
 
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
index 1eac20b..263117a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;

Reply via email to