Repository: incubator-singa Updated Branches: refs/heads/master 9ca741712 -> 06f85e23e
fix bugs from zmq sockets: move socekts creation to sub threads. Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/06f85e23 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/06f85e23 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/06f85e23 Branch: refs/heads/master Commit: 06f85e23eb0a5c969b7758716cf4090b4de302ce Parents: 9ca7417 Author: wang wei <[email protected]> Authored: Tue May 12 13:58:02 2015 +0800 Committer: wang wei <[email protected]> Committed: Tue May 12 13:58:02 2015 +0800 ---------------------------------------------------------------------- examples/cifar10/Makefile | 2 +- include/trainer/server.h | 7 +++---- include/trainer/worker.h | 9 ++++----- src/trainer/server.cc | 10 +++++----- src/trainer/trainer.cc | 15 +++++---------- src/trainer/worker.cc | 27 ++++++++++++--------------- 6 files changed, 30 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/examples/cifar10/Makefile ---------------------------------------------------------------------- diff --git a/examples/cifar10/Makefile b/examples/cifar10/Makefile index 40fece6..16c329f 100644 --- a/examples/cifar10/Makefile +++ b/examples/cifar10/Makefile @@ -5,7 +5,7 @@ libs :=singa glog protobuf download: cifar-10-binary-bin cifar-10-binary-bin: - wget http://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz + #wget http://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz tar xf cifar-10-binary.tar.gz create: http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/include/trainer/server.h ---------------------------------------------------------------------- diff --git a/include/trainer/server.h b/include/trainer/server.h index d113c7d..e37dab4 100644 --- a/include/trainer/server.h +++ b/include/trainer/server.h @@ -8,13 +8,12 @@ using std::shared_ptr; namespace singa { class Server{ public: - Server(int group_id, int server_id); - void Setup(const UpdaterProto& proto, shared_ptr<PMServer::ParamShard> shard, - shared_ptr<Dealer> dealer); + Server(int thread_id, int group_id, int server_id); + void Setup(const UpdaterProto& proto, shared_ptr<PMServer::ParamShard> shard); void Run(); protected: - int group_id_, server_id_; + int thread_id_, group_id_, server_id_; shared_ptr<PMServer> pmserver_; shared_ptr<Dealer> dealer_; }; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/include/trainer/worker.h ---------------------------------------------------------------------- diff --git a/include/trainer/worker.h b/include/trainer/worker.h index 0e9f356..09ef49d 100644 --- a/include/trainer/worker.h +++ b/include/trainer/worker.h @@ -39,11 +39,10 @@ class Performance{ */ class Worker { public: - Worker(int group_id, int worker_id); + Worker(int thread_id, int group_id, int worker_id); ~Worker(){} void Setup(const ModelProto& model, shared_ptr<NeuralNet> train_net, - shared_ptr<PMWorker::ParamShard> shard, shared_ptr<Dealer> layer_dealer, - shared_ptr<Dealer> param_dealer); + shared_ptr<PMWorker::ParamShard> shard); void set_test_net(shared_ptr<NeuralNet> test_net){ test_net_=test_net; } @@ -160,7 +159,7 @@ class Worker { void ReceiveBlobs(shared_ptr<NeuralNet> net); void SendBlob(); protected: - int group_id_, worker_id_; + int thread_id_, group_id_, worker_id_; int step_; ModelProto modelproto_; shared_ptr<PMWorker> pmworker_; @@ -172,7 +171,7 @@ class Worker { class BPWorker: public Worker{ public: ~BPWorker(){} - BPWorker(int group_id, int worker_id):Worker(group_id, worker_id){} + BPWorker(int thread_id, int group_id, int worker_id):Worker(thread_id, group_id, worker_id){} virtual void TrainOneBatch(int step); virtual void TestOneBatch(shared_ptr<NeuralNet> net, int step, Phase phase); void Forward(shared_ptr<NeuralNet> net, int step, bool training); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/src/trainer/server.cc ---------------------------------------------------------------------- diff --git a/src/trainer/server.cc b/src/trainer/server.cc index f5877c5..5431955 100644 --- a/src/trainer/server.cc +++ b/src/trainer/server.cc @@ -9,20 +9,20 @@ namespace singa { -Server::Server(int group_id, int server_id): - group_id_(group_id), server_id_(server_id){} +Server::Server(int thread_id, int group_id, int server_id): + thread_id_(thread_id), group_id_(group_id), server_id_(server_id){} void Server::Setup(const UpdaterProto& proto, - shared_ptr<PMServer::ParamShard> shard, - shared_ptr<Dealer> dealer){ + shared_ptr<PMServer::ParamShard> shard){ //VLOG(3) << "Parsing config file for host "<<hosts[id_] << " server id = " <<id_; pmserver_=shared_ptr<PMServer>(Singleton<Factory<PMServer>>::Instance() ->Create("PMServer")); pmserver_->Setup(group_id_, server_id_, shard, proto); - dealer_=dealer; } void Server::Run(){ + dealer_=std::make_shared<Dealer>(thread_id_*2); + dealer_->Connect(kInprocRouterEndpoint); Msg* ping=new Msg(); ping->set_src(group_id_, server_id_, kServer); ping->set_dst(0,0,kStub); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/src/trainer/trainer.cc ---------------------------------------------------------------------- diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc index 0a1edc8..89e97f1 100644 --- a/src/trainer/trainer.cc +++ b/src/trainer/trainer.cc @@ -45,6 +45,7 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto, int procs_id){ RegisterDefaultClasses(mproto); + int nthreads=1; auto cluster=Cluster::Get(cproto, procs_id); // create servers vector<shared_ptr<Server>> servers; @@ -59,10 +60,8 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto, // the ParamShard for servers consists of a dictionary of Param objects auto shard=make_shared<PMServer::ParamShard>(); for(int sid=start;sid<end;sid++){ - auto server=make_shared<Server>(gid, sid); - auto dealer=make_shared<Dealer>(nSocket++); - dealer->Connect(kInprocRouterEndpoint); - server->Setup(mproto.updater(), shard, dealer); + auto server=make_shared<Server>(nthreads++, gid, sid); + server->Setup(mproto.updater(), shard); servers.push_back(server); } } @@ -129,15 +128,11 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto, for(int wid=wstart;wid<wend;wid++){ shared_ptr<Worker> worker=nullptr; if(mproto.alg()==ModelProto_GradCalcAlg_kBackPropagation) - worker=make_shared<BPWorker>(gid, wid); + worker=make_shared<BPWorker>(nthreads++,gid, wid); else{ // TODO add CDWorker } - auto layer_dealer=make_shared<Dealer>(nSocket++); - auto param_dealer=make_shared<Dealer>(nSocket++); - layer_dealer->Connect(kInprocRouterEndpoint); - param_dealer->Connect(kInprocRouterEndpoint); - worker->Setup(mproto, train_net, shard, layer_dealer, param_dealer); + worker->Setup(mproto, train_net, shard); worker->set_test_net(test_net); worker->set_validation_net(validation_net); workers.push_back(worker); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/06f85e23/src/trainer/worker.cc ---------------------------------------------------------------------- diff --git a/src/trainer/worker.cc b/src/trainer/worker.cc index a290996..138c954 100644 --- a/src/trainer/worker.cc +++ b/src/trainer/worker.cc @@ -8,40 +8,37 @@ #include "proto/model.pb.h" using std::thread; namespace singa { -Worker::Worker( int group_id, int worker_id): - group_id_(group_id), worker_id_(worker_id){ +Worker::Worker(int thread_id, int group_id, int worker_id): + thread_id_(thread_id),group_id_(group_id), worker_id_(worker_id){ } void Worker::Setup(const ModelProto& model, shared_ptr<NeuralNet> train_net, - shared_ptr<PMWorker::ParamShard> shard, - shared_ptr<Dealer> layer_dealer, - shared_ptr<Dealer> param_dealer){ + shared_ptr<PMWorker::ParamShard> shard){ train_net_=train_net; modelproto_=model; - layer_dealer_=layer_dealer; - param_dealer_=param_dealer; - if(layer_dealer_!=nullptr) - layer_poller_.Add(layer_dealer_.get()); - if(param_dealer_!=nullptr) - param_poller_.Add(param_dealer_.get()); pmworker_=shared_ptr<PMWorker>(Singleton<Factory<PMWorker>>::Instance() ->Create("PMWorker")); pmworker_->Setup(group_id_, worker_id_, shard); step_=modelproto_.step(); +} + +void Worker::Run(){ + param_dealer_=std::make_shared<Dealer>(thread_id_*2+1); + param_dealer_->Connect(kInprocRouterEndpoint); + //layer_dealer_=std::make_shared<Dealer>(thread_id_*2); // init params - for(auto layer: train_net->layers()) + for(auto layer: train_net_->layers()) if(group_id_==0&&layer->locationid()==worker_id_) for(auto param: layer->GetParams()){ if(param->owner()<0||param->owner()==param->id()){ param->Init(); Put(param, step_); } - Get(param, step_); + else + Get(param, step_); } -} -void Worker::Run(){ step_=modelproto_.step(); Performance perf(train_net_); while(!StopNow(step_)){
