This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dc84e529ba9 [SPARK-43122][CONNECT][PYTHON][ML][TESTS] Reenable 
TorchDistributorLocalUnitTestsOnConnect and 
TorchDistributorLocalUnitTestsIIOnConnect
dc84e529ba9 is described below

commit dc84e529ba96ba8afc24216e5fc28d95ce8ce290
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Apr 18 11:22:58 2023 +0800

    [SPARK-43122][CONNECT][PYTHON][ML][TESTS] Reenable 
TorchDistributorLocalUnitTestsOnConnect and 
TorchDistributorLocalUnitTestsIIOnConnect
    
    ### What changes were proposed in this pull request?
    `TorchDistributorLocalUnitTestsOnConnect` and 
`TorchDistributorLocalUnitTestsIIOnConnect` were not stable and occasionally 
got stuck. However, I can not reproduce the issue locally.
    
    The two UTs were disabled, and this PR is to reenable them. I found that 
the all the tests for PyTorch set up the regular sessions or connect sessions 
in `setUp` and close them in `tearDown`, however such session operations are 
very expensive and should be placed into `setUpClass` and `tearDownClass` 
instead. After this change, the related tests seems much stable. So I think the 
root cause is still related to the resources, since TorchDistributor works on 
barrier mode, when there is n [...]
    
    ### Why are the changes needed?
    for test coverage
    
    ### Does this PR introduce _any_ user-facing change?
    No, test-only
    
    ### How was this patch tested?
    CI
    
    Closes #40793 from zhengruifeng/torch_reenable.
    
    Lead-authored-by: Ruifeng Zheng <[email protected]>
    Co-authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../tests/connect/test_parity_torch_distributor.py | 111 +++++++------
 python/pyspark/ml/torch/tests/test_distributor.py  | 177 +++++++++++----------
 2 files changed, 158 insertions(+), 130 deletions(-)

diff --git a/python/pyspark/ml/tests/connect/test_parity_torch_distributor.py 
b/python/pyspark/ml/tests/connect/test_parity_torch_distributor.py
index 8f5699afdf2..55ea99a6540 100644
--- a/python/pyspark/ml/tests/connect/test_parity_torch_distributor.py
+++ b/python/pyspark/ml/tests/connect/test_parity_torch_distributor.py
@@ -17,7 +17,6 @@
 
 import os
 import shutil
-import tempfile
 import unittest
 
 have_torch = True
@@ -33,6 +32,9 @@ from pyspark.ml.torch.tests.test_distributor import (
     TorchDistributorLocalUnitTestsMixin,
     TorchDistributorDistributedUnitTestsMixin,
     TorchWrapperUnitTestsMixin,
+    set_up_test_dirs,
+    get_local_mode_conf,
+    get_distributed_mode_conf,
 )
 
 
