This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 8d810c4ac2d [SPARK-42190][K8S] Support `local` mode in
`spark.kubernetes.driver.master`
8d810c4ac2d is described below
commit 8d810c4ac2d831d8d178c91403daeb327bc872b7
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Jan 25 20:22:19 2023 -0800
[SPARK-42190][K8S] Support `local` mode in `spark.kubernetes.driver.master`
### What changes were proposed in this pull request?
This PR aims to support a driver-only K8s Spark job.
### Why are the changes needed?
Some workloads like SQL DDL operations can be executed by driver only.
### Does this PR introduce _any_ user-facing change?
No. This is a new feature.
### How was this patch tested?
Pass the CIs with the newly add test case.
Closes #39748 from dongjoon-hyun/SPARK-42190.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 426b1151fc0b910d6ae9885d74d2717eb4aa7ffd)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
docs/running-on-kubernetes.md | 3 ++-
.../scala/org/apache/spark/deploy/k8s/Config.scala | 3 ++-
.../cluster/k8s/KubernetesClusterManager.scala | 29 ++++++++++++++++++++--
.../k8s/integrationtest/BasicTestsSuite.scala | 19 +++++++++++++-
.../k8s/integrationtest/KubernetesSuite.scala | 10 +++++---
5 files changed, 55 insertions(+), 9 deletions(-)
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 9d33c339734..9174697ca97 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -590,7 +590,8 @@ See the [configuration page](configuration.html) for
information on Spark config
<td><code>spark.kubernetes.driver.master</code></td>
<td><code>https://kubernetes.default.svc</code></td>
<td>
- The internal Kubernetes master (API server) address to be used for driver
to request executors.
+ The internal Kubernetes master (API server) address to be used for driver
to request executors or
+ 'local[*]' for driver-pod-only mode.
</td>
<td>3.0.0</td>
</tr>
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index ed766e7050d..e76351f6c02 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -49,7 +49,8 @@ private[spark] object Config extends Logging {
val KUBERNETES_DRIVER_MASTER_URL =
ConfigBuilder("spark.kubernetes.driver.master")
.doc("The internal Kubernetes master (API server) address " +
- "to be used for driver to request executors.")
+ "to be used for driver to request executors or " +
+ "'local[*]' for driver-only mode.")
.version("3.0.0")
.stringConf
.createWithDefault(KUBERNETES_MASTER_INTERNAL_URL)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 55d28dc5a73..c01fbf4f9c7 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -21,26 +21,51 @@ import java.io.File
import io.fabric8.kubernetes.client.Config
import io.fabric8.kubernetes.client.KubernetesClient
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SparkConf, SparkContext, SparkMasterRegex}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils,
SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.TASK_MAX_FAILURES
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend,
TaskScheduler, TaskSchedulerImpl}
+import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
private[spark] class KubernetesClusterManager extends ExternalClusterManager
with Logging {
+ import SparkMasterRegex._
override def canCreate(masterURL: String): Boolean =
masterURL.startsWith("k8s")
+ private def isLocal(conf: SparkConf): Boolean =
+ conf.get(KUBERNETES_DRIVER_MASTER_URL).startsWith("local")
+
override def createTaskScheduler(sc: SparkContext, masterURL: String):
TaskScheduler = {
- new TaskSchedulerImpl(sc)
+ val maxTaskFailures = masterURL match {
+ case "local" | LOCAL_N_REGEX(_) => 1
+ case LOCAL_N_FAILURES_REGEX(_, maxFailures) => maxFailures.toInt
+ case _ => sc.conf.get(TASK_MAX_FAILURES)
+ }
+ new TaskSchedulerImpl(sc, maxTaskFailures, isLocal(sc.conf))
}
override def createSchedulerBackend(
sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
+ if (isLocal(sc.conf)) {
+ def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
+ val threadCount = masterURL match {
+ case LOCAL_N_REGEX(threads) =>
+ if (threads == "*") localCpuCount else 1
+ case LOCAL_N_FAILURES_REGEX(threads, _) =>
+ if (threads == "*") localCpuCount else 1
+ case _ => 1
+ }
+ val schedulerImpl = scheduler.asInstanceOf[TaskSchedulerImpl]
+ val backend = new LocalSchedulerBackend(sc.conf, schedulerImpl,
threadCount)
+ schedulerImpl.initialize(backend)
+ return backend
+ }
val wasSparkSubmittedInClusterMode =
sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK)
val (authConfPrefix,
apiServerUri,
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
index a79442ac635..6ae000c80c5 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
@@ -19,10 +19,12 @@ package org.apache.spark.deploy.k8s.integrationtest
import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.Pod
-import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.matchers.should.Matchers._
+import org.scalatest.time.{Seconds, Span}
import org.apache.spark.{SparkFunSuite, TestUtils}
+import
org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.SPARK_PI_MAIN_CLASS
import org.apache.spark.launcher.SparkLauncher
private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite =>
@@ -31,6 +33,21 @@ private[spark] trait BasicTestsSuite { k8sSuite:
KubernetesSuite =>
import KubernetesSuite.{k8sTestTag, localTestTag}
import KubernetesSuite.{TIMEOUT, INTERVAL}
+ test("SPARK-42190: Run SparkPi with local[*]", k8sTestTag) {
+ sparkAppConf.set("spark.kubernetes.driver.master", "local[*]")
+ runSparkApplicationAndVerifyCompletion(
+ containerLocalSparkDistroExamplesJar,
+ SPARK_PI_MAIN_CLASS,
+ Seq("Pi is roughly 3"),
+ Seq(),
+ Array.empty[String],
+ doBasicDriverPodCheck,
+ _ => (),
+ isJVM = true,
+ executorPatience =
+ Some((Some(PatienceConfiguration.Interval(Span(0, Seconds))), None)))
+ }
+
test("Run SparkPi with no resources", k8sTestTag) {
runSparkPiAndVerifyCompletion()
}
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 358a724fda3..78839ee6103 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -465,10 +465,12 @@ class KubernetesSuite extends SparkFunSuite
.get(0)
driverPodChecker(driverPod)
- // If we're testing decommissioning we an executors, but we should have an
executor
- // at some point.
- Eventually.eventually(TIMEOUT, patienceInterval) {
- execPods.values.nonEmpty should be (true)
+ if (patienceInterval.value.toSeconds.toInt > 0) {
+ // If we're testing decommissioning we an executors, but we should have
an executor
+ // at some point.
+ Eventually.eventually(TIMEOUT, patienceInterval) {
+ execPods.values.nonEmpty should be (true)
+ }
}
execPods.values.foreach(executorPodChecker(_))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]