This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 8d810c4ac2d [SPARK-42190][K8S] Support `local` mode in 
`spark.kubernetes.driver.master`
8d810c4ac2d is described below

commit 8d810c4ac2d831d8d178c91403daeb327bc872b7
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Wed Jan 25 20:22:19 2023 -0800

    [SPARK-42190][K8S] Support `local` mode in `spark.kubernetes.driver.master`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support a driver-only K8s Spark job.
    
    ### Why are the changes needed?
    
    Some workloads like SQL DDL operations can be executed by driver only.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is a new feature.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly add test case.
    
    Closes #39748 from dongjoon-hyun/SPARK-42190.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 426b1151fc0b910d6ae9885d74d2717eb4aa7ffd)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 docs/running-on-kubernetes.md                      |  3 ++-
 .../scala/org/apache/spark/deploy/k8s/Config.scala |  3 ++-
 .../cluster/k8s/KubernetesClusterManager.scala     | 29 ++++++++++++++++++++--
 .../k8s/integrationtest/BasicTestsSuite.scala      | 19 +++++++++++++-
 .../k8s/integrationtest/KubernetesSuite.scala      | 10 +++++---
 5 files changed, 55 insertions(+), 9 deletions(-)

diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 9d33c339734..9174697ca97 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -590,7 +590,8 @@ See the [configuration page](configuration.html) for 
information on Spark config
   <td><code>spark.kubernetes.driver.master</code></td>
   <td><code>https://kubernetes.default.svc</code></td>
   <td>
-    The internal Kubernetes master (API server) address to be used for driver 
to request executors.
+    The internal Kubernetes master (API server) address to be used for driver 
to request executors or
+    'local[*]' for driver-pod-only mode.
   </td>
   <td>3.0.0</td>
 </tr>
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index ed766e7050d..e76351f6c02 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -49,7 +49,8 @@ private[spark] object Config extends Logging {
   val KUBERNETES_DRIVER_MASTER_URL =
     ConfigBuilder("spark.kubernetes.driver.master")
       .doc("The internal Kubernetes master (API server) address " +
-        "to be used for driver to request executors.")
+        "to be used for driver to request executors or " +
+        "'local[*]' for driver-only mode.")
       .version("3.0.0")
       .stringConf
       .createWithDefault(KUBERNETES_MASTER_INTERNAL_URL)
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 55d28dc5a73..c01fbf4f9c7 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -21,26 +21,51 @@ import java.io.File
 import io.fabric8.kubernetes.client.Config
 import io.fabric8.kubernetes.client.KubernetesClient
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SparkConf, SparkContext, SparkMasterRegex}
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, 
SparkKubernetesClientFactory}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.TASK_MAX_FAILURES
 import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, 
TaskScheduler, TaskSchedulerImpl}
+import org.apache.spark.scheduler.local.LocalSchedulerBackend
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 
 private[spark] class KubernetesClusterManager extends ExternalClusterManager 
with Logging {
+  import SparkMasterRegex._
 
   override def canCreate(masterURL: String): Boolean = 
masterURL.startsWith("k8s")
 
+  private def isLocal(conf: SparkConf): Boolean =
+    conf.get(KUBERNETES_DRIVER_MASTER_URL).startsWith("local")
+
   override def createTaskScheduler(sc: SparkContext, masterURL: String): 
TaskScheduler = {
-    new TaskSchedulerImpl(sc)
+    val maxTaskFailures = masterURL match {
+      case "local" | LOCAL_N_REGEX(_) => 1
+      case LOCAL_N_FAILURES_REGEX(_, maxFailures) => maxFailures.toInt
+      case _ => sc.conf.get(TASK_MAX_FAILURES)
+    }
+    new TaskSchedulerImpl(sc, maxTaskFailures, isLocal(sc.conf))
   }
 
   override def createSchedulerBackend(
       sc: SparkContext,
       masterURL: String,
       scheduler: TaskScheduler): SchedulerBackend = {
+    if (isLocal(sc.conf)) {
+      def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
+      val threadCount = masterURL match {
+        case LOCAL_N_REGEX(threads) =>
+          if (threads == "*") localCpuCount else 1
+        case LOCAL_N_FAILURES_REGEX(threads, _) =>
+          if (threads == "*") localCpuCount else 1
+        case _ => 1
+      }
+      val schedulerImpl = scheduler.asInstanceOf[TaskSchedulerImpl]
+      val backend = new LocalSchedulerBackend(sc.conf, schedulerImpl, 
threadCount)
+      schedulerImpl.initialize(backend)
+      return backend
+    }
     val wasSparkSubmittedInClusterMode = 
sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK)
     val (authConfPrefix,
       apiServerUri,
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
index a79442ac635..6ae000c80c5 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
@@ -19,10 +19,12 @@ package org.apache.spark.deploy.k8s.integrationtest
 import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model.Pod
-import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
 import org.scalatest.matchers.should.Matchers._
+import org.scalatest.time.{Seconds, Span}
 
 import org.apache.spark.{SparkFunSuite, TestUtils}
+import 
org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.SPARK_PI_MAIN_CLASS
 import org.apache.spark.launcher.SparkLauncher
 
 private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite =>
@@ -31,6 +33,21 @@ private[spark] trait BasicTestsSuite { k8sSuite: 
KubernetesSuite =>
   import KubernetesSuite.{k8sTestTag, localTestTag}
   import KubernetesSuite.{TIMEOUT, INTERVAL}
 
+  test("SPARK-42190: Run SparkPi with local[*]", k8sTestTag) {
+    sparkAppConf.set("spark.kubernetes.driver.master", "local[*]")
+    runSparkApplicationAndVerifyCompletion(
+      containerLocalSparkDistroExamplesJar,
+      SPARK_PI_MAIN_CLASS,
+      Seq("Pi is roughly 3"),
+      Seq(),
+      Array.empty[String],
+      doBasicDriverPodCheck,
+      _ => (),
+      isJVM = true,
+      executorPatience =
+        Some((Some(PatienceConfiguration.Interval(Span(0, Seconds))), None)))
+  }
+
   test("Run SparkPi with no resources", k8sTestTag) {
     runSparkPiAndVerifyCompletion()
   }
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 358a724fda3..78839ee6103 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -465,10 +465,12 @@ class KubernetesSuite extends SparkFunSuite
       .get(0)
     driverPodChecker(driverPod)
 
-    // If we're testing decommissioning we an executors, but we should have an 
executor
-    // at some point.
-    Eventually.eventually(TIMEOUT, patienceInterval) {
-      execPods.values.nonEmpty should be (true)
+    if (patienceInterval.value.toSeconds.toInt > 0) {
+      // If we're testing decommissioning we an executors, but we should have 
an executor
+      // at some point.
+      Eventually.eventually(TIMEOUT, patienceInterval) {
+        execPods.values.nonEmpty should be (true)
+      }
     }
     execPods.values.foreach(executorPodChecker(_))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to