This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d09ef22f8ad [SPARK-50381][CORE] Support `spark.master.rest.maxThreads`
2d09ef22f8ad is described below

commit 2d09ef22f8adfb562d7da4e192fc5d6201edf34f
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Nov 21 13:07:36 2024 -0800

    [SPARK-50381][CORE] Support `spark.master.rest.maxThreads`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `spark.master.rest.maxThreads`.
    
    ### Why are the changes needed?
    
    To provide users a way to control the number of maximum threads of REST 
API. Previously, Apache Spark uses a default constructor whose value is fixed 
to `200` always.
    
    
https://github.com/apache/spark/blob/2e1c3dc8004b4f003cde8dfae6857f5bef4bb170/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala#L94
    
    
https://github.com/jetty/jetty.project/blob/5dfc59a691b748796f922208956bd1f2794bcd16/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java#L118-L121
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, the default value of new configuration is identical with the 
previously-used Jetty's default value.
    
    ### How was this patch tested?
    
    Pass the CIs with a newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #48921 from dongjoon-hyun/SPARK-50381.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/deploy/rest/RestSubmissionServer.scala  |  7 ++++---
 .../scala/org/apache/spark/internal/config/package.scala |  6 ++++++
 .../spark/deploy/rest/StandaloneRestSubmitSuite.scala    | 16 +++++++++++++++-
 3 files changed, 25 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
index 7efab73726ef..d3381ef6fb7f 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
@@ -33,7 +33,7 @@ import org.json4s.jackson.JsonMethods._
 import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
 import org.apache.spark.internal.{Logging, MDC}
 import org.apache.spark.internal.LogKeys._
-import org.apache.spark.internal.config.MASTER_REST_SERVER_FILTERS
+import org.apache.spark.internal.config.{MASTER_REST_SERVER_FILTERS, 
MASTER_REST_SERVER_MAX_THREADS}
 import org.apache.spark.util.Utils
 
 /**
@@ -63,7 +63,8 @@ private[spark] abstract class RestSubmissionServer(
   protected val clearRequestServlet: ClearRequestServlet
   protected val readyzRequestServlet: ReadyzRequestServlet
 
-  private var _server: Option[Server] = None
+  // Visible for testing
+  private[rest] var _server: Option[Server] = None
 
   // A mapping from URL prefixes to servlets that serve them. Exposed for 
testing.
   protected val baseContext = 
s"/${RestSubmissionServer.PROTOCOL_VERSION}/submissions"
@@ -91,7 +92,7 @@ private[spark] abstract class RestSubmissionServer(
    * Return a 2-tuple of the started server and the bound port.
    */
   private def doStart(startPort: Int): (Server, Int) = {
-    val threadPool = new QueuedThreadPool
+    val threadPool = new 
QueuedThreadPool(masterConf.get(MASTER_REST_SERVER_MAX_THREADS))
     threadPool.setDaemon(true)
     val server = new Server(threadPool)
 
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index c58c371da20c..c2b49d164ae3 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1987,6 +1987,12 @@ package object config {
     .intConf
     .createWithDefault(6066)
 
+  private[spark] val MASTER_REST_SERVER_MAX_THREADS = 
ConfigBuilder("spark.master.rest.maxThreads")
+    .doc("Maximum number of threads to use in the Spark Master REST API 
Server.")
+    .version("4.0.0")
+    .intConf
+    .createWithDefault(200)
+
   private[spark] val MASTER_REST_SERVER_FILTERS = 
ConfigBuilder("spark.master.rest.filters")
     .doc("Comma separated list of filter class names to apply to the Spark 
Master REST API.")
     .version("4.0.0")
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 4a05aab01cb5..075a15063c98 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -25,6 +25,7 @@ import java.util.Base64
 import scala.collection.mutable
 
 import jakarta.servlet.http.HttpServletResponse
+import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool
 import org.json4s.JsonAST._
 import org.json4s.jackson.JsonMethods._
 
@@ -33,7 +34,7 @@ import org.apache.spark.deploy.{SparkSubmit, 
SparkSubmitArguments}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.DriverState._
 import org.apache.spark.deploy.master.RecoveryState
-import org.apache.spark.internal.config.MASTER_REST_SERVER_FILTERS
+import org.apache.spark.internal.config.{MASTER_REST_SERVER_FILTERS, 
MASTER_REST_SERVER_MAX_THREADS}
 import org.apache.spark.rpc._
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.Utils
@@ -545,6 +546,19 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
     }
   }
 
+  test("SPARK-50381: Support spark.master.rest.maxThreads") {
+    val conf = new SparkConf()
+    val localhost = Utils.localHostName()
+    val securityManager = new SecurityManager(conf)
+    rpcEnv = Some(RpcEnv.create("rest-with-maxThreads", localhost, 0, conf, 
securityManager))
+    val fakeMasterRef = rpcEnv.get.setupEndpoint("fake-master", new 
DummyMaster(rpcEnv.get))
+    conf.set(MASTER_REST_SERVER_MAX_THREADS, 2000)
+    server = Some(new StandaloneRestServer(localhost, 0, conf, fakeMasterRef, 
"spark://fake:7077"))
+    server.get.start()
+    val pool = 
server.get._server.get.getThreadPool.asInstanceOf[SizedThreadPool]
+    assert(pool.getMaxThreads === 2000)
+  }
+
   /* --------------------- *
    |     Helper methods    |
    * --------------------- */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to