try to fix bugs from zsock_connect by binding the router before connecting to it; todo test the changed code.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/cd9fc797 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/cd9fc797 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/cd9fc797 Branch: refs/heads/master Commit: cd9fc7971369e5447b563abefeaefc31d988ac9b Parents: a617e6c Author: wang wei <[email protected]> Authored: Tue May 26 16:18:01 2015 +0800 Committer: wang wei <[email protected]> Committed: Tue May 26 16:18:01 2015 +0800 ---------------------------------------------------------------------- include/trainer/trainer.h | 2 ++ src/trainer/trainer.cc | 14 +++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/cd9fc797/include/trainer/trainer.h ---------------------------------------------------------------------- diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h index 57fee8c..37d7106 100644 --- a/include/trainer/trainer.h +++ b/include/trainer/trainer.h @@ -9,6 +9,7 @@ #include "neuralnet/neuralnet.h" #include "trainer/worker.h" #include "trainer/server.h" +#include "communication/socket.h" namespace singa { /** @@ -131,6 +132,7 @@ class Trainer{ protected: int procs_id_; + shared_ptr<Router> router_; }; } /* singa */ #endif // INCLUDE_TRAINER_TRAINER_H_ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/cd9fc797/src/trainer/trainer.cc ---------------------------------------------------------------------- diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc index 02e60a4..3f343af 100644 --- a/src/trainer/trainer.cc +++ b/src/trainer/trainer.cc @@ -56,6 +56,11 @@ void Trainer::Start(const ModelProto& mproto, const ClusterProto& cproto, RegisterDefaultClasses(mproto); auto cluster=Cluster::Get(cproto, procs_id); + router_=make_shared<Router>(); + router_->Bind(kInprocRouterEndpoint); + if(cluster->nprocs()>1) + router_->Bind(cluster->endpoint()); + // create servers vector<shared_ptr<Server>> servers; vector<HandleContext> ctx; @@ -184,18 +189,13 @@ void Trainer::Run(int nworkers, int nservers, const std::map<int, shared_ptr<Trainer::ParamShard>>& shards){ auto cluster=Cluster::Get(); procs_id_=cluster->procs_id(); - auto router=make_shared<Router>(); - router->Bind(kInprocRouterEndpoint); - if(cluster->nprocs()>1) - router->Bind(cluster->endpoint()); - map<int, shared_ptr<Dealer>> interprocs_dealers; Metric perf; int perf_step=-1; string perf_prefix; bool stop=false; while(!stop){ - Msg* msg=router->Receive(); + Msg* msg=router_->Receive(); if(msg==nullptr){ LOG(ERROR)<<"Connection broken!"; exit(0); @@ -277,7 +277,7 @@ void Trainer::Run(int nworkers, int nservers, interprocs_dealers[procs_id]->Send(&msg); */ }else{ - router->Send(&msg); + router_->Send(&msg); } } }
