merge code for data partition within one group on single node to the upstream
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/f29d93ff Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/f29d93ff Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/f29d93ff Branch: refs/heads/master Commit: f29d93ff7967b615d546fc525c3f514ce350f689 Parents: 0d47ec5 06f85e2 Author: wang wei <[email protected]> Authored: Sun May 17 14:19:05 2015 +0800 Committer: wang wei <[email protected]> Committed: Sun May 17 14:19:05 2015 +0800 ---------------------------------------------------------------------- README.md | 12 ++++++++++++ examples/cifar10/cluster.conf | 4 ++-- include/trainer/worker.h | 2 +- src/trainer/trainer.cc | 2 +- src/trainer/worker.cc | 1 + 5 files changed, 17 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f29d93ff/examples/cifar10/cluster.conf ---------------------------------------------------------------------- diff --cc examples/cifar10/cluster.conf index 88c3d4b,6b8a8e6..97c64fd --- a/examples/cifar10/cluster.conf +++ b/examples/cifar10/cluster.conf @@@ -1,6 -1,5 +1,6 @@@ nworker_groups: 1 nserver_groups: 1 nservers_per_group: 1 - nworkers_per_group: 2 - nworkers_per_procs: 2 + nworkers_per_group: 1 ++nworkers_per_procs: 1 workspace: "examples/cifar10/" http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f29d93ff/include/trainer/worker.h ---------------------------------------------------------------------- diff --cc include/trainer/worker.h index afa56ae,09ef49d..ec07210 --- a/include/trainer/worker.h +++ b/include/trainer/worker.h @@@ -137,9 -159,10 +137,9 @@@ class Worker void ReceiveBlobs(shared_ptr<NeuralNet> net); void SendBlob(); protected: - int thread_id_,group_id_, worker_id_; + int thread_id_, group_id_, worker_id_; int step_; ModelProto modelproto_; - shared_ptr<PMWorker> pmworker_; shared_ptr<NeuralNet> train_net_, test_net_, validation_net_; shared_ptr<Dealer> layer_dealer_, param_dealer_; Poller layer_poller_, param_poller_; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f29d93ff/src/trainer/trainer.cc ---------------------------------------------------------------------- diff --cc src/trainer/trainer.cc index 35b8f6c,89e97f1..bc6867d --- a/src/trainer/trainer.cc +++ b/src/trainer/trainer.cc @@@ -53,9 -58,9 +53,9 @@@ void Trainer::Start(const ModelProto& m int start=pid*cluster->nservers_per_procs()%cluster->nservers_per_group(); int end=start+cluster->nservers_per_group(); // the ParamShard for servers consists of a dictionary of Param objects - auto shard=make_shared<PMServer::ParamShard>(); + auto shard=make_shared<Server::ParamShard>(); for(int sid=start;sid<end;sid++){ - auto server=make_shared<Server>(nthreads++,gid, sid); + auto server=make_shared<Server>(nthreads++, gid, sid); server->Setup(mproto.updater(), shard); servers.push_back(server); } http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f29d93ff/src/trainer/worker.cc ---------------------------------------------------------------------- diff --cc src/trainer/worker.cc index 7565d49,138c954..dfbe989 --- a/src/trainer/worker.cc +++ b/src/trainer/worker.cc @@@ -22,67 -20,27 +22,68 @@@ void Worker::Setup(const ModelProto& mo } void Worker::Run(){ - param_dealer_=std::make_shared<Dealer>(thread_id_*2+1); + param_dealer_=make_shared<Dealer>(2*thread_id_); param_dealer_->Connect(kInprocRouterEndpoint); + param_poller_.Add(param_dealer_.get()); + layer_dealer_=make_shared<Dealer>(2*thread_id_+1); + layer_dealer_->Connect(kInprocRouterEndpoint); + + { // TODO remove waiting pong msg + Msg* ping=new Msg(); + ping->set_src(group_id_, worker_id_, kWorkerParam); + ping->set_dst(-1,-1,kStub); + ping->set_type(kConnect); + ping->add_frame("PING", 4); + param_dealer_->Send(&ping); + ping=param_dealer_->Receive(); + string pong((char*)ping->frame_data(), ping->frame_size()); + CHECK_STREQ("PONG", pong.c_str()); + delete ping; + } + + { + Msg* ping=new Msg(); + ping->set_src(group_id_, worker_id_, kWorkerLayer); + ping->set_dst(-1,-1,kStub); + ping->set_type(kConnect); + ping->add_frame("PING", 4); + layer_dealer_->Send(&ping); + ping=layer_dealer_->Receive(); + string pong((char*)ping->frame_data(), ping->frame_size()); + CHECK_STREQ("PONG", pong.c_str()); + delete ping; + } + step_=modelproto_.step(); + //layer_dealer_=std::make_shared<Dealer>(thread_id_*2); // init params - for(auto layer: train_net_->layers()) - if(group_id_==0&&layer->locationid()==worker_id_) + for(auto layer: train_net_->layers()){ + //LOG(ERROR)<<layer->partitionid()<<" : "<<layer->name(); + if(layer->partitionid()==worker_id_) for(auto param: layer->GetParams()){ - if(param->owner()<0||param->owner()==param->id()){ - param->Init(); - Put(param, step_); + if(group_id_==0){ + if(param->owner()==param->id()){ + param->Init(0); + Put(param, step_); + }else{ + Get(param, 0); + } + }else{ + Get(param, modelproto_.warmup_steps()); } - else - Get(param, step_); } - - step_=modelproto_.step(); - Performance perf(train_net_); + } + Metric perf; + if(group_id_==0&&step_<modelproto_.warmup_steps()){ + for(step_=0;step_<modelproto_.warmup_steps();step_++) + RunOneBatch(step_, &perf); + for(auto layer: train_net_->layers()){ + //LOG(ERROR)<<layer->partitionid()<<" : "<<layer->name(); + if(layer->partitionid()==worker_id_) + for(auto param: layer->GetParams()) + if(param->owner()==param->id()) + Put(param, step_); + } + } while(!StopNow(step_)){ RunOneBatch(step_, &perf); step_++;
