http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java deleted file mode 100644 index ade8c7a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java +++ /dev/null @@ -1,763 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.messages; - -import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; -import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler; -import org.apache.flink.util.Preconditions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; - -import javax.annotation.Nullable; - -import java.util.List; -import java.util.Objects; - -/** - * Response of the {@link CheckpointStatisticsHandler}. - */ -public class CheckpointStatistics implements ResponseBody { - - public static final String FIELD_NAME_COUNTS = "counts"; - - public static final String FIELD_NAME_SUMMARY = "summary"; - - public static final String FIELD_NAME_LATEST_CHECKPOINTS = "latest"; - - public static final String FIELD_NAME_HISTORY = "history"; - - @JsonProperty(FIELD_NAME_COUNTS) - private final Counts counts; - - @JsonProperty(FIELD_NAME_SUMMARY) - private final Summary summary; - - @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) - private final LatestCheckpoints latestCheckpoints; - - @JsonProperty(FIELD_NAME_HISTORY) - private final List<BaseCheckpointStatistics> history; - - @JsonCreator - public CheckpointStatistics( - @JsonProperty(FIELD_NAME_COUNTS) Counts counts, - @JsonProperty(FIELD_NAME_SUMMARY) Summary summary, - @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) LatestCheckpoints latestCheckpoints, - @JsonProperty(FIELD_NAME_HISTORY) List<BaseCheckpointStatistics> history) { - this.counts = Preconditions.checkNotNull(counts); - this.summary = Preconditions.checkNotNull(summary); - this.latestCheckpoints = Preconditions.checkNotNull(latestCheckpoints); - this.history = Preconditions.checkNotNull(history); - } - - public Counts getCounts() { - return counts; - } - - public Summary getSummary() { - return summary; - } - - public LatestCheckpoints getLatestCheckpoints() { - return latestCheckpoints; - } - - public List<BaseCheckpointStatistics> getHistory() { - return history; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - CheckpointStatistics that = (CheckpointStatistics) o; - return Objects.equals(counts, that.counts) && - Objects.equals(summary, that.summary) && - Objects.equals(latestCheckpoints, that.latestCheckpoints) && - Objects.equals(history, that.history); - } - - @Override - public int hashCode() { - return Objects.hash(counts, summary, latestCheckpoints, history); - } - - // ------------------------------------------------------------------ - // Inner classes - // ------------------------------------------------------------------ - - /** - * Checkpoint counts. - */ - public static final class Counts { - - public static final String FIELD_NAME_RESTORED_CHECKPOINTS = "restored"; - - public static final String FIELD_NAME_TOTAL_CHECKPOINTS = "total"; - - public static final String FIELD_NAME_IN_PROGRESS_CHECKPOINTS = "in_progress"; - - public static final String FIELD_NAME_COMPLETED_CHECKPOINTS = "completed"; - - public static final String FIELD_NAME_FAILED_CHECKPOINTS = "failed"; - - @JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS) - private final long numberRestoredCheckpoints; - - @JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS) - private final long totalNumberCheckpoints; - - @JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS) - private final int numberInProgressCheckpoints; - - @JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS) - private final long numberCompletedCheckpoints; - - @JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS) - private final long numberFailedCheckpoints; - - @JsonCreator - public Counts( - @JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS) long numberRestoredCheckpoints, - @JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS) long totalNumberCheckpoints, - @JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS) int numberInProgressCheckpoints, - @JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS) long numberCompletedCheckpoints, - @JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS) long numberFailedCheckpoints) { - this.numberRestoredCheckpoints = numberRestoredCheckpoints; - this.totalNumberCheckpoints = totalNumberCheckpoints; - this.numberInProgressCheckpoints = numberInProgressCheckpoints; - this.numberCompletedCheckpoints = numberCompletedCheckpoints; - this.numberFailedCheckpoints = numberFailedCheckpoints; - } - - public long getNumberRestoredCheckpoints() { - return numberRestoredCheckpoints; - } - - public long getTotalNumberCheckpoints() { - return totalNumberCheckpoints; - } - - public int getNumberInProgressCheckpoints() { - return numberInProgressCheckpoints; - } - - public long getNumberCompletedCheckpoints() { - return numberCompletedCheckpoints; - } - - public long getNumberFailedCheckpoints() { - return numberFailedCheckpoints; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Counts counts = (Counts) o; - return numberRestoredCheckpoints == counts.numberRestoredCheckpoints && - totalNumberCheckpoints == counts.totalNumberCheckpoints && - numberInProgressCheckpoints == counts.numberInProgressCheckpoints && - numberCompletedCheckpoints == counts.numberCompletedCheckpoints && - numberFailedCheckpoints == counts.numberFailedCheckpoints; - } - - @Override - public int hashCode() { - return Objects.hash(numberRestoredCheckpoints, totalNumberCheckpoints, numberInProgressCheckpoints, numberCompletedCheckpoints, numberFailedCheckpoints); - } - } - - /** - * Checkpoint summary. - */ - public static final class Summary { - - public static final String FIELD_NAME_STATE_SIZE = "state_size"; - - public static final String FIELD_NAME_DURATION = "end_to_end_duration"; - - public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; - - @JsonProperty(FIELD_NAME_STATE_SIZE) - private final MinMaxAvgStatistics stateSize; - - @JsonProperty(FIELD_NAME_DURATION) - private final MinMaxAvgStatistics duration; - - @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) - private final MinMaxAvgStatistics alignmentBuffered; - - @JsonCreator - public Summary( - @JsonProperty(FIELD_NAME_STATE_SIZE) MinMaxAvgStatistics stateSize, - @JsonProperty(FIELD_NAME_DURATION) MinMaxAvgStatistics duration, - @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) MinMaxAvgStatistics alignmentBuffered) { - this.stateSize = stateSize; - this.duration = duration; - this.alignmentBuffered = alignmentBuffered; - } - - public MinMaxAvgStatistics getStateSize() { - return stateSize; - } - - public MinMaxAvgStatistics getDuration() { - return duration; - } - - public MinMaxAvgStatistics getAlignmentBuffered() { - return alignmentBuffered; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Summary summary = (Summary) o; - return Objects.equals(stateSize, summary.stateSize) && - Objects.equals(duration, summary.duration) && - Objects.equals(alignmentBuffered, summary.alignmentBuffered); - } - - @Override - public int hashCode() { - return Objects.hash(stateSize, duration, alignmentBuffered); - } - } - - /** - * Minimum, maximum and average statistics. - */ - public static final class MinMaxAvgStatistics { - - public static final String FIELD_NAME_MINIMUM = "min"; - - public static final String FIELD_NAME_MAXIMUM = "max"; - - public static final String FIELD_NAME_AVERAGE = "avg"; - - @JsonProperty(FIELD_NAME_MINIMUM) - private final long minimum; - - @JsonProperty(FIELD_NAME_MAXIMUM) - private final long maximum; - - @JsonProperty(FIELD_NAME_AVERAGE) - private final long average; - - @JsonCreator - public MinMaxAvgStatistics( - @JsonProperty(FIELD_NAME_MINIMUM) long minimum, - @JsonProperty(FIELD_NAME_MAXIMUM) long maximum, - @JsonProperty(FIELD_NAME_AVERAGE) long average) { - this.minimum = minimum; - this.maximum = maximum; - this.average = average; - } - - public long getMinimum() { - return minimum; - } - - public long getMaximum() { - return maximum; - } - - public long getAverage() { - return average; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - MinMaxAvgStatistics that = (MinMaxAvgStatistics) o; - return minimum == that.minimum && - maximum == that.maximum && - average == that.average; - } - - @Override - public int hashCode() { - return Objects.hash(minimum, maximum, average); - } - } - - /** - * Statistics about the latest checkpoints. - */ - public static final class LatestCheckpoints { - - public static final String FIELD_NAME_COMPLETED = "completed"; - - public static final String FIELD_NAME_SAVEPOINT = "savepoint"; - - public static final String FIELD_NAME_FAILED = "failed"; - - public static final String FIELD_NAME_RESTORED = "restored"; - - @JsonProperty(FIELD_NAME_COMPLETED) - @Nullable - private final CompletedCheckpointStatistics completedCheckpointStatistics; - - @JsonProperty(FIELD_NAME_SAVEPOINT) - @Nullable - private final CompletedCheckpointStatistics savepointStatistics; - - @JsonProperty(FIELD_NAME_FAILED) - @Nullable - private final FailedCheckpointStatistics failedCheckpointStatistics; - - @JsonProperty(FIELD_NAME_RESTORED) - @Nullable - private final RestoredCheckpointStatistics restoredCheckpointStatistics; - - @JsonCreator - public LatestCheckpoints( - @JsonProperty(FIELD_NAME_COMPLETED) @Nullable CompletedCheckpointStatistics completedCheckpointStatistics, - @JsonProperty(FIELD_NAME_SAVEPOINT) @Nullable CompletedCheckpointStatistics savepointStatistics, - @JsonProperty(FIELD_NAME_FAILED) @Nullable FailedCheckpointStatistics failedCheckpointStatistics, - @JsonProperty(FIELD_NAME_RESTORED) @Nullable RestoredCheckpointStatistics restoredCheckpointStatistics) { - this.completedCheckpointStatistics = completedCheckpointStatistics; - this.savepointStatistics = savepointStatistics; - this.failedCheckpointStatistics = failedCheckpointStatistics; - this.restoredCheckpointStatistics = restoredCheckpointStatistics; - } - - @Nullable - public CompletedCheckpointStatistics getCompletedCheckpointStatistics() { - return completedCheckpointStatistics; - } - - @Nullable - public CompletedCheckpointStatistics getSavepointStatistics() { - return savepointStatistics; - } - - @Nullable - public FailedCheckpointStatistics getFailedCheckpointStatistics() { - return failedCheckpointStatistics; - } - - @Nullable - public RestoredCheckpointStatistics getRestoredCheckpointStatistics() { - return restoredCheckpointStatistics; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - LatestCheckpoints that = (LatestCheckpoints) o; - return Objects.equals(completedCheckpointStatistics, that.completedCheckpointStatistics) && - Objects.equals(savepointStatistics, that.savepointStatistics) && - Objects.equals(failedCheckpointStatistics, that.failedCheckpointStatistics) && - Objects.equals(restoredCheckpointStatistics, that.restoredCheckpointStatistics); - } - - @Override - public int hashCode() { - return Objects.hash(completedCheckpointStatistics, savepointStatistics, failedCheckpointStatistics, restoredCheckpointStatistics); - } - } - - /** - * Statistics for a checkpoint. - */ - @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") - @JsonSubTypes({ - @JsonSubTypes.Type(value = CompletedCheckpointStatistics.class, name = "completed"), - @JsonSubTypes.Type(value = FailedCheckpointStatistics.class, name = "failed")}) - public static class BaseCheckpointStatistics { - - public static final String FIELD_NAME_ID = "id"; - - public static final String FIELD_NAME_STATUS = "status"; - - public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; - - public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; - - public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; - - public static final String FIELD_NAME_STATE_SIZE = "state_size"; - - public static final String FIELD_NAME_DURATION = "end_to_end_duration"; - - public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; - - public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; - - public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; - - @JsonProperty(FIELD_NAME_ID) - private final long id; - - @JsonProperty(FIELD_NAME_STATUS) - private final CheckpointStatsStatus status; - - @JsonProperty(FIELD_NAME_IS_SAVEPOINT) - private final boolean savepoint; - - @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) - private final long triggerTimestamp; - - @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) - private final long latestAckTimestamp; - - @JsonProperty(FIELD_NAME_STATE_SIZE) - private final long stateSize; - - @JsonProperty(FIELD_NAME_DURATION) - private final long duration; - - @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) - private final long alignmentBuffered; - - @JsonProperty(FIELD_NAME_NUM_SUBTASKS) - private final int numSubtasks; - - @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) - private final int numAckSubtasks; - - @JsonCreator - protected BaseCheckpointStatistics( - @JsonProperty(FIELD_NAME_ID) long id, - @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, - @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, - @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, - @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, - @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, - @JsonProperty(FIELD_NAME_DURATION) long duration, - @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, - @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, - @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) { - this.id = id; - this.status = Preconditions.checkNotNull(status); - this.savepoint = savepoint; - this.triggerTimestamp = triggerTimestamp; - this.latestAckTimestamp = latestAckTimestamp; - this.stateSize = stateSize; - this.duration = duration; - this.alignmentBuffered = alignmentBuffered; - this.numSubtasks = numSubtasks; - this.numAckSubtasks = numAckSubtasks; - } - - public long getId() { - return id; - } - - public CheckpointStatsStatus getStatus() { - return status; - } - - public boolean isSavepoint() { - return savepoint; - } - - public long getTriggerTimestamp() { - return triggerTimestamp; - } - - public long getLatestAckTimestamp() { - return latestAckTimestamp; - } - - public long getStateSize() { - return stateSize; - } - - public long getDuration() { - return duration; - } - - public long getAlignmentBuffered() { - return alignmentBuffered; - } - - public int getNumSubtasks() { - return numSubtasks; - } - - public int getNumAckSubtasks() { - return numAckSubtasks; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - BaseCheckpointStatistics that = (BaseCheckpointStatistics) o; - return id == that.id && - savepoint == that.savepoint && - triggerTimestamp == that.triggerTimestamp && - latestAckTimestamp == that.latestAckTimestamp && - stateSize == that.stateSize && - duration == that.duration && - alignmentBuffered == that.alignmentBuffered && - numSubtasks == that.numSubtasks && - numAckSubtasks == that.numAckSubtasks && - status == that.status; - } - - @Override - public int hashCode() { - return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks); - } - } - - /** - * Statistics for a completed checkpoint. - */ - public static final class CompletedCheckpointStatistics extends BaseCheckpointStatistics { - - public static final String FIELD_NAME_EXTERNAL_PATH = "external_path"; - - public static final String FIELD_NAME_DISCARDED = "discarded"; - - @JsonProperty(FIELD_NAME_EXTERNAL_PATH) - @Nullable - private final String externalPath; - - @JsonProperty(FIELD_NAME_DISCARDED) - private final boolean discarded; - - @JsonCreator - public CompletedCheckpointStatistics( - @JsonProperty(FIELD_NAME_ID) long id, - @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, - @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, - @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, - @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, - @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, - @JsonProperty(FIELD_NAME_DURATION) long duration, - @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, - @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, - @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, - @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath, - @JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) { - super(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks); - - this.externalPath = externalPath; - this.discarded = discarded; - } - - @Nullable - public String getExternalPath() { - return externalPath; - } - - public boolean isDiscarded() { - return discarded; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - CompletedCheckpointStatistics that = (CompletedCheckpointStatistics) o; - return discarded == that.discarded && - Objects.equals(externalPath, that.externalPath); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), externalPath, discarded); - } - } - - /** - * Statistics for a failed checkpoint. - */ - public static final class FailedCheckpointStatistics extends BaseCheckpointStatistics { - - public static final String FIELD_NAME_FAILURE_TIMESTAMP = "failure_timestamp"; - - public static final String FIELD_NAME_FAILURE_MESSAGE = "failure_message"; - - @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) - private final long failureTimestamp; - - @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) - @Nullable - private final String failureMessage; - - @JsonCreator - public FailedCheckpointStatistics( - @JsonProperty(FIELD_NAME_ID) long id, - @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, - @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, - @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, - @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, - @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, - @JsonProperty(FIELD_NAME_DURATION) long duration, - @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, - @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, - @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, - @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp, - @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) { - super(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks); - - this.failureTimestamp = failureTimestamp; - this.failureMessage = failureMessage; - } - - public long getFailureTimestamp() { - return failureTimestamp; - } - - @Nullable - public String getFailureMessage() { - return failureMessage; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - FailedCheckpointStatistics that = (FailedCheckpointStatistics) o; - return failureTimestamp == that.failureTimestamp && - Objects.equals(failureMessage, that.failureMessage); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), failureTimestamp, failureMessage); - } - } - - /** - * Statistics for a restored checkpoint. - */ - public static final class RestoredCheckpointStatistics { - - public static final String FIELD_NAME_ID = "id"; - - public static final String FIELD_NAME_RESTORE_TIMESTAMP = "restore_timestamp"; - - public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; - - public static final String FIELD_NAME_EXTERNAL_PATH = "external_path"; - - @JsonProperty(FIELD_NAME_ID) - private final long id; - - @JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP) - private final long restoreTimestamp; - - @JsonProperty(FIELD_NAME_IS_SAVEPOINT) - private final boolean savepoint; - - @JsonProperty(FIELD_NAME_EXTERNAL_PATH) - @Nullable - private final String externalPath; - - @JsonCreator - public RestoredCheckpointStatistics( - @JsonProperty(FIELD_NAME_ID) long id, - @JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP) long restoreTimestamp, - @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, - @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath) { - this.id = id; - this.restoreTimestamp = restoreTimestamp; - this.savepoint = savepoint; - this.externalPath = externalPath; - } - - public long getId() { - return id; - } - - public long getRestoreTimestamp() { - return restoreTimestamp; - } - - public boolean isSavepoint() { - return savepoint; - } - - @Nullable - public String getExternalPath() { - return externalPath; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - RestoredCheckpointStatistics that = (RestoredCheckpointStatistics) o; - return id == that.id && - restoreTimestamp == that.restoreTimestamp && - savepoint == that.savepoint && - Objects.equals(externalPath, that.externalPath); - } - - @Override - public int hashCode() { - return Objects.hash(id, restoreTimestamp, savepoint, externalPath); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java deleted file mode 100644 index b062d0d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.messages; - -import org.apache.flink.runtime.rest.HttpMethodWrapper; -import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler; - -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; - -/** - * Message headers for the {@link CheckpointStatisticsHandler}. - */ -public class CheckpointStatisticsHeaders implements MessageHeaders<EmptyRequestBody, CheckpointStatistics, JobMessageParameters> { - - private static final CheckpointStatisticsHeaders INSTANCE = new CheckpointStatisticsHeaders(); - - public static final String URL = "/jobs/:jobid/checkpoints"; - - @Override - public Class<EmptyRequestBody> getRequestClass() { - return EmptyRequestBody.class; - } - - @Override - public Class<CheckpointStatistics> getResponseClass() { - return CheckpointStatistics.class; - } - - @Override - public HttpResponseStatus getResponseStatusCode() { - return HttpResponseStatus.OK; - } - - @Override - public JobMessageParameters getUnresolvedMessageParameters() { - return new JobMessageParameters(); - } - - @Override - public HttpMethodWrapper getHttpMethod() { - return HttpMethodWrapper.GET; - } - - @Override - public String getTargetRestEndpointURL() { - return URL; - } - - public static CheckpointStatisticsHeaders getInstance() { - return INSTANCE; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java index 9d74c95..1155892 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobMessageParameters.java @@ -26,7 +26,7 @@ import java.util.Collections; */ public class JobMessageParameters extends MessageParameters { - private final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); + protected final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); @Override public Collection<MessagePathParameter<?>> getPathParameters() { http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java new file mode 100644 index 0000000..f0526a0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigHeaders.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link CheckpointConfigHandler}. + */ +public class CheckpointConfigHeaders implements MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> { + + private static final CheckpointConfigHeaders INSTANCE = new CheckpointConfigHeaders(); + + public static final String URL = "/jobs/:jobid/checkpoints/config"; + + private CheckpointConfigHeaders() {} + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class<CheckpointConfigInfo> getResponseClass() { + return CheckpointConfigInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public JobMessageParameters getUnresolvedMessageParameters() { + return new JobMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static CheckpointConfigHeaders getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java new file mode 100644 index 0000000..797d3a5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Response class of the {@link CheckpointConfigHandler}. + */ +public class CheckpointConfigInfo implements ResponseBody { + + public static final String FIELD_NAME_PROCESSING_MODE = "mode"; + + public static final String FIELD_NAME_CHECKPOINT_INTERVAL = "interval"; + + public static final String FIELD_NAME_CHECKPOINT_TIMEOUT = "timeout"; + + public static final String FIELD_NAME_CHECKPOINT_MIN_PAUSE = "min_pause"; + + public static final String FIELD_NAME_CHECKPOINT_MAX_CONCURRENT = "max_concurrent"; + + public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = "externalization"; + + @JsonProperty(FIELD_NAME_PROCESSING_MODE) + private final ProcessingMode processingMode; + + @JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL) + private final long checkpointInterval; + + @JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT) + private final long checkpointTimeout; + + @JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE) + private final long minPauseBetweenCheckpoints; + + @JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT) + private final long maxConcurrentCheckpoints; + + @JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG) + private final ExternalizedCheckpointInfo externalizedCheckpointInfo; + + @JsonCreator + public CheckpointConfigInfo( + @JsonProperty(FIELD_NAME_PROCESSING_MODE) ProcessingMode processingMode, + @JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL) long checkpointInterval, + @JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT) long checkpointTimeout, + @JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE) long minPauseBetweenCheckpoints, + @JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT) int maxConcurrentCheckpoints, + @JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG) ExternalizedCheckpointInfo externalizedCheckpointInfo) { + this.processingMode = Preconditions.checkNotNull(processingMode); + this.checkpointInterval = checkpointInterval; + this.checkpointTimeout = checkpointTimeout; + this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; + this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; + this.externalizedCheckpointInfo = Preconditions.checkNotNull(externalizedCheckpointInfo); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CheckpointConfigInfo that = (CheckpointConfigInfo) o; + return checkpointInterval == that.checkpointInterval && + checkpointTimeout == that.checkpointTimeout && + minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints && + maxConcurrentCheckpoints == that.maxConcurrentCheckpoints && + processingMode == that.processingMode && + Objects.equals(externalizedCheckpointInfo, that.externalizedCheckpointInfo); + } + + @Override + public int hashCode() { + return Objects.hash(processingMode, checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, externalizedCheckpointInfo); + } + + /** + * Contains information about the externalized checkpoint configuration. + */ + public static final class ExternalizedCheckpointInfo { + + public static final String FIELD_NAME_ENABLED = "enabled"; + + public static final String FIELD_NAME_DELETE_ON_CANCELLATION = "delete_on_cancellation"; + + @JsonProperty(FIELD_NAME_ENABLED) + private final boolean enabled; + + @JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION) + private final boolean deleteOnCancellation; + + @JsonCreator + public ExternalizedCheckpointInfo( + @JsonProperty(FIELD_NAME_ENABLED) boolean enabled, + @JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION) boolean deleteOnCancellation) { + this.enabled = enabled; + this.deleteOnCancellation = deleteOnCancellation; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ExternalizedCheckpointInfo that = (ExternalizedCheckpointInfo) o; + return enabled == that.enabled && + deleteOnCancellation == that.deleteOnCancellation; + } + + @Override + public int hashCode() { + return Objects.hash(enabled, deleteOnCancellation); + } + } + + /** + * Processing mode. + */ + public enum ProcessingMode { + AT_LEAST_ONCE, + EXACTLY_ONCE + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java new file mode 100644 index 0000000..c08cc82 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointIdPathParameter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.messages.ConversionException; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; + +/** + * Path parameter for the checkpoint id of type {@link Long}. + */ +public class CheckpointIdPathParameter extends MessagePathParameter<Long> { + + public static final String KEY = "checkpointid"; + + protected CheckpointIdPathParameter() { + super(KEY); + } + + @Override + protected Long convertFromString(String value) throws ConversionException { + try { + return Long.parseLong(value); + } catch (NumberFormatException nfe) { + throw new ConversionException("Could not parse long from " + value + '.', nfe); + } + } + + @Override + protected String convertToString(Long value) { + return value.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java new file mode 100644 index 0000000..040aa87 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointMessageParameters.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Message parameters for checkpoint related messages. + */ +public class CheckpointMessageParameters extends JobMessageParameters { + + protected final CheckpointIdPathParameter checkpointIdPathParameter = new CheckpointIdPathParameter(); + + @Override + public Collection<MessagePathParameter<?>> getPathParameters() { + return Arrays.asList(jobPathParameter, checkpointIdPathParameter); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java new file mode 100644 index 0000000..3d7ba2b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatisticDetailsHeaders.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Headers for the {@link CheckpointStatisticDetailsHandler}. + */ +public class CheckpointStatisticDetailsHeaders implements MessageHeaders<EmptyRequestBody, CheckpointStatistics, CheckpointMessageParameters> { + + private static final CheckpointStatisticDetailsHeaders INSTANCE = new CheckpointStatisticDetailsHeaders(); + + public static final String URL = "/jobs/:jobid/checkpoints/:checkpointid"; + + private CheckpointStatisticDetailsHeaders() {} + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class<CheckpointStatistics> getResponseClass() { + return CheckpointStatistics.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public CheckpointMessageParameters getUnresolvedMessageParameters() { + return new CheckpointMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static CheckpointStatisticDetailsHeaders getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java new file mode 100644 index 0000000..9fb1094 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; +import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Statistics for a checkpoint. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class") +@JsonSubTypes({ + @JsonSubTypes.Type(value = CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"), + @JsonSubTypes.Type(value = CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")}) +public class CheckpointStatistics implements ResponseBody { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_STATUS = "status"; + + public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; + + public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp"; + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + public static final String FIELD_NAME_TASKS = "tasks"; + + @JsonProperty(FIELD_NAME_ID) + private final long id; + + @JsonProperty(FIELD_NAME_STATUS) + private final CheckpointStatsStatus status; + + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) + private final boolean savepoint; + + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) + private final long triggerTimestamp; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered; + + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) + private final int numSubtasks; + + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) + private final int numAckSubtasks; + + @JsonProperty(FIELD_NAME_TASKS) + @JsonSerialize(keyUsing = JobVertexIDSerializer.class) + private final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask; + + @JsonCreator + private CheckpointStatistics( + @JsonProperty(FIELD_NAME_ID) long id, + @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, + @JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) { + this.id = id; + this.status = Preconditions.checkNotNull(status); + this.savepoint = savepoint; + this.triggerTimestamp = triggerTimestamp; + this.latestAckTimestamp = latestAckTimestamp; + this.stateSize = stateSize; + this.duration = duration; + this.alignmentBuffered = alignmentBuffered; + this.numSubtasks = numSubtasks; + this.numAckSubtasks = numAckSubtasks; + this.checkpointStatisticsPerTask = Preconditions.checkNotNull(checkpointStatisticsPerTask); + } + + public long getId() { + return id; + } + + public CheckpointStatsStatus getStatus() { + return status; + } + + public boolean isSavepoint() { + return savepoint; + } + + public long getTriggerTimestamp() { + return triggerTimestamp; + } + + public long getLatestAckTimestamp() { + return latestAckTimestamp; + } + + public long getStateSize() { + return stateSize; + } + + public long getDuration() { + return duration; + } + + public long getAlignmentBuffered() { + return alignmentBuffered; + } + + public int getNumSubtasks() { + return numSubtasks; + } + + public int getNumAckSubtasks() { + return numAckSubtasks; + } + + @Nullable + public Map<JobVertexID, TaskCheckpointStatistics> getCheckpointStatisticsPerTask() { + return checkpointStatisticsPerTask; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CheckpointStatistics that = (CheckpointStatistics) o; + return id == that.id && + savepoint == that.savepoint && + triggerTimestamp == that.triggerTimestamp && + latestAckTimestamp == that.latestAckTimestamp && + stateSize == that.stateSize && + duration == that.duration && + alignmentBuffered == that.alignmentBuffered && + numSubtasks == that.numSubtasks && + numAckSubtasks == that.numAckSubtasks && + status == that.status && + Objects.equals(checkpointStatisticsPerTask, that.checkpointStatisticsPerTask); + } + + @Override + public int hashCode() { + return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks, checkpointStatisticsPerTask); + } + + // ------------------------------------------------------------------------- + // Static factory methods + // ------------------------------------------------------------------------- + + public static CheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats, boolean includeTaskCheckpointStatistics) { + Preconditions.checkNotNull(checkpointStats); + + Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask; + + if (includeTaskCheckpointStatistics) { + Collection<TaskStateStats> taskStateStats = checkpointStats.getAllTaskStateStats(); + + checkpointStatisticsPerTask = new HashMap<>(taskStateStats.size()); + + for (TaskStateStats taskStateStat : taskStateStats) { + checkpointStatisticsPerTask.put( + taskStateStat.getJobVertexId(), + new TaskCheckpointStatistics( + taskStateStat.getLatestAckTimestamp(), + taskStateStat.getStateSize(), + taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()), + taskStateStat.getAlignmentBuffered(), + taskStateStat.getNumberOfSubtasks(), + taskStateStat.getNumberOfAcknowledgedSubtasks())); + } + } else { + checkpointStatisticsPerTask = Collections.emptyMap(); + } + + if (checkpointStats instanceof CompletedCheckpointStats) { + final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats); + + return new CheckpointStatistics.CompletedCheckpointStatistics( + completedCheckpointStats.getCheckpointId(), + completedCheckpointStats.getStatus(), + completedCheckpointStats.getProperties().isSavepoint(), + completedCheckpointStats.getTriggerTimestamp(), + completedCheckpointStats.getLatestAckTimestamp(), + completedCheckpointStats.getStateSize(), + completedCheckpointStats.getEndToEndDuration(), + completedCheckpointStats.getAlignmentBuffered(), + completedCheckpointStats.getNumberOfSubtasks(), + completedCheckpointStats.getNumberOfAcknowledgedSubtasks(), + checkpointStatisticsPerTask, + completedCheckpointStats.getExternalPath(), + completedCheckpointStats.isDiscarded()); + } else if (checkpointStats instanceof FailedCheckpointStats) { + final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats); + + return new CheckpointStatistics.FailedCheckpointStatistics( + failedCheckpointStats.getCheckpointId(), + failedCheckpointStats.getStatus(), + failedCheckpointStats.getProperties().isSavepoint(), + failedCheckpointStats.getTriggerTimestamp(), + failedCheckpointStats.getLatestAckTimestamp(), + failedCheckpointStats.getStateSize(), + failedCheckpointStats.getEndToEndDuration(), + failedCheckpointStats.getAlignmentBuffered(), + failedCheckpointStats.getNumberOfSubtasks(), + failedCheckpointStats.getNumberOfAcknowledgedSubtasks(), + checkpointStatisticsPerTask, + failedCheckpointStats.getFailureTimestamp(), + failedCheckpointStats.getFailureMessage()); + } else { + throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted."); + } + } + + // --------------------------------------------------------------------- + // Static inner classes + // --------------------------------------------------------------------- + + /** + * Checkpoint statistics for a single task. + */ + public static final class TaskCheckpointStatistics { + + public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp"; + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks"; + + public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks"; + + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) + private final long latestAckTimestamp; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final long stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final long duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final long alignmentBuffered; + + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) + private final int numSubtasks; + + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) + private final int numAckSubtasks; + + @JsonCreator + public TaskCheckpointStatistics( + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) { + this.latestAckTimestamp = latestAckTimestamp; + this.stateSize = stateSize; + this.duration = duration; + this.alignmentBuffered = alignmentBuffered; + this.numSubtasks = numSubtasks; + this.numAckSubtasks = numAckSubtasks; + } + + public long getLatestAckTimestamp() { + return latestAckTimestamp; + } + + public long getStateSize() { + return stateSize; + } + + public long getDuration() { + return duration; + } + + public long getAlignmentBuffered() { + return alignmentBuffered; + } + + public int getNumSubtasks() { + return numSubtasks; + } + + public int getNumAckSubtasks() { + return numAckSubtasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskCheckpointStatistics that = (TaskCheckpointStatistics) o; + return latestAckTimestamp == that.latestAckTimestamp && + stateSize == that.stateSize && + duration == that.duration && + alignmentBuffered == that.alignmentBuffered && + numSubtasks == that.numSubtasks && + numAckSubtasks == that.numAckSubtasks; + } + + @Override + public int hashCode() { + return Objects.hash(latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks); + } + } + + /** + * Statistics for a completed checkpoint. + */ + public static final class CompletedCheckpointStatistics extends CheckpointStatistics { + + public static final String FIELD_NAME_EXTERNAL_PATH = "external_path"; + + public static final String FIELD_NAME_DISCARDED = "discarded"; + + @JsonProperty(FIELD_NAME_EXTERNAL_PATH) + @Nullable + private final String externalPath; + + @JsonProperty(FIELD_NAME_DISCARDED) + private final boolean discarded; + + @JsonCreator + public CompletedCheckpointStatistics( + @JsonProperty(FIELD_NAME_ID) long id, + @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, + @JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask, + @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath, + @JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) { + super( + id, + status, + savepoint, + triggerTimestamp, + latestAckTimestamp, + stateSize, + duration, + alignmentBuffered, + numSubtasks, + numAckSubtasks, + checkpointingStatisticsPerTask); + + this.externalPath = externalPath; + this.discarded = discarded; + } + + @Nullable + public String getExternalPath() { + return externalPath; + } + + public boolean isDiscarded() { + return discarded; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + CompletedCheckpointStatistics that = (CompletedCheckpointStatistics) o; + return discarded == that.discarded && + Objects.equals(externalPath, that.externalPath); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), externalPath, discarded); + } + } + + /** + * Statistics for a failed checkpoint. + */ + public static final class FailedCheckpointStatistics extends CheckpointStatistics { + + public static final String FIELD_NAME_FAILURE_TIMESTAMP = "failure_timestamp"; + + public static final String FIELD_NAME_FAILURE_MESSAGE = "failure_message"; + + @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) + private final long failureTimestamp; + + @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) + @Nullable + private final String failureMessage; + + @JsonCreator + public FailedCheckpointStatistics( + @JsonProperty(FIELD_NAME_ID) long id, + @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status, + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, + @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp, + @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp, + @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize, + @JsonProperty(FIELD_NAME_DURATION) long duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered, + @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks, + @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks, + @JsonDeserialize(keyUsing = JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask, + @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp, + @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) { + super( + id, + status, + savepoint, + triggerTimestamp, + latestAckTimestamp, + stateSize, + duration, + alignmentBuffered, + numSubtasks, + numAckSubtasks, + checkpointingStatisticsPerTask); + + this.failureTimestamp = failureTimestamp; + this.failureMessage = failureMessage; + } + + public long getFailureTimestamp() { + return failureTimestamp; + } + + @Nullable + public String getFailureMessage() { + return failureMessage; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + FailedCheckpointStatistics that = (FailedCheckpointStatistics) o; + return failureTimestamp == that.failureTimestamp && + Objects.equals(failureMessage, that.failureMessage); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), failureTimestamp, failureMessage); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java new file mode 100644 index 0000000..1f00fcc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatistics.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +/** + * Response of the {@link CheckpointingStatisticsHandler}. This class contains information about + * the checkpointing of a given job. + */ +public class CheckpointingStatistics implements ResponseBody { + + public static final String FIELD_NAME_COUNTS = "counts"; + + public static final String FIELD_NAME_SUMMARY = "summary"; + + public static final String FIELD_NAME_LATEST_CHECKPOINTS = "latest"; + + public static final String FIELD_NAME_HISTORY = "history"; + + @JsonProperty(FIELD_NAME_COUNTS) + private final Counts counts; + + @JsonProperty(FIELD_NAME_SUMMARY) + private final Summary summary; + + @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) + private final LatestCheckpoints latestCheckpoints; + + @JsonProperty(FIELD_NAME_HISTORY) + private final List<CheckpointStatistics> history; + + @JsonCreator + public CheckpointingStatistics( + @JsonProperty(FIELD_NAME_COUNTS) Counts counts, + @JsonProperty(FIELD_NAME_SUMMARY) Summary summary, + @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) LatestCheckpoints latestCheckpoints, + @JsonProperty(FIELD_NAME_HISTORY) List<CheckpointStatistics> history) { + this.counts = Preconditions.checkNotNull(counts); + this.summary = Preconditions.checkNotNull(summary); + this.latestCheckpoints = Preconditions.checkNotNull(latestCheckpoints); + this.history = Preconditions.checkNotNull(history); + } + + public Counts getCounts() { + return counts; + } + + public Summary getSummary() { + return summary; + } + + public LatestCheckpoints getLatestCheckpoints() { + return latestCheckpoints; + } + + public List<CheckpointStatistics> getHistory() { + return history; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CheckpointingStatistics that = (CheckpointingStatistics) o; + return Objects.equals(counts, that.counts) && + Objects.equals(summary, that.summary) && + Objects.equals(latestCheckpoints, that.latestCheckpoints) && + Objects.equals(history, that.history); + } + + @Override + public int hashCode() { + return Objects.hash(counts, summary, latestCheckpoints, history); + } + + // ------------------------------------------------------------------ + // Inner classes + // ------------------------------------------------------------------ + + /** + * Checkpoint counts. + */ + public static final class Counts { + + public static final String FIELD_NAME_RESTORED_CHECKPOINTS = "restored"; + + public static final String FIELD_NAME_TOTAL_CHECKPOINTS = "total"; + + public static final String FIELD_NAME_IN_PROGRESS_CHECKPOINTS = "in_progress"; + + public static final String FIELD_NAME_COMPLETED_CHECKPOINTS = "completed"; + + public static final String FIELD_NAME_FAILED_CHECKPOINTS = "failed"; + + @JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS) + private final long numberRestoredCheckpoints; + + @JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS) + private final long totalNumberCheckpoints; + + @JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS) + private final int numberInProgressCheckpoints; + + @JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS) + private final long numberCompletedCheckpoints; + + @JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS) + private final long numberFailedCheckpoints; + + @JsonCreator + public Counts( + @JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS) long numberRestoredCheckpoints, + @JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS) long totalNumberCheckpoints, + @JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS) int numberInProgressCheckpoints, + @JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS) long numberCompletedCheckpoints, + @JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS) long numberFailedCheckpoints) { + this.numberRestoredCheckpoints = numberRestoredCheckpoints; + this.totalNumberCheckpoints = totalNumberCheckpoints; + this.numberInProgressCheckpoints = numberInProgressCheckpoints; + this.numberCompletedCheckpoints = numberCompletedCheckpoints; + this.numberFailedCheckpoints = numberFailedCheckpoints; + } + + public long getNumberRestoredCheckpoints() { + return numberRestoredCheckpoints; + } + + public long getTotalNumberCheckpoints() { + return totalNumberCheckpoints; + } + + public int getNumberInProgressCheckpoints() { + return numberInProgressCheckpoints; + } + + public long getNumberCompletedCheckpoints() { + return numberCompletedCheckpoints; + } + + public long getNumberFailedCheckpoints() { + return numberFailedCheckpoints; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Counts counts = (Counts) o; + return numberRestoredCheckpoints == counts.numberRestoredCheckpoints && + totalNumberCheckpoints == counts.totalNumberCheckpoints && + numberInProgressCheckpoints == counts.numberInProgressCheckpoints && + numberCompletedCheckpoints == counts.numberCompletedCheckpoints && + numberFailedCheckpoints == counts.numberFailedCheckpoints; + } + + @Override + public int hashCode() { + return Objects.hash(numberRestoredCheckpoints, totalNumberCheckpoints, numberInProgressCheckpoints, numberCompletedCheckpoints, numberFailedCheckpoints); + } + } + + /** + * Checkpoint summary. + */ + public static final class Summary { + + public static final String FIELD_NAME_STATE_SIZE = "state_size"; + + public static final String FIELD_NAME_DURATION = "end_to_end_duration"; + + public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered"; + + @JsonProperty(FIELD_NAME_STATE_SIZE) + private final MinMaxAvgStatistics stateSize; + + @JsonProperty(FIELD_NAME_DURATION) + private final MinMaxAvgStatistics duration; + + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) + private final MinMaxAvgStatistics alignmentBuffered; + + @JsonCreator + public Summary( + @JsonProperty(FIELD_NAME_STATE_SIZE) MinMaxAvgStatistics stateSize, + @JsonProperty(FIELD_NAME_DURATION) MinMaxAvgStatistics duration, + @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) MinMaxAvgStatistics alignmentBuffered) { + this.stateSize = stateSize; + this.duration = duration; + this.alignmentBuffered = alignmentBuffered; + } + + public MinMaxAvgStatistics getStateSize() { + return stateSize; + } + + public MinMaxAvgStatistics getDuration() { + return duration; + } + + public MinMaxAvgStatistics getAlignmentBuffered() { + return alignmentBuffered; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Summary summary = (Summary) o; + return Objects.equals(stateSize, summary.stateSize) && + Objects.equals(duration, summary.duration) && + Objects.equals(alignmentBuffered, summary.alignmentBuffered); + } + + @Override + public int hashCode() { + return Objects.hash(stateSize, duration, alignmentBuffered); + } + } + + /** + * Minimum, maximum and average statistics. + */ + public static final class MinMaxAvgStatistics { + + public static final String FIELD_NAME_MINIMUM = "min"; + + public static final String FIELD_NAME_MAXIMUM = "max"; + + public static final String FIELD_NAME_AVERAGE = "avg"; + + @JsonProperty(FIELD_NAME_MINIMUM) + private final long minimum; + + @JsonProperty(FIELD_NAME_MAXIMUM) + private final long maximum; + + @JsonProperty(FIELD_NAME_AVERAGE) + private final long average; + + @JsonCreator + public MinMaxAvgStatistics( + @JsonProperty(FIELD_NAME_MINIMUM) long minimum, + @JsonProperty(FIELD_NAME_MAXIMUM) long maximum, + @JsonProperty(FIELD_NAME_AVERAGE) long average) { + this.minimum = minimum; + this.maximum = maximum; + this.average = average; + } + + public long getMinimum() { + return minimum; + } + + public long getMaximum() { + return maximum; + } + + public long getAverage() { + return average; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MinMaxAvgStatistics that = (MinMaxAvgStatistics) o; + return minimum == that.minimum && + maximum == that.maximum && + average == that.average; + } + + @Override + public int hashCode() { + return Objects.hash(minimum, maximum, average); + } + } + + /** + * Statistics about the latest checkpoints. + */ + public static final class LatestCheckpoints { + + public static final String FIELD_NAME_COMPLETED = "completed"; + + public static final String FIELD_NAME_SAVEPOINT = "savepoint"; + + public static final String FIELD_NAME_FAILED = "failed"; + + public static final String FIELD_NAME_RESTORED = "restored"; + + @JsonProperty(FIELD_NAME_COMPLETED) + @Nullable + private final CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics; + + @JsonProperty(FIELD_NAME_SAVEPOINT) + @Nullable + private final CheckpointStatistics.CompletedCheckpointStatistics savepointStatistics; + + @JsonProperty(FIELD_NAME_FAILED) + @Nullable + private final CheckpointStatistics.FailedCheckpointStatistics failedCheckpointStatistics; + + @JsonProperty(FIELD_NAME_RESTORED) + @Nullable + private final RestoredCheckpointStatistics restoredCheckpointStatistics; + + @JsonCreator + public LatestCheckpoints( + @JsonProperty(FIELD_NAME_COMPLETED) @Nullable CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics, + @JsonProperty(FIELD_NAME_SAVEPOINT) @Nullable CheckpointStatistics.CompletedCheckpointStatistics savepointStatistics, + @JsonProperty(FIELD_NAME_FAILED) @Nullable CheckpointStatistics.FailedCheckpointStatistics failedCheckpointStatistics, + @JsonProperty(FIELD_NAME_RESTORED) @Nullable RestoredCheckpointStatistics restoredCheckpointStatistics) { + this.completedCheckpointStatistics = completedCheckpointStatistics; + this.savepointStatistics = savepointStatistics; + this.failedCheckpointStatistics = failedCheckpointStatistics; + this.restoredCheckpointStatistics = restoredCheckpointStatistics; + } + + @Nullable + public CheckpointStatistics.CompletedCheckpointStatistics getCompletedCheckpointStatistics() { + return completedCheckpointStatistics; + } + + @Nullable + public CheckpointStatistics.CompletedCheckpointStatistics getSavepointStatistics() { + return savepointStatistics; + } + + @Nullable + public CheckpointStatistics.FailedCheckpointStatistics getFailedCheckpointStatistics() { + return failedCheckpointStatistics; + } + + @Nullable + public RestoredCheckpointStatistics getRestoredCheckpointStatistics() { + return restoredCheckpointStatistics; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LatestCheckpoints that = (LatestCheckpoints) o; + return Objects.equals(completedCheckpointStatistics, that.completedCheckpointStatistics) && + Objects.equals(savepointStatistics, that.savepointStatistics) && + Objects.equals(failedCheckpointStatistics, that.failedCheckpointStatistics) && + Objects.equals(restoredCheckpointStatistics, that.restoredCheckpointStatistics); + } + + @Override + public int hashCode() { + return Objects.hash(completedCheckpointStatistics, savepointStatistics, failedCheckpointStatistics, restoredCheckpointStatistics); + } + } + + /** + * Statistics for a restored checkpoint. + */ + public static final class RestoredCheckpointStatistics { + + public static final String FIELD_NAME_ID = "id"; + + public static final String FIELD_NAME_RESTORE_TIMESTAMP = "restore_timestamp"; + + public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint"; + + public static final String FIELD_NAME_EXTERNAL_PATH = "external_path"; + + @JsonProperty(FIELD_NAME_ID) + private final long id; + + @JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP) + private final long restoreTimestamp; + + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) + private final boolean savepoint; + + @JsonProperty(FIELD_NAME_EXTERNAL_PATH) + @Nullable + private final String externalPath; + + @JsonCreator + public RestoredCheckpointStatistics( + @JsonProperty(FIELD_NAME_ID) long id, + @JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP) long restoreTimestamp, + @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint, + @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath) { + this.id = id; + this.restoreTimestamp = restoreTimestamp; + this.savepoint = savepoint; + this.externalPath = externalPath; + } + + public long getId() { + return id; + } + + public long getRestoreTimestamp() { + return restoreTimestamp; + } + + public boolean isSavepoint() { + return savepoint; + } + + @Nullable + public String getExternalPath() { + return externalPath; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RestoredCheckpointStatistics that = (RestoredCheckpointStatistics) o; + return id == that.id && + restoreTimestamp == that.restoreTimestamp && + savepoint == that.savepoint && + Objects.equals(externalPath, that.externalPath); + } + + @Override + public int hashCode() { + return Objects.hash(id, restoreTimestamp, savepoint, externalPath); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java new file mode 100644 index 0000000..ce809e7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsHeaders.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.checkpoints; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link CheckpointingStatisticsHandler}. + */ +public class CheckpointingStatisticsHeaders implements MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters> { + + private static final CheckpointingStatisticsHeaders INSTANCE = new CheckpointingStatisticsHeaders(); + + public static final String URL = "/jobs/:jobid/checkpoints"; + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class<CheckpointingStatistics> getResponseClass() { + return CheckpointingStatistics.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public JobMessageParameters getUnresolvedMessageParameters() { + return new JobMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static CheckpointingStatisticsHeaders getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java new file mode 100644 index 0000000..b1f083f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDDeserializer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.KeyDeserializer; + +import java.io.IOException; + +/** + * Jackson deserializer for {@link JobVertexID}. + */ +public class JobVertexIDDeserializer extends KeyDeserializer { + + @Override + public Object deserializeKey(String key, DeserializationContext ctxt) throws IOException { + return JobVertexID.fromHexString(key); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java new file mode 100644 index 0000000..f2b9859 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobVertexIDSerializer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.json; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; + +/** + * Jackson serializer for {@link JobVertexID}. + */ +public class JobVertexIDSerializer extends StdSerializer<JobVertexID> { + + private static final long serialVersionUID = 2970050507628933522L; + + public JobVertexIDSerializer() { + super(JobVertexID.class); + } + + @Override + public void serialize(JobVertexID value, JsonGenerator gen, SerializerProvider provider) throws IOException { + gen.writeFieldName(value.toString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java index 04b1c55..7337772 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy.checkpoints; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java index 1eac20b..263117a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
