This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f2b41b034a8f [SPARK-50208][CORE] Support 
`spark.master.useDriverIdAsAppName.enabled`
f2b41b034a8f is described below

commit f2b41b034a8fecb09d7788837bf082a2b5f14026
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Fri Nov 1 23:49:05 2024 -0700

    [SPARK-50208][CORE] Support `spark.master.useDriverIdAsAppName.enabled`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `spark.master.useDriverIdAsAppName.enabled` as an 
experimental feature in Spark Standalone cluster.
    
    ### Why are the changes needed?
    
    This allows a user to access `submissionId` (=`driverID) in Spark 
application.
    
    ```
    $ cat appName.py
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.getOrCreate()
    print(spark.sparkContext.appName)
    spark.stop()
    ```
    
    ```
    $ SPARK_MASTER_OPTS='-Dspark.master.useDriverIdAsAppName.enabled=true 
-Dspark.master.rest.enabled=true' sbin/start-master.sh
    $ sbin/start-worker.sh spark://$(hostname):7077
    $ ./examples/src/main/scripts/submit-pi.sh localhost $PWD/appName.py
    {
      "action" : "CreateSubmissionResponse",
      "message" : "Driver successfully submitted as driver-20241101162106-0000",
      "serverSparkVersion" : "4.0.0-SNAPSHOT",
      "submissionId" : "driver-20241101162106-0000",
      "success" : true
    }
    ```
    
    <img width="469" alt="Screenshot 2024-11-01 at 16 23 25" 
src="https://github.com/user-attachments/assets/77a30994-8245-498f-9b30-d4707090d730";>
    
    ### Does this PR introduce _any_ user-facing change?
    
    No behavior change because this is a new feature behind configuration.
    
    ### How was this patch tested?
    
    Pass the CIs with newly added test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #48740 from dongjoon-hyun/SPARK-50208.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/deploy/master/Master.scala    | 28 ++++++++++++++++++++--
 .../org/apache/spark/internal/config/package.scala |  8 +++++++
 .../apache/spark/deploy/master/MasterSuite.scala   | 22 +++++++++++++++++
 .../spark/deploy/master/MasterSuiteBase.scala      |  2 ++
 4 files changed, 58 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b9a4b5cf15a5..4657c8d795a6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, 
HashSet}
 import scala.util.Random
 
 import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, 
ExecutorState}
+import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, 
ExecutorState, SparkSubmit}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.DriverState.DriverState
 import org.apache.spark.deploy.master.MasterMessages._
@@ -128,6 +128,7 @@ private[deploy] class Master(
   val reverseProxy = conf.get(UI_REVERSE_PROXY)
   val historyServerUrl = conf.get(MASTER_UI_HISTORY_SERVER_URL)
   val useAppNameAsAppId = conf.get(MASTER_USE_APP_NAME_AS_APP_ID)
+  val useDriverIdAsAppId = conf.get(MASTER_USE_DRIVER_ID_AS_APP_NAME)
 
   // Alternative application submission gateway that is stable across Spark 
versions
   private val restServerEnabled = conf.get(MASTER_REST_SERVER_ENABLED)
@@ -1330,10 +1331,33 @@ private[deploy] class Master(
     appId
   }
 
+  /**
+   * Update and add `spark.app.name` configurations to DriverDescription.
+   */
+  private def maybeUpdateAppName(desc: DriverDescription, appName: String): 
DriverDescription = {
+    if (!useDriverIdAsAppId) return desc
+
+    val config = s"spark.app.name=$appName"
+    val javaOpts = desc.command.javaOpts
+      .filter(opt => !opt.startsWith("-Dspark.app.name=")) :+ s"-D$config"
+    val args = desc.command.arguments
+    val arguments = if (args(2).equals(classOf[SparkSubmit].getName)) {
+      if (args.length > 4 && args(4).startsWith("spark.app.name=")) {
+        args.updated(4, config)
+      } else {
+        args.patch(3, Seq("-c", config), 0)
+      }
+    } else {
+      args
+    }
+    desc.copy(command = desc.command.copy(arguments = arguments, javaOpts = 
javaOpts))
+  }
+
   private def createDriver(desc: DriverDescription): DriverInfo = {
     val now = System.currentTimeMillis()
     val date = new Date(now)
-    new DriverInfo(now, newDriverId(date), desc, date)
+    val id = newDriverId(date)
+    new DriverInfo(now, id, maybeUpdateAppName(desc, id), date)
   }
 
   private def launchDriver(worker: WorkerInfo, driver: DriverInfo): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 6b164aecb2af..c58c371da20c 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2015,6 +2015,14 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val MASTER_USE_DRIVER_ID_AS_APP_NAME =
+    ConfigBuilder("spark.master.useDriverIdAsAppName.enabled")
+      .internal()
+      .doc("(Experimental) If true, Spark master tries to set driver ID as 
appName.")
+      .version("4.0.0")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val IO_COMPRESSION_SNAPPY_BLOCKSIZE =
     ConfigBuilder("spark.io.compression.snappy.blockSize")
       .doc("Block size in bytes used in Snappy compression, in the case when " 
+
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index e75c4ca88069..e64bc724cfba 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -183,6 +183,28 @@ class MasterSuite extends MasterSuiteBase {
     assert(master.invokePrivate(_newDriverId(submitDate)) === 
"my-driver-00001")
   }
 
+  test("SPARK-50208: Use driverID as appName in javaOpts") {
+    val master = makeMaster(new 
SparkConf().set(MASTER_USE_DRIVER_ID_AS_APP_NAME, true))
+    val command = Command(
+      "org.apache.spark.deploy.worker.DriverWrapper",
+      Seq("{{WORKER_URL}}", "{{USER_JAR}}", "mainClass"),
+      Map.empty, Seq.empty, Seq.empty, Seq.empty)
+    val desc = DriverDescription("", 1, 1, false, command)
+    val result = master.invokePrivate(_maybeUpdateAppName(desc, "driver-id"))
+    assert(result.command.javaOpts.contains("-Dspark.app.name=driver-id"))
+  }
+
+  test("SPARK-50208: Use driverID as appName in arguments") {
+    val master = makeMaster(new 
SparkConf().set(MASTER_USE_DRIVER_ID_AS_APP_NAME, true))
+    val command = Command(
+      "org.apache.spark.deploy.worker.DriverWrapper",
+      Seq("{{WORKER_URL}}", "{{USER_JAR}}", 
"org.apache.spark.deploy.SparkSubmit", "pi.py"),
+      Map.empty, Seq.empty, Seq.empty, Seq.empty)
+    val desc = DriverDescription("", 1, 1, false, command)
+    val result = master.invokePrivate(_maybeUpdateAppName(desc, "driver-id"))
+    assert(result.command.arguments.contains("spark.app.name=driver-id"))
+  }
+
   test("SPARK-45753: Prevent invalid driver id patterns") {
     val m = intercept[IllegalArgumentException] {
       makeMaster(new SparkConf().set(DRIVER_ID_PATTERN, "my driver"))
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuiteBase.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuiteBase.scala
index 629112a27463..2e159b828884 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuiteBase.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuiteBase.scala
@@ -443,6 +443,8 @@ trait MasterSuiteBase extends SparkFunSuite
   private val _state = PrivateMethod[RecoveryState.Value](Symbol("state"))
   protected val _newDriverId = PrivateMethod[String](Symbol("newDriverId"))
   protected val _newApplicationId = 
PrivateMethod[String](Symbol("newApplicationId"))
+  protected val _maybeUpdateAppName =
+    PrivateMethod[DriverDescription](Symbol("maybeUpdateAppName"))
   protected val _createApplication = 
PrivateMethod[ApplicationInfo](Symbol("createApplication"))
   protected val _persistenceEngine = 
PrivateMethod[PersistenceEngine](Symbol("persistenceEngine"))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to