XiaotaoChen opened a new issue #18248:
URL: https://github.com/apache/incubator-mxnet/issues/18248


   I mean using kvstore for devices communication in the node,  horovod between 
nodes. 
   
   It's widely knows that horovod launch process per devices, and support 
hierarchical communication.  So the devices in the same node are independent. 
And SyncBN can't be used in horovod normally. So we use kvstore local/device to 
sync gradients acorss devices in the same node. and then sync gradients across 
nodes by launch process per node by mpirun. the efficiency of our network about 
`kvstore_dist` and `hvd + kvstore(local/device)` as belows:
   
   compare `kv_dist_sync_device_tree` with `hvd_device_tree` in `4 node` , it 
shows `hvd + kv`'s efficiency is 93%,  `kv_dist_sync_device_tree` is 86%. 
   
   | node                         | kv_dist_sync(samples/s) | 
kv_dist_sync_device_tree(samples/s) | hvd_device_tree(samples/s) |
   | ---------------------------- | ----------------------- | 
----------------------------------- | -------------------------- |
   | 1x8gpus ( local device_tree) | 46.75                   | 46.75             
                  | 46.75                      |
   | 4 x8gpus                     | 35.80                   | 40.41             
                  | 43.48                      |
   | efficiency(%)                | 76.5                    | 86.4              
                  | 93                         |
   
   ### how to do
   
   `kvstore` supports set optimizer on kvstore,  so if we  set 
`DistributedOptimizer` created by `horovod` to kvstore. and set kvstore type to 
`device/local`, it means kvstore would reduce gradients acorsss devices, and 
then call `upater` with reduced gradients across devices, due to `updater` is 
`DistributedOptimizer`,  `horovod` will reduce gradients across nodes. So in 
this mode, It's feasible theoretically.  To verify `hvd +kv(local)` is equal to 
`kv_dist` for distriubted training. we check the results of the two mode. the 
test code is at the end.
   
   ### our encounter
   
   according the `test code` result, it shows the two modes is equal. but in 
our real training scenario, the trained model by `hvd + kv(local)` is worser 
than `kv_dist`. I don't know why. May you can give some advices or guesses.
   
   ### potential bug
   
   in the `test code`, if we use `hvd+kv(local)`,  and haven't add 
