[
https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16197163#comment-16197163
]
ASF GitHub Bot commented on FLINK-7709:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4763#discussion_r143504804
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Statistics for a checkpoint.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value =
CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
+ @JsonSubTypes.Type(value =
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CheckpointStatistics implements ResponseBody {
+
+ public static final String FIELD_NAME_ID = "id";
+
+ public static final String FIELD_NAME_STATUS = "status";
+
+ public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+ public static final String FIELD_NAME_TRIGGER_TIMESTAMP =
"trigger_timestamp";
+
+ public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP =
"latest_ack_timestamp";
+
+ public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+ public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+ public static final String FIELD_NAME_ALIGNMENT_BUFFERED =
"alignment_buffered";
+
+ public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+ public static final String FIELD_NAME_NUM_ACK_SUBTASKS =
"num_acknowledged_subtasks";
+
+ public static final String FIELD_NAME_TASKS = "tasks";
+
+ @JsonProperty(FIELD_NAME_ID)
+ private final long id;
+
+ @JsonProperty(FIELD_NAME_STATUS)
+ private final CheckpointStatsStatus status;
+
+ @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+ private final boolean savepoint;
+
+ @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+ private final long triggerTimestamp;
+
+ @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+ private final long latestAckTimestamp;
+
+ @JsonProperty(FIELD_NAME_STATE_SIZE)
+ private final long stateSize;
+
+ @JsonProperty(FIELD_NAME_DURATION)
+ private final long duration;
+
+ @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+ private final long alignmentBuffered;
+
+ @JsonProperty(FIELD_NAME_NUM_SUBTASKS)
+ private final int numSubtasks;
+
+ @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
+ private final int numAckSubtasks;
+
+ @JsonProperty(FIELD_NAME_TASKS)
+ @JsonSerialize(keyUsing = JobVertexIDSerializer.class)
+ @Nullable
+ private final Map<JobVertexID, TaskCheckpointStatistics>
checkpointStatisticsPerTask;
+
+ @JsonCreator
+ private CheckpointStatistics(
+ @JsonProperty(FIELD_NAME_ID) long id,
+ @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus
status,
+ @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean
savepoint,
+ @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long
triggerTimestamp,
+ @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long
latestAckTimestamp,
+ @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+ @JsonProperty(FIELD_NAME_DURATION) long duration,
+ @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long
alignmentBuffered,
+ @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+ @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int
numAckSubtasks,
+ @JsonDeserialize(keyUsing =
JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable
Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
+ this.id = id;
+ this.status = Preconditions.checkNotNull(status);
+ this.savepoint = savepoint;
+ this.triggerTimestamp = triggerTimestamp;
+ this.latestAckTimestamp = latestAckTimestamp;
+ this.stateSize = stateSize;
+ this.duration = duration;
+ this.alignmentBuffered = alignmentBuffered;
+ this.numSubtasks = numSubtasks;
+ this.numAckSubtasks = numAckSubtasks;
+ this.checkpointStatisticsPerTask = checkpointStatisticsPerTask;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public CheckpointStatsStatus getStatus() {
+ return status;
+ }
+
+ public boolean isSavepoint() {
+ return savepoint;
+ }
+
+ public long getTriggerTimestamp() {
+ return triggerTimestamp;
+ }
+
+ public long getLatestAckTimestamp() {
+ return latestAckTimestamp;
+ }
+
+ public long getStateSize() {
+ return stateSize;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public long getAlignmentBuffered() {
+ return alignmentBuffered;
+ }
+
+ public int getNumSubtasks() {
+ return numSubtasks;
+ }
+
+ public int getNumAckSubtasks() {
+ return numAckSubtasks;
+ }
+
+ @Nullable
+ public Map<JobVertexID, TaskCheckpointStatistics>
getCheckpointStatisticsPerTask() {
+ return checkpointStatisticsPerTask;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CheckpointStatistics that = (CheckpointStatistics) o;
+ return id == that.id &&
+ savepoint == that.savepoint &&
+ triggerTimestamp == that.triggerTimestamp &&
+ latestAckTimestamp == that.latestAckTimestamp &&
+ stateSize == that.stateSize &&
+ duration == that.duration &&
+ alignmentBuffered == that.alignmentBuffered &&
+ numSubtasks == that.numSubtasks &&
+ numAckSubtasks == that.numAckSubtasks &&
+ status == that.status &&
+ Objects.equals(checkpointStatisticsPerTask,
that.checkpointStatisticsPerTask);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, status, savepoint, triggerTimestamp,
latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks,
numAckSubtasks, checkpointStatisticsPerTask);
+ }
+
+ public static CheckpointStatistics
generateCheckpointStatistics(AbstractCheckpointStats checkpointStats, boolean
includeTaskCheckpointStatistics) {
+ if (checkpointStats != null) {
+
+ Map<JobVertexID, TaskCheckpointStatistics>
checkpointStatisticsPerTask;
+
+ if (includeTaskCheckpointStatistics) {
+ Collection<TaskStateStats> taskStateStats =
checkpointStats.getAllTaskStateStats();
+
+ checkpointStatisticsPerTask = new
HashMap<>(taskStateStats.size());
+
+ for (TaskStateStats taskStateStat :
taskStateStats) {
+ checkpointStatisticsPerTask.put(
+ taskStateStat.getJobVertexId(),
+ new TaskCheckpointStatistics(
+
taskStateStat.getLatestAckTimestamp(),
+
taskStateStat.getStateSize(),
+
taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
+
taskStateStat.getAlignmentBuffered(),
+
taskStateStat.getNumberOfSubtasks(),
+
taskStateStat.getNumberOfAcknowledgedSubtasks()));
+ }
+ } else {
+ checkpointStatisticsPerTask = null;
+ }
+
+ if (checkpointStats instanceof
CompletedCheckpointStats) {
+ final CompletedCheckpointStats
completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
+
+ return new
CheckpointStatistics.CompletedCheckpointStatistics(
+
completedCheckpointStats.getCheckpointId(),
+ completedCheckpointStats.getStatus(),
+
completedCheckpointStats.getProperties().isSavepoint(),
+
completedCheckpointStats.getTriggerTimestamp(),
+
completedCheckpointStats.getLatestAckTimestamp(),
+ completedCheckpointStats.getStateSize(),
+
completedCheckpointStats.getEndToEndDuration(),
+
completedCheckpointStats.getAlignmentBuffered(),
+
completedCheckpointStats.getNumberOfSubtasks(),
+
completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+ checkpointStatisticsPerTask,
+
completedCheckpointStats.getExternalPath(),
+ completedCheckpointStats.isDiscarded());
+ } else if (checkpointStats instanceof
FailedCheckpointStats) {
+ final FailedCheckpointStats
failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
+
+ return new
CheckpointStatistics.FailedCheckpointStatistics(
+ failedCheckpointStats.getCheckpointId(),
+ failedCheckpointStats.getStatus(),
+
failedCheckpointStats.getProperties().isSavepoint(),
+
failedCheckpointStats.getTriggerTimestamp(),
+
failedCheckpointStats.getLatestAckTimestamp(),
+ failedCheckpointStats.getStateSize(),
+
failedCheckpointStats.getEndToEndDuration(),
+
failedCheckpointStats.getAlignmentBuffered(),
+
failedCheckpointStats.getNumberOfSubtasks(),
+
failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+ checkpointStatisticsPerTask,
+
failedCheckpointStats.getFailureTimestamp(),
+
failedCheckpointStats.getFailureMessage());
+ } else {
+ throw new IllegalArgumentException("Given
checkpoint stats object of type " + checkpointStats.getClass().getName() + "
cannot be converted.");
+ }
+ } else {
+ return null;
--- End diff --
I think it should actually work and output a serialized `null` value.
However, I this distinction is not necessary and should be better pulled out of
this method.
> Port CheckpointStatsDetailsHandler to new REST endpoint
> -------------------------------------------------------
>
> Key: FLINK-7709
> URL: https://issues.apache.org/jira/browse/FLINK-7709
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, REST, Webfrontend
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Till Rohrmann
> Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CheckpointStatsDetailsHandler}} to new REST endpoint.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)