This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5d1f585eba69 [SPARK-50383][CORE] Support `Virtual Threads` in REST
Submission API
5d1f585eba69 is described below
commit 5d1f585eba694a446cf3f9494b80c485b6a176b4
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Nov 21 17:31:59 2024 -0800
[SPARK-50383][CORE] Support `Virtual Threads` in REST Submission API
### What changes were proposed in this pull request?
This PR aims to support [JEP-444: Virtual
Threads](https://openjdk.org/jeps/444) in REST Submission API for Apache Spark
4.0.0.
- https://openjdk.org/jeps/425: Virtual Threads (Preview) in Java 19
- https://openjdk.org/jeps/436: Virtual Threads (Second Preview) in Java 20
- https://openjdk.org/jeps/444: Virtual Threads in Java 21
### Why are the changes needed?
`Virtual Threads` becomes an official feature in Java 21.
- https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html
> Virtual threads are lightweight threads that reduce the effort of
writing, maintaining, and debugging high-throughput concurrent applications.
In addition, `Jetty 11` supports `Virtual Threads`.
-
https://jetty.org/docs/jetty/11/programming-guide/arch/threads.html#thread-pool-virtual-threads
We had better provide a way to use it via a new option
`spark.master.rest.virtualThread.enabled`, which is enabled in Java 21 runtime.
Please note that this aims for high-throughput concurrency.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48923 from dongjoon-hyun/SPARK-50383.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/deploy/rest/RestSubmissionServer.scala | 9 ++++++++-
.../org/apache/spark/internal/config/package.scala | 7 +++++++
.../deploy/rest/StandaloneRestSubmitSuite.scala | 20 +++++++++++++++++++-
3 files changed, 34 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
index d3381ef6fb7f..877349da18dd 100644
---
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
@@ -18,6 +18,7 @@
package org.apache.spark.deploy.rest
import java.util.EnumSet
+import java.util.concurrent.{Executors, ExecutorService}
import scala.io.Source
@@ -33,7 +34,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
-import org.apache.spark.internal.config.{MASTER_REST_SERVER_FILTERS,
MASTER_REST_SERVER_MAX_THREADS}
+import org.apache.spark.internal.config.{MASTER_REST_SERVER_FILTERS,
MASTER_REST_SERVER_MAX_THREADS, MASTER_REST_SERVER_VIRTUAL_THREADS}
import org.apache.spark.util.Utils
/**
@@ -93,6 +94,12 @@ private[spark] abstract class RestSubmissionServer(
*/
private def doStart(startPort: Int): (Server, Int) = {
val threadPool = new
QueuedThreadPool(masterConf.get(MASTER_REST_SERVER_MAX_THREADS))
+ if (Utils.isJavaVersionAtLeast21 &&
masterConf.get(MASTER_REST_SERVER_VIRTUAL_THREADS)) {
+ val newVirtualThreadPerTaskExecutor =
+ classOf[Executors].getMethod("newVirtualThreadPerTaskExecutor")
+ val service =
newVirtualThreadPerTaskExecutor.invoke(null).asInstanceOf[ExecutorService]
+ threadPool.setVirtualThreadsExecutor(service)
+ }
threadPool.setDaemon(true)
val server = new Server(threadPool)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index c2b49d164ae3..324ef701c426 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2000,6 +2000,13 @@ package object config {
.toSequence
.createWithDefault(Nil)
+ private[spark] val MASTER_REST_SERVER_VIRTUAL_THREADS =
+ ConfigBuilder("spark.master.rest.virtualThread.enabled")
+ .doc("If true, Spark master tries to use Java 21 virtual thread for REST
API.")
+ .version("4.0.0")
+ .booleanConf
+ .createWithDefault(false)
+
private[spark] val MASTER_UI_PORT = ConfigBuilder("spark.master.ui.port")
.version("1.1.0")
.intConf
diff --git
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 075a15063c98..a155e4cc3ac9 100644
---
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -25,6 +25,7 @@ import java.util.Base64
import scala.collection.mutable
import jakarta.servlet.http.HttpServletResponse
+import org.eclipse.jetty.util.thread.QueuedThreadPool
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
@@ -34,7 +35,7 @@ import org.apache.spark.deploy.{SparkSubmit,
SparkSubmitArguments}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.DriverState._
import org.apache.spark.deploy.master.RecoveryState
-import org.apache.spark.internal.config.{MASTER_REST_SERVER_FILTERS,
MASTER_REST_SERVER_MAX_THREADS}
+import org.apache.spark.internal.config.{MASTER_REST_SERVER_FILTERS,
MASTER_REST_SERVER_MAX_THREADS, MASTER_REST_SERVER_VIRTUAL_THREADS}
import org.apache.spark.rpc._
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils
@@ -559,6 +560,23 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
assert(pool.getMaxThreads === 2000)
}
+ test("SPARK-50383: Support spark.master.rest.virtualThread.enabled") {
+ val conf = new SparkConf()
+ val localhost = Utils.localHostName()
+ val securityManager = new SecurityManager(conf)
+ rpcEnv = Some(RpcEnv.create("rest-with-virtualThreads", localhost, 0,
conf, securityManager))
+ val fakeMasterRef = rpcEnv.get.setupEndpoint("fake-master", new
DummyMaster(rpcEnv.get))
+ conf.set(MASTER_REST_SERVER_VIRTUAL_THREADS, true)
+ server = Some(new StandaloneRestServer(localhost, 0, conf, fakeMasterRef,
"spark://fake:7077"))
+ server.get.start()
+ val pool =
server.get._server.get.getThreadPool.asInstanceOf[QueuedThreadPool]
+ if (Utils.isJavaVersionAtLeast21) {
+ assert(pool.getVirtualThreadsExecutor != null)
+ } else {
+ assert(pool.getVirtualThreadsExecutor == null)
+ }
+ }
+
/* --------------------- *
| Helper methods |
* --------------------- */
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]