@@ -40,31 +42,35 @@ from pyspark.ml.torch.tests.test_distributor import (
 class TorchDistributorBaselineUnitTestsOnConnect(
     TorchDistributorBaselineUnitTestsMixin, unittest.TestCase
 ):
-    def setUp(self) -> None:
-        self.spark = SparkSession.builder.remote("local[4]").getOrCreate()
+    @classmethod
+    def setUpClass(cls):
+        cls.spark = SparkSession.builder.remote("local[4]").getOrCreate()
 
-    def tearDown(self) -> None:
-        self.spark.stop()
+    @classmethod
+    def tearDownClass(cls):
+        cls.spark.stop()
 
 
[email protected]("unstable, ignore for now")
[email protected](not have_torch, "torch is required")
 class TorchDistributorLocalUnitTestsOnConnect(
     TorchDistributorLocalUnitTestsMixin, unittest.TestCase
 ):
-    def setUp(self) -> None:
-        class_name = self.__class__.__name__
-        conf = self._get_spark_conf()
-        builder = SparkSession.builder.appName(class_name)
-        for k, v in conf.getAll():
-            if k not in ["spark.master", "spark.remote", "spark.app.name"]:
-                builder = builder.config(k, v)
-        self.spark = builder.remote("local-cluster[2,2,1024]").getOrCreate()
-        self.mnist_dir_path = tempfile.mkdtemp()
-
-    def tearDown(self) -> None:
-        shutil.rmtree(self.mnist_dir_path)
-        os.unlink(self.gpu_discovery_script_file.name)
-        self.spark.stop()
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = 
set_up_test_dirs()
+        builder = SparkSession.builder.appName(cls.__name__)
+        for k, v in get_local_mode_conf().items():
+            builder = builder.config(k, v)
+        builder = builder.config(
+            "spark.driver.resource.gpu.discoveryScript", 
cls.gpu_discovery_script_file_name
+        )
+        cls.spark = builder.remote("local-cluster[2,2,1024]").getOrCreate()
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.mnist_dir_path)
+        os.unlink(cls.gpu_discovery_script_file_name)
+        cls.spark.stop()
 
     def _get_inputs_for_test_local_training_succeeds(self):
         return [
@@ -75,24 +81,27 @@ class TorchDistributorLocalUnitTestsOnConnect(
         ]
 
 
[email protected]("unstable, ignore for now")
[email protected](not have_torch, "torch is required")
 class TorchDistributorLocalUnitTestsIIOnConnect(
     TorchDistributorLocalUnitTestsMixin, unittest.TestCase
 ):
-    def setUp(self) -> None:
-        class_name = self.__class__.__name__
-        conf = self._get_spark_conf()
-        builder = SparkSession.builder.appName(class_name)
-        for k, v in conf.getAll():
-            if k not in ["spark.master", "spark.remote", "spark.app.name"]:
-                builder = builder.config(k, v)
-        self.spark = builder.remote("local[4]").getOrCreate()
-        self.mnist_dir_path = tempfile.mkdtemp()
-
-    def tearDown(self) -> None:
-        shutil.rmtree(self.mnist_dir_path)
-        os.unlink(self.gpu_discovery_script_file.name)
-        self.spark.stop()
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = 
set_up_test_dirs()
+        builder = SparkSession.builder.appName(cls.__name__)
+        for k, v in get_local_mode_conf().items():
+            builder = builder.config(k, v)
+
+        builder = builder.config(
+            "spark.driver.resource.gpu.discoveryScript", 
cls.gpu_discovery_script_file_name
+        )
+        cls.spark = builder.remote("local[4]").getOrCreate()
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.mnist_dir_path)
+        os.unlink(cls.gpu_discovery_script_file_name)
+        cls.spark.stop()
 
     def _get_inputs_for_test_local_training_succeeds(self):
         return [
@@ -107,21 +116,23 @@ class TorchDistributorLocalUnitTestsIIOnConnect(
 class TorchDistributorDistributedUnitTestsOnConnect(
     TorchDistributorDistributedUnitTestsMixin, unittest.TestCase
 ):
-    def setUp(self) -> None:
-        class_name = self.__class__.__name__
-        conf = self._get_spark_conf()
-        builder = SparkSession.builder.appName(class_name)
-        for k, v in conf.getAll():
-            if k not in ["spark.master", "spark.remote", "spark.app.name"]:
-                builder = builder.config(k, v)
-
-        self.spark = builder.remote("local-cluster[2,2,1024]").getOrCreate()
-        self.mnist_dir_path = tempfile.mkdtemp()
-
-    def tearDown(self) -> None:
-        shutil.rmtree(self.mnist_dir_path)
-        os.unlink(self.gpu_discovery_script_file.name)
-        self.spark.stop()
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = 
set_up_test_dirs()
+        builder = SparkSession.builder.appName(cls.__name__)
+        for k, v in get_distributed_mode_conf().items():
+            builder = builder.config(k, v)
+
+        builder = builder.config(
+            "spark.worker.resource.gpu.discoveryScript", 
cls.gpu_discovery_script_file_name
+        )
+        cls.spark = builder.remote("local-cluster[2,2,1024]").getOrCreate()
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.mnist_dir_path)
+        os.unlink(cls.gpu_discovery_script_file_name)
+        cls.spark.stop()
 
 
 @unittest.skipIf(not have_torch, "torch is required")
diff --git a/python/pyspark/ml/torch/tests/test_distributor.py 
b/python/pyspark/ml/torch/tests/test_distributor.py
index e05b4b23817..ebd859031bd 100644
--- a/python/pyspark/ml/torch/tests/test_distributor.py
+++ b/python/pyspark/ml/torch/tests/test_distributor.py
@@ -122,6 +122,45 @@ def create_training_function(mnist_dir_path: str) -> 
Callable:
     return train_fn
 
 
+def set_up_test_dirs():
+    gpu_discovery_script_file = tempfile.NamedTemporaryFile(delete=False)
+    gpu_discovery_script_file_name = gpu_discovery_script_file.name
+    try:
+        gpu_discovery_script_file.write(
+            b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": 
[\\"0\\",\\"1\\",\\"2\\"]}'
+        )
+    finally:
+        gpu_discovery_script_file.close()
+
+    # create temporary directory for Worker resources coordination
+    tempdir = tempfile.NamedTemporaryFile(delete=False)
+    os.unlink(tempdir.name)
+    os.chmod(
+        gpu_discovery_script_file_name,
+        stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH | 
stat.S_IXOTH,
+    )
+    mnist_dir_path = tempfile.mkdtemp()
+
+    return (gpu_discovery_script_file_name, mnist_dir_path)
+
+
+def get_local_mode_conf():
+    return {
+        "spark.test.home": SPARK_HOME,
+        "spark.driver.resource.gpu.amount": "3",
+    }
+
+
+def get_distributed_mode_conf():
+    return {
+        "spark.test.home": SPARK_HOME,
+        "spark.worker.resource.gpu.amount": "3",
+        "spark.task.cpus": "2",
+        "spark.task.resource.gpu.amount": "1",
+        "spark.executor.resource.gpu.amount": "1",
+    }
+
+
 class TorchDistributorBaselineUnitTestsMixin:
     def setup_env_vars(self, input_map: Dict[str, str]) -> None:
         for key, value in input_map.items():
