This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dc84e529ba9 [SPARK-43122][CONNECT][PYTHON][ML][TESTS] Reenable
TorchDistributorLocalUnitTestsOnConnect and
TorchDistributorLocalUnitTestsIIOnConnect
dc84e529ba9 is described below
commit dc84e529ba96ba8afc24216e5fc28d95ce8ce290
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Apr 18 11:22:58 2023 +0800
[SPARK-43122][CONNECT][PYTHON][ML][TESTS] Reenable
TorchDistributorLocalUnitTestsOnConnect and
TorchDistributorLocalUnitTestsIIOnConnect
### What changes were proposed in this pull request?
`TorchDistributorLocalUnitTestsOnConnect` and
`TorchDistributorLocalUnitTestsIIOnConnect` were not stable and occasionally
got stuck. However, I can not reproduce the issue locally.
The two UTs were disabled, and this PR is to reenable them. I found that
the all the tests for PyTorch set up the regular sessions or connect sessions
in `setUp` and close them in `tearDown`, however such session operations are
very expensive and should be placed into `setUpClass` and `tearDownClass`
instead. After this change, the related tests seems much stable. So I think the
root cause is still related to the resources, since TorchDistributor works on
barrier mode, when there is n [...]
### Why are the changes needed?
for test coverage
### Does this PR introduce _any_ user-facing change?
No, test-only
### How was this patch tested?
CI
Closes #40793 from zhengruifeng/torch_reenable.
Lead-authored-by: Ruifeng Zheng <[email protected]>
Co-authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../tests/connect/test_parity_torch_distributor.py | 111 +++++++------
python/pyspark/ml/torch/tests/test_distributor.py | 177 +++++++++++----------
2 files changed, 158 insertions(+), 130 deletions(-)
diff --git a/python/pyspark/ml/tests/connect/test_parity_torch_distributor.py
b/python/pyspark/ml/tests/connect/test_parity_torch_distributor.py
index 8f5699afdf2..55ea99a6540 100644
--- a/python/pyspark/ml/tests/connect/test_parity_torch_distributor.py
+++ b/python/pyspark/ml/tests/connect/test_parity_torch_distributor.py
@@ -17,7 +17,6 @@
import os
import shutil
-import tempfile
import unittest
have_torch = True
@@ -33,6 +32,9 @@ from pyspark.ml.torch.tests.test_distributor import (
TorchDistributorLocalUnitTestsMixin,
TorchDistributorDistributedUnitTestsMixin,
TorchWrapperUnitTestsMixin,
+ set_up_test_dirs,
+ get_local_mode_conf,
+ get_distributed_mode_conf,
)
@@ -40,31 +42,35 @@ from pyspark.ml.torch.tests.test_distributor import (
class TorchDistributorBaselineUnitTestsOnConnect(
TorchDistributorBaselineUnitTestsMixin, unittest.TestCase
):
- def setUp(self) -> None:
- self.spark = SparkSession.builder.remote("local[4]").getOrCreate()
+ @classmethod
+ def setUpClass(cls):
+ cls.spark = SparkSession.builder.remote("local[4]").getOrCreate()
- def tearDown(self) -> None:
- self.spark.stop()
+ @classmethod
+ def tearDownClass(cls):
+ cls.spark.stop()
[email protected]("unstable, ignore for now")
[email protected](not have_torch, "torch is required")
class TorchDistributorLocalUnitTestsOnConnect(
TorchDistributorLocalUnitTestsMixin, unittest.TestCase
):
- def setUp(self) -> None:
- class_name = self.__class__.__name__
- conf = self._get_spark_conf()
- builder = SparkSession.builder.appName(class_name)
- for k, v in conf.getAll():
- if k not in ["spark.master", "spark.remote", "spark.app.name"]:
- builder = builder.config(k, v)
- self.spark = builder.remote("local-cluster[2,2,1024]").getOrCreate()
- self.mnist_dir_path = tempfile.mkdtemp()
-
- def tearDown(self) -> None:
- shutil.rmtree(self.mnist_dir_path)
- os.unlink(self.gpu_discovery_script_file.name)
- self.spark.stop()
+ @classmethod
+ def setUpClass(cls):
+ (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) =
set_up_test_dirs()
+ builder = SparkSession.builder.appName(cls.__name__)
+ for k, v in get_local_mode_conf().items():
+ builder = builder.config(k, v)
+ builder = builder.config(
+ "spark.driver.resource.gpu.discoveryScript",
cls.gpu_discovery_script_file_name
+ )
+ cls.spark = builder.remote("local-cluster[2,2,1024]").getOrCreate()
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.mnist_dir_path)
+ os.unlink(cls.gpu_discovery_script_file_name)
+ cls.spark.stop()
def _get_inputs_for_test_local_training_succeeds(self):
return [
@@ -75,24 +81,27 @@ class TorchDistributorLocalUnitTestsOnConnect(
]
[email protected]("unstable, ignore for now")
[email protected](not have_torch, "torch is required")
class TorchDistributorLocalUnitTestsIIOnConnect(
TorchDistributorLocalUnitTestsMixin, unittest.TestCase
):
- def setUp(self) -> None:
- class_name = self.__class__.__name__
- conf = self._get_spark_conf()
- builder = SparkSession.builder.appName(class_name)
- for k, v in conf.getAll():
- if k not in ["spark.master", "spark.remote", "spark.app.name"]:
- builder = builder.config(k, v)
- self.spark = builder.remote("local[4]").getOrCreate()
- self.mnist_dir_path = tempfile.mkdtemp()
-
- def tearDown(self) -> None:
- shutil.rmtree(self.mnist_dir_path)
- os.unlink(self.gpu_discovery_script_file.name)
- self.spark.stop()
+ @classmethod
+ def setUpClass(cls):
+ (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) =
set_up_test_dirs()
+ builder = SparkSession.builder.appName(cls.__name__)
+ for k, v in get_local_mode_conf().items():
+ builder = builder.config(k, v)
+
+ builder = builder.config(
+ "spark.driver.resource.gpu.discoveryScript",
cls.gpu_discovery_script_file_name
+ )
+ cls.spark = builder.remote("local[4]").getOrCreate()
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.mnist_dir_path)
+ os.unlink(cls.gpu_discovery_script_file_name)
+ cls.spark.stop()
def _get_inputs_for_test_local_training_succeeds(self):
return [
@@ -107,21 +116,23 @@ class TorchDistributorLocalUnitTestsIIOnConnect(
class TorchDistributorDistributedUnitTestsOnConnect(
TorchDistributorDistributedUnitTestsMixin, unittest.TestCase
):
- def setUp(self) -> None:
- class_name = self.__class__.__name__
- conf = self._get_spark_conf()
- builder = SparkSession.builder.appName(class_name)
- for k, v in conf.getAll():
- if k not in ["spark.master", "spark.remote", "spark.app.name"]:
- builder = builder.config(k, v)
-
- self.spark = builder.remote("local-cluster[2,2,1024]").getOrCreate()
- self.mnist_dir_path = tempfile.mkdtemp()
-
- def tearDown(self) -> None:
- shutil.rmtree(self.mnist_dir_path)
- os.unlink(self.gpu_discovery_script_file.name)
- self.spark.stop()
+ @classmethod
+ def setUpClass(cls):
+ (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) =
set_up_test_dirs()
+ builder = SparkSession.builder.appName(cls.__name__)
+ for k, v in get_distributed_mode_conf().items():
+ builder = builder.config(k, v)
+
+ builder = builder.config(
+ "spark.worker.resource.gpu.discoveryScript",
cls.gpu_discovery_script_file_name
+ )
+ cls.spark = builder.remote("local-cluster[2,2,1024]").getOrCreate()
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.mnist_dir_path)
+ os.unlink(cls.gpu_discovery_script_file_name)
+ cls.spark.stop()
@unittest.skipIf(not have_torch, "torch is required")
diff --git a/python/pyspark/ml/torch/tests/test_distributor.py
b/python/pyspark/ml/torch/tests/test_distributor.py
index e05b4b23817..ebd859031bd 100644
--- a/python/pyspark/ml/torch/tests/test_distributor.py
+++ b/python/pyspark/ml/torch/tests/test_distributor.py
@@ -122,6 +122,45 @@ def create_training_function(mnist_dir_path: str) ->
Callable:
return train_fn
+def set_up_test_dirs():
+ gpu_discovery_script_file = tempfile.NamedTemporaryFile(delete=False)
+ gpu_discovery_script_file_name = gpu_discovery_script_file.name
+ try:
+ gpu_discovery_script_file.write(
+ b'echo {\\"name\\": \\"gpu\\", \\"addresses\\":
[\\"0\\",\\"1\\",\\"2\\"]}'
+ )
+ finally:
+ gpu_discovery_script_file.close()
+
+ # create temporary directory for Worker resources coordination
+ tempdir = tempfile.NamedTemporaryFile(delete=False)
+ os.unlink(tempdir.name)
+ os.chmod(
+ gpu_discovery_script_file_name,
+ stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXOTH,
+ )
+ mnist_dir_path = tempfile.mkdtemp()
+
+ return (gpu_discovery_script_file_name, mnist_dir_path)
+
+
+def get_local_mode_conf():
+ return {
+ "spark.test.home": SPARK_HOME,
+ "spark.driver.resource.gpu.amount": "3",
+ }
+
+
+def get_distributed_mode_conf():
+ return {
+ "spark.test.home": SPARK_HOME,
+ "spark.worker.resource.gpu.amount": "3",
+ "spark.task.cpus": "2",
+ "spark.task.resource.gpu.amount": "1",
+ "spark.executor.resource.gpu.amount": "1",
+ }
+
+
class TorchDistributorBaselineUnitTestsMixin:
def setup_env_vars(self, input_map: Dict[str, str]) -> None:
for key, value in input_map.items():
@@ -271,37 +310,18 @@ class TorchDistributorBaselineUnitTestsMixin:
@unittest.skipIf(not have_torch, "torch is required")
class
TorchDistributorBaselineUnitTests(TorchDistributorBaselineUnitTestsMixin,
unittest.TestCase):
- def setUp(self) -> None:
+ @classmethod
+ def setUpClass(cls):
conf = SparkConf()
sc = SparkContext("local[4]", conf=conf)
- self.spark = SparkSession(sc)
+ cls.spark = SparkSession(sc)
- def tearDown(self) -> None:
- self.spark.stop()
+ @classmethod
+ def tearDownClass(cls):
+ cls.spark.stop()
class TorchDistributorLocalUnitTestsMixin:
- def _get_spark_conf(self) -> SparkConf:
- self.gpu_discovery_script_file =
tempfile.NamedTemporaryFile(delete=False)
- self.gpu_discovery_script_file.write(
- b'echo {\\"name\\": \\"gpu\\", \\"addresses\\":
[\\"0\\",\\"1\\",\\"2\\"]}'
- )
- self.gpu_discovery_script_file.close()
- # create temporary directory for Worker resources coordination
- self.tempdir = tempfile.NamedTemporaryFile(delete=False)
- os.unlink(self.tempdir.name)
- os.chmod(
- self.gpu_discovery_script_file.name,
- stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXOTH,
- )
-
- conf = SparkConf().set("spark.test.home", SPARK_HOME)
- conf = conf.set("spark.driver.resource.gpu.amount", "3")
- conf = conf.set(
- "spark.driver.resource.gpu.discoveryScript",
self.gpu_discovery_script_file.name
- )
- return conf
-
def setup_env_vars(self, input_map: Dict[str, str]) -> None:
for key, value in input_map.items():
os.environ[key] = value
@@ -382,59 +402,49 @@ class TorchDistributorLocalUnitTestsMixin:
@unittest.skipIf(not have_torch, "torch is required")
class TorchDistributorLocalUnitTests(TorchDistributorLocalUnitTestsMixin,
unittest.TestCase):
- def setUp(self) -> None:
- class_name = self.__class__.__name__
- conf = self._get_spark_conf()
- sc = SparkContext("local-cluster[2,2,1024]", class_name, conf=conf)
- self.spark = SparkSession(sc)
- self.mnist_dir_path = tempfile.mkdtemp()
+ @classmethod
+ def setUpClass(cls):
+ (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) =
set_up_test_dirs()
+ conf = SparkConf()
+ for k, v in get_local_mode_conf().items():
+ conf = conf.set(k, v)
+ conf = conf.set(
+ "spark.driver.resource.gpu.discoveryScript",
cls.gpu_discovery_script_file_name
+ )
+
+ sc = SparkContext("local-cluster[2,2,1024]", cls.__name__, conf=conf)
+ cls.spark = SparkSession(sc)
- def tearDown(self) -> None:
- shutil.rmtree(self.mnist_dir_path)
- os.unlink(self.gpu_discovery_script_file.name)
- self.spark.stop()
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.mnist_dir_path)
+ os.unlink(cls.gpu_discovery_script_file_name)
+ cls.spark.stop()
@unittest.skipIf(not have_torch, "torch is required")
class TorchDistributorLocalUnitTestsII(TorchDistributorLocalUnitTestsMixin,
unittest.TestCase):
- def setUp(self) -> None:
- class_name = self.__class__.__name__
- conf = self._get_spark_conf()
- sc = SparkContext("local[4]", class_name, conf=conf)
- self.spark = SparkSession(sc)
- self.mnist_dir_path = tempfile.mkdtemp()
+ @classmethod
+ def setUpClass(cls):
+ (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) =
set_up_test_dirs()
+ conf = SparkConf()
+ for k, v in get_local_mode_conf().items():
+ conf = conf.set(k, v)
+ conf = conf.set(
+ "spark.driver.resource.gpu.discoveryScript",
cls.gpu_discovery_script_file_name
+ )
- def tearDown(self) -> None:
- shutil.rmtree(self.mnist_dir_path)
- os.unlink(self.gpu_discovery_script_file.name)
- self.spark.stop()
+ sc = SparkContext("local[4]", cls.__name__, conf=conf)
+ cls.spark = SparkSession(sc)
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.mnist_dir_path)
+ os.unlink(cls.gpu_discovery_script_file_name)
+ cls.spark.stop()
-class TorchDistributorDistributedUnitTestsMixin:
- def _get_spark_conf(self) -> SparkConf:
- self.gpu_discovery_script_file =
tempfile.NamedTemporaryFile(delete=False)
- self.gpu_discovery_script_file.write(
- b'echo {\\"name\\": \\"gpu\\", \\"addresses\\":
[\\"0\\",\\"1\\",\\"2\\"]}'
- )
- self.gpu_discovery_script_file.close()
- # create temporary directory for Worker resources coordination
- tempdir = tempfile.NamedTemporaryFile(delete=False)
- os.unlink(tempdir.name)
- os.chmod(
- self.gpu_discovery_script_file.name,
- stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXOTH,
- )
-
- conf = SparkConf().set("spark.test.home", SPARK_HOME)
- conf = conf.set(
- "spark.worker.resource.gpu.discoveryScript",
self.gpu_discovery_script_file.name
- )
- conf = conf.set("spark.worker.resource.gpu.amount", "3")
- conf = conf.set("spark.task.cpus", "2")
- conf = conf.set("spark.task.resource.gpu.amount", "1")
- conf = conf.set("spark.executor.resource.gpu.amount", "1")
- return conf
+class TorchDistributorDistributedUnitTestsMixin:
def test_dist_training_succeeds(self) -> None:
CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
inputs = [
@@ -482,17 +492,24 @@ class TorchDistributorDistributedUnitTestsMixin:
class TorchDistributorDistributedUnitTests(
TorchDistributorDistributedUnitTestsMixin, unittest.TestCase
):
- def setUp(self) -> None:
- class_name = self.__class__.__name__
- conf = self._get_spark_conf()
- sc = SparkContext("local-cluster[2,2,1024]", class_name, conf=conf)
- self.spark = SparkSession(sc)
- self.mnist_dir_path = tempfile.mkdtemp()
-
- def tearDown(self) -> None:
- shutil.rmtree(self.mnist_dir_path)
- os.unlink(self.gpu_discovery_script_file.name)
- self.spark.stop()
+ @classmethod
+ def setUpClass(cls):
+ (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) =
set_up_test_dirs()
+ conf = SparkConf()
+ for k, v in get_distributed_mode_conf().items():
+ conf = conf.set(k, v)
+ conf = conf.set(
+ "spark.worker.resource.gpu.discoveryScript",
cls.gpu_discovery_script_file_name
+ )
+
+ sc = SparkContext("local-cluster[2,2,1024]", cls.__name__, conf=conf)
+ cls.spark = SparkSession(sc)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.mnist_dir_path)
+ os.unlink(cls.gpu_discovery_script_file_name)
+ cls.spark.stop()
class TorchWrapperUnitTestsMixin:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]