This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 17f84358fbf [SPARK-45948][K8S] Make single-pod spark jobs respect
`spark.app.id`
17f84358fbf is described below
commit 17f84358fbfb39ab048862e92ac0562fe7443ca1
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Nov 16 00:31:39 2023 -0800
[SPARK-45948][K8S] Make single-pod spark jobs respect `spark.app.id`
### What changes were proposed in this pull request?
This PR aims to make single-pod Spark jobs respect `spark.app.id` in K8s
environment.
### Why are the changes needed?
Since Apache Spark 3.4.0, SPARK-42190 allows users to run single-pod Spark
jobs in K8s environment by utilizing `LocalSchedulerBackend` in the driver pod.
However, `LocalSchedulerBackend` doesn't respect `spark.app.id` while
`KubernetesClusterSchedulerBackend` does. This PR aims to improve K8s UX by
reducing the behavior difference between single-pod Spark jobs and multi-pod
Spark jobs in K8s environment.
### Does this PR introduce _any_ user-facing change?
Yes, but it's more consistent with the existing general K8s jobs.
### How was this patch tested?
Pass the CIs with the newly added test case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43833 from dongjoon-hyun/SPARK-45948.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../cluster/k8s/KubernetesClusterManager.scala | 5 ++++-
.../cluster/k8s/KubernetesClusterManagerSuite.scala | 21 +++++++++++++++++++++
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index ec5cce239ef..3235d922204 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -63,7 +63,10 @@ private[spark] class KubernetesClusterManager extends
ExternalClusterManager wit
}
logInfo(s"Running Spark with
${sc.conf.get(KUBERNETES_DRIVER_MASTER_URL)}")
val schedulerImpl = scheduler.asInstanceOf[TaskSchedulerImpl]
- val backend = new LocalSchedulerBackend(sc.conf, schedulerImpl,
threadCount)
+ // KubernetesClusterSchedulerBackend respects `spark.app.id` while
LocalSchedulerBackend
+ // does not. Propagate `spark.app.id` via `spark.test.appId` to match
the behavior.
+ val conf =
sc.conf.getOption("spark.app.id").map(sc.conf.set("spark.test.appId", _))
+ val backend = new LocalSchedulerBackend(conf.getOrElse(sc.conf),
schedulerImpl, threadCount)
schedulerImpl.initialize(backend)
return backend
}
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
index 8f999a4cfe8..07410b6a7b7 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
@@ -20,10 +20,13 @@ import io.fabric8.kubernetes.client.KubernetesClient
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Mockito.when
import org.scalatest.BeforeAndAfter
+import org.scalatestplus.mockito.MockitoSugar.mock
import org.apache.spark._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.internal.config._
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.local.LocalSchedulerBackend
class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter {
@@ -59,4 +62,22 @@ class KubernetesClusterManagerSuite extends SparkFunSuite
with BeforeAndAfter {
manager.makeExecutorPodsAllocator(sc, kubernetesClient, null)
}
}
+
+ test("SPARK-45948: Single-pod Spark jobs respect spark.app.id") {
+ val conf = new SparkConf()
+ conf.set(KUBERNETES_DRIVER_MASTER_URL, "local[2]")
+ when(sc.conf).thenReturn(conf)
+ val scheduler = mock[TaskSchedulerImpl]
+ when(scheduler.sc).thenReturn(sc)
+ val manager = new KubernetesClusterManager()
+
+ val backend1 = manager.createSchedulerBackend(sc, "", scheduler)
+ assert(backend1.isInstanceOf[LocalSchedulerBackend])
+ assert(backend1.applicationId().startsWith("local-"))
+
+ conf.set("spark.app.id", "user-app-id")
+ val backend2 = manager.createSchedulerBackend(sc, "", scheduler)
+ assert(backend2.isInstanceOf[LocalSchedulerBackend])
+ assert(backend2.applicationId() === "user-app-id")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]