This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b3a5a91eec1 [SPARK-41592][PYTHON][ML] Pytorch file Distributed Training
b3a5a91eec1 is described below
commit b3a5a91eec169b32e94903a1c0d214932e1d2995
Author: Rithwik Ediga Lakhamsani <[email protected]>
AuthorDate: Thu Jan 19 09:24:44 2023 +0900
[SPARK-41592][PYTHON][ML] Pytorch file Distributed Training
### What changes were proposed in this pull request?
This is an addition to https://github.com/apache/spark/pull/39188 to add
support for multi node training using PyTorch files. The users would follow the
second workflow in the [design
document](https://docs.google.com/document/d/1QPO1Ly8WteL6aIPvVcR7Xne9qVtJiB3fdrRn7NwBcpA/edit#heading=h.8yvw9xq428fh)
to run training on the executors. I added some new utility functions as well
as built on top of current functions. This is largely WIP so testing will be
added very soon.
### Why are the changes needed?
Look at the [main
ticket](https://issues.apache.org/jira/browse/SPARK-41589) for more details.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested with a pseudo-integration test. Integration tests will be added in a
future PR.
Closes #39267 from rithwik-db/pytorch-file-distributed-training.
Authored-by: Rithwik Ediga Lakhamsani <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/ml/torch/distributor.py | 151 ++++++++++++++++-----
python/pyspark/ml/torch/tests/test_distributor.py | 54 +++++++-
.../pyspark/ml/torch/torch_run_process_wrapper.py | 2 -
3 files changed, 171 insertions(+), 36 deletions(-)
diff --git a/python/pyspark/ml/torch/distributor.py
b/python/pyspark/ml/torch/distributor.py
index 3a59692cd12..51a1203cbd5 100644
--- a/python/pyspark/ml/torch/distributor.py
+++ b/python/pyspark/ml/torch/distributor.py
@@ -16,12 +16,10 @@
#
import collections
-import ctypes
import math
import os
import random
import re
-import signal
import sys
import subprocess
import time
@@ -30,6 +28,7 @@ import warnings
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
+from pyspark.taskcontext import BarrierTaskContext
# TODO(SPARK-41589): will move the functions and tests to an external file
@@ -73,13 +72,13 @@ def get_conf_boolean(sc: SparkContext, key: str,
default_value: str) -> bool:
)
-def get_gpus_owned(sc: SparkContext) -> List[str]:
+def get_gpus_owned(context: Union[SparkContext, BarrierTaskContext]) ->
List[str]:
"""Gets the number of GPUs that Spark scheduled to the calling task.
Parameters
----------
- sc : :class:`SparkContext`
- The :class:`SparkContext` that has GPUs available.
+ context : :class:`SparkContext` or :class:`BarrierTaskContext`
+ The :class:`SparkContext` or :class:`BarrierTaskContext` that has GPUs
available.
Returns
-------
@@ -93,7 +92,10 @@ def get_gpus_owned(sc: SparkContext) -> List[str]:
"""
CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
pattern = re.compile("^[1-9][0-9]*|0$")
- addresses = sc.resources["gpu"].addresses
+ if isinstance(context, SparkContext):
+ addresses = context.resources["gpu"].addresses
+ else:
+ addresses = context.resources()["gpu"].addresses
if any(not pattern.match(address) for address in addresses):
raise ValueError(
f"Found GPU addresses {addresses} which "
@@ -325,13 +327,20 @@ class TorchDistributor(Distributor):
torchrun_args = ["--standalone", "--nnodes=1"]
processes_per_node = num_processes
else:
- pass
- # TODO(SPARK-41592): Handle distributed training
+ master_addr, master_port = os.environ["MASTER_ADDR"],
os.environ["MASTER_PORT"]
+ node_rank = os.environ["RANK"]
+ torchrun_args = [
+ f"--nnodes={num_processes}",
+ f"--node_rank={node_rank}",
+ f"--rdzv_endpoint={master_addr}:{master_port}",
+ "--rdzv_id=0",
+ ] # TODO: setup random ID that is gleaned from env variables
+ processes_per_node = 1
args_string = list(map(str, args)) # converting all args to strings
return (
- [sys.executable, "-m",
"pyspark.ml.torch.distributor.torch_run_process_wrapper"]
+ [sys.executable, "-m",
"pyspark.ml.torch.torch_run_process_wrapper"]
+ torchrun_args
+ [f"--nproc_per_node={processes_per_node}"]
+ [path_to_train_file, *args_string]
@@ -343,28 +352,12 @@ class TorchDistributor(Distributor):
) -> None:
_TAIL_LINES_TO_KEEP = 100
- def sigterm_on_parent_death() -> None:
- """
- Uses prctl to automatically send SIGTERM to the command process
when its parent is dead.
- This handles the case when the parent is a PySpark worker process.
- If a user cancels the PySpark job, the worker process gets killed,
regardless of
- PySpark daemon and worker reuse settings.
- """
- if _prctl:
- try:
- libc = ctypes.CDLL("libc.so.6")
- # Set the parent process death signal of the command
process to SIGTERM.
- libc.prctl(1, signal.SIGTERM)
- except OSError:
- pass
-
task = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE,
env=os.environ,
- preexec_fn=sigterm_on_parent_death,
)
task.stdin.close() # type: ignore
tail: collections.deque = collections.deque(maxlen=_TAIL_LINES_TO_KEEP)
@@ -407,13 +400,6 @@ class TorchDistributor(Distributor):
try:
if self.use_gpu:
gpus_owned = get_gpus_owned(self.sc)
-
- if self.num_processes > len(gpus_owned):
- raise ValueError(
- f"""{self.num_processes} processes were requested
- for local training with GPU training but only
- {len(gpus_owned)} GPUs were available."""
- )
random.seed(hash(train_object))
selected_gpus = [str(e) for e in random.sample(gpus_owned,
self.num_processes)]
os.environ[CUDA_VISIBLE_DEVICES] = ",".join(selected_gpus)
@@ -428,6 +414,105 @@ class TorchDistributor(Distributor):
return output
+ def _get_spark_task_function(
+ self,
+ framework_wrapper_fn: Optional[Callable],
+ train_object: Union[Callable, str],
+ *args: Any,
+ ) -> Callable:
+ """Creates a spark task function that is used inside `mapPartitions`.
+
+ Parameters
+ ----------
+ framework_wrapper_fn : Optional[Callable]
+ The function that determines whether we are running training
+ on a PyTorch file or a PyTorch function.
+ train_object : Union[Callable, str]
+ The actual train function/file.
+
+ Returns
+ -------
+ Callable
+ The wrapped function ready for use with `mapPartitions`
+ """
+ num_processes = self.num_processes
+ use_gpu = self.use_gpu
+ input_params = self.input_params
+
+ # Spark task program
+ def wrapped_train_fn(_): # type: ignore[no-untyped-def]
+ import os
+ from pyspark import BarrierTaskContext
+
+ CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+ # The idea of setting the random port to 0 doesn't seem to work?
+ def get_free_port(address: str) -> int:
+ import socket
+ import random
+
+ MAX_NUM_ATTEMPTS = 100
+
+ for _ in range(MAX_NUM_ATTEMPTS):
+ time.sleep(0.1)
+ port = random.randint(32768, 61000)
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ if not (sock.connect_ex((address, port)) == 0):
+ return port
+
+ raise RuntimeError("Failed to find free port for distributed
training.")
+
+ def set_torch_config(context: "BarrierTaskContext") -> None:
+ addrs = [e.address.split(":")[0] for e in
context.getTaskInfos()]
+
+ os.environ["MASTER_ADDR"] = str(addrs[0])
+ os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+ os.environ["WORLD_SIZE"] = str(num_processes)
+ os.environ["NODE_RANK"] = str(context.partitionId())
+ os.environ["RANK"] = str(context.partitionId())
+
+ def set_gpus(context: "BarrierTaskContext") -> None:
+ if CUDA_VISIBLE_DEVICES in os.environ:
+ return
+
+ gpus_owned = get_gpus_owned(context)
+ os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpus_owned)
+
+ context = BarrierTaskContext.get()
+
+ if use_gpu:
+ set_gpus(context)
+ else:
+ os.environ[CUDA_VISIBLE_DEVICES] = ""
+ set_torch_config(context)
+
+ output = framework_wrapper_fn(input_params, train_object, *args)
+
+ if context.partitionId() == 0:
+ yield output
+
+ return wrapped_train_fn
+
+ def _run_distributed_training(
+ self,
+ framework_wrapper_fn: Optional[Callable],
+ train_object: Union[Callable, str],
+ *args: Any,
+ ) -> Optional[Any]:
+ if not framework_wrapper_fn:
+ raise RuntimeError("Unknown combination of parameters")
+ spark_task_function = self._get_spark_task_function(
+ framework_wrapper_fn, train_object, *args
+ )
+ self._check_encryption()
+ result = (
+ self.sc.parallelize(range(self.num_tasks), self.num_tasks)
+ .barrier()
+ .mapPartitions(spark_task_function)
+ .collect()[0]
+ )
+ return result
+
@staticmethod
def _run_training_on_pytorch_file(
input_params: Dict[str, Any], train_path: str, *args: Any
@@ -458,4 +543,6 @@ class TorchDistributor(Distributor):
framework_wrapper_fn =
TorchDistributor._run_training_on_pytorch_file
if self.local_mode:
output = self._run_local_training(framework_wrapper_fn,
train_object, *args)
+ else:
+ output = self._run_distributed_training(framework_wrapper_fn,
train_object, *args)
return output
diff --git a/python/pyspark/ml/torch/tests/test_distributor.py
b/python/pyspark/ml/torch/tests/test_distributor.py
index 9f57024cc4e..607cc7cd1ad 100644
--- a/python/pyspark/ml/torch/tests/test_distributor.py
+++ b/python/pyspark/ml/torch/tests/test_distributor.py
@@ -73,7 +73,14 @@ class TorchDistributorBaselineUnitTests(unittest.TestCase):
]
for num_processes, local_mode, use_gpu in inputs:
with self.subTest():
- TorchDistributor(num_processes, local_mode, use_gpu)
+ expected_params = {
+ "num_processes": num_processes,
+ "local_mode": local_mode,
+ "use_gpu": use_gpu,
+ "num_tasks": num_processes,
+ }
+ dist = TorchDistributor(num_processes, local_mode, use_gpu)
+ self.assertEqual(expected_params, dist.input_params)
def test_validate_incorrect_inputs(self) -> None:
inputs = [
@@ -153,7 +160,7 @@ class TorchDistributorBaselineUnitTests(unittest.TestCase):
expected_local_mode_output = [
sys.executable,
"-m",
- "pyspark.ml.torch.distributor.torch_run_process_wrapper",
+ "pyspark.ml.torch.torch_run_process_wrapper",
"--standalone",
"--nnodes=1",
"--nproc_per_node=4",
@@ -168,6 +175,32 @@ class TorchDistributorBaselineUnitTests(unittest.TestCase):
expected_local_mode_output,
)
+ distributed_mode_input_params = {"num_processes": 4, "local_mode":
False}
+ input_env_vars = {"MASTER_ADDR": "localhost", "MASTER_PORT": "9350",
"RANK": "3"}
+
+ args_number = [1, 3] # testing conversion to strings
+ self.setup_env_vars(input_env_vars)
+ expected_distributed_mode_output = [
+ sys.executable,
+ "-m",
+ "pyspark.ml.torch.torch_run_process_wrapper",
+ "--nnodes=4",
+ "--node_rank=3",
+ "--rdzv_endpoint=localhost:9350",
+ "--rdzv_id=0",
+ "--nproc_per_node=1",
+ "train.py",
+ "1",
+ "3",
+ ]
+ self.assertEqual(
+ TorchDistributor._create_torchrun_command(
+ distributed_mode_input_params, train_path, *args_number
+ ),
+ expected_distributed_mode_output,
+ )
+ self.delete_env_vars(input_env_vars)
+
class TorchDistributorLocalUnitTests(unittest.TestCase):
def setUp(self) -> None:
@@ -286,6 +319,23 @@ class
TorchDistributorDistributedUnitTests(unittest.TestCase):
os.unlink(self.tempFile.name)
self.spark.stop()
+ def test_dist_training_succeeds(self) -> None:
+ CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+ inputs = [
+ ("0,1,2", 2, True, "0"),
+ ]
+
+ for i, (_, num_processes, use_gpu, expected) in enumerate(inputs):
+ with self.subTest(f"subtest: {i + 1}"):
+ dist = TorchDistributor(num_processes, False, use_gpu)
+ dist._run_training_on_pytorch_file = lambda *args:
os.environ.get( # type: ignore
+ CUDA_VISIBLE_DEVICES, "NONE"
+ )
+ self.assertEqual(
+ expected,
+
dist._run_distributed_training(dist._run_training_on_pytorch_file, "..."),
+ )
+
def test_get_num_tasks_distributed(self) -> None:
inputs = [(1, 8, 8), (2, 8, 4), (3, 8, 3)]
diff --git a/python/pyspark/ml/torch/torch_run_process_wrapper.py
b/python/pyspark/ml/torch/torch_run_process_wrapper.py
index 67ec492329d..6b5b6a1d0be 100644
--- a/python/pyspark/ml/torch/torch_run_process_wrapper.py
+++ b/python/pyspark/ml/torch/torch_run_process_wrapper.py
@@ -52,8 +52,6 @@ if __name__ == "__main__":
cmd = [sys.executable, "-m", "torch.distributed.run", *args]
task = subprocess.Popen(
cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
stdin=subprocess.PIPE,
env=os.environ,
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]