This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b3a5a91eec1 [SPARK-41592][PYTHON][ML] Pytorch file Distributed Training
b3a5a91eec1 is described below

commit b3a5a91eec169b32e94903a1c0d214932e1d2995
Author: Rithwik Ediga Lakhamsani <[email protected]>
AuthorDate: Thu Jan 19 09:24:44 2023 +0900

    [SPARK-41592][PYTHON][ML] Pytorch file Distributed Training
    
    ### What changes were proposed in this pull request?
    
    This is an addition to https://github.com/apache/spark/pull/39188 to add 
support for multi node training using PyTorch files. The users would follow the 
second workflow in the [design 
document](https://docs.google.com/document/d/1QPO1Ly8WteL6aIPvVcR7Xne9qVtJiB3fdrRn7NwBcpA/edit#heading=h.8yvw9xq428fh)
 to run training on the executors. I added some new utility functions as well 
as built on top of current functions. This is largely WIP so testing will be 
added very soon.
    
    ### Why are the changes needed?
    
    Look at the [main 
ticket](https://issues.apache.org/jira/browse/SPARK-41589) for more details.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Tested with a pseudo-integration test. Integration tests will be added in a 
future PR.
    
    Closes #39267 from rithwik-db/pytorch-file-distributed-training.
    
    Authored-by: Rithwik Ediga Lakhamsani <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/ml/torch/distributor.py             | 151 ++++++++++++++++-----
 python/pyspark/ml/torch/tests/test_distributor.py  |  54 +++++++-
 .../pyspark/ml/torch/torch_run_process_wrapper.py  |   2 -
 3 files changed, 171 insertions(+), 36 deletions(-)

diff --git a/python/pyspark/ml/torch/distributor.py 
b/python/pyspark/ml/torch/distributor.py
index 3a59692cd12..51a1203cbd5 100644
--- a/python/pyspark/ml/torch/distributor.py
+++ b/python/pyspark/ml/torch/distributor.py
@@ -16,12 +16,10 @@
 #
 
 import collections
-import ctypes
 import math
 import os
 import random
 import re
-import signal
 import sys
 import subprocess
 import time
@@ -30,6 +28,7 @@ import warnings
 
 from pyspark.sql import SparkSession
 from pyspark.context import SparkContext
+from pyspark.taskcontext import BarrierTaskContext
 
 
 # TODO(SPARK-41589): will move the functions and tests to an external file
@@ -73,13 +72,13 @@ def get_conf_boolean(sc: SparkContext, key: str, 
default_value: str) -> bool:
     )
 
 
-def get_gpus_owned(sc: SparkContext) -> List[str]:
+def get_gpus_owned(context: Union[SparkContext, BarrierTaskContext]) -> 
List[str]:
     """Gets the number of GPUs that Spark scheduled to the calling task.
 
     Parameters
     ----------
-    sc : :class:`SparkContext`
-        The :class:`SparkContext` that has GPUs available.
+    context : :class:`SparkContext` or :class:`BarrierTaskContext`
+        The :class:`SparkContext` or :class:`BarrierTaskContext` that has GPUs 
available.
 
     Returns
     -------
@@ -93,7 +92,10 @@ def get_gpus_owned(sc: SparkContext) -> List[str]:
     """
     CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
     pattern = re.compile("^[1-9][0-9]*|0$")
-    addresses = sc.resources["gpu"].addresses
+    if isinstance(context, SparkContext):
+        addresses = context.resources["gpu"].addresses
+    else:
+        addresses = context.resources()["gpu"].addresses
     if any(not pattern.match(address) for address in addresses):
         raise ValueError(
             f"Found GPU addresses {addresses} which "
@@ -325,13 +327,20 @@ class TorchDistributor(Distributor):
             torchrun_args = ["--standalone", "--nnodes=1"]
             processes_per_node = num_processes
         else:
-            pass
-            # TODO(SPARK-41592): Handle distributed training
+            master_addr, master_port = os.environ["MASTER_ADDR"], 
os.environ["MASTER_PORT"]
+            node_rank = os.environ["RANK"]
+            torchrun_args = [
+                f"--nnodes={num_processes}",
+                f"--node_rank={node_rank}",
+                f"--rdzv_endpoint={master_addr}:{master_port}",
+                "--rdzv_id=0",
+            ]  # TODO: setup random ID that is gleaned from env variables
+            processes_per_node = 1
 
         args_string = list(map(str, args))  # converting all args to strings
 
         return (
-            [sys.executable, "-m", 
"pyspark.ml.torch.distributor.torch_run_process_wrapper"]
+            [sys.executable, "-m", 
"pyspark.ml.torch.torch_run_process_wrapper"]
             + torchrun_args
             + [f"--nproc_per_node={processes_per_node}"]
             + [path_to_train_file, *args_string]
@@ -343,28 +352,12 @@ class TorchDistributor(Distributor):
     ) -> None:
         _TAIL_LINES_TO_KEEP = 100
 
-        def sigterm_on_parent_death() -> None:
-            """
-            Uses prctl to automatically send SIGTERM to the command process 
when its parent is dead.
-            This handles the case when the parent is a PySpark worker process.
-            If a user cancels the PySpark job, the worker process gets killed, 
regardless of
-            PySpark daemon and worker reuse settings.
-            """
-            if _prctl:
-                try:
-                    libc = ctypes.CDLL("libc.so.6")
-                    # Set the parent process death signal of the command 
process to SIGTERM.
-                    libc.prctl(1, signal.SIGTERM)
-                except OSError:
-                    pass
-
         task = subprocess.Popen(
             cmd,
             stdout=subprocess.PIPE,
             stderr=subprocess.STDOUT,
             stdin=subprocess.PIPE,
             env=os.environ,
-            preexec_fn=sigterm_on_parent_death,
         )
         task.stdin.close()  # type: ignore
         tail: collections.deque = collections.deque(maxlen=_TAIL_LINES_TO_KEEP)
@@ -407,13 +400,6 @@ class TorchDistributor(Distributor):
         try:
             if self.use_gpu:
                 gpus_owned = get_gpus_owned(self.sc)
-
-                if self.num_processes > len(gpus_owned):
-                    raise ValueError(
-                        f"""{self.num_processes} processes were requested
-                        for local training with GPU training but only
-                        {len(gpus_owned)} GPUs were available."""
-                    )
                 random.seed(hash(train_object))
                 selected_gpus = [str(e) for e in random.sample(gpus_owned, 
self.num_processes)]
                 os.environ[CUDA_VISIBLE_DEVICES] = ",".join(selected_gpus)
@@ -428,6 +414,105 @@ class TorchDistributor(Distributor):
 
         return output
 
+    def _get_spark_task_function(
+        self,
+        framework_wrapper_fn: Optional[Callable],
+        train_object: Union[Callable, str],
+        *args: Any,
+    ) -> Callable:
+        """Creates a spark task function that is used inside `mapPartitions`.
+
+        Parameters
+        ----------
+        framework_wrapper_fn : Optional[Callable]
+            The function that determines whether we are running training
+            on a PyTorch file or a PyTorch function.
+        train_object : Union[Callable, str]
+            The actual train function/file.
+
+        Returns
+        -------
+        Callable
+            The wrapped function ready for use with `mapPartitions`
+        """
+        num_processes = self.num_processes
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                MAX_NUM_ATTEMPTS = 100
+
+                for _ in range(MAX_NUM_ATTEMPTS):
+                    time.sleep(0.1)
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+                raise RuntimeError("Failed to find free port for distributed 
training.")
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in 
context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                if CUDA_VISIBLE_DEVICES in os.environ:
+                    return
+
+                gpus_owned = get_gpus_owned(context)
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpus_owned)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""
+            set_torch_config(context)
+
+            output = framework_wrapper_fn(input_params, train_object, *args)
+
+            if context.partitionId() == 0:
+                yield output
+
+        return wrapped_train_fn
+
+    def _run_distributed_training(
+        self,
+        framework_wrapper_fn: Optional[Callable],
+        train_object: Union[Callable, str],
+        *args: Any,
+    ) -> Optional[Any]:
+        if not framework_wrapper_fn:
+            raise RuntimeError("Unknown combination of parameters")
+        spark_task_function = self._get_spark_task_function(
+            framework_wrapper_fn, train_object, *args
+        )
+        self._check_encryption()
+        result = (
+            self.sc.parallelize(range(self.num_tasks), self.num_tasks)
+            .barrier()
+            .mapPartitions(spark_task_function)
+            .collect()[0]
+        )
+        return result
+
     @staticmethod
     def _run_training_on_pytorch_file(
         input_params: Dict[str, Any], train_path: str, *args: Any
@@ -458,4 +543,6 @@ class TorchDistributor(Distributor):
             framework_wrapper_fn = 
TorchDistributor._run_training_on_pytorch_file
         if self.local_mode:
             output = self._run_local_training(framework_wrapper_fn, 
train_object, *args)
+        else:
+            output = self._run_distributed_training(framework_wrapper_fn, 
train_object, *args)
         return output
diff --git a/python/pyspark/ml/torch/tests/test_distributor.py 
b/python/pyspark/ml/torch/tests/test_distributor.py
index 9f57024cc4e..607cc7cd1ad 100644
--- a/python/pyspark/ml/torch/tests/test_distributor.py
+++ b/python/pyspark/ml/torch/tests/test_distributor.py
@@ -73,7 +73,14 @@ class TorchDistributorBaselineUnitTests(unittest.TestCase):
         ]
         for num_processes, local_mode, use_gpu in inputs:
             with self.subTest():
-                TorchDistributor(num_processes, local_mode, use_gpu)
+                expected_params = {
+                    "num_processes": num_processes,
+                    "local_mode": local_mode,
+                    "use_gpu": use_gpu,
+                    "num_tasks": num_processes,
+                }
+                dist = TorchDistributor(num_processes, local_mode, use_gpu)
+                self.assertEqual(expected_params, dist.input_params)
 
     def test_validate_incorrect_inputs(self) -> None:
         inputs = [
@@ -153,7 +160,7 @@ class TorchDistributorBaselineUnitTests(unittest.TestCase):
         expected_local_mode_output = [
             sys.executable,
             "-m",
-            "pyspark.ml.torch.distributor.torch_run_process_wrapper",
+            "pyspark.ml.torch.torch_run_process_wrapper",
             "--standalone",
             "--nnodes=1",
             "--nproc_per_node=4",
@@ -168,6 +175,32 @@ class TorchDistributorBaselineUnitTests(unittest.TestCase):
             expected_local_mode_output,
         )
 
+        distributed_mode_input_params = {"num_processes": 4, "local_mode": 
False}
+        input_env_vars = {"MASTER_ADDR": "localhost", "MASTER_PORT": "9350", 
"RANK": "3"}
+
+        args_number = [1, 3]  # testing conversion to strings
+        self.setup_env_vars(input_env_vars)
+        expected_distributed_mode_output = [
+            sys.executable,
+            "-m",
+            "pyspark.ml.torch.torch_run_process_wrapper",
+            "--nnodes=4",
+            "--node_rank=3",
+            "--rdzv_endpoint=localhost:9350",
+            "--rdzv_id=0",
+            "--nproc_per_node=1",
+            "train.py",
+            "1",
+            "3",
+        ]
+        self.assertEqual(
+            TorchDistributor._create_torchrun_command(
+                distributed_mode_input_params, train_path, *args_number
+            ),
+            expected_distributed_mode_output,
+        )
+        self.delete_env_vars(input_env_vars)
+
 
 class TorchDistributorLocalUnitTests(unittest.TestCase):
     def setUp(self) -> None:
@@ -286,6 +319,23 @@ class 
TorchDistributorDistributedUnitTests(unittest.TestCase):
         os.unlink(self.tempFile.name)
         self.spark.stop()
 
+    def test_dist_training_succeeds(self) -> None:
+        CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+        inputs = [
+            ("0,1,2", 2, True, "0"),
+        ]
+
+        for i, (_, num_processes, use_gpu, expected) in enumerate(inputs):
+            with self.subTest(f"subtest: {i + 1}"):
+                dist = TorchDistributor(num_processes, False, use_gpu)
+                dist._run_training_on_pytorch_file = lambda *args: 
os.environ.get(  # type: ignore
+                    CUDA_VISIBLE_DEVICES, "NONE"
+                )
+                self.assertEqual(
+                    expected,
+                    
dist._run_distributed_training(dist._run_training_on_pytorch_file, "..."),
+                )
+
     def test_get_num_tasks_distributed(self) -> None:
         inputs = [(1, 8, 8), (2, 8, 4), (3, 8, 3)]
 
diff --git a/python/pyspark/ml/torch/torch_run_process_wrapper.py 
b/python/pyspark/ml/torch/torch_run_process_wrapper.py
index 67ec492329d..6b5b6a1d0be 100644
--- a/python/pyspark/ml/torch/torch_run_process_wrapper.py
+++ b/python/pyspark/ml/torch/torch_run_process_wrapper.py
@@ -52,8 +52,6 @@ if __name__ == "__main__":
     cmd = [sys.executable, "-m", "torch.distributed.run", *args]
     task = subprocess.Popen(
         cmd,
-        stdout=subprocess.PIPE,
-        stderr=subprocess.STDOUT,
         stdin=subprocess.PIPE,
         env=os.environ,
     )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to