rahul003 commented on a change in pull request #9152: tutorial for distributed training URL: https://github.com/apache/incubator-mxnet/pull/9152#discussion_r161617906
########## File path: docs/faq/distributed_training.md ########## @@ -0,0 +1,286 @@ +# Distributed training +MXNet supports distributed training enabling us to leverage multiple machines for faster training. +In this document, we describe how it works, how to launch a distributed training job and +some environment variables which provide more control. + +## Type of parallelism +There are two ways in which we can distribute the workload of training a neural network across multiple devices (can be either GPU or CPU). +The first way is *data parallelism*, which refers to the case where each device stores a complete copy of the model. +Each device works with a different part of the dataset, and the devices collectively update a shared model. +These devices can be located on a single machine or across multiple machines. +In this document, we describe how to train a model with devices distributed across machines in a data parallel way. + +When models are so large that they don't fit into device memory, then a second way called *model parallelism* is useful. +Here, different devices are assigned the task of learning different parts of the model. +Currently, MXNet supports Model parallelism in a single machine only. Refer [Training with multiple GPUs using model parallelism](https://mxnet.incubator.apache.org/versions/master/how_to/model_parallel_lstm.html) for more on this. + +## How does distributed training work? +The architecture of distributed training in MXNet is as follows: +#### Types of processes +MXNet has three types of processes which communicate with each other to accomplish training of a model. +- Worker: A worker node actually performs training on a batch of training samples. +Before processing each batch, the workers pull weights from servers. +The workers also send gradients to the servers after each batch. +Depending on the workload for training a model, it might not be a good idea to run multiple worker processes on the same machine. +- Server: There can be multiple servers which store the model's parameters, and communicate with workers. +A server may or may not be co-located with the worker processes. +- Scheduler: There is only one scheduler. +The role of the scheduler is to set up the cluster. +This includes waiting for messages that each node has come up and which port the node is listening on. +The scheduler then lets all processes know about every other node in the cluster, so that they can communicate with each other. + +#### KV Store +MXNet provides a key-value store, which is a critical component used for multi-device and distributed training. +It provides a push and pull API for workers to communicate the parameters of the models. It stores a parameter value for each key. +Workers `push` gradients after processing a batch, and `pull` updated weights before processing a new batch. +We can also pass in optimizers for the KVStore to use while updating each weight. Optimizers like Stochastic Gradient Descent define an update rule, +essentially a mathematical formula to compute the new weight based on the old weight, gradient, and some parameters. + +If you are using a Gluon Trainer object or the Module API, +it uses a kvstore object internally to aggregate gradients from multiple devices on the same machine as well as to communicate across different machines. + +Although the API remains the same whether or not multiple machines are being used, +the notion of kvstore server exists only during distributed training. +In this case, each `push` and `pull` involves communication with the kvstore servers. +Note that we need to compile MXNet with the build flag `USE_DIST_KVSTORE=1` to use distributed training. + +The distributed mode of KVStore is enabled by calling `mxnet.kvstore.create` function +with a string argument which contains the word `dist` as follows: +> kv = mxnet.kvstore.create('dist_sync') + +Refer [KVStore API](https://mxnet.incubator.apache.org/versions/master/api/python/kvstore/kvstore.html) for more information about KVStore. + +#### Data iterators +When running distributed training in data parallel mode, +we want the data iterators on each machine to be working on different parts of the dataset. + +For data parallel training on a single worker, +we can use `mxnet.gluon.utils.split_and_load` to split a batch of samples provided by the data iterator, and then load each part of the batch on the device which will process it. +In the case of distributed training, one way to ensure that different workers +process different samples is to divide the dataset into `n` parts at the beginning, one for each worker. +Within the part of the dataset each worker has, we can continue to split as before for each device on that worker. + +Typically, this split of data for each worker happens through the data iterator, +on passing the number of parts and the index of parts to iterate over. +Some iterators in MXNet that support this feature are [mxnet.io.MNISTIterator](https://mxnet.incubator.apache.org/versions/master/api/python/io/io.html#mxnet.io.MNISTIter) and [mxnet.io.ImageRecordIter](https://mxnet.incubator.apache.org/versions/master/api/python/io/io.html#mxnet.io.ImageRecordIter). +If you are using a different iterator, you can look at how the above iterators implement this. +We can use the kvstore object to get the number of workers (`kv.num_workers`) and rank of the current worker (`kv.rank`). +These can be passed as arguments to the iterator. +You can look at [example/gluon/image_classification.py](https://github.com/apache/incubator-mxnet/blob/master/example/gluon/image_classification.py) +to see an example usage. + +#### Different modes of distributed training +Different modes of distributed training can be enabled by using different types of kvstore. +Distributed training itself is enabled when kvstore creation string contains the word `dist`. + +- `dist_sync`: In synchronous distributed training, all workers use the same synchronized set of model parameters at the start of every batch. +This means that after each batch, the server waits to receive gradients from each worker before it updates the model parameters. +This synchronization comes at a cost because the worker pulling parameters would have to wait till the server finishes this process. +In this mode, if a worker crashes, then it halts the progress of all workers. + +- `dist_async`: In asynchronous distributed training, the server receives gradients from one worker and immediately updates its store, which it uses to respond to any future pulls. +This means that a worker who finishes processing a batch can pull the current parameters from server and start the next batch, +even if other workers haven't finished processing the earlier batch. +This is faster than `dist_sync` but can take more epochs to converge. +In `async` mode, it is required to pass an optimizer because in the absence of an optimizer kvstore would replace the stored weights with received weights and this doesn't make sense for training in asynchronous mode. +The update of weights is atomic, meaning no two updates happen on the same weight at the same time. However, the order of updates is not guaranteed. + +- `dist_sync_device`: Same as `dist_sync` except that when there are multiple GPUs being used on each node, +this mode aggregates gradients and updates weights on GPU while dist_sync does so on CPU memory. +This is faster than `dist_sync` because it reduces expensive communication between GPU and CPU, but it increases memory usage on GPU. + +- `dist_async_device` : The analogue of `dist_sync_device` but in asynchronous mode. + +#### Distribution of parameter arrays +Each server doesn't necessarily store all the parameter arrays. +Arrays are distributed across different servers. The decision of which server stores a particular array is made at random. +The worker processes are unaware of this distribution because kvstore ensures that when a particular key is being pulled, this request is sent to the server which has the corresponding value. +If the value of some key is very large, it may be sharded across different servers. +Again, this is handled internally, so that the worker does not have to do anything differently. +The threshold for this sharding can be controlled with the environment variable `MXNET_KVSTORE_BIGARRAY_BOUND`. +See [environment variables](#environment-variables) for more details. + +#### Gradient compression +When communication cost is expensive, and the ratio of computation time to communication time is low, communication can become a bottleneck. Review comment: good catch ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
