This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8871f6dfb82 [SPARK-41949][CORE][PYTHON] Make stage scheduling support 
local-cluster mode
8871f6dfb82 is described below

commit 8871f6dfb82acd7f44dfa19fc24c877abe3a2fe3
Author: Weichen Xu <[email protected]>
AuthorDate: Tue Jan 10 21:05:44 2023 +0800

    [SPARK-41949][CORE][PYTHON] Make stage scheduling support local-cluster mode
    
    Signed-off-by: Weichen Xu <weichen.xudatabricks.com>
    
    ### What changes were proposed in this pull request?
    
    Make stage scheduling support local-cluster mode.
    
    ### Why are the changes needed?
    
    This is useful in testing, especially for test code of third-party python 
libraries that depends on pyspark, many tests are written with pytest, but 
pytest is hard to integrate with a standalone spark cluster.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests
    
    Closes #39424 from WeichenXu123/stage-sched-local-cluster.
    
    Authored-by: Weichen Xu <[email protected]>
    Signed-off-by: Weichen Xu <[email protected]>
---
 .../spark/resource/ResourceProfileManager.scala    |  12 +-
 dev/sparktestsupport/modules.py                    |   1 +
 python/pyspark/tests/test_stage_sched.py           | 153 +++++++++++++++++++++
 3 files changed, 161 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index 3f48aaded5c..9f98d4d9c9c 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -54,7 +54,9 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf,
   private val master = sparkConf.getOption("spark.master")
   private val isYarn = master.isDefined && master.get.equals("yarn")
   private val isK8s = master.isDefined && master.get.startsWith("k8s://")
-  private val isStandalone = master.isDefined && 
master.get.startsWith("spark://")
+  private val isStandaloneOrLocalCluster = master.isDefined && (
+      master.get.startsWith("spark://") || 
master.get.startsWith("local-cluster")
+    )
   private val notRunningUnitTests = !isTesting
   private val testExceptionThrown = 
sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)
 
@@ -65,16 +67,16 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf,
    */
   private[spark] def isSupported(rp: ResourceProfile): Boolean = {
     if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
-      if ((notRunningUnitTests || testExceptionThrown) && !isStandalone) {
+      if ((notRunningUnitTests || testExceptionThrown) && 
!isStandaloneOrLocalCluster) {
         throw new SparkException("TaskResourceProfiles are only supported for 
Standalone " +
           "cluster for now when dynamic allocation is disabled.")
       }
     } else {
       val isNotDefaultProfile = rp.id != 
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
       val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
-        isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
+        isNotDefaultProfile && !(isYarn || isK8s || isStandaloneOrLocalCluster)
       val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
-        isNotDefaultProfile && (isYarn || isK8s || isStandalone) && 
!dynamicEnabled
+        isNotDefaultProfile && (isYarn || isK8s || isStandaloneOrLocalCluster) 
&& !dynamicEnabled
 
       // We want the exception to be thrown only when we are specifically 
testing for the
       // exception or in a real application. Otherwise in all other testing 
scenarios we want
@@ -86,7 +88,7 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf,
           "and Standalone with dynamic allocation enabled.")
       }
 
