This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 20b593811dc0 [SPARK-46888][CORE] Fix `Master` to reject 
`/workers/kill/` requests if decommission is disabled
20b593811dc0 is described below

commit 20b593811dc02c96c71978851e051d32bf8c3496
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat Jan 27 20:24:15 2024 -0800

    [SPARK-46888][CORE] Fix `Master` to reject `/workers/kill/` requests if 
decommission is disabled
    
    ### What changes were proposed in this pull request?
    
    This PR aims to fix `Master` to reject `/workers/kill/` request if 
`spark.decommission.enabled` is `false` in order to fix the dangling worker 
issue.
    
    ### Why are the changes needed?
    
    Currently, `spark.decommission.enabled` is `false` by default. So, when a 
user asks to decommission, only Master marked it `DECOMMISSIONED` while the 
worker is alive.
    ```
    $ curl -XPOST http://localhost:8080/workers/kill/\?host\=127.0.0.1
    ```
    
    **Master UI**
    ![Screenshot 2024-01-27 at 6 19 18 
PM](https://github.com/apache/spark/assets/9700541/443bfc32-b924-438a-8bf6-c64b9afbc4be)
    
    **Worker Log**
    ```
    24/01/27 18:18:06 WARN Worker: Receive decommission request, but 
decommission feature is disabled.
    ```
    
    To be consistent with the existing `Worker` behavior which ignores the 
request.
    
    
https://github.com/apache/spark/blob/1787a5261e87e0214a3f803f6534c5e52a0138e6/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L859-L868
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is a bug fix.
    
    ### How was this patch tested?
    
    Pass the CI with the newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44915 from dongjoon-hyun/SPARK-46888.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/deploy/master/ui/MasterWebUI.scala |  4 +++-
 .../apache/spark/deploy/master/MasterSuite.scala    | 21 +++++++++++++++++++++
 .../spark/deploy/master/ui/MasterWebUISuite.scala   |  3 ++-
 3 files changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index d71ef8b9e36e..3025c0bf468b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -23,6 +23,7 @@ import javax.servlet.http.{HttpServlet, HttpServletRequest, 
HttpServletResponse}
 import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, 
MasterStateResponse, RequestMasterState}
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.DECOMMISSION_ENABLED
 import org.apache.spark.internal.config.UI.MASTER_UI_DECOMMISSION_ALLOW_MODE
 import org.apache.spark.internal.config.UI.UI_KILL_ENABLED
 import org.apache.spark.ui.{SparkUI, WebUI}
@@ -41,6 +42,7 @@ class MasterWebUI(
 
   val masterEndpointRef = master.self
   val killEnabled = master.conf.get(UI_KILL_ENABLED)
+  val decommissionDisabled = !master.conf.get(DECOMMISSION_ENABLED)
   val decommissionAllowMode = 
master.conf.get(MASTER_UI_DECOMMISSION_ALLOW_MODE)
 
   initialize()
@@ -60,7 +62,7 @@ class MasterWebUI(
       override def doPost(req: HttpServletRequest, resp: HttpServletResponse): 
Unit = {
         val hostnames: Seq[String] = Option(req.getParameterValues("host"))
           .getOrElse(Array[String]()).toImmutableArraySeq
-        if (!isDecommissioningRequestAllowed(req)) {
+        if (decommissionDisabled || !isDecommissioningRequestAllowed(req)) {
           resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
         } else {
           val removedWorkers = masterEndpointRef.askSync[Integer](
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 6966a7f660b2..0db58ae0c834 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.deploy.master
 
+import java.net.{HttpURLConnection, URL}
 import java.util.Date
 import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
@@ -444,6 +445,26 @@ class MasterSuite extends SparkFunSuite
     }
   }
 
+  test("SPARK-46888: master should reject worker kill request if decommision 
is disabled") {
+    implicit val formats = org.json4s.DefaultFormats
+    val conf = new SparkConf()
+      .set(DECOMMISSION_ENABLED, false)
+      .set(MASTER_UI_DECOMMISSION_ALLOW_MODE, "ALLOW")
+    val localCluster = LocalSparkCluster(1, 1, 512, conf)
+    localCluster.start()
+    val masterUrl = 
s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
+    try {
+      eventually(timeout(30.seconds), interval(100.milliseconds)) {
+        val url = new 
URL(s"$masterUrl/workers/kill/?host=${Utils.localHostNameForURI()}")
+        val conn = url.openConnection().asInstanceOf[HttpURLConnection]
+        conn.setRequestMethod("POST")
+        assert(conn.getResponseCode === 405)
+      }
+    } finally {
+      localCluster.stop()
+    }
+  }
+
   test("master/worker web ui available") {
     implicit val formats = org.json4s.DefaultFormats
     val conf = new SparkConf()
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala
index 024511189acc..40265a12af93 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala
@@ -30,12 +30,13 @@ import org.apache.spark.{SecurityManager, SparkConf, 
SparkFunSuite}
 import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, 
KillDriverResponse, RequestKillDriver}
 import org.apache.spark.deploy.DeployTestUtils._
 import org.apache.spark.deploy.master._
+import org.apache.spark.internal.config.DECOMMISSION_ENABLED
 import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
 import org.apache.spark.util.Utils
 
 class MasterWebUISuite extends SparkFunSuite {
 
-  val conf = new SparkConf()
+  val conf = new SparkConf().set(DECOMMISSION_ENABLED, true)
   val securityMgr = new SecurityManager(conf)
   val rpcEnv = mock(classOf[RpcEnv])
   val master = mock(classOf[Master])


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to