This is an automated email from the ASF dual-hosted git repository.
weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8871f6dfb82 [SPARK-41949][CORE][PYTHON] Make stage scheduling support
local-cluster mode
8871f6dfb82 is described below
commit 8871f6dfb82acd7f44dfa19fc24c877abe3a2fe3
Author: Weichen Xu <[email protected]>
AuthorDate: Tue Jan 10 21:05:44 2023 +0800
[SPARK-41949][CORE][PYTHON] Make stage scheduling support local-cluster mode
Signed-off-by: Weichen Xu <weichen.xudatabricks.com>
### What changes were proposed in this pull request?
Make stage scheduling support local-cluster mode.
### Why are the changes needed?
This is useful in testing, especially for test code of third-party python
libraries that depends on pyspark, many tests are written with pytest, but
pytest is hard to integrate with a standalone spark cluster.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests
Closes #39424 from WeichenXu123/stage-sched-local-cluster.
Authored-by: Weichen Xu <[email protected]>
Signed-off-by: Weichen Xu <[email protected]>
---
.../spark/resource/ResourceProfileManager.scala | 12 +-
dev/sparktestsupport/modules.py | 1 +
python/pyspark/tests/test_stage_sched.py | 153 +++++++++++++++++++++
3 files changed, 161 insertions(+), 5 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index 3f48aaded5c..9f98d4d9c9c 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -54,7 +54,9 @@ private[spark] class ResourceProfileManager(sparkConf:
SparkConf,
private val master = sparkConf.getOption("spark.master")
private val isYarn = master.isDefined && master.get.equals("yarn")
private val isK8s = master.isDefined && master.get.startsWith("k8s://")
- private val isStandalone = master.isDefined &&
master.get.startsWith("spark://")
+ private val isStandaloneOrLocalCluster = master.isDefined && (
+ master.get.startsWith("spark://") ||
master.get.startsWith("local-cluster")
+ )
private val notRunningUnitTests = !isTesting
private val testExceptionThrown =
sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)
@@ -65,16 +67,16 @@ private[spark] class ResourceProfileManager(sparkConf:
SparkConf,
*/
private[spark] def isSupported(rp: ResourceProfile): Boolean = {
if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
- if ((notRunningUnitTests || testExceptionThrown) && !isStandalone) {
+ if ((notRunningUnitTests || testExceptionThrown) &&
!isStandaloneOrLocalCluster) {
throw new SparkException("TaskResourceProfiles are only supported for
Standalone " +
"cluster for now when dynamic allocation is disabled.")
}
} else {
val isNotDefaultProfile = rp.id !=
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
- isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
+ isNotDefaultProfile && !(isYarn || isK8s || isStandaloneOrLocalCluster)
val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
- isNotDefaultProfile && (isYarn || isK8s || isStandalone) &&
!dynamicEnabled
+ isNotDefaultProfile && (isYarn || isK8s || isStandaloneOrLocalCluster)
&& !dynamicEnabled
// We want the exception to be thrown only when we are specifically
testing for the
// exception or in a real application. Otherwise in all other testing
scenarios we want
@@ -86,7 +88,7 @@ private[spark] class ResourceProfileManager(sparkConf:
SparkConf,
"and Standalone with dynamic allocation enabled.")
}
- if (isStandalone && dynamicEnabled && rp.getExecutorCores.isEmpty &&
+ if (isStandaloneOrLocalCluster && dynamicEnabled &&
rp.getExecutorCores.isEmpty &&
sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
logWarning("Neither executor cores is set for resource profile, nor
spark.executor.cores " +
"is explicitly set, you may get more executors allocated than
expected. " +
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 655c539cfae..5df495096b7 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -430,6 +430,7 @@ pyspark_core = Module(
"pyspark.tests.test_taskcontext",
"pyspark.tests.test_util",
"pyspark.tests.test_worker",
+ "pyspark.tests.test_stage_sched",
],
)
diff --git a/python/pyspark/tests/test_stage_sched.py
b/python/pyspark/tests/test_stage_sched.py
new file mode 100644
index 00000000000..56cc0a0b2cd
--- /dev/null
+++ b/python/pyspark/tests/test_stage_sched.py
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import tempfile
+import unittest
+import time
+import shutil
+import json
+
+from pyspark import SparkConf, SparkContext
+from pyspark.resource.profile import ResourceProfileBuilder
+from pyspark.resource.requests import TaskResourceRequests
+from pyspark.taskcontext import TaskContext
+
+
+class StageSchedulingTest(unittest.TestCase):
+ def setUp(self):
+ self.temp_dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.temp_dir, ignore_errors=True)
+ if getattr(self, "sc", None) is not None:
+ self.sc.stop()
+ self.sc = None
+
+ def _test_stage_scheduling(
+ self,
+ cpus_per_worker,
+ gpus_per_worker,
+ num_tasks,
+ resource_profile,
+ expected_max_concurrent_tasks,
+ ):
+ conf = SparkConf()
+ conf.setMaster(f"local-cluster[1,{cpus_per_worker},1024]").set(
+ "spark.task.maxFailures", "1"
+ )
+
+ if gpus_per_worker:
+ worker_res_config_file = os.path.join(self.temp_dir,
"worker_res.json")
+ worker_res = [
+ {
+ "id": {
+ "componentName": "spark.worker",
+ "resourceName": "gpu",
+ },
+ "addresses": [str(i) for i in range(gpus_per_worker)],
+ }
+ ]
+ with open(worker_res_config_file, "w") as fp:
+ json.dump(worker_res, fp)
+
+ conf.set("spark.worker.resource.gpu.amount", str(gpus_per_worker))
+ conf.set("spark.worker.resourcesFile", worker_res_config_file)
+ conf.set("spark.executor.resource.gpu.amount",
str(gpus_per_worker))
+
+ self.sc = SparkContext(conf=conf)
+ pids_output_dir = os.path.join(self.temp_dir, "pids")
+ os.mkdir(pids_output_dir)
+
+ def mapper(_):
+ task_id = TaskContext.get().partitionId()
+ pid_file_path = os.path.join(pids_output_dir, str(task_id))
+ with open(pid_file_path, mode="w"):
+ pass
+ time.sleep(0.1)
+ num_concurrent_tasks = len(os.listdir(pids_output_dir))
+ time.sleep(1)
+ os.remove(pid_file_path)
+ return num_concurrent_tasks
+
+ results = (
+ self.sc.parallelize(range(num_tasks), num_tasks)
+ .withResources(resource_profile)
+ .map(mapper)
+ .collect()
+ )
+ self.assertEqual(max(results), expected_max_concurrent_tasks)
+
+ def test_stage_scheduling_3_cpu_per_task(self):
+ rp =
ResourceProfileBuilder().require(TaskResourceRequests().cpus(3)).build
+ self._test_stage_scheduling(
+ cpus_per_worker=4,
+ gpus_per_worker=0,
+ num_tasks=2,
+ resource_profile=rp,
+ expected_max_concurrent_tasks=1,
+ )
+
+ def test_stage_scheduling_2_cpu_per_task(self):
+ rp =
ResourceProfileBuilder().require(TaskResourceRequests().cpus(2)).build
+ self._test_stage_scheduling(
+ cpus_per_worker=4,
+ gpus_per_worker=0,
+ num_tasks=4,
+ resource_profile=rp,
+ expected_max_concurrent_tasks=2,
+ )
+
+ def test_stage_scheduling_2_cpus_2_gpus_per_task(self):
+ rp = (
+ ResourceProfileBuilder()
+ .require(TaskResourceRequests().cpus(2).resource("gpu", 2))
+ .build
+ )
+ self._test_stage_scheduling(
+ cpus_per_worker=4,
+ gpus_per_worker=4,
+ num_tasks=4,
+ resource_profile=rp,
+ expected_max_concurrent_tasks=2,
+ )
+
+ def test_stage_scheduling_2_cpus_3_gpus_per_task(self):
+ rp = (
+ ResourceProfileBuilder()
+ .require(TaskResourceRequests().cpus(2).resource("gpu", 3))
+ .build
+ )
+ self._test_stage_scheduling(
+ cpus_per_worker=4,
+ gpus_per_worker=4,
+ num_tasks=2,
+ resource_profile=rp,
+ expected_max_concurrent_tasks=1,
+ )
+
+
+if __name__ == "__main__":
+ from pyspark.tests.test_stage_sched import * # noqa: F401
+
+ try:
+ import xmlrunner
+
+ testRunner = xmlrunner.XMLTestRunner(output="target/test-reports",
verbosity=2)
+ except ImportError:
+ testRunner = None
+ unittest.main(testRunner=testRunner, verbosity=2)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]