This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 20b593811dc0 [SPARK-46888][CORE] Fix `Master` to reject
`/workers/kill/` requests if decommission is disabled
20b593811dc0 is described below
commit 20b593811dc02c96c71978851e051d32bf8c3496
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat Jan 27 20:24:15 2024 -0800
[SPARK-46888][CORE] Fix `Master` to reject `/workers/kill/` requests if
decommission is disabled
### What changes were proposed in this pull request?
This PR aims to fix `Master` to reject `/workers/kill/` request if
`spark.decommission.enabled` is `false` in order to fix the dangling worker
issue.
### Why are the changes needed?
Currently, `spark.decommission.enabled` is `false` by default. So, when a
user asks to decommission, only Master marked it `DECOMMISSIONED` while the
worker is alive.
```
$ curl -XPOST http://localhost:8080/workers/kill/\?host\=127.0.0.1
```
**Master UI**

**Worker Log**
```
24/01/27 18:18:06 WARN Worker: Receive decommission request, but
decommission feature is disabled.
```
To be consistent with the existing `Worker` behavior which ignores the
request.
https://github.com/apache/spark/blob/1787a5261e87e0214a3f803f6534c5e52a0138e6/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L859-L868
### Does this PR introduce _any_ user-facing change?
No, this is a bug fix.
### How was this patch tested?
Pass the CI with the newly added test case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44915 from dongjoon-hyun/SPARK-46888.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/deploy/master/ui/MasterWebUI.scala | 4 +++-
.../apache/spark/deploy/master/MasterSuite.scala | 21 +++++++++++++++++++++
.../spark/deploy/master/ui/MasterWebUISuite.scala | 3 ++-
3 files changed, 26 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index d71ef8b9e36e..3025c0bf468b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -23,6 +23,7 @@ import javax.servlet.http.{HttpServlet, HttpServletRequest,
HttpServletResponse}
import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts,
MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.internal.config.UI.MASTER_UI_DECOMMISSION_ALLOW_MODE
import org.apache.spark.internal.config.UI.UI_KILL_ENABLED
import org.apache.spark.ui.{SparkUI, WebUI}
@@ -41,6 +42,7 @@ class MasterWebUI(
val masterEndpointRef = master.self
val killEnabled = master.conf.get(UI_KILL_ENABLED)
+ val decommissionDisabled = !master.conf.get(DECOMMISSION_ENABLED)
val decommissionAllowMode =
master.conf.get(MASTER_UI_DECOMMISSION_ALLOW_MODE)
initialize()
@@ -60,7 +62,7 @@ class MasterWebUI(
override def doPost(req: HttpServletRequest, resp: HttpServletResponse):
Unit = {
val hostnames: Seq[String] = Option(req.getParameterValues("host"))
.getOrElse(Array[String]()).toImmutableArraySeq
- if (!isDecommissioningRequestAllowed(req)) {
+ if (decommissionDisabled || !isDecommissioningRequestAllowed(req)) {
resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
} else {
val removedWorkers = masterEndpointRef.askSync[Integer](
diff --git
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 6966a7f660b2..0db58ae0c834 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.deploy.master
+import java.net.{HttpURLConnection, URL}
import java.util.Date
import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
@@ -444,6 +445,26 @@ class MasterSuite extends SparkFunSuite
}
}
+ test("SPARK-46888: master should reject worker kill request if decommision
is disabled") {
+ implicit val formats = org.json4s.DefaultFormats
+ val conf = new SparkConf()
+ .set(DECOMMISSION_ENABLED, false)
+ .set(MASTER_UI_DECOMMISSION_ALLOW_MODE, "ALLOW")
+ val localCluster = LocalSparkCluster(1, 1, 512, conf)
+ localCluster.start()
+ val masterUrl =
s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
+ try {
+ eventually(timeout(30.seconds), interval(100.milliseconds)) {
+ val url = new
URL(s"$masterUrl/workers/kill/?host=${Utils.localHostNameForURI()}")
+ val conn = url.openConnection().asInstanceOf[HttpURLConnection]
+ conn.setRequestMethod("POST")
+ assert(conn.getResponseCode === 405)
+ }
+ } finally {
+ localCluster.stop()
+ }
+ }
+
test("master/worker web ui available") {
implicit val formats = org.json4s.DefaultFormats
val conf = new SparkConf()
diff --git
a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala
b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala
index 024511189acc..40265a12af93 100644
---
a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala
@@ -30,12 +30,13 @@ import org.apache.spark.{SecurityManager, SparkConf,
SparkFunSuite}
import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts,
KillDriverResponse, RequestKillDriver}
import org.apache.spark.deploy.DeployTestUtils._
import org.apache.spark.deploy.master._
+import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
import org.apache.spark.util.Utils
class MasterWebUISuite extends SparkFunSuite {
- val conf = new SparkConf()
+ val conf = new SparkConf().set(DECOMMISSION_ENABLED, true)
val securityMgr = new SecurityManager(conf)
val rpcEnv = mock(classOf[RpcEnv])
val master = mock(classOf[Master])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]