piiswrong closed pull request #9152: [MXNET-37] tutorial for distributed 
URL: https://github.com/apache/incubator-mxnet/pull/9152

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/faq/distributed_training.md b/docs/faq/distributed_training.md
new file mode 100644
index 00000000000..70078ba6095
--- /dev/null
+++ b/docs/faq/distributed_training.md
@@ -0,0 +1,288 @@
+# Distributed Training in MXNet
+MXNet supports distributed training enabling us to leverage multiple machines 
for faster training.
+In this document, we describe how it works, how to launch a distributed 
training job and
+some environment variables which provide more control.
+## Types of Parallelism
+There are two ways in which we can distribute the workload of training a 
neural network across multiple devices (can be either GPU or CPU).
+The first way is *data parallelism*, which refers to the case where each 
device stores a complete copy of the model.
+Each device works with a different part of the dataset, and the devices 
collectively update a shared model.
+These devices can be located on a single machine or across multiple machines.
+In this document, we describe how to train a model with devices distributed 
across machines in a data parallel way.
+When models are so large that they don't fit into device memory, then a second 
way called *model parallelism* is useful.
+Here, different devices are assigned the task of learning different parts of 
the model.
+Currently, MXNet supports Model parallelism in a single machine only. Refer 
[Training with multiple GPUs using model 
 for more on this.
+## How Does Distributed Training Work?
+The following concepts are key to understanding distributed training in MXNet:
+### Types of Processes
+MXNet has three types of processes which communicate with each other to 
accomplish training of a model.
+- Worker: A worker node actually performs training on a batch of training 
+Before processing each batch, the workers pull weights from servers.
+The workers also send gradients to the servers after each batch.
+Depending on the workload for training a model, it might not be a good idea to 
run multiple worker processes on the same machine.
+- Server: There can be multiple servers which store the model's parameters, 
and communicate with workers.
+A server may or may not be co-located with the worker processes.
+- Scheduler: There is only one scheduler. The role of the scheduler is to set 
up the cluster. This includes waiting for messages that each node has come up 
and which port the node is listening on.
+The scheduler then lets all processes know about every other node in the 
cluster, so that they can communicate with each other.
+### KV Store
+MXNet provides a key-value store, which is a critical component used for 
multi-device training. The communication of parameters across devices on a 
single machine, as well as across multiple machines, is relayed through one or 
more servers with a key-value store for the parameters. Each value in this 
store is represented by a key and value, where each parameter array in the 
network is assigned a key, and value refers to the weights of that parameter 
array. Workers `push` gradients after processing a batch, and `pull` updated 
weights before processing a new batch. 
+We can also pass in optimizers for the KVStore to use while updating each 
weight. Optimizers like Stochastic Gradient Descent define an update rule,
+essentially a mathematical formula to compute the new weight based on the old 
weight, gradient, and some parameters.
+If you are using a Gluon Trainer object or the Module API,
+it uses a kvstore object internally to aggregate gradients from multiple 
devices on the same machine as well as across different machines.
+Although the API remains the same whether or not multiple machines are being 
+the notion of kvstore server exists only during distributed training.
+In this case, each `push` and `pull` involves communication with the kvstore 
servers. When there are multiple devices on a single machine, gradients from 
these devices are first aggregated on the machine and then sent to the servers. 
+Note that we need to compile MXNet with the build flag `USE_DIST_KVSTORE=1` to 
use distributed training.
+The distributed mode of KVStore is enabled by calling `mxnet.kvstore.create` 
+with a string argument which contains the word `dist` as follows:
+> kv = mxnet.kvstore.create('dist_sync')
+Refer [KVStore 
 for more information about KVStore.
+### Distribution of Keys
+Each server doesn't necessarily store all the keys or parameter arrays.
+Parameters are distributed across different servers. The decision of which 
server stores a particular key is made at random.
+This distribution of keys across different servers is handled transparently by 
the KVStore.
+It ensures that when a key is pulled, that request is sent to the server which 
has the corresponding value.
+If the value of some key is very large, it may be sharded across different 
servers. This means that different servers hold different parts of the value.
+Again, this is handled transparently so that the worker does not have to do 
anything different.
+The threshold for this sharding can be controlled with the environment 
+See [environment variables](#environment-variables) for more details.
+### Split training data
+When running distributed training in data parallel mode, we want each machine 
to be working on different parts of the dataset.
+For data parallel training on a single worker,
+we can use `mxnet.gluon.utils.split_and_load` to split a batch of samples 
provided by the data iterator, and then load each part of the batch on the 
device which will process it.
+In the case of distributed training though, we would need to divide the 
dataset into `n` parts at the beginning, so that each worker gets a different 
part. Each worker can then use `split_and_load` to again divide that part of 
the dataset across different devices on a single machine.
+Typically, this split of data for each worker happens through the data 
+on passing the number of parts and the index of parts to iterate over.
+Some iterators in MXNet that support this feature are 
+If you are using a different iterator, you can look at how the above iterators 
implement this.
+We can use the kvstore object to get the number of workers (`kv.num_workers`) 
and rank of the current worker (`kv.rank`).
+These can be passed as arguments to the iterator.
+You can look at 
+to see an example usage.
+### Different Modes of Distributed Training
+Distributed training itself is enabled when kvstore creation string contains 
the word `dist`.
+Different modes of distributed training can be enabled by using different 
types of kvstore.
+- `dist_sync`: In synchronous distributed training, all workers use the same 
synchronized set of model parameters at the start of every batch.
+This means that after each batch, the server waits to receive gradients from 
each worker before it updates the model parameters.
+This synchronization comes at a cost because the worker pulling parameters 
would have to wait till the server finishes this process.
+In this mode, if a worker crashes, then it halts the progress of all workers.
+- `dist_async`: In asynchronous distributed training, the server receives 
gradients from one worker and immediately updates its store, which it uses to 
respond to any future pulls.
+This means that a worker who finishes processing a batch can pull the current 
parameters from server and start the next batch,
+even if other workers haven't finished processing the earlier batch.
+This is faster than `dist_sync` but can take more epochs to converge.
+In `async` mode, it is required to pass an optimizer because in the absence of 
an optimizer kvstore would replace the stored weights with received weights and 
this doesn't make sense for training in asynchronous mode.
+The update of weights is atomic, meaning no two updates happen on the same 
weight at the same time. However, the order  of updates is not guaranteed.
+- `dist_sync_device`: Same as `dist_sync` except that when there are multiple 
GPUs being used on each node,
+this mode aggregates gradients and updates weights on GPU while dist_sync does 
so on CPU memory.
+This is faster than `dist_sync` because it reduces expensive communication 
between GPU and CPU, but it increases memory usage on GPU.
+- `dist_async_device` : The analogue of `dist_sync_device` but in asynchronous 
+### Gradient Compression
+When communication is expensive, and the ratio of computation time to 
communication time is low, communication can become a bottleneck.
+In such cases, gradient compression can be used to reduce the cost of 
communication, thereby speeding up training.
+Refer [Gradient 
 for more details.
+Note: For small models when the cost of computation is much lower than cost of 
+distributed training might actually be slower than training on a single 
machine because of the overhead of communication and synchronization.
+## How to Start Distributed Training?
+MXNet provides a script tools/launch.py to make it easy to launch a 
distributed training job. This supports various types of cluster resource 
managers like `ssh`, `mpirun`, `yarn` and `sge`.
+If you already have one of these clusters setup, you can skip the next section 
on setting up a cluster.
+If you want to use a type of cluster not mentioned above, skip ahead to 
Manually launching jobs section.
+### Setting up the Cluster
+An easy way to set up a cluster of EC2 instances for distributed deep learning 
is by using the [AWS CloudFormation 
+If you can not use the above, this section will help you manually set up a 
cluster of instances
+to enable you to use `ssh` for launching a distributed training job.
+Let us denote one machine as the `master` of the cluster through which we will 
launch and monitor the distributed training on all machines.
+If the machines in your cluster are a part of a cloud computing platform like 
AWS EC2, then your instances should be using key-based authentication already.
+Ensure that you create all instances using the same key, say `mxnet-key` and 
in the same security group.
+Next, we need to ensure that master has access to all other machines in the 
cluster through `ssh` by
+adding this key to [ssh-agent](https://en.wikipedia.org/wiki/Ssh-agent) and 
forwarding it to master when we log in. This will make `mxnet-key` the default 
key on master.
+ssh-add .ssh/mxnet-key
+If your machines use passwords for authentication, see 
[here](https://help.ubuntu.com/community/SSH/OpenSSH/Keys) for instructions on 
setting up password-less authentication between machines.
+It is easier if all these machines have a shared file system so that they can 
access the training script. One way is to use Amazon Elastic File System to 
create your network file system.
+The options in the following command are the recommended options when mounting 
an AWS Elastic File System.
+sudo mkdir efs && sudo mount -t nfs4 -o 
+Tip: You might find it helpful to store large datasets on S3 for easy access 
from all machines in the cluster. Refer [Using data from S3 for 
 for more information.
+### Using Launch.py
+MXNet provides a script 
 to make it easy to launch distributed training on a cluster with `ssh`, `mpi`, 
`sge` or `yarn`.
+You can fetch this script by cloning the mxnet repository.
+git clone --recursive https://github.com/apache/incubator-mxnet
+#### Example
+Let us consider training a VGG11 model on the CIFAR10 dataset using 
+cd example/gluon/
+On a single machine, we can run this script as follows:
+python image_classification.py --dataset cifar10 --model vgg11 --num-epochs 1
+For distributed training of this example, we would do the following:
+If the mxnet directory which contains the script `image_classification.py` is 
accessible to all machines in the cluster (for example if they are on a network 
file system), we can run:
+../../tools/launch.py -n 3 -H hosts --launcher ssh python 
image_classification.py --dataset cifar10 --model vgg11 --num-epochs 1 
--kvstore dist_sync
+If the directory with the script is not accessible from the other machines in 
the cluster, then we can synchronize the current directory to all machines.
+../../tools/launch.py -n 3 -H hosts --launcher ssh --sync-dst-dir 
/tmp/mxnet_job/ python image_classification.py --dataset cifar10 --model vgg11 
--num-epochs 1 --kvstore dist_sync
+> Tip: If you don't have a cluster ready and still want to try this out, pass 
the option `--launcher local` instead of `ssh`
+#### Options
+Here, launch.py is used to submit the distributed training job. It takes the 
following options:
+- `-n` denotes the number of worker nodes to be launched.
+- `-s` denotes the number of server nodes to be launched.
+If it is not specified, it is taken to be equal to the number of worker nodes.
+The script tries to cycle through the hosts file to launch the servers and 
+For example, if you have 5 hosts in the hosts file and you passed `n` as 3 
(and nothing for `s`).
+The script will launch a total of 3 server processes,
+one each for the first three hosts and launch a total of 3 worker processes, 
one each for the fourth, fifth and first host.
+If the hosts file has exactly `n` number of worker nodes, it will launch a 
server process and a worker process on each of the `n` hosts.
+- `--launcher` denotes the mode of communication. The options are:
+    - `ssh` if machines can communicate through ssh without passwords. This is 
the default launcher mode.
+    - `mpi` if Open MPI is available
+    - `sge` for Sun Grid Engine
+    - `yarn` for Apache Yarn
+    - `local` for launching all processes on the same local machine. This can 
be used for debugging purposes.
+- `-H` requires the path of the hosts file
+  This file contains IPs of the machines in the cluster. These machines should 
be able to communicate with each other without using passwords.
+  This file is only applicable and required when the launcher mode is `ssh` or 
+  An example of the contents of the hosts file would be:
+  ```
+  ```
+- `--sync-dst-dir` takes the path of a directory on all hosts to which the 
current working directory will be synchronized. This only supports `ssh` 
launcher mode.
+This is necessary when the working directory is not accessible to all machines 
in the cluster. Setting this option synchronizes the current directory using 
rsync before the job is launched.  
+If you have not installed MXNet system-wide
+then you have to copy the folder `python/mxnet` and the file `lib/libmxnet.so` 
into the current directory before running `launch.py`.
+For example if you are in `example/gluon`, you can do this with `cp -r 
../../python/mxnet ../../lib/libmxnet.so .`. This would work if your `lib` 
folder contains `libmxnet.so`, as would be the case when you use make. If you 
use CMake, this file would be in your `build` directory.
+- `python image_classification.py --dataset cifar10 --model vgg11 --num-epochs 
1 --kvstore dist_sync`
+is the command for the training job on each machine. Note the use of 
`dist_sync` for the kvstore used in the script.
+#### Terminating Jobs
+If the training job crashes due to an error or if we try to terminate the 
launch script while training is running,
+jobs on all machines might not have terminated. In such a case, we would need 
to terminate them manually.
+If we are using `ssh` launcher, this can be done by running the following 
command where `hosts` is the path of the hostfile.
+while read -u 10 host; do ssh -o "StrictHostKeyChecking no" $host "pkill -f 
python" ; done 10<hosts
+### Manually Launching Jobs
+If for some reason, you do not want to use the script above to start 
distributed training, then this section will be helpful.
+MXNet uses environment variables to assign roles to different processes and to 
let different processes find the scheduler.
+The environment variables are required to be set correctly as follows for the 
training to start:
+- `DMLC_ROLE`: Specifies the role of the process. This can be `server`, 
`worker` or `scheduler`. Note that there should only be one `scheduler`.
+When `DMLC_ROLE` is set to `server` or `scheduler`, these processes start when 
mxnet is imported.
+- `DMLC_PS_ROOT_URI`: Specifies the IP of the scheduler
+- `DMLC_PS_ROOT_PORT`: Specifies the port that the scheduler listens to
+- `DMLC_NUM_SERVER`: Specifies how many server nodes are in the cluster
+- `DMLC_NUM_WORKER`: Specifies how many worker nodes are in the cluster
+Below is an example to start all jobs locally on Linux or Mac. Note that 
starting all jobs on the same machine is not a good idea.
+This is only to make the usage clear.
+export COMMAND=python example/gluon/mnist.py --dataset cifar10 --model vgg11 
--num-epochs 1 --kv-store dist_async
+For an in-depth discussion of how the scheduler sets up the cluster, you can 
+## Environment Variables
+### For tuning performance
+  Value type: Integer
+  Default value: 4
+  The number of CPU threads used for summing up big arrays on a single machine
+  This will also be used for `dist_sync` kvstore to sum up arrays from 
different contexts on a single machine.
+  This does not affect summing up of arrays from different machines on servers.
+  Summing up of arrays for `dist_sync_device` kvstore is also unaffected as 
that happens on GPUs.
+  Value type: Integer
+  Default value: 1000000
+  The minimum size of a *big array*.
+  When the array size is bigger than this threshold, 
`MXNET_KVSTORE_REDUCTION_NTHREADS` threads are used for reduction.
+  This parameter is also used as a load balancer in kvstore.
+  It controls when to partition a single weight to all the servers.
+  If the size of a single weight matrix is less than this bound, then it is 
sent to a single randomly picked server; otherwise, it is partitioned to all 
the servers.
+- `MXNET_ENABLE_GPU_P2P` GPU Peer-to-Peer communication
+  Value type: 0(false) or 1(true)
+  Default value: 1
+  If true, MXNet tries to use GPU peer-to-peer communication, if available on 
your device. This is used only when kvstore has the type `device` in it.
+### Communication
+- `DMLC_INTERFACE` Using a particular network interface
+  Value type: Name of interface
+  Example: `eth0`
+  MXNet often chooses the first available network interface.
+  But for machines with multiple interfaces, we can specify which network 
interface to use for data communication using this environment variable.
+- `PS_VERBOSE` Logging communication
+  Value type: 1 or 2
+  Default value: (empty)
+    - `PS_VERBOSE=1` logs connection information like the IPs and ports of all 
+    - `PS_VERBOSE=2` logs all data communication information
+When the network is unreliable, messages being sent from one node to another 
might get lost.
+The training process can hang when a critical message is not successfully 
+In such cases, an additional ACK can be sent for each message to track its 
+This can be done by setting `PS_RESEND` and `PS_RESEND_TIMEOUT`
+- `PS_RESEND` Retransmission for unreliable network
+Value type: 0(false) or 1(true)
+Default value: 0
+Whether or not to enable retransmission of messages
+- `PS_RESEND_TIMEOUT` Timeout for ACK to be received
+Value type: Integer (in milliseconds)
+Default value: 1000
+If ACK is not received in `PS_RESEND_TIMEOUT` milliseconds, then the message 
will be resent.
diff --git a/docs/faq/index.md b/docs/faq/index.md
index 098d37f5fc0..3f42023fad5 100644
--- a/docs/faq/index.md
+++ b/docs/faq/index.md
@@ -15,9 +15,11 @@ and full working examples, visit the [tutorials 
 ## Scale
-* [How can I train with multiple CPU/GPUs with data 
+* [How can I train with multiple CPU/GPUs on a single machine with data 
-* [How can I train with multiple GPUs with model 
+* [How can I train using multiple machines with data 
+* [How can I train using multiple GPUs with model 
 ## Speed
diff --git a/docs/faq/multi_devices.md b/docs/faq/multi_devices.md
index b9cb3ea2916..3bb00105a62 100644
--- a/docs/faq/multi_devices.md
+++ b/docs/faq/multi_devices.md
@@ -82,132 +82,6 @@ Note that this option may result in higher GPU memory usage.
 When using a large number of GPUs, e.g. >=4, we suggest using `device` for 
better performance.
-## Distributed Training with Multiple Machines
-`KVStore` also supports a number of options for running on multiple machines.
-- `dist_sync` behaves similarly to `local` but exhibits one major difference.
-  With `dist_sync`, `batch-size` now means the batch size used on each machine.
-  So if there are *n* machines and we use batch size *b*,
-  then `dist_sync` behaves like `local` with batch size *n\*b*.
-- `dist_device_sync` is similar to `dist_sync`. The difference between them is 
-  `dist_device_sync` aggregates gradients and updates weight on GPUs
-  while `dist_sync` does so on CPU memory.
-- `dist_async`  performs asynchronous updates.
-  The weight is updated whenever gradients are received from any machine.
-  The update is atomic, i.e., no two updates happen on the same weight at the 
same time.
-  However, the order is not guaranteed.
-### How to Launch a Job
-> To use distributed training, we need to compile with `USE_DIST_KVSTORE=1`
-> (see [MXNet installation guide](http://mxnet.io/install/index.html) for more 
-Launching a distributed job is a bit different from running on a single
-machine. MXNet provides
-[tools/launch.py](https://github.com/dmlc/mxnet/blob/master/tools/launch.py) to
-start a job by using `ssh`, `mpi`, `sge`, or `yarn`.
-An easy way to set up a cluster of EC2 instances for distributed deep learning
-is using an [AWS CloudFormation 
-If you do not have a cluster, you can check the repository before you continue.
-Assume we are at the directory `mxnet/example/image-classification`
-and want to train LeNet to classify MNIST images, as demonstrated here:
-On a single machine, we can run:
-python train_mnist.py --network lenet
-Now, say we are given two ssh-able machines and _MXNet_ is installed on both 
-We want to train LeNet on these two machines.
-First, we save the IPs (or hostname) of these two machines in file `hosts`, 
-$ cat hosts
-Next, if the mxnet folder is accessible from both machines, e.g. on a
-then we can run:
-python ../../tools/launch.py -n 2 --launcher ssh -H hosts python 
train_mnist.py --network lenet --kv-store dist_sync
-Note that here we
-- use `launch.py` to submit the job.
-- provide launcher, `ssh` if all machines are ssh-able, `mpi` if `mpirun` is
-  available, `sge` for Sun Grid Engine, and `yarn` for Apache Yarn.
-- `-n` number of worker nodes to run on
-- `-H` the host file which is required by `ssh` and `mpi`
-- `--kv-store` use either `dist_sync` or `dist_async`
-- `-s` number of server nodes to run on
-- If the `-s` argument is not passed, it will keep the number of servers same 
as number of workers
-- The launch.py script tries to cycle through the hosts file to launch the 
servers and workers. For example,
-  let's say you have `5` hosts in the hosts file and you passed n as `3`(and 
nothing for s).
-  The script will launch a total of `3` server processes, one each for the 
first three hosts and
-  launch a total of `3` worker processes, one each for the fourth, fifth and 
first host.
-- If the hosts file has exactly `n` number of worker nodes which is passed as 
an argument with `-n`, it will launch
-  a server process and a worker process on each of the `n` hosts.
-### Synchronize Directory
-Now consider if the mxnet folder is not accessible.
-We can first copy the `MXNet` library to this folder by
-cp -r ../../python/mxnet .
-cp -r ../../lib/libmxnet.so mxnet
-then ask `launch.py` to synchronize the current directory to all machines'
- `/tmp/mxnet` directory with `--sync-dst-dir`
-python ../../tools/launch.py -n 2 -H hosts --sync-dst-dir /tmp/mxnet \
-   python train_mnist.py --network lenet --kv-store dist_sync
-### Gradient compression
-If your model has fully connected components or recurrent neural networks, you 
may achieve increased training speed using gradient compression with 
potentially slight loss of accuracy. Please see [Gradient 
 for more details on when and how to use it. For the above example, gradient 
compression can be enabled by running the following:
-python ../../tools/launch.py -n 2 --launcher ssh -H hosts python 
train_mnist.py --network lenet \
-    --kv-store dist_sync --gc-type 2bit
-In this example, `gc-type` has been set to `2bit`, to enable two bit gradient 
-### Use a Particular Network Interface
-_MXNet_ often chooses the first available network interface.
-But for machines that have multiple interfaces,
-we can specify which network interface to use for data
-communication by the environment variable `DMLC_INTERFACE`.
-For example, to use the interface `eth0`, we can
-export DMLC_INTERFACE=eth0; python ../../tools/launch.py ...
-### Debug Connection
-Set`PS_VERBOSE=1` to see the debug logging, e.g
-export PS_VERBOSE=1; python ../../tools/launch.py ...
-### More
-- See more launch options by `python ../../tools/launch.py -h`
-- See more options of [ps-lite](https://ps-lite.readthedocs.io/en/latest)
+## Distributed training with multiple devices across machines
+Refer [Distributed 
+for information on how distributed training works and how to use it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

Reply via email to