This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ce899404f59d [SPARK-50195][CORE] Fix `StandaloneRestServer` to
propagate `spark.app.name` to `SparkSubmit` properly
ce899404f59d is described below
commit ce899404f59d63b83e24ad24d65628676f85c773
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Oct 31 20:29:49 2024 -0700
[SPARK-50195][CORE] Fix `StandaloneRestServer` to propagate
`spark.app.name` to `SparkSubmit` properly
### What changes were proposed in this pull request?
This PR aims to fix `StandaloneRestServer` to propagate `spark.app.name` to
`SparkSubmit` properly.
### Why are the changes needed?
This is a long-standing bug which PySpark job didn't get a proper
`spark.app.name` propagation unlike Scala/Java Spark jobs. Since PySpark jobs
are invoked indirectly via `SparkSubmit`, we need to hand over `spark.app.name`
via `-c` configuration.
### Does this PR introduce _any_ user-facing change?
This is a bug fix. The new behavior is the expected bahavior.
### How was this patch tested?
Pass the CIs with the newly added test case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48729 from dongjoon-hyun/SPARK-50195.
Lead-authored-by: Dongjoon Hyun <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/deploy/rest/StandaloneRestServer.scala | 11 +++++++++--
.../spark/deploy/rest/StandaloneRestSubmitSuite.scala | 15 +++++++++++++++
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index c92e79381ca9..5e29199a352f 100644
---
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -22,7 +22,7 @@ import java.io.File
import jakarta.servlet.http.HttpServletResponse
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
-import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
+import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription,
SparkSubmit}
import org.apache.spark.deploy.ClientArguments._
import org.apache.spark.internal.config
import org.apache.spark.launcher.{JavaModuleOptions, SparkLauncher}
@@ -238,9 +238,16 @@ private[rest] class StandaloneSubmitRequestServlet(
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaModuleOptions =
JavaModuleOptions.defaultModuleOptionArray().toImmutableArraySeq
val javaOpts = javaModuleOptions ++ sparkJavaOpts ++ defaultJavaOpts ++
extraJavaOpts
+ val sparkSubmitOpts = if (mainClass.equals(classOf[SparkSubmit].getName)) {
+ sparkProperties.get("spark.app.name")
+ .map { v => Seq("-c", s"spark.app.name=$v") }
+ .getOrElse(Seq.empty[String])
+ } else {
+ Seq.empty[String]
+ }
val command = new Command(
"org.apache.spark.deploy.worker.DriverWrapper",
- Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to
the DriverWrapper
+ Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ sparkSubmitOpts ++
appArgs,
environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
val actualDriverMemory =
driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)
diff --git
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 9303d97de330..4a05aab01cb5 100644
---
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -530,6 +530,21 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
assert(conn.getResponseCode === HttpServletResponse.SC_FORBIDDEN)
}
+ test("SPARK-50195: Fix StandaloneRestServer to propagate app name to
SparkSubmit properly") {
+ Seq((classOf[SparkSubmit].getName, Seq("-c", "spark.app.name=app1")),
+ ("", Seq.empty)).foreach { case (mainClass, expectedArguments) =>
+ val request = new CreateSubmissionRequest
+ request.appResource = ""
+ request.mainClass = mainClass
+ request.appArgs = Array.empty[String]
+ request.sparkProperties = Map("spark.app.name" -> "app1")
+ request.environmentVariables = Map.empty[String, String]
+ val servlet = new StandaloneSubmitRequestServlet(null, null, null)
+ val desc = servlet.buildDriverDescription(request,
"spark://master:7077", 6066)
+ assert(desc.command.arguments.slice(3, 5) === expectedArguments)
+ }
+ }
+
/* --------------------- *
| Helper methods |
* --------------------- */
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]