eric-haibin-lin commented on a change in pull request #15124: [MXNET-1294]
Priority-based parameter propagation for improved data parallel training
throughput
URL: https://github.com/apache/incubator-mxnet/pull/15124#discussion_r291451322
##########
File path: src/kvstore/kvstore_dist.h
##########
@@ -412,17 +439,38 @@ class KVStoreDist : public KVStoreLocal {
void PushDefault(int key, const NDArray &send_buf, const PSKV& pskv, int
priority) {
auto push_to_servers =
- [this, key, pskv, send_buf](RunContext rctx,
Engine::CallbackOnComplete cb) {
+ [this, key, pskv, send_buf, priority](RunContext rctx,
Engine::CallbackOnComplete cb) {
const int dtype = send_buf.dtype();
// convert to ps keys
const size_t size = send_buf.shape().Size() *
mshadow::mshadow_sizeof(dtype);
char* data = static_cast<char *>(send_buf.data().dptr_);
// do push. false means no delete
ps::SArray<char> vals(data, size, false);
int cmd = GetCommandType(RequestType::kDefaultPushPull, dtype);
- CHECK_NOTNULL(ps_worker_)->ZPush(
- pskv.keys, vals, pskv.lens,
- cmd, [cb]() { cb(); });
+
+ auto *counter = new std::atomic<int>(pskv.keys.size());
+ int len = 0;
+ for (size_t i = 0; i < pskv.keys.size(); i++) {
+ auto ks = new ps::SArray<ps::Key>(std::move(pskv.keys.segment(i,
i+1)));
+ auto vs = new ps::SArray<char>(std::move(vals.segment(len,
len+pskv.lens[i])));
+ auto ls = new ps::SArray<int>(std::move(pskv.lens.segment(i,
i+1)));
+ CHECK_NOTNULL(ps_worker_)->ZPush(*ks, *vs, *ls,
+ static_cast<int>(RequestType::kDefaultPushPull),
Review comment:
Hmmm. We cannot assume users call kv.pull after kv.push is called. It is
legal for a user to call `kv.push` multiple times before a `kv.pull` is called.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services