This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 17f84358fbf [SPARK-45948][K8S] Make single-pod spark jobs respect 
`spark.app.id`
17f84358fbf is described below

commit 17f84358fbfb39ab048862e92ac0562fe7443ca1
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Thu Nov 16 00:31:39 2023 -0800

    [SPARK-45948][K8S] Make single-pod spark jobs respect `spark.app.id`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to make single-pod Spark jobs respect `spark.app.id` in K8s 
environment.
    
    ### Why are the changes needed?
    
    Since Apache Spark 3.4.0, SPARK-42190 allows users to run single-pod Spark 
jobs in K8s environment by utilizing `LocalSchedulerBackend` in the driver pod. 
However, `LocalSchedulerBackend` doesn't respect `spark.app.id` while 
`KubernetesClusterSchedulerBackend` does. This PR aims to improve K8s UX by 
reducing the behavior difference between single-pod Spark jobs and multi-pod 
Spark jobs in K8s environment.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but it's more consistent with the existing general K8s jobs.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43833 from dongjoon-hyun/SPARK-45948.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../cluster/k8s/KubernetesClusterManager.scala      |  5 ++++-
 .../cluster/k8s/KubernetesClusterManagerSuite.scala | 21 +++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index ec5cce239ef..3235d922204 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -63,7 +63,10 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
       }
       logInfo(s"Running Spark with 
${sc.conf.get(KUBERNETES_DRIVER_MASTER_URL)}")
       val schedulerImpl = scheduler.asInstanceOf[TaskSchedulerImpl]
-      val backend = new LocalSchedulerBackend(sc.conf, schedulerImpl, 
threadCount)
+      // KubernetesClusterSchedulerBackend respects `spark.app.id` while 
LocalSchedulerBackend
+      // does not. Propagate `spark.app.id` via `spark.test.appId` to match 
the behavior.
+      val conf = 
sc.conf.getOption("spark.app.id").map(sc.conf.set("spark.test.appId", _))
+      val backend = new LocalSchedulerBackend(conf.getOrElse(sc.conf), 
schedulerImpl, threadCount)
       schedulerImpl.initialize(backend)
       return backend
     }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
index 8f999a4cfe8..07410b6a7b7 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
@@ -20,10 +20,13 @@ import io.fabric8.kubernetes.client.KubernetesClient
 import org.mockito.{Mock, MockitoAnnotations}
 import org.mockito.Mockito.when
 import org.scalatest.BeforeAndAfter
+import org.scalatestplus.mockito.MockitoSugar.mock
 
 import org.apache.spark._
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.internal.config._
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.local.LocalSchedulerBackend
 
 class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter {
 
@@ -59,4 +62,22 @@ class KubernetesClusterManagerSuite extends SparkFunSuite 
with BeforeAndAfter {
       manager.makeExecutorPodsAllocator(sc, kubernetesClient, null)
     }
   }
+
+  test("SPARK-45948: Single-pod Spark jobs respect spark.app.id") {
+    val conf = new SparkConf()
+    conf.set(KUBERNETES_DRIVER_MASTER_URL, "local[2]")
+    when(sc.conf).thenReturn(conf)
+    val scheduler = mock[TaskSchedulerImpl]
+    when(scheduler.sc).thenReturn(sc)
+    val manager = new KubernetesClusterManager()
+
+    val backend1 = manager.createSchedulerBackend(sc, "", scheduler)
+    assert(backend1.isInstanceOf[LocalSchedulerBackend])
+    assert(backend1.applicationId().startsWith("local-"))
+
+    conf.set("spark.app.id", "user-app-id")
+    val backend2 = manager.createSchedulerBackend(sc, "", scheduler)
+    assert(backend2.isInstanceOf[LocalSchedulerBackend])
+    assert(backend2.applicationId() === "user-app-id")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to