This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ce899404f59d [SPARK-50195][CORE] Fix `StandaloneRestServer` to 
propagate `spark.app.name` to `SparkSubmit` properly
ce899404f59d is described below

commit ce899404f59d63b83e24ad24d65628676f85c773
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Oct 31 20:29:49 2024 -0700

    [SPARK-50195][CORE] Fix `StandaloneRestServer` to propagate 
`spark.app.name` to `SparkSubmit` properly
    
    ### What changes were proposed in this pull request?
    
    This PR aims to fix `StandaloneRestServer` to propagate `spark.app.name` to 
`SparkSubmit` properly.
    
    ### Why are the changes needed?
    
    This is a long-standing bug which PySpark job didn't get a proper 
`spark.app.name` propagation unlike Scala/Java Spark jobs. Since PySpark jobs 
are invoked indirectly via `SparkSubmit`, we need to hand over `spark.app.name` 
via `-c` configuration.
    
    ### Does this PR introduce _any_ user-facing change?
    
    This is a bug fix. The new behavior is the expected bahavior.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #48729 from dongjoon-hyun/SPARK-50195.
    
    Lead-authored-by: Dongjoon Hyun <[email protected]>
    Co-authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/deploy/rest/StandaloneRestServer.scala   | 11 +++++++++--
 .../spark/deploy/rest/StandaloneRestSubmitSuite.scala     | 15 +++++++++++++++
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index c92e79381ca9..5e29199a352f 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -22,7 +22,7 @@ import java.io.File
 import jakarta.servlet.http.HttpServletResponse
 
 import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
-import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
+import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription, 
SparkSubmit}
 import org.apache.spark.deploy.ClientArguments._
 import org.apache.spark.internal.config
 import org.apache.spark.launcher.{JavaModuleOptions, SparkLauncher}
@@ -238,9 +238,16 @@ private[rest] class StandaloneSubmitRequestServlet(
     val sparkJavaOpts = Utils.sparkJavaOpts(conf)
     val javaModuleOptions = 
JavaModuleOptions.defaultModuleOptionArray().toImmutableArraySeq
     val javaOpts = javaModuleOptions ++ sparkJavaOpts ++ defaultJavaOpts ++ 
extraJavaOpts
+    val sparkSubmitOpts = if (mainClass.equals(classOf[SparkSubmit].getName)) {
+      sparkProperties.get("spark.app.name")
+        .map { v => Seq("-c", s"spark.app.name=$v") }
+        .getOrElse(Seq.empty[String])
+    } else {
+      Seq.empty[String]
+    }
     val command = new Command(
       "org.apache.spark.deploy.worker.DriverWrapper",
-      Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to 
the DriverWrapper
+      Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ sparkSubmitOpts ++ 
appArgs,
       environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
     val actualDriverMemory = 
driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
     val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 9303d97de330..4a05aab01cb5 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -530,6 +530,21 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
     assert(conn.getResponseCode === HttpServletResponse.SC_FORBIDDEN)
   }
 
+  test("SPARK-50195: Fix StandaloneRestServer to propagate app name to 
SparkSubmit properly") {
+    Seq((classOf[SparkSubmit].getName, Seq("-c", "spark.app.name=app1")),
+        ("", Seq.empty)).foreach { case (mainClass, expectedArguments) =>
+      val request = new CreateSubmissionRequest
+      request.appResource = ""
+      request.mainClass = mainClass
+      request.appArgs = Array.empty[String]
+      request.sparkProperties = Map("spark.app.name" -> "app1")
+      request.environmentVariables = Map.empty[String, String]
+      val servlet = new StandaloneSubmitRequestServlet(null, null, null)
+      val desc = servlet.buildDriverDescription(request, 
"spark://master:7077", 6066)
+      assert(desc.command.arguments.slice(3, 5) === expectedArguments)
+    }
+  }
+
   /* --------------------- *
    |     Helper methods    |
    * --------------------- */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to