This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 516cb7f6533 [SPARK-42190][K8S][FOLLOWUP] Fix to use the user-given
number of threads
516cb7f6533 is described below
commit 516cb7f6533d028dff1e7e54806c7333e8420019
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Fri Jan 27 02:00:27 2023 -0800
[SPARK-42190][K8S][FOLLOWUP] Fix to use the user-given number of threads
### What changes were proposed in this pull request?
This is a follow-up PR to fix to use the user-given number of threads.
### Why are the changes needed?
Previously, it always uses `1` core due to this bug.
After this fix, the users can see.
```
23/01/27 08:42:45 INFO KubernetesClusterManager: Running Spark with
local[10]
```

### Does this PR introduce _any_ user-facing change?
No. This is not released yet.
### How was this patch tested?
Pass the CI with newly added test case.
Closes #39769 from dongjoon-hyun/SPARK-42190-2.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/scheduler/cluster/k8s/KubernetesClusterManager.scala | 9 +++++----
.../spark/deploy/k8s/integrationtest/BasicTestsSuite.scala | 4 ++--
2 files changed, 7 insertions(+), 6 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index c01fbf4f9c7..fb0783239c6 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -40,7 +40,7 @@ private[spark] class KubernetesClusterManager extends
ExternalClusterManager wit
conf.get(KUBERNETES_DRIVER_MASTER_URL).startsWith("local")
override def createTaskScheduler(sc: SparkContext, masterURL: String):
TaskScheduler = {
- val maxTaskFailures = masterURL match {
+ val maxTaskFailures = sc.conf.get(KUBERNETES_DRIVER_MASTER_URL) match {
case "local" | LOCAL_N_REGEX(_) => 1
case LOCAL_N_FAILURES_REGEX(_, maxFailures) => maxFailures.toInt
case _ => sc.conf.get(TASK_MAX_FAILURES)
@@ -54,13 +54,14 @@ private[spark] class KubernetesClusterManager extends
ExternalClusterManager wit
scheduler: TaskScheduler): SchedulerBackend = {
if (isLocal(sc.conf)) {
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
- val threadCount = masterURL match {
+ val threadCount = sc.conf.get(KUBERNETES_DRIVER_MASTER_URL) match {
case LOCAL_N_REGEX(threads) =>
- if (threads == "*") localCpuCount else 1
+ if (threads == "*") localCpuCount else threads.toInt
case LOCAL_N_FAILURES_REGEX(threads, _) =>
- if (threads == "*") localCpuCount else 1
+ if (threads == "*") localCpuCount else threads.toInt
case _ => 1
}
+ logInfo(s"Running Spark with
${sc.conf.get(KUBERNETES_DRIVER_MASTER_URL)}")
val schedulerImpl = scheduler.asInstanceOf[TaskSchedulerImpl]
val backend = new LocalSchedulerBackend(sc.conf, schedulerImpl,
threadCount)
schedulerImpl.initialize(backend)
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
index 6ae000c80c5..1f48d796067 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
@@ -34,11 +34,11 @@ private[spark] trait BasicTestsSuite { k8sSuite:
KubernetesSuite =>
import KubernetesSuite.{TIMEOUT, INTERVAL}
test("SPARK-42190: Run SparkPi with local[*]", k8sTestTag) {
- sparkAppConf.set("spark.kubernetes.driver.master", "local[*]")
+ sparkAppConf.set("spark.kubernetes.driver.master", "local[10]")
runSparkApplicationAndVerifyCompletion(
containerLocalSparkDistroExamplesJar,
SPARK_PI_MAIN_CLASS,
- Seq("Pi is roughly 3"),
+ Seq("local[10]", "Pi is roughly 3"),
Seq(),
Array.empty[String],
doBasicDriverPodCheck,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]