SINGA-8 Implement distributed Hogwild Have replaced hard-code enpoints with RegistPocs() and GetProcHost() implemented with the help of zookeeper. TODO slice large Param objects in a separate branch.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/51d4c2ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/51d4c2ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/51d4c2ae Branch: refs/heads/master Commit: 51d4c2aec4f9ddedeff588a916b25a0041dd8a88 Parents: f437011 Author: wang wei <[email protected]> Authored: Fri Jun 19 14:35:50 2015 +0800 Committer: wang wei <[email protected]> Committed: Thu Jun 25 11:49:32 2015 +0800 ---------------------------------------------------------------------- include/trainer/trainer.h | 2 +- include/utils/cluster.h | 19 ++++++++++--------- include/utils/cluster_rt.h | 6 +++--- src/communication/socket.cc | 14 +++++++------- src/main.cc | 2 +- src/trainer/trainer.cc | 8 +++++--- src/utils/cluster.cc | 29 ++++++++++++++++++++++++++--- 7 files changed, 53 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/include/trainer/trainer.h ---------------------------------------------------------------------- diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h index fbbfd0b..fb716bc 100644 --- a/include/trainer/trainer.h +++ b/include/trainer/trainer.h @@ -89,7 +89,7 @@ class Trainer{ * @param clusterproto */ void Start(const ModelProto& modelproto, const ClusterProto& clusterproto, - int procs_id); + const int procs_id); // TODO add Resume() function to continue training from a previously stopped // point. http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/include/utils/cluster.h ---------------------------------------------------------------------- diff --git a/include/utils/cluster.h b/include/utils/cluster.h index 0eeb808..c11874e 100644 --- a/include/utils/cluster.h +++ b/include/utils/cluster.h @@ -23,7 +23,7 @@ namespace singa { class Cluster { public: static shared_ptr<Cluster> Get(); - static shared_ptr<Cluster> Get(const ClusterProto& cluster, int procs_id); + static shared_ptr<Cluster> Get(const ClusterProto& cluster, int procs_id=0); const int nserver_groups()const{ return cluster_.nserver_groups(); } const int nworker_groups()const { return cluster_.nworker_groups(); } @@ -71,17 +71,12 @@ class Cluster { return nprocs_; } - const string endpoint() const { - return endpoint(procs_id()); - } + /** * @return endpoint of the router of a procs with the specified id */ - const string endpoint(int procs_id) const { - CHECK_LT(procs_id, nprocs_); - CHECK_GE(procs_id, 0); - return endpoints_.at(procs_id); - } + const string endpoint(const int procs_id) const; + const string workspace() {return cluster_.workspace();} const string vis_folder(){ return cluster_.workspace()+"/visualization"; @@ -127,6 +122,11 @@ class Cluster { } int ProcsIDOf(int group_id, int id, int flag); + const string hostname() const { + return hostname_; + } + void Register(const string& endpoint); + private: Cluster(const ClusterProto &cluster, int procs_id) ; void SetupFolders(const ClusterProto &cluster); @@ -135,6 +135,7 @@ class Cluster { private: int procs_id_; int nprocs_; + string hostname_; std::vector<std::string> endpoints_; // cluster config proto ClusterProto cluster_; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/include/utils/cluster_rt.h ---------------------------------------------------------------------- diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h index 08bea90..1b877ec 100644 --- a/include/utils/cluster_rt.h +++ b/include/utils/cluster_rt.h @@ -10,7 +10,7 @@ namespace singa { typedef void (*rt_callback)(void *contest); /** - * ClusterRuntime is a runtime service that manages dynamic configuration + * ClusterRuntime is a runtime service that manages dynamic configuration * and status of the whole cluster. It mainly provides following services: * 1) Provide running status of each server/worker * 2) Translate process id to (hostname:port) @@ -35,8 +35,8 @@ class ClusterRuntime { */ virtual std::string GetProcHost(int proc_id) = 0; /** - * Server: watch all workers in a server group, - * will be notified when all workers have left + * Server: watch all workers in a server group, + * will be notified when all workers have left */ virtual bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) = 0; /** http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/communication/socket.cc ---------------------------------------------------------------------- diff --git a/src/communication/socket.cc b/src/communication/socket.cc index 38c0d79..2039162 100644 --- a/src/communication/socket.cc +++ b/src/communication/socket.cc @@ -36,7 +36,6 @@ Dealer::Dealer(int id) : id_(id) { CHECK_NOTNULL(poller_); } -<<<<<<< HEAD Dealer::~Dealer() { zsock_destroy(&dealer_); } @@ -92,13 +91,14 @@ Router::~Router() { } } -int Router::Bind(const std::string& endpoint) { - CHECK_GT(endpoint.length(), 0); - if (endpoint.length()) { - CHECK_EQ(zsock_bind(router_, "%s", endpoint.c_str()), 0); - return 1; +int Router::Bind(string endpoint){ + int port=-1; + if(endpoint.length()){ + port=zsock_bind(router_, "%s", endpoint.c_str()); } - return 0; + CHECK_NE(port,-1)<<endpoint; + LOG(INFO)<<"bind successfully to "<<endpoint+":"+std::to_string(port); + return port; } int Router::Send(Msg **msg) { http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/main.cc ---------------------------------------------------------------------- diff --git a/src/main.cc b/src/main.cc index 77898be..851d528 100644 --- a/src/main.cc +++ b/src/main.cc @@ -19,7 +19,7 @@ * easily, e.g., AddLayer(layer_type, source_layers, meta_data). */ -DEFINE_int32(procsID, 0, "Global process ID"); +DEFINE_int32(procsID, -1, "Global process ID"); DEFINE_string(cluster, "examples/mnist/cluster.conf", "Cluster config file"); DEFINE_string(model, "examples/mnist/conv.conf", "Model config file"); http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/trainer/trainer.cc ---------------------------------------------------------------------- diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc index bdc1416..cd80296 100644 --- a/src/trainer/trainer.cc +++ b/src/trainer/trainer.cc @@ -25,17 +25,14 @@ void Trainer::RegisterDefaultClasses(const singa::ModelProto& proto){ } void HandleWorkerFinish(void * ctx){ - /* HandleContext* hctx=static_cast<HandleContext*> (ctx); Msg* msg=new Msg(); msg->set_src(-1,-1, kRuntime); msg->set_dst(hctx->group_id, hctx->id, kServer); msg->set_type(kStop); hctx->dealer->Send(&msg); - */ } -<<<<<<< HEAD const std::unordered_map<int, vector<std::pair<int, int>>> SliceParams(int num, const vector<shared_ptr<Param>>& params){ std::unordered_map<int, vector<std::pair<int, int>>> paramid2slices; @@ -417,6 +414,11 @@ void Trainer::Run(const vector<shared_ptr<Worker>>& workers, if (interprocs_dealers.find(dst_procs_id)==interprocs_dealers.end()){ auto dealer=make_shared<Dealer>(); interprocs_dealers[dst_procs_id]=dealer; + while(cluster->endpoint(dst_procs_id)==""){ + std::this_thread::sleep_for( + std::chrono::milliseconds(kCollectSleepTime)); + LOG(ERROR)<<"waiting for procs "<< dst_procs_id<<" to register"; + } dealer->Connect("tcp://"+cluster->endpoint(dst_procs_id)); } if(bandwidth(amount, start) <=cluster->bandwidth()){ http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/51d4c2ae/src/utils/cluster.cc ---------------------------------------------------------------------- diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc index 035e0c7..d022a64 100644 --- a/src/utils/cluster.cc +++ b/src/utils/cluster.cc @@ -1,5 +1,6 @@ #include <glog/logging.h> #include <fcntl.h> +#include <unistd.h> #include <fstream> #include "utils/cluster.h" #include "proto/cluster.pb.h" @@ -13,12 +14,17 @@ Cluster::Cluster(const ClusterProto &cluster, int procs_id) { procs_id_=procs_id; cluster_ = cluster; SetupFolders(cluster); + int nprocs; if(server_worker_separate()) nprocs_=nworker_procs()+nserver_procs(); else - nprocs_=std::max(nworker_procs(), nserver_procs()); - CHECK_LT(procs_id, nprocs_); - if(nprocs_>1){ + nprocs=std::max(nworker_procs(), nserver_procs()); + CHECK_LT(procs_id, nprocs); + if (cluster_.has_nprocs()) + CHECK_EQ(cluster.nprocs(), nprocs); + else + cluster_.set_nprocs(nprocs); + if(nprocs>1&&procs_id>-1){ std::ifstream ifs(cluster.hostfile(), std::ifstream::in); std::string line; while(std::getline(ifs, line) @@ -49,8 +55,25 @@ Cluster::Cluster(const ClusterProto &cluster, int procs_id) { auto rt=new ZKClusterRT(cluster_.zookeeper_host()); rt->Init(); cluster_rt_=shared_ptr<ClusterRuntime>(static_cast<ClusterRuntime*>(rt)); + + char buf[128]; + gethostname(buf, 128); + hostname_=string(buf); } +void Cluster::Register(const string& endpoint){ + procs_id_=cluster_rt_->RegistProc(endpoint); + CHECK_GE(procs_id_,0); + CHECK_LT(procs_id_,nprocs()); +} +const string Cluster::endpoint(int procsid) const{ + CHECK_LT(procsid, nprocs()); + CHECK_GE(procsid, 0); + if(endpoints_.size()) + return endpoints_.at(procsid); + else + return cluster_rt_->GetProcHost(procsid); +} void Cluster::SetupFolders(const ClusterProto &cluster){ // create visulization folder mkdir(vis_folder().c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