@@ -271,37 +310,18 @@ class TorchDistributorBaselineUnitTestsMixin:
 
 @unittest.skipIf(not have_torch, "torch is required")
 class 
TorchDistributorBaselineUnitTests(TorchDistributorBaselineUnitTestsMixin, 
unittest.TestCase):
-    def setUp(self) -> None:
+    @classmethod
+    def setUpClass(cls):
         conf = SparkConf()
         sc = SparkContext("local[4]", conf=conf)
-        self.spark = SparkSession(sc)
+        cls.spark = SparkSession(sc)
 
-    def tearDown(self) -> None:
-        self.spark.stop()
+    @classmethod
+    def tearDownClass(cls):
+        cls.spark.stop()
 
 
 class TorchDistributorLocalUnitTestsMixin:
-    def _get_spark_conf(self) -> SparkConf:
-        self.gpu_discovery_script_file = 
tempfile.NamedTemporaryFile(delete=False)
-        self.gpu_discovery_script_file.write(
-            b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": 
[\\"0\\",\\"1\\",\\"2\\"]}'
-        )
-        self.gpu_discovery_script_file.close()
-        # create temporary directory for Worker resources coordination
-        self.tempdir = tempfile.NamedTemporaryFile(delete=False)
-        os.unlink(self.tempdir.name)
-        os.chmod(
-            self.gpu_discovery_script_file.name,
-            stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH | 
stat.S_IXOTH,
-        )
-
-        conf = SparkConf().set("spark.test.home", SPARK_HOME)
-        conf = conf.set("spark.driver.resource.gpu.amount", "3")
-        conf = conf.set(
-            "spark.driver.resource.gpu.discoveryScript", 
self.gpu_discovery_script_file.name
-        )
-        return conf
-
     def setup_env_vars(self, input_map: Dict[str, str]) -> None:
         for key, value in input_map.items():
             os.environ[key] = value
@@ -382,59 +402,49 @@ class TorchDistributorLocalUnitTestsMixin:
 
 @unittest.skipIf(not have_torch, "torch is required")
 class TorchDistributorLocalUnitTests(TorchDistributorLocalUnitTestsMixin, 
unittest.TestCase):
-    def setUp(self) -> None:
-        class_name = self.__class__.__name__
-        conf = self._get_spark_conf()
-        sc = SparkContext("local-cluster[2,2,1024]", class_name, conf=conf)
-        self.spark = SparkSession(sc)
-        self.mnist_dir_path = tempfile.mkdtemp()
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = 
set_up_test_dirs()
+        conf = SparkConf()
+        for k, v in get_local_mode_conf().items():
+            conf = conf.set(k, v)
+        conf = conf.set(
+            "spark.driver.resource.gpu.discoveryScript", 
cls.gpu_discovery_script_file_name
+        )
+
+        sc = SparkContext("local-cluster[2,2,1024]", cls.__name__, conf=conf)
+        cls.spark = SparkSession(sc)
 
