SINGA-19 Slice large Param objects for load-balance rebased to latested master and tested with multiple servers
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/c635cc61 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/c635cc61 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/c635cc61 Branch: refs/heads/master Commit: c635cc61e82b7419ad5aefd7dd643d037cd25737 Parents: e0a52a6 Author: wang wei <[email protected]> Authored: Tue Jun 23 20:09:33 2015 +0800 Committer: wang wei <[email protected]> Committed: Tue Jun 23 20:24:33 2015 +0800 ---------------------------------------------------------------------- include/communication/msg.h | 17 ++++++----- src/communication/msg.cc | 8 ++--- src/trainer/server.cc | 31 +++++++++++--------- src/trainer/trainer.cc | 63 +++++++++++++++++++++++++--------------- src/trainer/worker.cc | 8 ++--- 5 files changed, 76 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/c635cc61/include/communication/msg.h ---------------------------------------------------------------------- diff --git a/include/communication/msg.h b/include/communication/msg.h index b83c738..e63c3cf 100644 --- a/include/communication/msg.h +++ b/include/communication/msg.h @@ -39,12 +39,14 @@ class Msg { inline void SwapAddr() { std::swap(src_, dst_); } inline void set_type(int type) { type_ = type; } inline int type() const { return type_; } - inline void set_target(int first, int second) { - target_first_ = first; - target_second_ = second; + inline void set_trgt(int first, int second, int third) { + trgt_first_ = first; + trgt_second_ = second; + trgt_third_ = third; } - inline int target_first() const { return target_first_; } - inline int target_second() const { return target_second_; } + inline int trgt_first() const { return trgt_first_; } + inline int trgt_second() const { return trgt_second_; } + inline int trgt_third() const { return trgt_third_; } /** * Copy src and dst address, including first, id, flag */ @@ -84,8 +86,9 @@ class Msg { int src_ = 0; int dst_ = 0; int type_ = 0; - int target_first_ = 0; - int target_second_ = 0; + int trgt_first_ = 0; + int trgt_second_ = 0; + int trgt_third_ = 0; #ifdef USE_ZMQ zmsg_t* msg_ = nullptr; zframe_t *frame_ = nullptr; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/c635cc61/src/communication/msg.cc ---------------------------------------------------------------------- diff --git a/src/communication/msg.cc b/src/communication/msg.cc index 2a22b05..4f41077 100644 --- a/src/communication/msg.cc +++ b/src/communication/msg.cc @@ -31,15 +31,15 @@ bool Msg::next_frame() { void Msg::ParseFromZmsg(zmsg_t* msg) { char* tmp = zmsg_popstr(msg); - sscanf(tmp, "%d %d %d %d %d", - &src_, &dst_, &type_, &target_first_, &target_second_); + sscanf(tmp, "%d %d %d %d %d %d", + &src_, &dst_, &type_, &trgt_first_, &trgt_second_, &trgt_third_); frame_ = zmsg_next(msg); msg_ = msg; } zmsg_t* Msg::DumpToZmsg() { - zmsg_pushstrf(msg_, "%d %d %d %d %d", - src_, dst_, type_, target_first_, target_second_); + zmsg_pushstrf(msg_, "%d %d %d %d %d %d", + src_, dst_, type_, trgt_first_, trgt_second_, trgt_third_); zmsg_t *tmp = msg_; msg_ = nullptr; return tmp; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/c635cc61/src/trainer/server.cc ---------------------------------------------------------------------- diff --git a/src/trainer/server.cc b/src/trainer/server.cc index 36b04b6..5662258 100644 --- a/src/trainer/server.cc +++ b/src/trainer/server.cc @@ -50,7 +50,7 @@ void Server::Run(){ CHECK_STREQ("PONG", pong.c_str()); delete msg; }else if(type==kPut){ - int pid=msg->target_first(); + int pid=msg->trgt_second(); shared_ptr<Param> param=nullptr; if(shard_->find(pid)!=shard_->end()){ LOG(ERROR)<<"Param ("<<pid<<") is put more than once"; @@ -61,16 +61,14 @@ void Server::Run(){ param->set_id(pid); (*shard_)[pid]=param; } - param->HandlePutMsg(&msg); + HandlePut(param, &msg); }else{ - int pid=msg->target_first(); + int pid=msg->trgt_second(); if(shard_->find(pid)==shard_->end()){ // delay the processing by re-queue the msg. response=msg; + DLOG(ERROR)<<"Requeue msg"; } else{ - CHECK(shard_->find(pid)!=shard_->end()) <<"Param ("<<pid - <<") is not maintained by server (" - <<group_id_ <<", " <<server_id_<<")"; auto param=shard_->at(pid); switch (type){ case kGet: @@ -80,7 +78,6 @@ void Server::Run(){ response = HandleUpdate(param, &msg); break; case kSyncRequest: - VLOG(3)<<"Handle SYNC-REQUEST"; response = HandleSyncRequest(param, &msg); break; } @@ -93,25 +90,33 @@ void Server::Run(){ } void Server::HandlePut(shared_ptr<Param> param, Msg **msg){ + int version=(*msg)->trgt_third(); param->HandlePutMsg(msg); + // must set version after HandlePutMsg which allocates the memory + param->set_version(version); } Msg* Server::HandleGet(shared_ptr<Param> param, Msg **msg){ - return param->HandleGetMsg(msg); + if(param->version()<(*msg)->trgt_third()) + return *msg; + else{ + auto reply= param->HandleGetMsg(msg); + int paramid=reply->trgt_first(), slice=reply->trgt_second(); + reply->set_trgt(paramid, slice, param->version()); + } } Msg* Server::HandleUpdate(shared_ptr<Param> param, Msg **msg) { - //repsonse of the format: <identity><type: kData><paramId><param content> auto* tmp=static_cast<Msg*>((*msg)->CopyAddr()); tmp->SwapAddr(); - int paramid=(*msg)->target_first(); - int sliceid=(*msg)->target_second(); - int step=(*msg)->target_third(); + int paramid=(*msg)->trgt_first(); + int sliceid=(*msg)->trgt_second(); + int step=(*msg)->trgt_third(); bool copy=param->ParseUpdateMsg(msg); updater_->Update(step, param); param->set_version(param->version()+1); auto response=param->GenUpdateResponseMsg(copy); - response->set_target(paramid, sliceid, param->version()); + response->set_trgt(paramid, sliceid, param->version()); response->SetAddr(tmp); delete tmp; return response; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/c635cc61/src/trainer/trainer.cc ---------------------------------------------------------------------- diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc index 989a020..11499db 100644 --- a/src/trainer/trainer.cc +++ b/src/trainer/trainer.cc @@ -37,11 +37,11 @@ const std::unordered_map<int, vector<std::pair<int, int>>> SliceParams(int num, if(x->owner()==x->id()) avg+=x->size(); } - avg/=num; + avg=avg/num+avg%num; int diff=avg/10; LOG(INFO)<<"Slicer, param avg="<<avg<<", diff= "<<diff; - int capacity=avg, sliceid=0; + int capacity=avg, sliceid=0, nbox=0; std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices; for(auto& param: params){ if(param->id()!=param->owner()) @@ -50,20 +50,22 @@ const std::unordered_map<int, vector<std::pair<int, int>>> SliceParams(int num, LOG(INFO)<<"param id="<<paramid<<", total size="<<x; while(x>0){ int size=0; - if(capacity>x){ + if(capacity>=x){ capacity-=x; size=x; x=0; - }else if(capacity+diff>x){ - capacity=avg; + }else if(capacity+diff>=x){ size=x; x=0; - }else if(capacity>diff){ + capacity=0; + }else if(capacity>=diff){ x-=capacity; size=capacity; capacity=avg; + nbox++; }else{ capacity=avg; + nbox++; } if(size){ paramid2slices[paramid].push_back(std::make_pair(sliceid++, size)); @@ -71,29 +73,42 @@ const std::unordered_map<int, vector<std::pair<int, int>>> SliceParams(int num, } } } + CHECK_LE(nbox, num); return paramid2slices; } const vector<int> PartitionSlice(int num, const vector<int>& slices){ int avg=0; for(int x: slices) avg+=x; - avg/=num; + avg=avg/num+avg%num; int box=avg, boxid=0, diff=avg/10; vector<int> slice2box; - for(int x: slices){ + for(auto it=slices.begin(); it!=slices.end();){ + int x=*it; if(box>=x){ box-=x; slice2box.push_back(boxid); + it++; }else if(box+diff>=x){ slice2box.push_back(boxid); - box=avg; - boxid++; + it++; + box=0; }else{ box=avg; boxid++; } } - CHECK_LE(boxid, num); +// CHECK_LT(slice2box.back(), num); + CHECK_EQ(slice2box.size(), slices.size()); + int previd=slice2box[0]; + std::string disp; + for(size_t i=0;i<slice2box.size();i++) + if(previd!=slice2box[i]){ + disp+=", "+std::to_string(slices[i]); + previd=slice2box[i]; + } else + disp+=" "+std::to_string(slices[i]); + LOG(INFO)<<"partition slice (av ="<<avg<<", num="<<num<<"):"<<disp; return slice2box; } vector<shared_ptr<Server>> Trainer::CreateServers(int nthreads, @@ -298,7 +313,7 @@ void Trainer::Run(int nworkers, int nservers){ } }else if(type==kMetric){ if(msg->src_first()==0){ - int step=msg->target_first(); + int step=msg->trgt_first(); string prefix((char*)msg->frame_data(), msg->frame_size()); msg->next_frame(); Metric cur; @@ -308,7 +323,7 @@ void Trainer::Run(int nworkers, int nservers){ DeleteMsg(&msg); }else if(cluster->nserver_groups()>0){ int group_id; - int paramid=msg->target_first(); + int paramid=msg->trgt_first(); shared_ptr<ParamInfo> entry; switch (type){ // TODO process other requests, e.g. RESTful case kUpdate: @@ -378,7 +393,7 @@ Msg* Trainer::HandleConnect(Msg** msg){ const vector<Msg*> Trainer::HandleGet(shared_ptr<ParamInfo> pi, Msg** msg){ Msg* msgg=*msg; vector<Msg*> replies; - int version=msgg->target_second(); + int version=msgg->trgt_third(); if(msgg->src_flag()==kStub){ if(version<=pi->shares.at(0)->version()){ pi->shares.at(0)->HandleGetMsg(msg); @@ -394,7 +409,7 @@ const vector<Msg*> Trainer::HandleGet(shared_ptr<ParamInfo> pi, Msg** msg){ int server=slice2server_[id+idx]; int procs=Cluster::Get()->ProcsIDOf(group, server, kServer); auto x=param->GenGetMsg(procs!=procs_id_, idx); - x->set_target(param->owner(), id+idx, param->local_version()+1); + x->set_trgt(param->owner(), id+idx, param->local_version()+1); x->set_src(procs_id_, gid, kStub); x->set_dst(group, server, kServer); replies.push_back(x); @@ -406,7 +421,7 @@ const vector<Msg*> Trainer::HandleGet(shared_ptr<ParamInfo> pi, Msg** msg){ const vector<Msg*> Trainer::HandleUpdate(shared_ptr<ParamInfo>pi, Msg** msg){ Msg* msgg=*msg ; vector<Msg*> ret; - int step= msgg->target_second(); + int step= msgg->trgt_third(); if(msgg->src_flag()==kStub){ if(pi->num_update<pi->num_local){ ret.push_back(*msg); @@ -448,7 +463,7 @@ const vector<Msg*> Trainer::HandleUpdate(shared_ptr<ParamInfo>pi, Msg** msg){ int server=slice2server_[idx+id]; int procs=Cluster::Get()->ProcsIDOf(group, server, kServer); auto x=param->GenUpdateMsg(procs!=procs_id_, idx); - x->set_target(param->owner(), id+idx, step); + x->set_trgt(param->owner(), id+idx, step); x->set_src(procs_id_, srcgid, kStub); x->set_dst(group, server, kServer); ret.push_back(x); @@ -463,13 +478,14 @@ const vector<Msg*> Trainer::HandlePut(shared_ptr<ParamInfo>pi, Msg** msg){ vector<Msg*> ret; CHECK_NE((*msg)->src_flag(), kStub); int gid=(*msg)->src_first(); + int version=(*msg)->trgt_third(); auto param=pi->shares.at(0); int group=gid/Cluster::Get()->nworker_groups_per_server_group(); for(int idx=0, start=param->slice_start();idx<param->num_slices(); idx++){ int server=slice2server_[start+idx]; int procs=Cluster::Get()->ProcsIDOf(group, server, kServer); auto x=param->GenPutMsg(procs!=procs_id_, idx); - x->set_target(param->owner(), start+idx, param->version()); + x->set_trgt(param->owner(), start+idx, version); x->set_src(procs_id_, gid, kStub); x->set_dst(group, server, kServer); ret.push_back(x); @@ -479,8 +495,8 @@ const vector<Msg*> Trainer::HandlePut(shared_ptr<ParamInfo>pi, Msg** msg){ } void Trainer::HandleGetResponse(shared_ptr<ParamInfo>pi, Msg** msg){ - int version=(*msg)->target_third(); - int sliceid=(*msg)->target_second(); + int version=(*msg)->trgt_third(); + int sliceid=(*msg)->trgt_second(); auto param=pi->shares.at(0); if(param->ParseGetResponseMsg(msg,sliceid-param->slice_start())) param->set_version(version); @@ -489,10 +505,11 @@ void Trainer::HandleGetResponse(shared_ptr<ParamInfo>pi, Msg** msg){ void Trainer::HandleUpdateResponse(shared_ptr<ParamInfo> pi, Msg** msg){ - int version=(*msg)->target_third(); - int sliceid=(*msg)->target_second(); + int sliceid=(*msg)->trgt_second(); + int version=(*msg)->trgt_third(); auto param=pi->shares.at(0); - if(param->ParseUpdateResponseMsg(msg,sliceid-param->slice_start())) + if(param->ParseUpdateResponseMsg(msg,sliceid-param->slice_start())){ param->set_version(version); + } } } /* singa */ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/c635cc61/src/trainer/worker.cc ---------------------------------------------------------------------- diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc index 0835bbb..8ecb146 100644 --- a/src/trainer/worker.cc +++ b/src/trainer/worker.cc @@ -103,7 +103,7 @@ int Worker::Put(shared_ptr<Param> param, int step){ msg->set_src(group_id_, worker_id_, kWorkerParam); msg->set_dst(-1, -1, kStub); msg->set_type(kPut); - msg->set_target(param->owner(), step); + msg->set_trgt(param->owner(), 0, step); dealer_->Send(&msg); return 1; } @@ -112,7 +112,7 @@ int Worker::Get(shared_ptr<Param> param, int step){ msg->set_src(group_id_, worker_id_, kWorkerParam); msg->set_dst(-1, -1, kStub); msg->set_type(kGet); - msg->set_target(param->owner(), step); + msg->set_trgt(param->owner(), 0, step); dealer_->Send(&msg); return 1; } @@ -126,7 +126,7 @@ int Worker::Update(shared_ptr<Param> param, int step){ msg->set_src(group_id_, worker_id_, kWorkerParam); msg->set_dst(-1, -1, kStub); msg->set_type(kUpdate); - msg->set_target(param->owner(), step); + msg->set_trgt(param->owner(), 0, step); dealer_->Send(&msg); } return 1; @@ -153,7 +153,7 @@ const void Worker::DisplayPerformance(const Metric & perf, const string& prefix) msg->set_src(group_id_, worker_id_, kWorkerParam); msg->set_dst(-1,-1, kStub); msg->set_type(kMetric); - msg->set_target(step_,0); + msg->set_trgt(step_,0,0); const string disp=perf.ToString(); msg->add_frame(prefix.c_str(), prefix.length()); msg->add_frame(disp.c_str(), disp.length());
