This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 17f84358fbf [SPARK-45948][K8S] Make single-pod spark jobs respect `spark.app.id` 17f84358fbf is described below commit 17f84358fbfb39ab048862e92ac0562fe7443ca1 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Thu Nov 16 00:31:39 2023 -0800 [SPARK-45948][K8S] Make single-pod spark jobs respect `spark.app.id` ### What changes were proposed in this pull request? This PR aims to make single-pod Spark jobs respect `spark.app.id` in K8s environment. ### Why are the changes needed? Since Apache Spark 3.4.0, SPARK-42190 allows users to run single-pod Spark jobs in K8s environment by utilizing `LocalSchedulerBackend` in the driver pod. However, `LocalSchedulerBackend` doesn't respect `spark.app.id` while `KubernetesClusterSchedulerBackend` does. This PR aims to improve K8s UX by reducing the behavior difference between single-pod Spark jobs and multi-pod Spark jobs in K8s environment. ### Does this PR introduce _any_ user-facing change? Yes, but it's more consistent with the existing general K8s jobs. ### How was this patch tested? Pass the CIs with the newly added test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43833 from dongjoon-hyun/SPARK-45948. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../cluster/k8s/KubernetesClusterManager.scala | 5 ++++- .../cluster/k8s/KubernetesClusterManagerSuite.scala | 21 +++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index ec5cce239ef..3235d922204 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -63,7 +63,10 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit } logInfo(s"Running Spark with ${sc.conf.get(KUBERNETES_DRIVER_MASTER_URL)}") val schedulerImpl = scheduler.asInstanceOf[TaskSchedulerImpl] - val backend = new LocalSchedulerBackend(sc.conf, schedulerImpl, threadCount) + // KubernetesClusterSchedulerBackend respects `spark.app.id` while LocalSchedulerBackend + // does not. Propagate `spark.app.id` via `spark.test.appId` to match the behavior. + val conf = sc.conf.getOption("spark.app.id").map(sc.conf.set("spark.test.appId", _)) + val backend = new LocalSchedulerBackend(conf.getOrElse(sc.conf), schedulerImpl, threadCount) schedulerImpl.initialize(backend) return backend } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala index 8f999a4cfe8..07410b6a7b7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala @@ -20,10 +20,13 @@ import io.fabric8.kubernetes.client.KubernetesClient import org.mockito.{Mock, MockitoAnnotations} import org.mockito.Mockito.when import org.scalatest.BeforeAndAfter +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.internal.config._ +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.local.LocalSchedulerBackend class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter { @@ -59,4 +62,22 @@ class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter { manager.makeExecutorPodsAllocator(sc, kubernetesClient, null) } } + + test("SPARK-45948: Single-pod Spark jobs respect spark.app.id") { + val conf = new SparkConf() + conf.set(KUBERNETES_DRIVER_MASTER_URL, "local[2]") + when(sc.conf).thenReturn(conf) + val scheduler = mock[TaskSchedulerImpl] + when(scheduler.sc).thenReturn(sc) + val manager = new KubernetesClusterManager() + + val backend1 = manager.createSchedulerBackend(sc, "", scheduler) + assert(backend1.isInstanceOf[LocalSchedulerBackend]) + assert(backend1.applicationId().startsWith("local-")) + + conf.set("spark.app.id", "user-app-id") + val backend2 = manager.createSchedulerBackend(sc, "", scheduler) + assert(backend2.isInstanceOf[LocalSchedulerBackend]) + assert(backend2.applicationId() === "user-app-id") + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org