This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 17de30009 [CELEBORN-847] Support use RESTful API to trigger worker
exit and exitImmediately
17de30009 is described below
commit 17de30009b6601f1fbb2ca159e9eeac4c537fafd
Author: Angerszhuuuu <[email protected]>
AuthorDate: Tue Aug 15 20:04:26 2023 +0800
[CELEBORN-847] Support use RESTful API to trigger worker exit and
exitImmediately
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1768 from AngersZhuuuu/CELEBORN-847.
Lead-authored-by: Angerszhuuuu <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
docs/monitoring.md | 2 +-
.../celeborn/server/common/HttpService.scala | 2 +-
.../server/common/http/HttpRequestHandler.scala | 7 ++--
.../celeborn/server/common/http/HttpUtils.scala | 37 +++++++++++++++++++
.../server/common/http/HttpUtilsSuite.scala | 43 ++++++++++++++++++++++
.../celeborn/service/deploy/worker/Worker.scala | 17 +++++++--
6 files changed, 99 insertions(+), 9 deletions(-)
diff --git a/docs/monitoring.md b/docs/monitoring.md
index cad97b4fa..0cdbe66e3 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -314,4 +314,4 @@ API path listed as below:
| /unavailablePeers | List the unavailable peers of the worker, this
always means the worker connect to the peer failed. |
| /isShutdown | Show if the worker is during the process of
shutdown. |
| /isRegistered | Show if the worker is registered to the master
success. |
-| /decommission | Trigger this worker to decommission from the
cluster |
+| /exit?type=${TYPE} | Trigger this worker to exit. Legal `type`s are
'DECOMMISSION‘, 'GRACEFUL' and 'IMMEDIATELY' |
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 6de0f03d0..6b5ec4196 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -67,7 +67,7 @@ abstract class HttpService extends Service with Logging {
def isRegistered: String
- def decommission: String = throw new UnsupportedOperationException()
+ def exit(exitType: String): String = throw new
UnsupportedOperationException()
def startHttpServer(): Unit = {
val handlers =
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
index cc727d22e..779b8e476 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
@@ -61,7 +61,8 @@ class HttpRequestHandler(
}
def handleRequest(uri: String): String = {
- uri match {
+ val (path, parameters) = HttpUtils.parseUrl(uri)
+ path match {
case "/conf" =>
service.getConf
case "/workerInfo" =>
@@ -90,8 +91,8 @@ class HttpRequestHandler(
service.isShutdown
case "/isRegistered" if service.serviceName == Service.WORKER =>
service.isRegistered
- case "/decommission" if service.serviceName == Service.WORKER =>
- service.decommission
+ case "/exit" if service.serviceName == Service.WORKER =>
+ service.exit(parameters.getOrElse("TYPE", ""))
case _ => INVALID
}
}
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
new file mode 100644
index 000000000..e212aa201
--- /dev/null
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http
+
+import java.net.URL
+import java.util.Locale
+
+object HttpUtils {
+ def parseUrl(uri: String): (String, Map[String, String]) = {
+ val url = new URL(s"https://127.0.0.1:9000$uri")
+ val parameter =
+ if (url.getQuery == null) {
+ Map.empty[String, String]
+ } else {
+ url.getQuery
+ .split("&")
+ .map(_.split("="))
+ .map(arr => arr(0).toUpperCase(Locale.ROOT) ->
arr(1).toUpperCase(Locale.ROOT)).toMap
+ }
+ (url.getPath, parameter)
+ }
+}
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
new file mode 100644
index 000000000..01d6ae973
--- /dev/null
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.http
+
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.common.internal.Logging
+
+class HttpUtilsSuite extends AnyFunSuite with Logging {
+
+ def checkParseUri(
+ uri: String,
+ expectPath: String,
+ expectParameters: Map[String, String]): Unit = {
+ val (path, parameters) = HttpUtils.parseUrl(uri)
+ assert(path == expectPath)
+ assert(parameters == expectParameters)
+ }
+
+ test("CELEBORN-847: Support parse HTTP Restful API parameters") {
+ checkParseUri("/exit", "/exit", Map.empty)
+ checkParseUri("/exit?type=decommission", "/exit", Map("TYPE" ->
"DECOMMISSION"))
+ checkParseUri(
+ "/exit?type=decommission&foo=a",
+ "/exit",
+ Map("TYPE" -> "DECOMMISSION", "FOO" -> "A"))
+ }
+}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 32d9290b7..45d88b36d 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -561,8 +561,17 @@ private[celeborn] class Worker(
sb.toString()
}
- override def decommission: String = {
- exitKind = CelebornExitKind.WORKER_DECOMMISSION
+ override def exit(exitType: String): String = {
+ exitType match {
+ case "DECOMMISSION" =>
+ exitKind = CelebornExitKind.WORKER_DECOMMISSION
+ case "GRACEFUL" =>
+ exitKind = CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN
+ case "IMMEDIATELY" =>
+ exitKind = CelebornExitKind.EXIT_IMMEDIATELY
+ case _ => // Use origin code
+ }
+ // Use the original EXIT_CODE
new Thread() {
override def run(): Unit = {
Thread.sleep(10000)
@@ -570,8 +579,8 @@ private[celeborn] class Worker(
}
}.start()
val sb = new StringBuilder
- sb.append("======================== Decommission Worker
=========================\n")
- sb.append("Decommission worker triggered: \n")
+ sb.append("============================ Exit Worker
=============================\n")
+ sb.append(s"Exit worker by $exitType triggered: \n")
sb.append(workerInfo.toString()).append("\n")
sb.toString()
}