SINGA-16 Runtime Process id Management Add two new feature in ClusterRuntime Class -- int RegistProc(const string& host_addr) -- string GetProcHost(int proc_id)
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/da6e4dce Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/da6e4dce Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/da6e4dce Branch: refs/heads/master Commit: da6e4dce8ad6be5bd851425e539e350ea9db5173 Parents: 017b042 Author: wang sheng <[email protected]> Authored: Wed Jun 17 14:02:59 2015 +0800 Committer: wang sheng <[email protected]> Committed: Wed Jun 17 14:08:11 2015 +0800 ---------------------------------------------------------------------- include/utils/cluster_rt.h | 37 ++++-- src/test/test_cluster.cc | 23 +++- src/utils/cluster_rt.cc | 266 ++++++++++++++++++++++++++-------------- 3 files changed, 224 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/da6e4dce/include/utils/cluster_rt.h ---------------------------------------------------------------------- diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h index 7b5f361..7ed7b68 100644 --- a/include/utils/cluster_rt.h +++ b/include/utils/cluster_rt.h @@ -15,7 +15,7 @@ namespace singa { * ClusterRuntime is a runtime service that manages dynamic configuration and status * of the whole cluster. It mainly provides following services: * 1) Provide running status of each server/worker - * 1) Translate process id to (hostname:port) + * 2) Translate process id to (hostname:port) */ typedef void (*rt_callback)(void *contest); @@ -31,6 +31,20 @@ class ClusterRuntime{ virtual bool Init(){ return false;} /** + * register the process, and get a unique process id + * + * \return the process id, -1 if failed + */ + virtual int RegistProc(const string& host_addr){ return -1;}; + + /** + * translate the process id to host address + * + * \return the host and port, "" if no such proc id + */ + virtual string GetProcHost(int proc_id){ return "";}; + + /** * Server: watch all workers in a server group, will be notified when all workers have left */ virtual bool sWatchSGroup(int gid, int sid, rt_callback fn, void *ctx){ return false;} @@ -53,15 +67,21 @@ class ZKClusterRT : public ClusterRuntime{ ZKClusterRT(string host, int timeout = 30000); ~ZKClusterRT(); bool Init(); + int RegistProc(const string& host_addr); + string GetProcHost(int proc_id); bool sWatchSGroup(int gid, int sid, rt_callback fn, void *ctx); bool wJoinSGroup(int gid, int wid, int s_group); bool wLeaveSGroup(int gid, int wid, int s_group); - static void watcherGlobal(zhandle_t * zh, int type, int state, const char *path, void *watcherCtx); private: - static void childChanges(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx); - string getSGroupPath(int gid); - string getWorkerPath(int gid, int wid); + static void WatcherGlobal(zhandle_t * zh, int type, int state, const char *path, void *watcherCtx); + static void ChildChanges(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx); + bool CreateZKNode(const char* path, const char* val, int flag, char* output); + bool DeleteZKNode(const char* path); + bool GetZKNode(const char* path, char* output); + bool GetZKChild(const char* path, vector<string>& vt); + string groupPath(int gid); + string workerPath(int gid, int wid); struct RTCallback{ rt_callback fn; @@ -76,8 +96,11 @@ class ZKClusterRT : public ClusterRuntime{ const int MAX_BUF_LEN = 50; const int RETRY_NUM = 10; const int SLEEP_SEC = 1; - const string ZK_P_SINGA = "/singa"; - const string ZK_P_STATUS = "/status"; + const string ZPATH_SINGA = "/singa"; + const string ZPATH_STATUS = "/singa/status"; + const string ZPATH_REGIST = "/singa/regist"; + const string ZPATH_REGIST_PROC = "/singa/regist/proc"; + const string ZPATH_REGIST_LOCK = "/singa/regist/lock"; }; } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/da6e4dce/src/test/test_cluster.cc ---------------------------------------------------------------------- diff --git a/src/test/test_cluster.cc b/src/test/test_cluster.cc index bb17149..b16d765 100644 --- a/src/test/test_cluster.cc +++ b/src/test/test_cluster.cc @@ -5,7 +5,7 @@ using namespace singa; -string folder="src/test/data/"; +//string folder="src/test/data/"; string host="localhost:2181"; @@ -13,12 +13,15 @@ void zk_cb(void *contest){ LOG(INFO) << "zk callback: " << (char *)contest; } -TEST(CluserRuntimeTest, ZooKeeper){ +TEST(CluserRuntimeTest, GroupManagement){ ClusterRuntime* rt = new ZKClusterRT(host); ASSERT_EQ(rt->Init(), true); + ASSERT_EQ(rt->sWatchSGroup(1, 1, zk_cb, "test call back"), true); + ASSERT_EQ(rt->wJoinSGroup(1, 1, 1), true); ASSERT_EQ(rt->wJoinSGroup(1, 2, 1), true); + ASSERT_EQ(rt->wLeaveSGroup(1, 2, 1), true); ASSERT_EQ(rt->wLeaveSGroup(1, 1, 1), true); @@ -26,6 +29,22 @@ TEST(CluserRuntimeTest, ZooKeeper){ delete rt; } +TEST(CluserRuntimeTest, ProcessManagement){ + ClusterRuntime* rt = new ZKClusterRT(host); + ASSERT_EQ(rt->Init(), true); + + ASSERT_EQ(rt->RegistProc("1.2.3.4:5"), 0); + ASSERT_EQ(rt->RegistProc("1.2.3.4:6"), 1); + ASSERT_EQ(rt->RegistProc("1.2.3.4:7"), 2); + + ASSERT_NE(rt->GetProcHost(0), ""); + ASSERT_NE(rt->GetProcHost(1), ""); + ASSERT_NE(rt->GetProcHost(2), ""); + + sleep(3); + delete rt; +} + /** ClusterProto GenClusterProto(){ ClusterProto proto; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/da6e4dce/src/utils/cluster_rt.cc ---------------------------------------------------------------------- diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc index af6fbbc..514ea2e 100644 --- a/src/utils/cluster_rt.cc +++ b/src/utils/cluster_rt.cc @@ -1,4 +1,5 @@ #include "utils/cluster_rt.h" +#include <algorithm> using std::to_string; @@ -25,61 +26,89 @@ bool ZKClusterRT::Init(){ zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); - zkhandle_ = zookeeper_init(host_.c_str(), watcherGlobal, timeout_, 0, "ZKClusterRT", 0); + zkhandle_ = zookeeper_init(host_.c_str(), WatcherGlobal, timeout_, 0, "ZKClusterRT", 0); - if (zkhandle_ == nullptr){ + if (zkhandle_ == NULL){ LOG(ERROR) << "Error when connecting to zookeeper servers..."; - LOG(ERROR) <<"Please ensure zookeeper service is up in host(s):"; + LOG(ERROR) << "Please ensure zookeeper service is up in host(s):"; LOG(ERROR) << host_.c_str(); return false; } + //create ZPATH_SINGA + if (!CreateZKNode(ZPATH_SINGA.c_str(), nullptr, 0, nullptr)) return false; + //create ZPATH_STATUS + if (!CreateZKNode(ZPATH_STATUS.c_str(), nullptr, 0, nullptr)) return false; + //create ZPATH_REGIST + if (!CreateZKNode(ZPATH_REGIST.c_str(), nullptr, 0, nullptr)) return false; + //create ZPATH_REGIST_PROC + if (!CreateZKNode(ZPATH_REGIST_PROC.c_str(), nullptr, 0, nullptr)) return false; + //create ZPATH_REGIST_LOCK + if (!CreateZKNode(ZPATH_REGIST_LOCK.c_str(), nullptr, 0, nullptr)) return false; + + return true; +} + +int ZKClusterRT::RegistProc(const string& host_addr){ + char buf[MAX_BUF_LEN]; - //create ZK_P_SINGA - string path = ZK_P_SINGA; - int ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, 0, buf, MAX_BUF_LEN); - if (ret != ZOK && ret != ZNODEEXISTS){ - LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)"; - return false; + string lock = ZPATH_REGIST_LOCK+"/lock-"; + + if (!CreateZKNode(lock.c_str(), nullptr, ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)){ + return -1; } - //create ZK_P_STATUS - path += ZK_P_STATUS; - ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, 0, buf, MAX_BUF_LEN); - if (ret != ZOK && ret != ZNODEEXISTS){ - LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)"; - return false; + + //get all children in lock folder + vector<string> vt; + if (!GetZKChild(ZPATH_REGIST_LOCK.c_str(), vt)){ + return -1; } - return true; + //find own position among all locks + int id = -1; + std::sort(vt.begin(), vt.end()); + for (int i = 0; i < (int)vt.size(); ++i){ + if (ZPATH_REGIST_LOCK+"/"+vt[i] == buf){ + id = i; + break; + } + } + + if (id == -1){ + LOG(ERROR) << "cannot find own node " << buf; + return -1; + } + + //create a new node in proc path + string path = ZPATH_REGIST_PROC+"/proc-"+to_string(id); + if (!CreateZKNode(path.c_str(), host_addr.c_str(), ZOO_EPHEMERAL, nullptr)){ + return -1; + } + + return id; +} + +string ZKClusterRT::GetProcHost(int proc_id){ + + //char buf[MAX_BUF_LEN]; + char val[MAX_BUF_LEN]; + + //construct file name + string path = ZPATH_REGIST_PROC+"/proc-"+to_string(proc_id); + + if (!GetZKNode(path.c_str(), val)) return ""; + + return string(val); } bool ZKClusterRT::sWatchSGroup(int gid, int sid, rt_callback fn, void *ctx){ CHECK_NOTNULL(fn); - string path = getSGroupPath(gid); - struct Stat stat; + string path = groupPath(gid); - //check existance of zk node - int ret = zoo_exists(zkhandle_, path.c_str(), 0, &stat); - //if have, pass - if (ret == ZOK) ; - //need to create zk node first - else if (ret == ZNONODE){ - char buf[MAX_BUF_LEN]; - ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, 0, buf, MAX_BUF_LEN); - if (ret == ZOK){ - LOG(INFO) << "zookeeper node " << buf << " created"; - } - else{ - LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)"; - return false; - } - } - else{ - LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_exists)"; - return false; - } + //create zk node + if (!CreateZKNode(path.c_str(), nullptr, 0, nullptr)) return false; struct String_vector child; //store the callback function and context for later usage @@ -88,72 +117,47 @@ bool ZKClusterRT::sWatchSGroup(int gid, int sid, rt_callback fn, void *ctx){ cb->ctx = ctx; cb_vec_.push_back(cb); //start to watch on the zk node, does not care about the first return value - zoo_wget_children(zkhandle_, path.c_str(), childChanges, cb, &child); + int ret = zoo_wget_children(zkhandle_, path.c_str(), ChildChanges, cb, &child); + + if (ret != ZOK){ + LOG(ERROR) << "failed to get child of " << path; + return false; + } return true; } bool ZKClusterRT::wJoinSGroup(int gid, int wid, int s_group){ - string path = getSGroupPath(s_group) + getWorkerPath(gid, wid); - char buf[MAX_BUF_LEN]; - - //try to create a file under the server group path - for (int i = 0; i < RETRY_NUM; ++i){ - //send the zk request - int ret = zoo_create(zkhandle_, path.c_str(), NULL, -1, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, buf, MAX_BUF_LEN); + string path = groupPath(s_group) + workerPath(gid, wid); - if (ret == ZOK){ - LOG(INFO) << "zookeeper node " << buf << " created"; - return true; - } - else if (ret == ZNODEEXISTS){ - LOG(WARNING) << "zookeeper node " << path << " already exist"; - return true; - } - //the parent node is not on, need to wait - else if (ret == ZNONODE){ - LOG(WARNING) << "zookeeper parent node " << getSGroupPath(s_group) << " not exist, retry later"; - sleep(SLEEP_SEC); - continue; - } - - LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)"; + //try to create an ephemeral node under server group path + if (!CreateZKNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr)){ return false; } - LOG(ERROR) << "zookeeper parent node " << getSGroupPath(s_group) << " still not exist after " << RETRY_NUM << " tries"; - return false; + return true; } bool ZKClusterRT::wLeaveSGroup(int gid, int wid, int s_group){ - string path = getSGroupPath(s_group) + getWorkerPath(gid, wid); - - int ret = zoo_delete(zkhandle_, path.c_str(), -1); - if (ret == ZOK){ - LOG(INFO) << "zookeeper node " << path << " deleted"; - return true; - } - else if (ret == ZNONODE){ - LOG(WARNING) << "try to delete an non-existing zookeeper node " << path; - return true; - } + string path = groupPath(s_group) + workerPath(gid, wid); - LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_delete)"; - return false; + if (!DeleteZKNode(path.c_str())) return false; + + return true; } -void ZKClusterRT::watcherGlobal(zhandle_t * zh, int type, int state, const char *path, void *watcherCtx){ +void ZKClusterRT::WatcherGlobal(zhandle_t * zh, int type, int state, const char *path, void *watcherCtx){ if (type == ZOO_SESSION_EVENT){ if (state == ZOO_CONNECTED_STATE) - LOG(INFO) << "Connected to zookeeper service successfully!"; + LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper service successfully!"; else if (state == ZOO_EXPIRED_SESSION_STATE) - LOG(INFO) << "zookeeper session expired!"; + LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!"; } } -void ZKClusterRT::childChanges(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx){ +void ZKClusterRT::ChildChanges(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx){ //check if already callback RTCallback *cb = (RTCallback *)watcherCtx; @@ -162,16 +166,10 @@ void ZKClusterRT::childChanges(zhandle_t *zh, int type, int state, const char *p if (type == ZOO_CHILD_EVENT){ struct String_vector child; //check the child list and put another watcher - int ret = zoo_wget_children(zh, path, childChanges, watcherCtx, &child); - LOG(INFO) << "ret = " << ret; + int ret = zoo_wget_children(zh, path, ChildChanges, watcherCtx, &child); if (ret == ZOK){ - LOG(INFO) << "child.count = " << child.count; if (child.count == 0){ - //LOG(ERROR) << "do call back"; - //LOG(ERROR) << "type = " << type; - //LOG(ERROR) << "state = " << state; - //LOG(ERROR) << "path = " << path; - + LOG(INFO) << "child.count = 0 in path: " << path; //all workers leave, we do callback now (*cb->fn)(cb->ctx); cb->fn = nullptr; @@ -186,12 +184,94 @@ void ZKClusterRT::childChanges(zhandle_t *zh, int type, int state, const char *p } } -string ZKClusterRT::getSGroupPath(int gid){ - //return "/singa/status/sg"+to_string(gid); - return ZK_P_SINGA+ZK_P_STATUS+"/sg"+to_string(gid); +bool ZKClusterRT::CreateZKNode(const char* path, const char* val, int flag, char* output){ + + char buf[MAX_BUF_LEN]; + int ret; + + //send the zk request + for (int i = 0; i < RETRY_NUM; ++i){ + ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val), &ZOO_OPEN_ACL_UNSAFE, flag, buf, MAX_BUF_LEN); + if (ret != ZNONODE) break; + LOG(WARNING) << "zookeeper parent node of " << path << " not exist, retry later"; + sleep(SLEEP_SEC); + } + + //copy the node name ot output + if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)){ + strcpy(output, buf); + } + + if (ret == ZOK){ + LOG(INFO) << "created zookeeper node " << buf << " (" << (val == nullptr ? "NULL" : val) << ")"; + return true; + } + else if (ret == ZNODEEXISTS){ + LOG(WARNING) << "zookeeper node " << path << " already exists"; + return true; + } + + LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)"; + return false; +} + +bool ZKClusterRT::DeleteZKNode(const char* path){ + + int ret = zoo_delete(zkhandle_, path, -1); + + if (ret == ZOK){ + LOG(INFO) << "deleted zookeeper node " << path; + return true; + } + else if (ret == ZNONODE){ + LOG(WARNING) << "try to delete an non-existing zookeeper node " << path; + return true; + } + + LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_delete)"; + return false; +} + +bool ZKClusterRT::GetZKNode(const char* path, char* output){ + + struct Stat stat; + int val_len = MAX_BUF_LEN; + + int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat); + + if (ret == ZOK){ + output[val_len] = 0; + return true; + } + else if (ret == ZNONODE){ + LOG(ERROR) << "zk node " << path << " does not exist"; + return false; + } + + LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get)"; + return false; +} + +bool ZKClusterRT::GetZKChild(const char* path, vector<string>& vt){ + + //get all children in lock folder + struct String_vector child; + int ret = zoo_get_children(zkhandle_, path, 0, &child); + + if (ret == ZOK){ + for (int i = 0; i < child.count; ++i) vt.push_back(child.data[i]); + return true; + } + + LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)"; + return false; +} + +string ZKClusterRT::groupPath(int gid){ + return ZPATH_STATUS+"/sg"+to_string(gid); } -string ZKClusterRT::getWorkerPath(int gid, int wid){ +string ZKClusterRT::workerPath(int gid, int wid){ return "/g"+to_string(gid)+"_w"+to_string(wid); }
