This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f2b41b034a8f [SPARK-50208][CORE] Support
`spark.master.useDriverIdAsAppName.enabled`
f2b41b034a8f is described below
commit f2b41b034a8fecb09d7788837bf082a2b5f14026
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Fri Nov 1 23:49:05 2024 -0700
[SPARK-50208][CORE] Support `spark.master.useDriverIdAsAppName.enabled`
### What changes were proposed in this pull request?
This PR aims to support `spark.master.useDriverIdAsAppName.enabled` as an
experimental feature in Spark Standalone cluster.
### Why are the changes needed?
This allows a user to access `submissionId` (=`driverID) in Spark
application.
```
$ cat appName.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
print(spark.sparkContext.appName)
spark.stop()
```
```
$ SPARK_MASTER_OPTS='-Dspark.master.useDriverIdAsAppName.enabled=true
-Dspark.master.rest.enabled=true' sbin/start-master.sh
$ sbin/start-worker.sh spark://$(hostname):7077
$ ./examples/src/main/scripts/submit-pi.sh localhost $PWD/appName.py
{
"action" : "CreateSubmissionResponse",
"message" : "Driver successfully submitted as driver-20241101162106-0000",
"serverSparkVersion" : "4.0.0-SNAPSHOT",
"submissionId" : "driver-20241101162106-0000",
"success" : true
}
```
<img width="469" alt="Screenshot 2024-11-01 at 16 23 25"
src="https://github.com/user-attachments/assets/77a30994-8245-498f-9b30-d4707090d730">
### Does this PR introduce _any_ user-facing change?
No behavior change because this is a new feature behind configuration.
### How was this patch tested?
Pass the CIs with newly added test cases.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48740 from dongjoon-hyun/SPARK-50208.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/deploy/master/Master.scala | 28 ++++++++++++++++++++--
.../org/apache/spark/internal/config/package.scala | 8 +++++++
.../apache/spark/deploy/master/MasterSuite.scala | 22 +++++++++++++++++
.../spark/deploy/master/MasterSuiteBase.scala | 2 ++
4 files changed, 58 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b9a4b5cf15a5..4657c8d795a6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap,
HashSet}
import scala.util.Random
import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
ExecutorState}
+import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
ExecutorState, SparkSubmit}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.MasterMessages._
@@ -128,6 +128,7 @@ private[deploy] class Master(
val reverseProxy = conf.get(UI_REVERSE_PROXY)
val historyServerUrl = conf.get(MASTER_UI_HISTORY_SERVER_URL)
val useAppNameAsAppId = conf.get(MASTER_USE_APP_NAME_AS_APP_ID)
+ val useDriverIdAsAppId = conf.get(MASTER_USE_DRIVER_ID_AS_APP_NAME)
// Alternative application submission gateway that is stable across Spark
versions
private val restServerEnabled = conf.get(MASTER_REST_SERVER_ENABLED)
@@ -1330,10 +1331,33 @@ private[deploy] class Master(
appId
}
+ /**
+ * Update and add `spark.app.name` configurations to DriverDescription.
+ */
+ private def maybeUpdateAppName(desc: DriverDescription, appName: String):
DriverDescription = {
+ if (!useDriverIdAsAppId) return desc
+
+ val config = s"spark.app.name=$appName"
+ val javaOpts = desc.command.javaOpts
+ .filter(opt => !opt.startsWith("-Dspark.app.name=")) :+ s"-D$config"
+ val args = desc.command.arguments
+ val arguments = if (args(2).equals(classOf[SparkSubmit].getName)) {
+ if (args.length > 4 && args(4).startsWith("spark.app.name=")) {
+ args.updated(4, config)
+ } else {
+ args.patch(3, Seq("-c", config), 0)
+ }
+ } else {
+ args
+ }
+ desc.copy(command = desc.command.copy(arguments = arguments, javaOpts =
javaOpts))
+ }
+
private def createDriver(desc: DriverDescription): DriverInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
- new DriverInfo(now, newDriverId(date), desc, date)
+ val id = newDriverId(date)
+ new DriverInfo(now, id, maybeUpdateAppName(desc, id), date)
}
private def launchDriver(worker: WorkerInfo, driver: DriverInfo): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 6b164aecb2af..c58c371da20c 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2015,6 +2015,14 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val MASTER_USE_DRIVER_ID_AS_APP_NAME =
+ ConfigBuilder("spark.master.useDriverIdAsAppName.enabled")
+ .internal()
+ .doc("(Experimental) If true, Spark master tries to set driver ID as
appName.")
+ .version("4.0.0")
+ .booleanConf
+ .createWithDefault(false)
+
private[spark] val IO_COMPRESSION_SNAPPY_BLOCKSIZE =
ConfigBuilder("spark.io.compression.snappy.blockSize")
.doc("Block size in bytes used in Snappy compression, in the case when "
+
diff --git
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index e75c4ca88069..e64bc724cfba 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -183,6 +183,28 @@ class MasterSuite extends MasterSuiteBase {
assert(master.invokePrivate(_newDriverId(submitDate)) ===
"my-driver-00001")
}
+ test("SPARK-50208: Use driverID as appName in javaOpts") {
+ val master = makeMaster(new
SparkConf().set(MASTER_USE_DRIVER_ID_AS_APP_NAME, true))
+ val command = Command(
+ "org.apache.spark.deploy.worker.DriverWrapper",
+ Seq("{{WORKER_URL}}", "{{USER_JAR}}", "mainClass"),
+ Map.empty, Seq.empty, Seq.empty, Seq.empty)
+ val desc = DriverDescription("", 1, 1, false, command)
+ val result = master.invokePrivate(_maybeUpdateAppName(desc, "driver-id"))
+ assert(result.command.javaOpts.contains("-Dspark.app.name=driver-id"))
+ }
+
+ test("SPARK-50208: Use driverID as appName in arguments") {
+ val master = makeMaster(new
SparkConf().set(MASTER_USE_DRIVER_ID_AS_APP_NAME, true))
+ val command = Command(
+ "org.apache.spark.deploy.worker.DriverWrapper",
+ Seq("{{WORKER_URL}}", "{{USER_JAR}}",
"org.apache.spark.deploy.SparkSubmit", "pi.py"),
+ Map.empty, Seq.empty, Seq.empty, Seq.empty)
+ val desc = DriverDescription("", 1, 1, false, command)
+ val result = master.invokePrivate(_maybeUpdateAppName(desc, "driver-id"))
+ assert(result.command.arguments.contains("spark.app.name=driver-id"))
+ }
+
test("SPARK-45753: Prevent invalid driver id patterns") {
val m = intercept[IllegalArgumentException] {
makeMaster(new SparkConf().set(DRIVER_ID_PATTERN, "my driver"))
diff --git
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuiteBase.scala
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuiteBase.scala
index 629112a27463..2e159b828884 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuiteBase.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuiteBase.scala
@@ -443,6 +443,8 @@ trait MasterSuiteBase extends SparkFunSuite
private val _state = PrivateMethod[RecoveryState.Value](Symbol("state"))
protected val _newDriverId = PrivateMethod[String](Symbol("newDriverId"))
protected val _newApplicationId =
PrivateMethod[String](Symbol("newApplicationId"))
+ protected val _maybeUpdateAppName =
+ PrivateMethod[DriverDescription](Symbol("maybeUpdateAppName"))
protected val _createApplication =
PrivateMethod[ApplicationInfo](Symbol("createApplication"))
protected val _persistenceEngine =
PrivateMethod[PersistenceEngine](Symbol("persistenceEngine"))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]