This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 878a83cfa [CELEBORN-1750] Return struct worker resource consumption
information with RESTful api
878a83cfa is described below
commit 878a83cfa7129ab9cb27b83889b1ff1e29b9c3ba
Author: Wang, Fei <[email protected]>
AuthorDate: Sun Dec 1 19:58:01 2024 -0800
[CELEBORN-1750] Return struct worker resource consumption information with
RESTful api
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
This information is useful for the automation tool integration.
For our automation tool, it query all the worker status periodically by
calling master `/api/v1/worker`.
In decommission process, when the worker is in IDLE state, we need to check
whether there is still unreleased shuffle data on this worker so that we can
shutdown this node without user impaction.
Before, I have to call the worker `/ap1/v1/shuffles` to check that.
It is better that we can get all the information from celeborn master end,
because master is HA enabled and always reachable.
So in this PR, it returns the struct resource consumption for automation
tool integration.
### Does this PR introduce _any_ user-facing change?
No, this RESTful api has not been released.
### How was this patch tested?
GA.
Closes #2955 from turboFei/worker_info_object.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../apache/celeborn/rest/v1/model/WorkerData.java | 41 ++--
.../celeborn/rest/v1/model/WorkerInfoResponse.java | 41 ++--
.../rest/v1/model/WorkerResourceConsumption.java | 242 +++++++++++++++++++++
.../src/main/openapi3/master_rest_v1.yaml | 26 ++-
.../src/main/openapi3/worker_rest_v1.yaml | 32 ++-
.../server/common/http/api/v1/ApiUtils.scala | 48 +++-
6 files changed, 371 insertions(+), 59 deletions(-)
diff --git
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerData.java
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerData.java
index 4067fae4c..09908248b 100644
---
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerData.java
+++
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerData.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.annotation.JsonValue;
import java.util.HashMap;
import java.util.Map;
+import org.apache.celeborn.rest.v1.model.WorkerResourceConsumption;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -44,7 +45,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
WorkerData.JSON_PROPERTY_LAST_HEARTBEAT_TIMESTAMP,
WorkerData.JSON_PROPERTY_HEARTBEAT_ELAPSED_SECONDS,
WorkerData.JSON_PROPERTY_DISK_INFOS,
- WorkerData.JSON_PROPERTY_RESOURCE_CONSUMPTION,
+ WorkerData.JSON_PROPERTY_RESOURCE_CONSUMPTIONS,
WorkerData.JSON_PROPERTY_WORKER_REF,
WorkerData.JSON_PROPERTY_WORKER_STATE,
WorkerData.JSON_PROPERTY_WORKER_STATE_START_TIME
@@ -81,8 +82,8 @@ public class WorkerData {
public static final String JSON_PROPERTY_DISK_INFOS = "diskInfos";
private Map<String, String> diskInfos = new HashMap<>();
- public static final String JSON_PROPERTY_RESOURCE_CONSUMPTION =
"resourceConsumption";
- private Map<String, String> resourceConsumption = new HashMap<>();
+ public static final String JSON_PROPERTY_RESOURCE_CONSUMPTIONS =
"resourceConsumptions";
+ private Map<String, WorkerResourceConsumption> resourceConsumptions = new
HashMap<>();
public static final String JSON_PROPERTY_WORKER_REF = "workerRef";
private String workerRef;
@@ -354,37 +355,37 @@ public class WorkerData {
this.diskInfos = diskInfos;
}
- public WorkerData resourceConsumption(Map<String, String>
resourceConsumption) {
+ public WorkerData resourceConsumptions(Map<String,
WorkerResourceConsumption> resourceConsumptions) {
- this.resourceConsumption = resourceConsumption;
+ this.resourceConsumptions = resourceConsumptions;
return this;
}
- public WorkerData putResourceConsumptionItem(String key, String
resourceConsumptionItem) {
- if (this.resourceConsumption == null) {
- this.resourceConsumption = new HashMap<>();
+ public WorkerData putResourceConsumptionsItem(String key,
WorkerResourceConsumption resourceConsumptionsItem) {
+ if (this.resourceConsumptions == null) {
+ this.resourceConsumptions = new HashMap<>();
}
- this.resourceConsumption.put(key, resourceConsumptionItem);
+ this.resourceConsumptions.put(key, resourceConsumptionsItem);
return this;
}
/**
- * A map of identifier and resource consumption.
- * @return resourceConsumption
+ * A map of user identifier and resource consumption.
+ * @return resourceConsumptions
*/
@javax.annotation.Nullable
- @JsonProperty(JSON_PROPERTY_RESOURCE_CONSUMPTION)
+ @JsonProperty(JSON_PROPERTY_RESOURCE_CONSUMPTIONS)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
- public Map<String, String> getResourceConsumption() {
- return resourceConsumption;
+ public Map<String, WorkerResourceConsumption> getResourceConsumptions() {
+ return resourceConsumptions;
}
- @JsonProperty(JSON_PROPERTY_RESOURCE_CONSUMPTION)
+ @JsonProperty(JSON_PROPERTY_RESOURCE_CONSUMPTIONS)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
- public void setResourceConsumption(Map<String, String> resourceConsumption) {
- this.resourceConsumption = resourceConsumption;
+ public void setResourceConsumptions(Map<String, WorkerResourceConsumption>
resourceConsumptions) {
+ this.resourceConsumptions = resourceConsumptions;
}
public WorkerData workerRef(String workerRef) {
@@ -481,7 +482,7 @@ public class WorkerData {
Objects.equals(this.lastHeartbeatTimestamp,
workerData.lastHeartbeatTimestamp) &&
Objects.equals(this.heartbeatElapsedSeconds,
workerData.heartbeatElapsedSeconds) &&
Objects.equals(this.diskInfos, workerData.diskInfos) &&
- Objects.equals(this.resourceConsumption,
workerData.resourceConsumption) &&
+ Objects.equals(this.resourceConsumptions,
workerData.resourceConsumptions) &&
Objects.equals(this.workerRef, workerData.workerRef) &&
Objects.equals(this.workerState, workerData.workerState) &&
Objects.equals(this.workerStateStartTime,
workerData.workerStateStartTime);
@@ -489,7 +490,7 @@ public class WorkerData {
@Override
public int hashCode() {
- return Objects.hash(host, rpcPort, pushPort, fetchPort, replicatePort,
internalPort, slotUsed, lastHeartbeatTimestamp, heartbeatElapsedSeconds,
diskInfos, resourceConsumption, workerRef, workerState, workerStateStartTime);
+ return Objects.hash(host, rpcPort, pushPort, fetchPort, replicatePort,
internalPort, slotUsed, lastHeartbeatTimestamp, heartbeatElapsedSeconds,
diskInfos, resourceConsumptions, workerRef, workerState, workerStateStartTime);
}
@Override
@@ -506,7 +507,7 @@ public class WorkerData {
sb.append(" lastHeartbeatTimestamp:
").append(toIndentedString(lastHeartbeatTimestamp)).append("\n");
sb.append(" heartbeatElapsedSeconds:
").append(toIndentedString(heartbeatElapsedSeconds)).append("\n");
sb.append(" diskInfos:
").append(toIndentedString(diskInfos)).append("\n");
- sb.append(" resourceConsumption:
").append(toIndentedString(resourceConsumption)).append("\n");
+ sb.append(" resourceConsumptions:
").append(toIndentedString(resourceConsumptions)).append("\n");
sb.append(" workerRef:
").append(toIndentedString(workerRef)).append("\n");
sb.append(" workerState:
").append(toIndentedString(workerState)).append("\n");
sb.append(" workerStateStartTime:
").append(toIndentedString(workerStateStartTime)).append("\n");
diff --git
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerInfoResponse.java
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerInfoResponse.java
index bae5e6ebd..cdbaf9ce7 100644
---
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerInfoResponse.java
+++
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerInfoResponse.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.annotation.JsonValue;
import java.util.HashMap;
import java.util.Map;
+import org.apache.celeborn.rest.v1.model.WorkerResourceConsumption;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -44,7 +45,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
WorkerInfoResponse.JSON_PROPERTY_LAST_HEARTBEAT_TIMESTAMP,
WorkerInfoResponse.JSON_PROPERTY_HEARTBEAT_ELAPSED_SECONDS,
WorkerInfoResponse.JSON_PROPERTY_DISK_INFOS,
- WorkerInfoResponse.JSON_PROPERTY_RESOURCE_CONSUMPTION,
+ WorkerInfoResponse.JSON_PROPERTY_RESOURCE_CONSUMPTIONS,
WorkerInfoResponse.JSON_PROPERTY_WORKER_REF,
WorkerInfoResponse.JSON_PROPERTY_WORKER_STATE,
WorkerInfoResponse.JSON_PROPERTY_WORKER_STATE_START_TIME,
@@ -84,8 +85,8 @@ public class WorkerInfoResponse {
public static final String JSON_PROPERTY_DISK_INFOS = "diskInfos";
private Map<String, String> diskInfos = new HashMap<>();
- public static final String JSON_PROPERTY_RESOURCE_CONSUMPTION =
"resourceConsumption";
- private Map<String, String> resourceConsumption = new HashMap<>();
+ public static final String JSON_PROPERTY_RESOURCE_CONSUMPTIONS =
"resourceConsumptions";
+ private Map<String, WorkerResourceConsumption> resourceConsumptions = new
HashMap<>();
public static final String JSON_PROPERTY_WORKER_REF = "workerRef";
private String workerRef;
@@ -366,37 +367,37 @@ public class WorkerInfoResponse {
this.diskInfos = diskInfos;
}
- public WorkerInfoResponse resourceConsumption(Map<String, String>
resourceConsumption) {
+ public WorkerInfoResponse resourceConsumptions(Map<String,
WorkerResourceConsumption> resourceConsumptions) {
- this.resourceConsumption = resourceConsumption;
+ this.resourceConsumptions = resourceConsumptions;
return this;
}
- public WorkerInfoResponse putResourceConsumptionItem(String key, String
resourceConsumptionItem) {
- if (this.resourceConsumption == null) {
- this.resourceConsumption = new HashMap<>();
+ public WorkerInfoResponse putResourceConsumptionsItem(String key,
WorkerResourceConsumption resourceConsumptionsItem) {
+ if (this.resourceConsumptions == null) {
+ this.resourceConsumptions = new HashMap<>();
}
- this.resourceConsumption.put(key, resourceConsumptionItem);
+ this.resourceConsumptions.put(key, resourceConsumptionsItem);
return this;
}
/**
- * A map of identifier and resource consumption.
- * @return resourceConsumption
+ * A map of user identifier and resource consumption.
+ * @return resourceConsumptions
*/
@javax.annotation.Nullable
- @JsonProperty(JSON_PROPERTY_RESOURCE_CONSUMPTION)
+ @JsonProperty(JSON_PROPERTY_RESOURCE_CONSUMPTIONS)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
- public Map<String, String> getResourceConsumption() {
- return resourceConsumption;
+ public Map<String, WorkerResourceConsumption> getResourceConsumptions() {
+ return resourceConsumptions;
}
- @JsonProperty(JSON_PROPERTY_RESOURCE_CONSUMPTION)
+ @JsonProperty(JSON_PROPERTY_RESOURCE_CONSUMPTIONS)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
- public void setResourceConsumption(Map<String, String> resourceConsumption) {
- this.resourceConsumption = resourceConsumption;
+ public void setResourceConsumptions(Map<String, WorkerResourceConsumption>
resourceConsumptions) {
+ this.resourceConsumptions = resourceConsumptions;
}
public WorkerInfoResponse workerRef(String workerRef) {
@@ -568,7 +569,7 @@ public class WorkerInfoResponse {
Objects.equals(this.lastHeartbeatTimestamp,
workerInfoResponse.lastHeartbeatTimestamp) &&
Objects.equals(this.heartbeatElapsedSeconds,
workerInfoResponse.heartbeatElapsedSeconds) &&
Objects.equals(this.diskInfos, workerInfoResponse.diskInfos) &&
- Objects.equals(this.resourceConsumption,
workerInfoResponse.resourceConsumption) &&
+ Objects.equals(this.resourceConsumptions,
workerInfoResponse.resourceConsumptions) &&
Objects.equals(this.workerRef, workerInfoResponse.workerRef) &&
Objects.equals(this.workerState, workerInfoResponse.workerState) &&
Objects.equals(this.workerStateStartTime,
workerInfoResponse.workerStateStartTime) &&
@@ -579,7 +580,7 @@ public class WorkerInfoResponse {
@Override
public int hashCode() {
- return Objects.hash(host, rpcPort, pushPort, fetchPort, replicatePort,
internalPort, slotUsed, lastHeartbeatTimestamp, heartbeatElapsedSeconds,
diskInfos, resourceConsumption, workerRef, workerState, workerStateStartTime,
isRegistered, isShutdown, isDecommissioning);
+ return Objects.hash(host, rpcPort, pushPort, fetchPort, replicatePort,
internalPort, slotUsed, lastHeartbeatTimestamp, heartbeatElapsedSeconds,
diskInfos, resourceConsumptions, workerRef, workerState, workerStateStartTime,
isRegistered, isShutdown, isDecommissioning);
}
@Override
@@ -596,7 +597,7 @@ public class WorkerInfoResponse {
sb.append(" lastHeartbeatTimestamp:
").append(toIndentedString(lastHeartbeatTimestamp)).append("\n");
sb.append(" heartbeatElapsedSeconds:
").append(toIndentedString(heartbeatElapsedSeconds)).append("\n");
sb.append(" diskInfos:
").append(toIndentedString(diskInfos)).append("\n");
- sb.append(" resourceConsumption:
").append(toIndentedString(resourceConsumption)).append("\n");
+ sb.append(" resourceConsumptions:
").append(toIndentedString(resourceConsumptions)).append("\n");
sb.append(" workerRef:
").append(toIndentedString(workerRef)).append("\n");
sb.append(" workerState:
").append(toIndentedString(workerState)).append("\n");
sb.append(" workerStateStartTime:
").append(toIndentedString(workerStateStartTime)).append("\n");
diff --git
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerResourceConsumption.java
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerResourceConsumption.java
new file mode 100644
index 000000000..8a1c6c7ff
--- /dev/null
+++
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerResourceConsumption.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.celeborn.rest.v1.model;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonValue;
+import java.util.HashMap;
+import java.util.Map;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * WorkerResourceConsumption
+ */
+@JsonPropertyOrder({
+ WorkerResourceConsumption.JSON_PROPERTY_DISK_BYTES_WRITTEN,
+ WorkerResourceConsumption.JSON_PROPERTY_DISK_FILE_COUNT,
+ WorkerResourceConsumption.JSON_PROPERTY_HDFS_BYTES_WRITTEN,
+ WorkerResourceConsumption.JSON_PROPERTY_HDFS_FILE_COUNT,
+ WorkerResourceConsumption.JSON_PROPERTY_SUB_RESOURCE_CONSUMPTION
+})
[email protected](value =
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator
version: 7.8.0")
+public class WorkerResourceConsumption {
+ public static final String JSON_PROPERTY_DISK_BYTES_WRITTEN =
"diskBytesWritten";
+ private Long diskBytesWritten;
+
+ public static final String JSON_PROPERTY_DISK_FILE_COUNT = "diskFileCount";
+ private Long diskFileCount;
+
+ public static final String JSON_PROPERTY_HDFS_BYTES_WRITTEN =
"hdfsBytesWritten";
+ private Long hdfsBytesWritten;
+
+ public static final String JSON_PROPERTY_HDFS_FILE_COUNT = "hdfsFileCount";
+ private Long hdfsFileCount;
+
+ public static final String JSON_PROPERTY_SUB_RESOURCE_CONSUMPTION =
"subResourceConsumption";
+ private Map<String, WorkerResourceConsumption> subResourceConsumption = new
HashMap<>();
+
+ public WorkerResourceConsumption() {
+ }
+
+ public WorkerResourceConsumption diskBytesWritten(Long diskBytesWritten) {
+
+ this.diskBytesWritten = diskBytesWritten;
+ return this;
+ }
+
+ /**
+ * Get diskBytesWritten
+ * @return diskBytesWritten
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_DISK_BYTES_WRITTEN)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public Long getDiskBytesWritten() {
+ return diskBytesWritten;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_DISK_BYTES_WRITTEN)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setDiskBytesWritten(Long diskBytesWritten) {
+ this.diskBytesWritten = diskBytesWritten;
+ }
+
+ public WorkerResourceConsumption diskFileCount(Long diskFileCount) {
+
+ this.diskFileCount = diskFileCount;
+ return this;
+ }
+
+ /**
+ * Get diskFileCount
+ * @return diskFileCount
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_DISK_FILE_COUNT)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public Long getDiskFileCount() {
+ return diskFileCount;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_DISK_FILE_COUNT)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setDiskFileCount(Long diskFileCount) {
+ this.diskFileCount = diskFileCount;
+ }
+
+ public WorkerResourceConsumption hdfsBytesWritten(Long hdfsBytesWritten) {
+
+ this.hdfsBytesWritten = hdfsBytesWritten;
+ return this;
+ }
+
+ /**
+ * Get hdfsBytesWritten
+ * @return hdfsBytesWritten
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_HDFS_BYTES_WRITTEN)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public Long getHdfsBytesWritten() {
+ return hdfsBytesWritten;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_HDFS_BYTES_WRITTEN)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setHdfsBytesWritten(Long hdfsBytesWritten) {
+ this.hdfsBytesWritten = hdfsBytesWritten;
+ }
+
+ public WorkerResourceConsumption hdfsFileCount(Long hdfsFileCount) {
+
+ this.hdfsFileCount = hdfsFileCount;
+ return this;
+ }
+
+ /**
+ * Get hdfsFileCount
+ * @return hdfsFileCount
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_HDFS_FILE_COUNT)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public Long getHdfsFileCount() {
+ return hdfsFileCount;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_HDFS_FILE_COUNT)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setHdfsFileCount(Long hdfsFileCount) {
+ this.hdfsFileCount = hdfsFileCount;
+ }
+
+ public WorkerResourceConsumption subResourceConsumption(Map<String,
WorkerResourceConsumption> subResourceConsumption) {
+
+ this.subResourceConsumption = subResourceConsumption;
+ return this;
+ }
+
+ public WorkerResourceConsumption putSubResourceConsumptionItem(String key,
WorkerResourceConsumption subResourceConsumptionItem) {
+ if (this.subResourceConsumption == null) {
+ this.subResourceConsumption = new HashMap<>();
+ }
+ this.subResourceConsumption.put(key, subResourceConsumptionItem);
+ return this;
+ }
+
+ /**
+ * Get subResourceConsumption
+ * @return subResourceConsumption
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_SUB_RESOURCE_CONSUMPTION)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public Map<String, WorkerResourceConsumption> getSubResourceConsumption() {
+ return subResourceConsumption;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_SUB_RESOURCE_CONSUMPTION)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setSubResourceConsumption(Map<String, WorkerResourceConsumption>
subResourceConsumption) {
+ this.subResourceConsumption = subResourceConsumption;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WorkerResourceConsumption workerResourceConsumption =
(WorkerResourceConsumption) o;
+ return Objects.equals(this.diskBytesWritten,
workerResourceConsumption.diskBytesWritten) &&
+ Objects.equals(this.diskFileCount,
workerResourceConsumption.diskFileCount) &&
+ Objects.equals(this.hdfsBytesWritten,
workerResourceConsumption.hdfsBytesWritten) &&
+ Objects.equals(this.hdfsFileCount,
workerResourceConsumption.hdfsFileCount) &&
+ Objects.equals(this.subResourceConsumption,
workerResourceConsumption.subResourceConsumption);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(diskBytesWritten, diskFileCount, hdfsBytesWritten,
hdfsFileCount, subResourceConsumption);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("class WorkerResourceConsumption {\n");
+ sb.append(" diskBytesWritten:
").append(toIndentedString(diskBytesWritten)).append("\n");
+ sb.append(" diskFileCount:
").append(toIndentedString(diskFileCount)).append("\n");
+ sb.append(" hdfsBytesWritten:
").append(toIndentedString(hdfsBytesWritten)).append("\n");
+ sb.append(" hdfsFileCount:
").append(toIndentedString(hdfsFileCount)).append("\n");
+ sb.append(" subResourceConsumption:
").append(toIndentedString(subResourceConsumption)).append("\n");
+ sb.append("}");
+ return sb.toString();
+ }
+
+ /**
+ * Convert the given object to string with each line indented by 4 spaces
+ * (except the first line).
+ */
+ private String toIndentedString(Object o) {
+ if (o == null) {
+ return "null";
+ }
+ return o.toString().replace("\n", "\n ");
+ }
+
+}
+
diff --git a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
index 62ee5efc9..6ba0b40b3 100644
--- a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
+++ b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
@@ -781,11 +781,11 @@ components:
description: A map of disk name and disk info.
additionalProperties:
type: string
- resourceConsumption:
+ resourceConsumptions:
type: object
- description: A map of identifier and resource consumption.
+ description: A map of user identifier and resource consumption.
additionalProperties:
- type: string
+ $ref: '#/components/schemas/WorkerResourceConsumption'
workerRef:
type: string
description: The reference of the worker.
@@ -803,6 +803,26 @@ components:
- fetchPort
- replicatePort
+ WorkerResourceConsumption:
+ type: object
+ properties:
+ diskBytesWritten:
+ type: integer
+ format: int64
+ diskFileCount:
+ type: integer
+ format: int64
+ hdfsBytesWritten:
+ type: integer
+ format: int64
+ hdfsFileCount:
+ type: integer
+ format: int64
+ subResourceConsumption:
+ type: object
+ additionalProperties:
+ $ref: '#/components/schemas/WorkerResourceConsumption'
+
WorkerTimestampData:
type: object
properties:
diff --git a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
index 630ea5da6..76875fa60 100644
--- a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
+++ b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
@@ -384,11 +384,11 @@ components:
description: A map of disk name and disk info.
additionalProperties:
type: string
- resourceConsumption:
+ resourceConsumptions:
type: object
- description: A map of identifier and resource consumption.
+ description: A map of user identifier and resource consumption.
additionalProperties:
- type: string
+ $ref: '#/components/schemas/WorkerResourceConsumption'
workerRef:
type: string
description: The reference of the worker.
@@ -492,11 +492,11 @@ components:
description: A map of disk name and disk info.
additionalProperties:
type: string
- resourceConsumption:
+ resourceConsumptions:
type: object
- description: A map of identifier and resource consumption.
+ description: A map of user identifier and resource consumption.
additionalProperties:
- type: string
+ $ref: '#/components/schemas/WorkerResourceConsumption'
workerRef:
type: string
description: The reference of the worker.
@@ -514,6 +514,26 @@ components:
- fetchPort
- replicatePort
+ WorkerResourceConsumption:
+ type: object
+ properties:
+ diskBytesWritten:
+ type: integer
+ format: int64
+ diskFileCount:
+ type: integer
+ format: int64
+ hdfsBytesWritten:
+ type: integer
+ format: int64
+ hdfsFileCount:
+ type: integer
+ format: int64
+ subResourceConsumption:
+ type: object
+ additionalProperties:
+ $ref: '#/components/schemas/WorkerResourceConsumption'
+
WorkerTimestampData:
type: object
properties:
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
index a4e4d2bca..d3209027b 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
@@ -17,6 +17,8 @@
package org.apache.celeborn.server.common.http.api.v1
+import java.util
+import java.util.{Map => JMap}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
@@ -24,6 +26,7 @@ import scala.collection.JavaConverters._
import org.apache.celeborn.common.meta.{WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.protocol.{PartitionLocation, StorageInfo}
import org.apache.celeborn.common.protocol.PbWorkerStatus.State
+import org.apache.celeborn.common.util.CollectionUtils
import org.apache.celeborn.rest.v1.model._
import org.apache.celeborn.rest.v1.model.PartitionLocationData.{ModeEnum,
StorageEnum}
@@ -37,14 +40,6 @@ object ApiUtils {
disk -> diskInfo.toString()
}.toMap -> workerInfo.usedSlots()
}
- val userResourceConsumption =
- if (workerInfo.userResourceConsumption == null) {
- Map.empty[String, String]
- } else {
- workerInfo.userResourceConsumption.asScala.map { case (user,
resourceConsumption) =>
- user.toString -> resourceConsumption.toString()
- }.toMap
- }
new WorkerData()
.host(workerInfo.host)
@@ -58,12 +53,45 @@ object ApiUtils {
.heartbeatElapsedSeconds(TimeUnit.MILLISECONDS.toSeconds(
System.currentTimeMillis() - workerInfo.lastHeartbeat))
.diskInfos(diskInfos.asJava)
- .resourceConsumption(userResourceConsumption.asJava)
+ .resourceConsumptions(workerResourceConsumptions(workerInfo))
.workerRef(Option(workerInfo.endpoint).map(_.toString).orNull)
.workerState(workerInfo.workerStatus.getState.toString)
.workerStateStartTime(workerInfo.workerStatus.getStateStartTime)
}
+ private def workerResourceConsumptions(workerInfo: WorkerInfo)
+ : JMap[String, WorkerResourceConsumption] = {
+ val workerResourceConsumptions = new util.HashMap[String,
WorkerResourceConsumption]()
+ if (CollectionUtils.isNotEmpty(workerInfo.userResourceConsumption)) {
+ workerInfo.userResourceConsumption.asScala.foreach {
+ case (userIdentifier, resourceConsumption) =>
+ val workerConsumption = new WorkerResourceConsumption()
+ .diskBytesWritten(resourceConsumption.diskBytesWritten)
+ .diskFileCount(resourceConsumption.diskFileCount)
+ .hdfsBytesWritten(resourceConsumption.hdfsBytesWritten)
+ .hdfsFileCount(resourceConsumption.hdfsFileCount)
+
+ if
(CollectionUtils.isNotEmpty(resourceConsumption.subResourceConsumptions)) {
+ val subConsumptions = new util.HashMap[String,
WorkerResourceConsumption]()
+ resourceConsumption.subResourceConsumptions.asScala.foreach {
+ case (subIdentifier, subConsumption) =>
+ subConsumptions.put(
+ subIdentifier,
+ new WorkerResourceConsumption()
+ .diskBytesWritten(subConsumption.diskBytesWritten)
+ .diskFileCount(subConsumption.diskFileCount)
+ .hdfsBytesWritten(subConsumption.hdfsBytesWritten)
+ .hdfsFileCount(subConsumption.hdfsFileCount))
+ }
+ workerConsumption.subResourceConsumption(subConsumptions)
+ }
+
+ workerResourceConsumptions.put(userIdentifier.toString,
workerConsumption)
+ }
+ }
+ workerResourceConsumptions
+ }
+
def workerInfoResponse(
workerInfo: WorkerInfo,
currentStatus: WorkerStatus,
@@ -81,7 +109,7 @@ object ApiUtils {
.lastHeartbeatTimestamp(workerData.getLastHeartbeatTimestamp)
.heartbeatElapsedSeconds(workerData.getHeartbeatElapsedSeconds)
.diskInfos(workerData.getDiskInfos)
- .resourceConsumption(workerData.getResourceConsumption)
+ .resourceConsumptions(workerData.getResourceConsumptions)
.workerRef(workerData.getWorkerRef)
.workerState(currentStatus.getState.toString)
.workerStateStartTime(currentStatus.getStateStartTime)