`mx.nd.waitall()` this process will crash down in the end. It seems like one of 
processes misses a tensor at the beginning. the error info as belows:
   
   ```shell
   mxnet.base.MXNetError: Horovod has been shut down. This was caused by an 
exception on one of the ranks or an attempt to allreduce, allgather or 
broadcast a tensor after one of the ranks finished execution. If the shutdown 
was caused by an exception, you should see the exception in the log before the 
first shutdown message.
   ```
   
   
   
   ### test code
   
   detail scripts is here: [example for 
hvd+kv](https://github.com/XiaotaoChen/example_for_hvd_kv)
   
   ```python
   import mxnet as mx
   import numpy as np
   import horovod.mxnet as hvd
   import os
   from mxnet import optimizer as opt
   import pickle as pkl
   import time
   import argparse
   
   def test_hvd_kv(rank, num_workers, kv, dtype="float32"):
       assert dtype in ("float32", "float16")
       np.random.seed(5+rank)
       gpu_num = 8
       tensor_count = 10
       repeat_count = 10
       # shape = (1,1,2,3)
       shape = (128,1024,3,3)
       params_names = ["w_{}".format(i) for i in range(tensor_count)]
       params_shapes = [shape[:3] + (3+i,) for i in range(tensor_count)]
       params_array = [[mx.nd.zeros(shape=params_shapes[t_id], ctx=mx.gpu(i)) 
for i in range(8)] for t_id in range(tensor_count)]
       for idx in range(tensor_count):
           kv.init(params_names[idx], params_array[idx][0])
       for cnt in range(repeat_count):
           print("{}/{} update {}...".format(rank, num_workers, cnt))
           for idx in range(tensor_count):
               # grad_list = [mx.nd.ones(shape=shape, ctx=mx.gpu(i), 
dtype=dtype) * (rank * gpu_num + i) for i in range(gpu_num)]
               grad_list = 
[mx.nd.array(np.random.uniform(size=params_shapes[idx]) * 10, ctx=mx.gpu(i), 
dtype=dtype) * (rank * gpu_num + i) for i in range(gpu_num)]
               arg_list = params_array[idx]
               name = params_names[idx]
   
               kv.push(name, grad_list)
               kv.pull(name, arg_list)
               mx.nd.waitall()
       
       if rank == 0:
           params_dict = {}
           for idx in range(tensor_count):
               params_dict[params_names[idx]] = params_array[idx][0].asnumpy()
           pkl_name = "{}_{}.pkl".format('hvd' if use_horovod else "kv_dist", 
dtype)
           with open(pkl_name, 'wb') as f:
               pkl.dump(params_dict, f)
       
       time.sleep(2)
   
   def test_allreduce(use_horovod, dtype):
       if use_horovod is False:
           kvstore_type = "dist_sync_device" if os.environ.get("DMLC_ROLE") == 
"worker" else kvstore_type
           kv = mx.kvstore.create(kvstore_type)
           rank = kv.rank
           num_workers = kv.num_workers
       else:
           kvstore_type = "device"
           kv = mx.kvstore.create(kvstore_type)
           hvd.init()
           rank = hvd.rank()
           num_workers = hvd.size()
       print('use horovod: {}, rank {}/{}, kv type: {}, usetree: {}'.format(
              use_horovod, rank, num_workers, kvstore_type, 
              os.environ.get("MXNET_KVSTORE_USETREE")))
   
       rescale_grad = 1.0 / (8 * num_workers)
       if use_horovod:
           rescale_grad = rescale_grad * num_workers
   
       optimizer_params = dict(
           momentum=0, # pOpt.optimizer.momentum,
           wd=0, # pOpt.optimizer.wd,
           learning_rate=0.1,
           rescale_grad=rescale_grad,
       )
       optimizer = mx.optimizer.create("sgd", **optimizer_params)
       if use_horovod:
           # Horovod: wrap optimizer with DistributedOptimizer
           optimizer = hvd.DistributedOptimizer(optimizer)
   
       print("opt rescale:{}".format(optimizer.rescale_grad))
       kv.set_optimizer(optimizer)
   
       test_hvd_kv(rank, num_workers, kv, dtype)
   
   def check_result(dtype):
       hvd_file = "hvd_{}.pkl".format(dtype)
       kv_file = "kv_dist_{}.pkl".format(dtype)
       with open(hvd_file, 'rb') as f:
           hvd_params = pkl.load(f)
       with open(kv_file, 'rb') as f:
           kv_params = pkl.load(f)
       
       for k,v in hvd_params.items():
           assert k in kv_params.keys()
           print("check {} : {}, dtype:{}".format(k, v.shape, dtype))
           # don't set decimal=6, this will cause mismatch, this mismatch may 
be caused by calculation precision difference bewteen kvstore and hvd or 
optimizer.
           # this mismatch have no relative with allreduce operation in hvd or 
kv.
           np.testing.assert_almost_equal(kv_params[k], hvd_params[k], 
decimal=4)
   
   
   if __name__ == "__main__":
       parser = argparse.ArgumentParser(description='example for hvd kv')
       parser.add_argument('--use_horovod', help='use horovod or not', type=str)
       parser.add_argument("--dtype", help="data type float32 or float16", 
type=str)
       args = parser.parse_args()
       use_horovod = eval(args.use_horovod)
       dtype = args.dtype # "float32", "float16"
       print("[example for hvd kv] use horovod:{}, 
dtype:{}".format(use_horovod, dtype))
       test_allreduce(use_horovod, dtype)
       # check_result(dtype)
       
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to