rahul003 commented on a change in pull request #9152: tutorial for distributed 
training
URL: https://github.com/apache/incubator-mxnet/pull/9152#discussion_r161617906
 
 

 ##########
 File path: docs/faq/distributed_training.md
 ##########
 @@ -0,0 +1,286 @@
+# Distributed training
+MXNet supports distributed training enabling us to leverage multiple machines 
for faster training.
+In this document, we describe how it works, how to launch a distributed 
training job and
+some environment variables which provide more control.
+
+## Type of parallelism
+There are two ways in which we can distribute the workload of training a 
neural network across multiple devices (can be either GPU or CPU).
+The first way is *data parallelism*, which refers to the case where each 
device stores a complete copy of the model.
+Each device works with a different part of the dataset, and the devices 
collectively update a shared model.
+These devices can be located on a single machine or across multiple machines.
+In this document, we describe how to train a model with devices distributed 
across machines in a data parallel way.
+
+When models are so large that they don't fit into device memory, then a second 
way called *model parallelism* is useful.
+Here, different devices are assigned the task of learning different parts of 
the model.
+Currently, MXNet supports Model parallelism in a single machine only. Refer 
[Training with multiple GPUs using model 
parallelism](https://mxnet.incubator.apache.org/versions/master/how_to/model_parallel_lstm.html)
 for more on this.
+
+## How does distributed training work?
+The architecture of distributed training in MXNet is as follows:
+#### Types of processes
+MXNet has three types of processes which communicate with each other to 
accomplish training of a model.
+- Worker: A worker node actually performs training on a batch of training 
samples.
+Before processing each batch, the workers pull weights from servers.
+The workers also send gradients to the servers after each batch.
+Depending on the workload for training a model, it might not be a good idea to 
run multiple worker processes on the same machine.
+- Server: There can be multiple servers which store the model's parameters, 
and communicate with workers.
+A server may or may not be co-located with the worker processes.
+- Scheduler: There is only one scheduler.
+The role of the scheduler is to set up the cluster.
+This includes waiting for messages that each node has come up and which port 
the node is listening on.
+The scheduler then lets all processes know about every other node in the 
cluster, so that they can communicate with each other.
+
+#### KV Store
+MXNet provides a key-value store, which is a critical component used for 
multi-device and distributed training.
+It provides a push and pull API for workers to communicate the parameters of 
the models. It stores a parameter value for each key.
+Workers `push` gradients after processing a batch, and `pull` updated weights 
before processing a new batch.
+We can also pass in optimizers for the KVStore to use while updating each 
weight. Optimizers like Stochastic Gradient Descent define an update rule,
+essentially a mathematical formula to compute the new weight based on the old 
weight, gradient, and some parameters.
+
+If you are using a Gluon Trainer object or the Module API,
+it uses a kvstore object internally to aggregate gradients from multiple 
devices on the same machine as well as to communicate across different machines.
+
+Although the API remains the same whether or not multiple machines are being 
used,
+the notion of kvstore server exists only during distributed training.
+In this case, each `push` and `pull` involves communication with the kvstore 
servers.
+Note that we need to compile MXNet with the build flag `USE_DIST_KVSTORE=1` to 
use distributed training.
+
+The distributed mode of KVStore is enabled by calling `mxnet.kvstore.create` 
function
+with a string argument which contains the word `dist` as follows:
+> kv = mxnet.kvstore.create('dist_sync')
+
+Refer [KVStore 
API](https://mxnet.incubator.apache.org/versions/master/api/python/kvstore/kvstore.html)
 for more information about KVStore.
+
+#### Data iterators
+When running distributed training in data parallel mode,
+we want the data iterators on each machine to be working on different parts of 
the dataset.
+
+For data parallel training on a single worker,
+we can use `mxnet.gluon.utils.split_and_load` to split a batch of samples 
provided by the data iterator, and then load each part of the batch on the 
device which will process it.
+In the case of distributed training, one way to ensure that different workers
+process different samples is to divide the dataset into `n` parts at the 
beginning, one for each worker.
+Within the part of the dataset each worker has, we can continue to split as 
before for each device on that worker.
+
+Typically, this split of data for each worker happens through the data 
iterator,
+on passing the number of parts and the index of parts to iterate over.
+Some iterators in MXNet that support this feature are 
[mxnet.io.MNISTIterator](https://mxnet.incubator.apache.org/versions/master/api/python/io/io.html#mxnet.io.MNISTIter)
 and 
[mxnet.io.ImageRecordIter](https://mxnet.incubator.apache.org/versions/master/api/python/io/io.html#mxnet.io.ImageRecordIter).
+If you are using a different iterator, you can look at how the above iterators 
implement this.
+We can use the kvstore object to get the number of workers (`kv.num_workers`) 
and rank of the current worker (`kv.rank`).
+These can be passed as arguments to the iterator.
+You can look at 
[example/gluon/image_classification.py](https://github.com/apache/incubator-mxnet/blob/master/example/gluon/image_classification.py)
+to see an example usage.
+
+#### Different modes of distributed training
+Different modes of distributed training can be enabled by using different 
types of kvstore.
+Distributed training itself is enabled when kvstore creation string contains 
the word `dist`.
+
+- `dist_sync`: In synchronous distributed training, all workers use the same 
synchronized set of model parameters at the start of every batch.
+This means that after each batch, the server waits to receive gradients from 
each worker before it updates the model parameters.
+This synchronization comes at a cost because the worker pulling parameters 
would have to wait till the server finishes this process.
+In this mode, if a worker crashes, then it halts the progress of all workers.
+
+- `dist_async`: In asynchronous distributed training, the server receives 
gradients from one worker and immediately updates its store, which it uses to 
respond to any future pulls.
+This means that a worker who finishes processing a batch can pull the current 
parameters from server and start the next batch,
+even if other workers haven't finished processing the earlier batch.
+This is faster than `dist_sync` but can take more epochs to converge.
+In `async` mode, it is required to pass an optimizer because in the absence of 
an optimizer kvstore would replace the stored weights with received weights and 
this doesn't make sense for training in asynchronous mode.
+The update of weights is atomic, meaning no two updates happen on the same 
weight at the same time. However, the order  of updates is not guaranteed.
+
+- `dist_sync_device`: Same as `dist_sync` except that when there are multiple 
GPUs being used on each node,
+this mode aggregates gradients and updates weights on GPU while dist_sync does 
so on CPU memory.
+This is faster than `dist_sync` because it reduces expensive communication 
between GPU and CPU, but it increases memory usage on GPU.
+
+- `dist_async_device` : The analogue of `dist_sync_device` but in asynchronous 
mode.
+
+#### Distribution of parameter arrays
+Each server doesn't necessarily store all the parameter arrays.
+Arrays are distributed across different servers. The decision of which server 
stores a particular array is made at random.
+The worker processes are unaware of this distribution because kvstore ensures 
that when a particular key is being pulled, this request is sent to the server 
which has the corresponding value.
+If the value of some key is very large, it may be sharded across different 
servers.
+Again, this is handled internally, so that the worker does not have to do 
anything differently.
+The threshold for this sharding can be controlled with the environment 
variable `MXNET_KVSTORE_BIGARRAY_BOUND`.
+See [environment variables](#environment-variables) for more details.
+
+#### Gradient compression
+When communication cost is expensive, and the ratio of computation time to 
communication time is low, communication can become a bottleneck.
 
 Review comment:
   good catch
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to