This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new accfb39e4ddf [SPARK-46888][CORE] Fix `Master` to reject `/workers/kill/` requests if decommission is disabled accfb39e4ddf is described below commit accfb39e4ddf7f7b54396bd0e35256a04461c693 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Sat Jan 27 20:24:15 2024 -0800 [SPARK-46888][CORE] Fix `Master` to reject `/workers/kill/` requests if decommission is disabled This PR aims to fix `Master` to reject `/workers/kill/` request if `spark.decommission.enabled` is `false` in order to fix the dangling worker issue. Currently, `spark.decommission.enabled` is `false` by default. So, when a user asks to decommission, only Master marked it `DECOMMISSIONED` while the worker is alive. ``` $ curl -XPOST http://localhost:8080/workers/kill/\?host\=127.0.0.1 ``` **Master UI** ![Screenshot 2024-01-27 at 6 19 18 PM](https://github.com/apache/spark/assets/9700541/443bfc32-b924-438a-8bf6-c64b9afbc4be) **Worker Log** ``` 24/01/27 18:18:06 WARN Worker: Receive decommission request, but decommission feature is disabled. ``` To be consistent with the existing `Worker` behavior which ignores the request. https://github.com/apache/spark/blob/1787a5261e87e0214a3f803f6534c5e52a0138e6/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L859-L868 No, this is a bug fix. Pass the CI with the newly added test case. No. Closes #44915 from dongjoon-hyun/SPARK-46888. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit 20b593811dc02c96c71978851e051d32bf8c3496) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../apache/spark/deploy/master/ui/MasterWebUI.scala | 4 +++- .../apache/spark/deploy/master/MasterSuite.scala | 21 +++++++++++++++++++++ .../spark/deploy/master/ui/MasterWebUISuite.scala | 3 ++- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index af94bd6d9e0f..53e5c5ac2a8f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -23,6 +23,7 @@ import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.Master import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DECOMMISSION_ENABLED import org.apache.spark.internal.config.UI.MASTER_UI_DECOMMISSION_ALLOW_MODE import org.apache.spark.internal.config.UI.UI_KILL_ENABLED import org.apache.spark.ui.{SparkUI, WebUI} @@ -40,6 +41,7 @@ class MasterWebUI( val masterEndpointRef = master.self val killEnabled = master.conf.get(UI_KILL_ENABLED) + val decommissionDisabled = !master.conf.get(DECOMMISSION_ENABLED) val decommissionAllowMode = master.conf.get(MASTER_UI_DECOMMISSION_ALLOW_MODE) initialize() @@ -58,7 +60,7 @@ class MasterWebUI( override def doPost(req: HttpServletRequest, resp: HttpServletResponse): Unit = { val hostnames: Seq[String] = Option(req.getParameterValues("host")) .getOrElse(Array[String]()).toSeq - if (!isDecommissioningRequestAllowed(req)) { + if (decommissionDisabled || !isDecommissioningRequestAllowed(req)) { resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED) } else { val removedWorkers = masterEndpointRef.askSync[Integer]( diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 1cec863b1e7f..37874de98766 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.master +import java.net.{HttpURLConnection, URL} import java.util.Date import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger @@ -325,6 +326,26 @@ class MasterSuite extends SparkFunSuite } } + test("SPARK-46888: master should reject worker kill request if decommision is disabled") { + implicit val formats = org.json4s.DefaultFormats + val conf = new SparkConf() + .set(DECOMMISSION_ENABLED, false) + .set(MASTER_UI_DECOMMISSION_ALLOW_MODE, "ALLOW") + val localCluster = LocalSparkCluster(1, 1, 512, conf) + localCluster.start() + val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}" + try { + eventually(timeout(30.seconds), interval(100.milliseconds)) { + val url = new URL(s"$masterUrl/workers/kill/?host=${Utils.localHostNameForURI()}") + val conn = url.openConnection().asInstanceOf[HttpURLConnection] + conn.setRequestMethod("POST") + assert(conn.getResponseCode === 405) + } + } finally { + localCluster.stop() + } + } + test("master/worker web ui available") { implicit val formats = org.json4s.DefaultFormats val conf = new SparkConf() diff --git a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala index 024511189acc..40265a12af93 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala @@ -30,12 +30,13 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, KillDriverResponse, RequestKillDriver} import org.apache.spark.deploy.DeployTestUtils._ import org.apache.spark.deploy.master._ +import org.apache.spark.internal.config.DECOMMISSION_ENABLED import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv} import org.apache.spark.util.Utils class MasterWebUISuite extends SparkFunSuite { - val conf = new SparkConf() + val conf = new SparkConf().set(DECOMMISSION_ENABLED, true) val securityMgr = new SecurityManager(conf) val rpcEnv = mock(classOf[RpcEnv]) val master = mock(classOf[Master]) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org