-      if (isStandalone && dynamicEnabled && rp.getExecutorCores.isEmpty &&
+      if (isStandaloneOrLocalCluster && dynamicEnabled && 
rp.getExecutorCores.isEmpty &&
         sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
         logWarning("Neither executor cores is set for resource profile, nor 
spark.executor.cores " +
           "is explicitly set, you may get more executors allocated than 
expected. " +
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 655c539cfae..5df495096b7 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -430,6 +430,7 @@ pyspark_core = Module(
         "pyspark.tests.test_taskcontext",
         "pyspark.tests.test_util",
         "pyspark.tests.test_worker",
+        "pyspark.tests.test_stage_sched",
     ],
 )
 
diff --git a/python/pyspark/tests/test_stage_sched.py 
b/python/pyspark/tests/test_stage_sched.py
new file mode 100644
index 00000000000..56cc0a0b2cd
--- /dev/null
+++ b/python/pyspark/tests/test_stage_sched.py
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import tempfile
+import unittest
+import time
+import shutil
+import json
+
+from pyspark import SparkConf, SparkContext
+from pyspark.resource.profile import ResourceProfileBuilder
+from pyspark.resource.requests import TaskResourceRequests
+from pyspark.taskcontext import TaskContext
+
+
+class StageSchedulingTest(unittest.TestCase):
+    def setUp(self):
+        self.temp_dir = tempfile.mkdtemp()
+
+    def tearDown(self):
+        shutil.rmtree(self.temp_dir, ignore_errors=True)
+        if getattr(self, "sc", None) is not None:
+            self.sc.stop()
+            self.sc = None
+
+    def _test_stage_scheduling(
+        self,
+        cpus_per_worker,
+        gpus_per_worker,
+        num_tasks,
+        resource_profile,
+        expected_max_concurrent_tasks,
+    ):
+        conf = SparkConf()
+        conf.setMaster(f"local-cluster[1,{cpus_per_worker},1024]").set(
+            "spark.task.maxFailures", "1"
+        )
+
+        if gpus_per_worker:
+            worker_res_config_file = os.path.join(self.temp_dir, 
"worker_res.json")
+            worker_res = [
+                {
+                    "id": {
+                        "componentName": "spark.worker",
+                        "resourceName": "gpu",
+                    },
+                    "addresses": [str(i) for i in range(gpus_per_worker)],
+                }
+            ]
+            with open(worker_res_config_file, "w") as fp:
+                json.dump(worker_res, fp)
+
+            conf.set("spark.worker.resource.gpu.amount", str(gpus_per_worker))
+            conf.set("spark.worker.resourcesFile", worker_res_config_file)
+            conf.set("spark.executor.resource.gpu.amount", 
str(gpus_per_worker))
+
+        self.sc = SparkContext(conf=conf)
+        pids_output_dir = os.path.join(self.temp_dir, "pids")
+        os.mkdir(pids_output_dir)
+
+        def mapper(_):
+            task_id = TaskContext.get().partitionId()
+            pid_file_path = os.path.join(pids_output_dir, str(task_id))
+            with open(pid_file_path, mode="w"):
+                pass
+            time.sleep(0.1)
+            num_concurrent_tasks = len(os.listdir(pids_output_dir))
+            time.sleep(1)
+            os.remove(pid_file_path)
+            return num_concurrent_tasks
+
+        results = (
+            self.sc.parallelize(range(num_tasks), num_tasks)
+            .withResources(resource_profile)
+            .map(mapper)
+            .collect()
+        )
+        self.assertEqual(max(results), expected_max_concurrent_tasks)
+
+    def test_stage_scheduling_3_cpu_per_task(self):
+        rp = 
ResourceProfileBuilder().require(TaskResourceRequests().cpus(3)).build
+        self._test_stage_scheduling(
+            cpus_per_worker=4,
+            gpus_per_worker=0,
+            num_tasks=2,
+            resource_profile=rp,
+            expected_max_concurrent_tasks=1,
+        )
+
+    def test_stage_scheduling_2_cpu_per_task(self):
+        rp = 
ResourceProfileBuilder().require(TaskResourceRequests().cpus(2)).build
+        self._test_stage_scheduling(
+            cpus_per_worker=4,
+            gpus_per_worker=0,
+            num_tasks=4,
+            resource_profile=rp,
+            expected_max_concurrent_tasks=2,
+        )
+
+    def test_stage_scheduling_2_cpus_2_gpus_per_task(self):
+        rp = (
+            ResourceProfileBuilder()
+            .require(TaskResourceRequests().cpus(2).resource("gpu", 2))
+            .build
+        )
+        self._test_stage_scheduling(
+            cpus_per_worker=4,
+            gpus_per_worker=4,
+            num_tasks=4,
+            resource_profile=rp,
+            expected_max_concurrent_tasks=2,
+        )
+
+    def test_stage_scheduling_2_cpus_3_gpus_per_task(self):
+        rp = (
+            ResourceProfileBuilder()
+            .require(TaskResourceRequests().cpus(2).resource("gpu", 3))
+            .build
+        )
+        self._test_stage_scheduling(
+            cpus_per_worker=4,
+            gpus_per_worker=4,
+            num_tasks=2,
+            resource_profile=rp,
+            expected_max_concurrent_tasks=1,
+        )
+
+
+if __name__ == "__main__":
+    from pyspark.tests.test_stage_sched import *  # noqa: F401
+
+    try:
+        import xmlrunner
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to