-    def tearDown(self) -> None:
-        shutil.rmtree(self.mnist_dir_path)
-        os.unlink(self.gpu_discovery_script_file.name)
-        self.spark.stop()
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.mnist_dir_path)
+        os.unlink(cls.gpu_discovery_script_file_name)
+        cls.spark.stop()
 
 
 @unittest.skipIf(not have_torch, "torch is required")
 class TorchDistributorLocalUnitTestsII(TorchDistributorLocalUnitTestsMixin, 
unittest.TestCase):
-    def setUp(self) -> None:
-        class_name = self.__class__.__name__
-        conf = self._get_spark_conf()
-        sc = SparkContext("local[4]", class_name, conf=conf)
-        self.spark = SparkSession(sc)
-        self.mnist_dir_path = tempfile.mkdtemp()
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = 
set_up_test_dirs()
+        conf = SparkConf()
+        for k, v in get_local_mode_conf().items():
+            conf = conf.set(k, v)
+        conf = conf.set(
+            "spark.driver.resource.gpu.discoveryScript", 
cls.gpu_discovery_script_file_name
+        )
 
-    def tearDown(self) -> None:
-        shutil.rmtree(self.mnist_dir_path)
-        os.unlink(self.gpu_discovery_script_file.name)
-        self.spark.stop()
+        sc = SparkContext("local[4]", cls.__name__, conf=conf)
+        cls.spark = SparkSession(sc)
 
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.mnist_dir_path)
+        os.unlink(cls.gpu_discovery_script_file_name)
+        cls.spark.stop()
 
-class TorchDistributorDistributedUnitTestsMixin:
-    def _get_spark_conf(self) -> SparkConf:
-        self.gpu_discovery_script_file = 
tempfile.NamedTemporaryFile(delete=False)
-        self.gpu_discovery_script_file.write(
-            b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": 
[\\"0\\",\\"1\\",\\"2\\"]}'
-        )
-        self.gpu_discovery_script_file.close()
-        # create temporary directory for Worker resources coordination
-        tempdir = tempfile.NamedTemporaryFile(delete=False)
-        os.unlink(tempdir.name)
-        os.chmod(
-            self.gpu_discovery_script_file.name,
-            stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH | 
stat.S_IXOTH,
-        )
-
-        conf = SparkConf().set("spark.test.home", SPARK_HOME)
-        conf = conf.set(
-            "spark.worker.resource.gpu.discoveryScript", 
self.gpu_discovery_script_file.name
-        )
-        conf = conf.set("spark.worker.resource.gpu.amount", "3")
-        conf = conf.set("spark.task.cpus", "2")
-        conf = conf.set("spark.task.resource.gpu.amount", "1")
-        conf = conf.set("spark.executor.resource.gpu.amount", "1")
-        return conf
 
+class TorchDistributorDistributedUnitTestsMixin:
     def test_dist_training_succeeds(self) -> None:
         CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
         inputs = [
@@ -482,17 +492,24 @@ class TorchDistributorDistributedUnitTestsMixin:
 class TorchDistributorDistributedUnitTests(
     TorchDistributorDistributedUnitTestsMixin, unittest.TestCase
 ):
-    def setUp(self) -> None:
-        class_name = self.__class__.__name__
-        conf = self._get_spark_conf()
-        sc = SparkContext("local-cluster[2,2,1024]", class_name, conf=conf)
-        self.spark = SparkSession(sc)
-        self.mnist_dir_path = tempfile.mkdtemp()
-
-    def tearDown(self) -> None:
-        shutil.rmtree(self.mnist_dir_path)
-        os.unlink(self.gpu_discovery_script_file.name)
-        self.spark.stop()
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = 
set_up_test_dirs()
+        conf = SparkConf()
+        for k, v in get_distributed_mode_conf().items():
+            conf = conf.set(k, v)
+        conf = conf.set(
+            "spark.worker.resource.gpu.discoveryScript", 
cls.gpu_discovery_script_file_name
+        )
+
+        sc = SparkContext("local-cluster[2,2,1024]", cls.__name__, conf=conf)
+        cls.spark = SparkSession(sc)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.mnist_dir_path)
+        os.unlink(cls.gpu_discovery_script_file_name)
+        cls.spark.stop()
 
 
 class TorchWrapperUnitTestsMixin:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to