This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ebfa1d8cf [CELEBORN-2014] updateInterruptionNotice REST API
ebfa1d8cf is described below
commit ebfa1d8cf418d4f30f53978ff37c9199748c348e
Author: Aravind Patnam <[email protected]>
AuthorDate: Fri Jun 6 14:07:49 2025 +0800
[CELEBORN-2014] updateInterruptionNotice REST API
### What changes were proposed in this pull request?
This PR is part of [CIP17: Interruption Aware Slot
Selection](https://cwiki.apache.org/confluence/display/CELEBORN/CIP-17%3A+Interruption+Aware+Slot+Selection).
It introduces a REST api for external services to notify master about
interruptions/schedules.
### Why are the changes needed?
To nofify master of upcoming interruption notices in the worker fleet.
Master can then use these to proactively deprioritize workers that might be in
scope for interruption sooner.
### Does this PR introduce _any_ user-facing change?
new rest api
### How was this patch tested?
added unit tests.
Closes #3285 from akpatnam25/CELEBORN-2014.
Authored-by: Aravind Patnam <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../apache/celeborn/cli/master/MasterOptions.scala | 6 +
.../celeborn/cli/master/MasterSubcommand.scala | 2 +
.../celeborn/cli/master/MasterSubcommandImpl.scala | 57 +++++++--
.../celeborn/cli/TestCelebornCliCommands.scala | 27 ++++
common/src/main/proto/TransportMessages.proto | 1 +
.../apache/celeborn/common/meta/WorkerInfo.scala | 1 +
.../apache/celeborn/common/util/PbSerDeUtils.scala | 7 ++
.../celeborn/common/util/PbSerDeUtilsTest.scala | 1 +
docs/celeborn_cli.md | 13 +-
.../celeborn/service/deploy/master/Master.scala | 20 +++
.../deploy/master/http/api/v1/WorkerResource.scala | 19 ++-
.../apache/celeborn/rest/v1/master/WorkerApi.java | 70 +++++++++++
.../v1/model/UpdateInterruptionNoticeRequest.java | 120 ++++++++++++++++++
.../apache/celeborn/rest/v1/model/WorkerData.java | 33 ++++-
.../celeborn/rest/v1/model/WorkerInfoResponse.java | 68 +++++++++-
.../rest/v1/model/WorkerInterruptionNotice.java | 140 +++++++++++++++++++++
.../src/main/openapi3/master_rest_v1.yaml | 48 +++++++
.../src/main/openapi3/worker_rest_v1.yaml | 11 ++
.../celeborn/server/common/HttpService.scala | 3 +
.../server/common/http/api/ApiRequestContext.scala | 2 +-
.../server/common/http/api/v1/ApiUtils.scala | 3 +
21 files changed, 632 insertions(+), 20 deletions(-)
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
index 7621a305f..28e8b4999 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala
@@ -120,4 +120,10 @@ final class MasterOptions {
names = Array("--delete-apps"),
description = Array("Delete resource of an application."))
private[master] var deleteApps: Boolean = _
+
+ @Option(
+ names = Array("--update-interruption-notices"),
+ paramLabel = "workerId1=timestamp,workerId2=timestamp,workerId3=timestamp",
+ description = Array("Update interruption notices of workers."))
+ private[master] var updateInterruptionNotices: String = _
}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
index d976f0c6b..a688930ca 100644
--- a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
+++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala
@@ -115,4 +115,6 @@ trait MasterSubcommand extends CliLogging {
private[master] def deleteApps: HandleResponse
+ private[master] def updateInterruptionNotices: HandleResponse
+
}
diff --git
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
index c32b95bbc..ed16c0d84 100644
---
a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
+++
b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala
@@ -20,6 +20,7 @@ package org.apache.celeborn.cli.master
import java.util
import scala.collection.JavaConverters._
+import scala.util.Try
import org.apache.commons.lang3.StringUtils
import picocli.CommandLine.{Command, ParameterException}
@@ -54,6 +55,8 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
if (masterOptions.showThreadDump) log(runShowThreadDump)
if (masterOptions.reviseLostShuffles) log(reviseLostShuffles)
if (masterOptions.deleteApps) log(deleteApps)
+ if (!StringUtils.isBlank(masterOptions.updateInterruptionNotices))
+ log(updateInterruptionNotices)
if (masterOptions.addClusterAlias != null &&
masterOptions.addClusterAlias.nonEmpty)
runAddClusterAlias
if (masterOptions.removeClusterAlias != null &&
masterOptions.removeClusterAlias.nonEmpty)
@@ -185,20 +188,22 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
workerIds
.trim
.split(",")
- .map(workerId => {
- val splitWorkerId = workerId.split(":")
- val host = splitWorkerId(0)
- val rpcPort = splitWorkerId(1).toInt
- val pushPort = splitWorkerId(2).toInt
- val fetchPort = splitWorkerId(3).toInt
- val replicatePort = splitWorkerId(4).toInt
- new
WorkerId().host(host).rpcPort(rpcPort).pushPort(pushPort).fetchPort(
- fetchPort).replicatePort(replicatePort)
- })
+ .map(toWorkerId)
.toList
.asJava
}
+ private[master] def toWorkerId(workerIdString: String): WorkerId = {
+ val splitWorkerId = workerIdString.split(":")
+ val host = splitWorkerId(0)
+ val rpcPort = splitWorkerId(1).toInt
+ val pushPort = splitWorkerId(2).toInt
+ val fetchPort = splitWorkerId(3).toInt
+ val replicatePort = splitWorkerId(4).toInt
+ new WorkerId().host(host).rpcPort(rpcPort).pushPort(pushPort).fetchPort(
+ fetchPort).replicatePort(replicatePort)
+ }
+
private[master] def runShowConf: ConfResponse =
confApi.getConf(commonOptions.getAuthHeader)
private[master] def runShowDynamicConf: DynamicConfigResponse =
@@ -264,4 +269,36 @@ class MasterSubcommandImpl extends Runnable with
MasterSubcommand {
val request = new DeleteAppsRequest().apps(appIds)
applicationApi.deleteApps(request, commonOptions.getAuthHeader)
}
+
+ override private[master] def updateInterruptionNotices: HandleResponse = {
+ val workerInterruptionNotices = masterOptions.updateInterruptionNotices
+ .split(",")
+ .toList
+ .map { pair =>
+ val parts = pair.split("=", 2)
+ if (parts.length != 2) {
+ throw new ParameterException(
+ spec.commandLine(),
+ s"Invalid format for interruption notice: '$pair'. Expected
format: workerId=timestamp")
+ }
+ val workerIdStr = parts(0)
+ val timestampStr = parts(1)
+ val timestamp =
+ try {
+ timestampStr.toLong
+ } catch {
+ case _: NumberFormatException =>
+ throw new ParameterException(
+ spec.commandLine(),
+ s"Invalid timestamp for worker '$workerIdStr': '$timestampStr'
is not a valid long")
+ }
+ new WorkerInterruptionNotice()
+ .workerId(toWorkerId(workerIdStr))
+ .interruptionTimestamp(timestamp)
+ }
+
+ val request = new
UpdateInterruptionNoticeRequest().workers(workerInterruptionNotices.asJava)
+ workerApi.updateInterruptionNotice(request, commonOptions.getAuthHeader)
+ }
+
}
diff --git
a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
index e47d8cec7..45b7b766f 100644
--- a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
+++ b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala
@@ -291,6 +291,20 @@ class TestCelebornCliCommands extends CelebornFunSuite
with MiniClusterFeature {
captureOutputAndValidateResponse(args, "success: true")
}
+ test("master --update-interruption-notices legal input") {
+ val args = prepareMasterArgs() ++ Array(
+ "--update-interruption-notices",
+ s"${getWorkerId()}=${Long.MaxValue}")
+ captureOutputAndValidateResponse(args, "success: true")
+ }
+
+ test("master --update-interruption-notices illegal input") {
+ val args = prepareMasterArgs() ++ Array(
+ "--update-interruption-notices",
+ s"${getWorkerId()}=illegalInput")
+ captureErrorAndValidateResponse(args, "Invalid timestamp for worker")
+ }
+
private def prepareMasterArgs(): Array[String] = {
Array(
"master",
@@ -321,6 +335,19 @@ class TestCelebornCliCommands extends CelebornFunSuite
with MiniClusterFeature {
assert(stdout.nonEmpty && stdout.contains(stdoutValidationString))
}
+ private def captureErrorAndValidateResponse(
+ args: Array[String],
+ stderrValidationString: String): Unit = {
+ val stderrStream = new ByteArrayOutputStream()
+ val stderrPrintStream = new PrintStream(stderrStream)
+ System.setErr(new PrintStream(stderrStream))
+ Console.withErr(stderrPrintStream) {
+ CelebornCli.main(args)
+ }
+ val stderr = stderrStream.toString
+ assert(stderr.nonEmpty && stderr.contains(stderrValidationString))
+ }
+
private def getWorkerId(): String = {
s"${worker.workerArgs.host}:${worker.rpcEnv.address.port}:${worker.getPushFetchServerPort._1}"
+
s":${worker.getPushFetchServerPort._2}:${worker.replicateServer.getPort}"
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index d53d56842..9ecb05051 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -185,6 +185,7 @@ message PbWorkerInfo {
map<string, PbResourceConsumption> userResourceConsumption = 7;
int32 internalPort = 8;
string networkLocation = 9;
+ int64 nextInterruptionNotice = 10; // Unix timestamp when disruption is
expected to be initiated
}
message PbFileGroup {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 84be612da..d6078ba75 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -44,6 +44,7 @@ class WorkerInfo(
_userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption])
extends Serializable
with Logging {
var networkLocation = NetworkTopology.DEFAULT_RACK
+ var nextInterruptionNotice = Long.MaxValue
var lastHeartbeat: Long = 0
var workerStatus = WorkerStatus.normalWorkerStatus()
val diskInfos = {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 7f51e158f..6b2c9bde7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -278,6 +278,12 @@ object PbSerDeUtils {
if (masterPersistWorkerNetworkLocation) {
workerInfo.networkLocation = pbWorkerInfo.getNetworkLocation
}
+ // If the next interruption notice is not specified in the message, set to
max.
+ if (pbWorkerInfo.getNextInterruptionNotice == 0) {
+ workerInfo.nextInterruptionNotice = Long.MaxValue
+ } else {
+ workerInfo.nextInterruptionNotice =
pbWorkerInfo.getNextInterruptionNotice
+ }
workerInfo
}
@@ -295,6 +301,7 @@ object PbSerDeUtils {
if (masterPersistWorkerNetworkLocation) {
builder.setNetworkLocation(workerInfo.networkLocation)
}
+ builder.setNextInterruptionNotice(workerInfo.nextInterruptionNotice)
if (!eliminateUserResourceConsumption) {
builder.putAllUserResourceConsumption(
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index 9d4059ff9..e8252c4d2 100644
---
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -126,6 +126,7 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
diskInfos,
userResourceConsumption)
workerInfo1.networkLocation = "/1"
+ workerInfo1.nextInterruptionNotice = 10000
val workerInfo2 =
new WorkerInfo(
"localhost",
diff --git a/docs/celeborn_cli.md b/docs/celeborn_cli.md
index c73f4bf58..1619a02f5 100644
--- a/docs/celeborn_cli.md
+++ b/docs/celeborn_cli.md
@@ -85,8 +85,8 @@ Usage: celeborn-cli master [-hV] [--apps=appId]
[--auth-header=authHeader]
[--worker-ids=w1,w2,w3...] (--show-masters-info |
--show-cluster-apps | --show-cluster-shuffles |
--exclude-worker | --remove-excluded-worker |
- --send-worker-event=IMMEDIATELY | DECOMMISSION |
- DECOMMISSION_THEN_IDLE | GRACEFUL | RECOMMISSION |
+ --send-worker-event=IMMEDIATELY | DECOMMISSION |
+ DECOMMISSION_THEN_IDLE | GRACEFUL | RECOMMISSION |
NONE | --show-worker-event-info |
--show-lost-workers | --show-excluded-workers |
--show-manual-excluded-workers |
@@ -98,7 +98,9 @@ Usage: celeborn-cli master [-hV] [--apps=appId]
[--auth-header=authHeader]
--show-container-info | --add-cluster-alias=alias |
--remove-cluster-alias=alias |
--remove-workers-unavailable-info |
- --revise-lost-shuffles | --delete-apps)
+ --revise-lost-shuffles | --delete-apps |
+ --update-interruption-notices=workerId1=timestamp,
+ workerId2=timestamp,workerId3=timestamp)
[[--shuffleIds=<shuffleIds>]]
--add-cluster-alias=alias
Add alias to use in the cli for the given set of
@@ -131,7 +133,7 @@ Usage: celeborn-cli master [-hV] [--apps=appId]
[--auth-header=authHeader]
master.
--revise-lost-shuffles Revise lost shuffles or remove shuffles for an
application.
- --send-worker-event=IMMEDIATELY | DECOMMISSION | DECOMMISSION_THEN_IDLE
|
+ --send-worker-event=IMMEDIATELY | DECOMMISSION | DECOMMISSION_THEN_IDLE |
GRACEFUL | RECOMMISSION | NONE
Send an event to a worker
--show-cluster-apps Show cluster applications
@@ -160,6 +162,9 @@ Usage: celeborn-cli master [-hV] [--apps=appId]
[--auth-header=authHeader]
Show registered workers topology
--shuffleIds=<shuffleIds>
The shuffle ids to manipulate.
+ --update-interruption-notices=workerId1=timestamp,workerId2=timestamp,
+ workerId3=timestamp
+ Update interruption notices of workers.
-V, --version Print version information and exit.
--worker-ids=w1,w2,w3...
List of workerIds to pass to the command. Each
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index a91996986..5f63b81b3 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -1301,6 +1301,26 @@ private[celeborn] class Master(
}
}
+ override def updateInterruptionNotice(workerInterruptionNotices: Map[String,
Long])
+ : HandleResponse = {
+ try {
+ statusSystem.workersMap.values().asScala.foreach { workerInfo =>
+ workerInterruptionNotices.get(workerInfo.toUniqueId) match {
+ case Some(update) => workerInfo.nextInterruptionNotice = update
+ case None => workerInfo.nextInterruptionNotice = Long.MaxValue
+ }
+ }
+ true -> "updateInterruptionNotice successful."
+ } catch {
+ case e: Throwable =>
+ val errorMessage =
+ s"updateInterruptionNotice for
${workerInterruptionNotices.keys.mkString(",")}" +
+ s" failed, message: ${e.getMessage}"
+ logError(errorMessage, e)
+ false -> errorMessage
+ }
+ }
+
override def getWorkerInfo: String = {
val sb = new StringBuilder
sb.append("====================== Workers Info in Master
=========================\n")
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
index 109c8b0e7..85bf7a77d 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
@@ -17,7 +17,7 @@
package org.apache.celeborn.service.deploy.master.http.api.v1
-import javax.ws.rs.{BadRequestException, Consumes, GET, Path, POST, Produces}
+import javax.ws.rs.{BadRequestException, Consumes, GET, Path, POST, Produces,
PUT}
import javax.ws.rs.core.MediaType
import scala.collection.JavaConverters._
@@ -152,6 +152,23 @@ class WorkerResource extends ApiRequestContext {
new HandleResponse().success(success).message(finalMsg)
}
+ @PUT
+ @Path("/updateInterruptionNotice")
+ def updateInterruptionNotice(request: UpdateInterruptionNoticeRequest):
HandleResponse = {
+ ensureMasterIsLeader(master) {
+ if (request.getWorkers.isEmpty) {
+ logWarning("Workers interruption notice list is empty.")
+ }
+ val interruptionNotices: Map[String, Long] =
+ request.getWorkers.asScala.map(worker =>
+ (
+ ApiUtils.toWorkerInfo(worker.getWorkerId).toUniqueId,
+ worker.getInterruptionTimestamp.toLong)).toMap
+ val (success, msg) =
httpService.updateInterruptionNotice(interruptionNotices)
+ new HandleResponse().success(success).message(msg)
+ }
+ }
+
@Operation(description = "List all worker topology info of the master.")
@ApiResponse(
responseCode = "200",
diff --git
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/WorkerApi.java
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/WorkerApi.java
index 3dd1ee07c..2ac818fb4 100644
---
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/WorkerApi.java
+++
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/WorkerApi.java
@@ -30,6 +30,7 @@ import org.apache.celeborn.rest.v1.model.HandleResponse;
import org.apache.celeborn.rest.v1.model.RemoveWorkersUnavailableInfoRequest;
import org.apache.celeborn.rest.v1.model.SendWorkerEventRequest;
import org.apache.celeborn.rest.v1.model.TopologyResponse;
+import org.apache.celeborn.rest.v1.model.UpdateInterruptionNoticeRequest;
import org.apache.celeborn.rest.v1.model.WorkerEventsResponse;
import org.apache.celeborn.rest.v1.model.WorkersResponse;
@@ -460,6 +461,75 @@ public class WorkerApi extends BaseApi {
);
}
+ /**
+ *
+ * Update the master with worker disruption info to be used during slot
allocation.
+ * @param updateInterruptionNoticeRequest (optional)
+ * @return HandleResponse
+ * @throws ApiException if fails to make API call
+ */
+ public HandleResponse
updateInterruptionNotice(UpdateInterruptionNoticeRequest
updateInterruptionNoticeRequest) throws ApiException {
+ return this.updateInterruptionNotice(updateInterruptionNoticeRequest,
Collections.emptyMap());
+ }
+
+
+ /**
+ *
+ * Update the master with worker disruption info to be used during slot
allocation.
+ * @param updateInterruptionNoticeRequest (optional)
+ * @param additionalHeaders additionalHeaders for this call
+ * @return HandleResponse
+ * @throws ApiException if fails to make API call
+ */
+ public HandleResponse
updateInterruptionNotice(UpdateInterruptionNoticeRequest
updateInterruptionNoticeRequest, Map<String, String> additionalHeaders) throws
ApiException {
+ Object localVarPostBody = updateInterruptionNoticeRequest;
+
+ // create path and map variables
+ String localVarPath = "/api/v1/workers/updateInterruptionNotice";
+
+ StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
+ String localVarQueryParameterBaseName;
+ List<Pair> localVarQueryParams = new ArrayList<Pair>();
+ List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
+ Map<String, String> localVarHeaderParams = new HashMap<String, String>();
+ Map<String, String> localVarCookieParams = new HashMap<String, String>();
+ Map<String, Object> localVarFormParams = new HashMap<String, Object>();
+
+
+ localVarHeaderParams.putAll(additionalHeaders);
+
+
+
+ final String[] localVarAccepts = {
+ "application/json"
+ };
+ final String localVarAccept =
apiClient.selectHeaderAccept(localVarAccepts);
+
+ final String[] localVarContentTypes = {
+ "application/json"
+ };
+ final String localVarContentType =
apiClient.selectHeaderContentType(localVarContentTypes);
+
+ String[] localVarAuthNames = new String[] { "basic" };
+
+ TypeReference<HandleResponse> localVarReturnType = new
TypeReference<HandleResponse>() {};
+ return apiClient.invokeAPI(
+ localVarPath,
+ "PUT",
+ localVarQueryParams,
+ localVarCollectionQueryParams,
+ localVarQueryStringJoiner.toString(),
+ localVarPostBody,
+ localVarHeaderParams,
+ localVarCookieParams,
+ localVarFormParams,
+ localVarAccept,
+ localVarContentType,
+ localVarAuthNames,
+ localVarReturnType
+ );
+ }
+
@Override
public <T> T invokeAPI(String url, String method, Object request,
TypeReference<T> returnType, Map<String, String> additionalHeaders) throws
ApiException {
String localVarPath = url.replace(apiClient.getBaseURL(), "");
diff --git
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/UpdateInterruptionNoticeRequest.java
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/UpdateInterruptionNoticeRequest.java
new file mode 100644
index 000000000..d9be38fab
--- /dev/null
+++
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/UpdateInterruptionNoticeRequest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.celeborn.rest.v1.model;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonValue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.celeborn.rest.v1.model.WorkerInterruptionNotice;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * UpdateInterruptionNoticeRequest
+ */
+@JsonPropertyOrder({
+ UpdateInterruptionNoticeRequest.JSON_PROPERTY_WORKERS
+})
[email protected](value =
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator
version: 7.8.0")
+public class UpdateInterruptionNoticeRequest {
+ public static final String JSON_PROPERTY_WORKERS = "workers";
+ private List<WorkerInterruptionNotice> workers = new ArrayList<>();
+
+ public UpdateInterruptionNoticeRequest() {
+ }
+
+ public UpdateInterruptionNoticeRequest
workers(List<WorkerInterruptionNotice> workers) {
+
+ this.workers = workers;
+ return this;
+ }
+
+ public UpdateInterruptionNoticeRequest
addWorkersItem(WorkerInterruptionNotice workersItem) {
+ if (this.workers == null) {
+ this.workers = new ArrayList<>();
+ }
+ this.workers.add(workersItem);
+ return this;
+ }
+
+ /**
+ * The workers to be removed from the master workers unavailable info.
+ * @return workers
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_WORKERS)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public List<WorkerInterruptionNotice> getWorkers() {
+ return workers;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_WORKERS)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setWorkers(List<WorkerInterruptionNotice> workers) {
+ this.workers = workers;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UpdateInterruptionNoticeRequest updateInterruptionNoticeRequest =
(UpdateInterruptionNoticeRequest) o;
+ return Objects.equals(this.workers,
updateInterruptionNoticeRequest.workers);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(workers);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("class UpdateInterruptionNoticeRequest {\n");
+ sb.append(" workers: ").append(toIndentedString(workers)).append("\n");
+ sb.append("}");
+ return sb.toString();
+ }
+
+ /**
+ * Convert the given object to string with each line indented by 4 spaces
+ * (except the first line).
+ */
+ private String toIndentedString(Object o) {
+ if (o == null) {
+ return "null";
+ }
+ return o.toString().replace("\n", "\n ");
+ }
+
+}
+
diff --git
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerData.java
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerData.java
index 618b25dbc..4d54a8a18 100644
---
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerData.java
+++
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerData.java
@@ -49,6 +49,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
WorkerData.JSON_PROPERTY_WORKER_REF,
WorkerData.JSON_PROPERTY_WORKER_STATE,
WorkerData.JSON_PROPERTY_WORKER_STATE_START_TIME,
+ WorkerData.JSON_PROPERTY_NEXT_INTERRUPTION_NOTICE,
WorkerData.JSON_PROPERTY_NETWORK_LOCATION
})
@javax.annotation.Generated(value =
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator
version: 7.8.0")
@@ -95,6 +96,9 @@ public class WorkerData {
public static final String JSON_PROPERTY_WORKER_STATE_START_TIME =
"workerStateStartTime";
private Long workerStateStartTime;
+ public static final String JSON_PROPERTY_NEXT_INTERRUPTION_NOTICE =
"nextInterruptionNotice";
+ private Long nextInterruptionNotice;
+
public static final String JSON_PROPERTY_NETWORK_LOCATION =
"networkLocation";
private String networkLocation;
@@ -467,6 +471,31 @@ public class WorkerData {
this.workerStateStartTime = workerStateStartTime;
}
+ public WorkerData nextInterruptionNotice(Long nextInterruptionNotice) {
+
+ this.nextInterruptionNotice = nextInterruptionNotice;
+ return this;
+ }
+
+ /**
+ * The next interruption notice of the worker.
+ * @return nextInterruptionNotice
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_NEXT_INTERRUPTION_NOTICE)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public Long getNextInterruptionNotice() {
+ return nextInterruptionNotice;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_NEXT_INTERRUPTION_NOTICE)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setNextInterruptionNotice(Long nextInterruptionNotice) {
+ this.nextInterruptionNotice = nextInterruptionNotice;
+ }
+
public WorkerData networkLocation(String networkLocation) {
this.networkLocation = networkLocation;
@@ -515,12 +544,13 @@ public class WorkerData {
Objects.equals(this.workerRef, workerData.workerRef) &&
Objects.equals(this.workerState, workerData.workerState) &&
Objects.equals(this.workerStateStartTime,
workerData.workerStateStartTime) &&
+ Objects.equals(this.nextInterruptionNotice,
workerData.nextInterruptionNotice) &&
Objects.equals(this.networkLocation, workerData.networkLocation);
}
@Override
public int hashCode() {
- return Objects.hash(host, rpcPort, pushPort, fetchPort, replicatePort,
internalPort, slotUsed, lastHeartbeatTimestamp, heartbeatElapsedSeconds,
diskInfos, resourceConsumptions, workerRef, workerState, workerStateStartTime,
networkLocation);
+ return Objects.hash(host, rpcPort, pushPort, fetchPort, replicatePort,
internalPort, slotUsed, lastHeartbeatTimestamp, heartbeatElapsedSeconds,
diskInfos, resourceConsumptions, workerRef, workerState, workerStateStartTime,
nextInterruptionNotice, networkLocation);
}
@Override
@@ -541,6 +571,7 @@ public class WorkerData {
sb.append(" workerRef:
").append(toIndentedString(workerRef)).append("\n");
sb.append(" workerState:
").append(toIndentedString(workerState)).append("\n");
sb.append(" workerStateStartTime:
").append(toIndentedString(workerStateStartTime)).append("\n");
+ sb.append(" nextInterruptionNotice:
").append(toIndentedString(nextInterruptionNotice)).append("\n");
sb.append(" networkLocation:
").append(toIndentedString(networkLocation)).append("\n");
sb.append("}");
return sb.toString();
diff --git
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerInfoResponse.java
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerInfoResponse.java
index cdbaf9ce7..738344d40 100644
---
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerInfoResponse.java
+++
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerInfoResponse.java
@@ -51,7 +51,9 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
WorkerInfoResponse.JSON_PROPERTY_WORKER_STATE_START_TIME,
WorkerInfoResponse.JSON_PROPERTY_IS_REGISTERED,
WorkerInfoResponse.JSON_PROPERTY_IS_SHUTDOWN,
- WorkerInfoResponse.JSON_PROPERTY_IS_DECOMMISSIONING
+ WorkerInfoResponse.JSON_PROPERTY_IS_DECOMMISSIONING,
+ WorkerInfoResponse.JSON_PROPERTY_NEXT_INTERRUPTION_NOTICE,
+ WorkerInfoResponse.JSON_PROPERTY_NETWORK_LOCATION
})
@javax.annotation.Generated(value =
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator
version: 7.8.0")
public class WorkerInfoResponse {
@@ -106,6 +108,12 @@ public class WorkerInfoResponse {
public static final String JSON_PROPERTY_IS_DECOMMISSIONING =
"isDecommissioning";
private Boolean isDecommissioning;
+ public static final String JSON_PROPERTY_NEXT_INTERRUPTION_NOTICE =
"nextInterruptionNotice";
+ private Long nextInterruptionNotice;
+
+ public static final String JSON_PROPERTY_NETWORK_LOCATION =
"networkLocation";
+ private String networkLocation;
+
public WorkerInfoResponse() {
}
@@ -550,6 +558,56 @@ public class WorkerInfoResponse {
this.isDecommissioning = isDecommissioning;
}
+ public WorkerInfoResponse nextInterruptionNotice(Long
nextInterruptionNotice) {
+
+ this.nextInterruptionNotice = nextInterruptionNotice;
+ return this;
+ }
+
+ /**
+ * The next interruption notice of the worker.
+ * @return nextInterruptionNotice
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_NEXT_INTERRUPTION_NOTICE)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public Long getNextInterruptionNotice() {
+ return nextInterruptionNotice;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_NEXT_INTERRUPTION_NOTICE)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setNextInterruptionNotice(Long nextInterruptionNotice) {
+ this.nextInterruptionNotice = nextInterruptionNotice;
+ }
+
+ public WorkerInfoResponse networkLocation(String networkLocation) {
+
+ this.networkLocation = networkLocation;
+ return this;
+ }
+
+ /**
+ * The network location of the worker.
+ * @return networkLocation
+ */
+ @javax.annotation.Nullable
+ @JsonProperty(JSON_PROPERTY_NETWORK_LOCATION)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+
+ public String getNetworkLocation() {
+ return networkLocation;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_NETWORK_LOCATION)
+ @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
+ public void setNetworkLocation(String networkLocation) {
+ this.networkLocation = networkLocation;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -575,12 +633,14 @@ public class WorkerInfoResponse {
Objects.equals(this.workerStateStartTime,
workerInfoResponse.workerStateStartTime) &&
Objects.equals(this.isRegistered, workerInfoResponse.isRegistered) &&
Objects.equals(this.isShutdown, workerInfoResponse.isShutdown) &&
- Objects.equals(this.isDecommissioning,
workerInfoResponse.isDecommissioning);
+ Objects.equals(this.isDecommissioning,
workerInfoResponse.isDecommissioning) &&
+ Objects.equals(this.nextInterruptionNotice,
workerInfoResponse.nextInterruptionNotice) &&
+ Objects.equals(this.networkLocation,
workerInfoResponse.networkLocation);
}
@Override
public int hashCode() {
- return Objects.hash(host, rpcPort, pushPort, fetchPort, replicatePort,
internalPort, slotUsed, lastHeartbeatTimestamp, heartbeatElapsedSeconds,
diskInfos, resourceConsumptions, workerRef, workerState, workerStateStartTime,
isRegistered, isShutdown, isDecommissioning);
+ return Objects.hash(host, rpcPort, pushPort, fetchPort, replicatePort,
internalPort, slotUsed, lastHeartbeatTimestamp, heartbeatElapsedSeconds,
diskInfos, resourceConsumptions, workerRef, workerState, workerStateStartTime,
isRegistered, isShutdown, isDecommissioning, nextInterruptionNotice,
networkLocation);
}
@Override
@@ -604,6 +664,8 @@ public class WorkerInfoResponse {
sb.append(" isRegistered:
").append(toIndentedString(isRegistered)).append("\n");
sb.append(" isShutdown:
").append(toIndentedString(isShutdown)).append("\n");
sb.append(" isDecommissioning:
").append(toIndentedString(isDecommissioning)).append("\n");
+ sb.append(" nextInterruptionNotice:
").append(toIndentedString(nextInterruptionNotice)).append("\n");
+ sb.append(" networkLocation:
").append(toIndentedString(networkLocation)).append("\n");
sb.append("}");
return sb.toString();
}
diff --git
a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerInterruptionNotice.java
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerInterruptionNotice.java
new file mode 100644
index 000000000..9afa89939
--- /dev/null
+++
b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerInterruptionNotice.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.celeborn.rest.v1.model;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonValue;
+import org.apache.celeborn.rest.v1.model.WorkerId;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * WorkerInterruptionNotice
+ */
+@JsonPropertyOrder({
+ WorkerInterruptionNotice.JSON_PROPERTY_WORKER_ID,
+ WorkerInterruptionNotice.JSON_PROPERTY_INTERRUPTION_TIMESTAMP
+})
[email protected](value =
"org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator
version: 7.8.0")
+public class WorkerInterruptionNotice {
+ public static final String JSON_PROPERTY_WORKER_ID = "workerId";
+ private WorkerId workerId;
+
+ public static final String JSON_PROPERTY_INTERRUPTION_TIMESTAMP =
"interruptionTimestamp";
+ private Long interruptionTimestamp;
+
+ public WorkerInterruptionNotice() {
+ }
+
+ public WorkerInterruptionNotice workerId(WorkerId workerId) {
+
+ this.workerId = workerId;
+ return this;
+ }
+
+ /**
+ * Get workerId
+ * @return workerId
+ */
+ @javax.annotation.Nonnull
+ @JsonProperty(JSON_PROPERTY_WORKER_ID)
+ @JsonInclude(value = JsonInclude.Include.ALWAYS)
+
+ public WorkerId getWorkerId() {
+ return workerId;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_WORKER_ID)
+ @JsonInclude(value = JsonInclude.Include.ALWAYS)
+ public void setWorkerId(WorkerId workerId) {
+ this.workerId = workerId;
+ }
+
+ public WorkerInterruptionNotice interruptionTimestamp(Long
interruptionTimestamp) {
+
+ this.interruptionTimestamp = interruptionTimestamp;
+ return this;
+ }
+
+ /**
+ * The datetime of the expected interruption.
+ * @return interruptionTimestamp
+ */
+ @javax.annotation.Nonnull
+ @JsonProperty(JSON_PROPERTY_INTERRUPTION_TIMESTAMP)
+ @JsonInclude(value = JsonInclude.Include.ALWAYS)
+
+ public Long getInterruptionTimestamp() {
+ return interruptionTimestamp;
+ }
+
+
+ @JsonProperty(JSON_PROPERTY_INTERRUPTION_TIMESTAMP)
+ @JsonInclude(value = JsonInclude.Include.ALWAYS)
+ public void setInterruptionTimestamp(Long interruptionTimestamp) {
+ this.interruptionTimestamp = interruptionTimestamp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WorkerInterruptionNotice workerInterruptionNotice =
(WorkerInterruptionNotice) o;
+ return Objects.equals(this.workerId, workerInterruptionNotice.workerId) &&
+ Objects.equals(this.interruptionTimestamp,
workerInterruptionNotice.interruptionTimestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(workerId, interruptionTimestamp);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("class WorkerInterruptionNotice {\n");
+ sb.append(" workerId:
").append(toIndentedString(workerId)).append("\n");
+ sb.append(" interruptionTimestamp:
").append(toIndentedString(interruptionTimestamp)).append("\n");
+ sb.append("}");
+ return sb.toString();
+ }
+
+ /**
+ * Convert the given object to string with each line indented by 4 spaces
+ * (except the first line).
+ */
+ private String toIndentedString(Object o) {
+ if (o == null) {
+ return "null";
+ }
+ return o.toString().replace("\n", "\n ");
+ }
+
+}
+
diff --git a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
index 44a96f6b7..c52ed5d0a 100644
--- a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
+++ b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml
@@ -193,6 +193,27 @@ paths:
schema:
$ref: '#/components/schemas/HandleResponse'
+
+ /api/v1/workers/updateInterruptionNotice:
+ put:
+ tags:
+ - Worker
+ operationId: updateInterruptionNotice
+ description: Update the master with worker disruption info to be used
during slot allocation.
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/UpdateInterruptionNoticeRequest'
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HandleResponse'
+
+
/api/v1/workers/events:
get:
tags:
@@ -857,6 +878,10 @@ components:
type: integer
format: int64
description: The start time of the worker state.
+ nextInterruptionNotice:
+ type: integer
+ format: int64
+ description: The next interruption notice of the worker.
networkLocation:
type: string
description: The network location of the worker.
@@ -1019,6 +1044,20 @@ components:
- fetchPort
- replicatePort
+
+ WorkerInterruptionNotice:
+ type: object
+ properties:
+ workerId:
+ $ref: '#/components/schemas/WorkerId'
+ interruptionTimestamp:
+ type: integer
+ format: int64
+ description: The datetime of the expected interruption.
+ required:
+ - workerId
+ - interruptionTimestamp
+
ExcludeWorkerRequest:
type: object
properties:
@@ -1042,6 +1081,15 @@ components:
items:
$ref: '#/components/schemas/WorkerId'
+ UpdateInterruptionNoticeRequest:
+ type: object
+ properties:
+ workers:
+ type: array
+ description: The workers to be removed from the master workers
unavailable info.
+ items:
+ $ref: '#/components/schemas/WorkerInterruptionNotice'
+
SendWorkerEventRequest:
type: object
properties:
diff --git a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
index 8ef32fb9c..6a9c0e889 100644
--- a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
+++ b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
@@ -453,6 +453,13 @@ components:
isDecommissioning:
type: boolean
description: The decommission status of the worker.
+ nextInterruptionNotice:
+ type: integer
+ format: int64
+ description: The next interruption notice of the worker.
+ networkLocation:
+ type: string
+ description: The network location of the worker.
required:
- host
- rpcPort
@@ -552,6 +559,10 @@ components:
type: integer
format: int64
description: The start time of the worker state.
+ nextInterruptionNotice:
+ type: integer
+ format: int64
+ description: The next interruption notice of the worker.
networkLocation:
type: string
description: The network location of the worker.
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index b123a0baa..a66f53305 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -192,6 +192,9 @@ abstract class HttpService extends Service with Logging {
def getWorkerEventInfo(): String = throw new UnsupportedOperationException()
+ def updateInterruptionNotice(workerInterruptionNotices: Map[String, Long]):
HandleResponse =
+ throw new UnsupportedOperationException()
+
def startHttpServer(): Unit = {
httpServer = HttpServer(
serviceName,
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
index 10a6ca800..52398c3d8 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala
@@ -28,7 +28,7 @@ import org.eclipse.jetty.server.handler.ContextHandler
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.server.common.HttpService
-private[celeborn] trait ApiRequestContext {
+private[celeborn] trait ApiRequestContext extends Logging {
@Context
protected var servletContext: ServletContext = _
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
index a14e5ab12..9fac3025d 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
@@ -58,6 +58,7 @@ object ApiUtils {
.workerState(workerInfo.workerStatus.getState.toString)
.workerStateStartTime(workerInfo.workerStatus.getStateStartTime)
.networkLocation(workerInfo.networkLocation)
+ .nextInterruptionNotice(workerInfo.nextInterruptionNotice)
}
private def workerResourceConsumptions(workerInfo: WorkerInfo)
@@ -120,6 +121,8 @@ object ApiUtils {
isShutdown && (
currentStatus.getState == State.InDecommission ||
currentStatus.getState == State.InDecommissionThenIdle))
+ .networkLocation(workerInfo.networkLocation)
+ .nextInterruptionNotice(workerInfo.nextInterruptionNotice)
}
def toWorkerInfo(workerId: WorkerId): WorkerInfo = {