[
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347417#comment-16347417
]
ASF GitHub Bot commented on FLINK-7856:
---------------------------------------
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5397#discussion_r165155639
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexBackPressureInfo.java
---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import
org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Response type of the {@link JobVertexBackPressureHandler}.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JobVertexBackPressureInfo implements ResponseBody {
+
+ public static final String FIELD_NAME_STATUS = "status";
+ public static final String FIELD_NAME_BACKPRESSURE_LEVEL =
"backpressure-level";
+ public static final String FIELD_NAME_END_TIMESTAMP = "end-timestamp";
+ public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+ @JsonProperty(FIELD_NAME_STATUS)
+ private final VertexBackPressureStatus status;
+
+ @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+ private final VertexBackPressureLevel backpressureLevel;
+
+ @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+ private final Long endTimestamp;
+
+ @JsonProperty(FIELD_NAME_SUBTASKS)
+ protected final List<SubtaskBackPressureInfo> subtasks;
+
+ @JsonCreator
+ public JobVertexBackPressureInfo(
+ @JsonProperty(FIELD_NAME_STATUS) VertexBackPressureStatus
status,
+ @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
VertexBackPressureLevel backpressureLevel,
+ @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long endTimestamp,
+ @JsonProperty(FIELD_NAME_SUBTASKS)
List<SubtaskBackPressureInfo> subtasks) {
+ this.status = status;
+ this.backpressureLevel = backpressureLevel;
+ this.endTimestamp = endTimestamp;
+ this.subtasks = subtasks;
+ }
+
+ public static JobVertexBackPressureInfo deprecated() {
+ return new JobVertexBackPressureInfo(
+ VertexBackPressureStatus.DEPRECATED,
+ null,
+ null,
+ null);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobVertexBackPressureInfo that = (JobVertexBackPressureInfo) o;
+ return Objects.equals(status, that.status) &&
+ Objects.equals(backpressureLevel,
that.backpressureLevel) &&
+ Objects.equals(endTimestamp, that.endTimestamp) &&
+ Objects.equals(subtasks, that.subtasks);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(status, backpressureLevel, endTimestamp,
subtasks);
+ }
+
+
//---------------------------------------------------------------------------------
+ // Static helper classes
+
//---------------------------------------------------------------------------------
+
+ /**
+ * Nested class to encapsulate the sub tasks back pressure.
+ */
+ public static final class SubtaskBackPressureInfo {
+
+ public static final String FIELD_NAME_SUBTASK = "subtask";
+ public static final String FIELD_NAME_BACKPRESSURE_LEVEL =
"backpressure-level";
+ public static final String FIELD_NAME_RATIO = "ratio";
+
+ @JsonProperty(FIELD_NAME_SUBTASK)
+ private final int subtask;
+
+ @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
+ private final VertexBackPressureLevel backpressureLevel;
+
+ @JsonProperty(FIELD_NAME_RATIO)
+ private final double ratio;
+
+ public SubtaskBackPressureInfo(
+ @JsonProperty(FIELD_NAME_SUBTASK) int subtask,
+ @JsonProperty(FIELD_NAME_BACKPRESSURE_LEVEL)
VertexBackPressureLevel backpressureLevel,
+ @JsonProperty(FIELD_NAME_RATIO) double ratio) {
+ this.subtask = subtask;
+ this.backpressureLevel =
checkNotNull(backpressureLevel);
+ this.ratio = ratio;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SubtaskBackPressureInfo that =
(SubtaskBackPressureInfo) o;
+ return subtask == that.subtask &&
+ ratio == that.ratio &&
+ Objects.equals(backpressureLevel,
that.backpressureLevel);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subtask, backpressureLevel, ratio);
+ }
+ }
+
+ /**
+ * Status of vertex back-pressure.
+ */
+ public enum VertexBackPressureStatus {
+ DEPRECATED("deprecated"), OK("ok");
+
+ private String status;
+
+ VertexBackPressureStatus(String status) {
+ this.status = status;
+ }
+
+ @JsonValue
--- End diff --
Needs to be tested.
> Port JobVertexBackPressureHandler to REST endpoint
> --------------------------------------------------
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, REST, Webfrontend
> Reporter: Fang Yong
> Assignee: Gary Yao
> Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)