Repository: incubator-singa Updated Branches: refs/heads/master 0c6e5c692 -> 4e15c3444
SINGA-73 Refine the selection of available hosts JobManager read next available host from zookeeper, and assign new jobs to that one, intead of always the first host cluster.h / .cc - add UpdateNode function in ZKService - add GenerateHostList function in JobManager - move ExtractCLusterConf from tool.cc to cluster.cc Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/4e15c344 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/4e15c344 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/4e15c344 Branch: refs/heads/master Commit: 4e15c3444bfb1f7fc3816eda2416007604137ab3 Parents: 0c6e5c6 Author: wang sheng <[email protected]> Authored: Fri Sep 25 15:14:58 2015 +0800 Committer: wang sheng <[email protected]> Committed: Fri Sep 25 15:14:58 2015 +0800 ---------------------------------------------------------------------- include/utils/cluster_rt.h | 4 ++ src/utils/cluster_rt.cc | 114 +++++++++++++++++++++++++++++++++++++--- src/utils/tool.cc | 78 +++------------------------ 3 files changed, 119 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e15c344/include/utils/cluster_rt.h ---------------------------------------------------------------------- diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h index c78bf75..bdfa8fd 100644 --- a/include/utils/cluster_rt.h +++ b/include/utils/cluster_rt.h @@ -35,6 +35,7 @@ const int kZKBufSize = 100; const std::string kZKPathSinga = "/singa"; const std::string kZKPathSys = "/singa/sys"; const std::string kZKPathJLock = "/singa/sys/job-lock"; +const std::string kZKPathHostIdx = "/singa/sys/host-idx"; const std::string kZKPathApp = "/singa/app"; const std::string kZKPathJob = "/singa/app/job-"; // following paths are local under /singa/app/job-X @@ -72,6 +73,7 @@ class ZKService { bool CreateNode(const char* path, const char* val, int flag, char* output); bool DeleteNode(const char* path); bool Exist(const char* path); + bool UpdateNode(const char* path, const char* val); bool GetNode(const char* path, char* output); bool GetChild(const char* path, std::vector<std::string>* vt); bool WGetChild(const char* path, std::vector<std::string>* vt, @@ -154,6 +156,7 @@ class JobManager { bool Init(); bool GenerateJobID(int* id); + bool GenerateHostList(const char* job_file, std::vector<std::string>* list); bool ListJobs(std::vector<JobInfo>* jobs); bool ListJobProcs(int job, std::vector<std::string>* procs); bool Remove(int job); @@ -164,6 +167,7 @@ class JobManager { const int kJobsNotRemoved = 10; bool CleanPath(const std::string& path, bool remove); + std::string ExtractClusterConf(const char* job_file); int timeout_ = 30000; std::string host_ = ""; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e15c344/src/utils/cluster_rt.cc ---------------------------------------------------------------------- diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc index 265e80b..e51ac97 100644 --- a/src/utils/cluster_rt.cc +++ b/src/utils/cluster_rt.cc @@ -22,7 +22,12 @@ #include "utils/cluster_rt.h" #include <glog/logging.h> +#include <google/protobuf/text_format.h> +#include <stdlib.h> #include <algorithm> +#include <fstream> +#include <iostream> +#include "proto/job.pb.h" using std::string; using std::to_string; @@ -94,9 +99,11 @@ bool ZKService::CreateNode(const char* path, const char* val, int flag, } sleep(kSleepSec); } - // copy the node name ot output + // copy the node name to output if (output != nullptr && (ret == ZOK || ret == ZNODEEXISTS)) { - strcpy(output, buf); + snprintf(output, kZKBufSize, "%s", buf); + // use snprintf instead of strcpy + // strcpy(output, buf); } if (ret == ZOK) { LOG(INFO) << "created zookeeper node " << buf @@ -141,6 +148,20 @@ bool ZKService::Exist(const char* path) { return false; } +bool ZKService::UpdateNode(const char* path, const char* val) { + // set version = -1, do not check content version + int ret = zoo_set(zkhandle_, path, val, strlen(val), -1); + if (ret == ZOK) { + return true; + } else if (ret == ZNONODE) { + LOG(ERROR) << "zk node " << path << " does not exist"; + return false; + } + LOG(FATAL) << "Unhandled ZK error code: " << ret + << " (zoo_get " << path << ")"; + return false; +} + bool ZKService::GetNode(const char* path, char* output) { struct Stat stat; int val_len = kZKBufSize; @@ -153,7 +174,7 @@ bool ZKService::GetNode(const char* path, char* output) { return false; } LOG(FATAL) << "Unhandled ZK error code: " << ret - << " (zoo_get " << path << ")"; + << " (zoo_get " << path << ")"; return false; } @@ -282,12 +303,11 @@ bool ClusterRuntime::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) { } std::string ClusterRuntime::GetProcHost(int proc_id) { - // char buf[kZKBufSize]; char val[kZKBufSize]; // construct file name - string path = proc_path_+"/proc-"+to_string(proc_id); + string path = proc_path_ + "/proc-" + to_string(proc_id); if (!zk_.GetNode(path.c_str(), val)) return ""; - int len = strlen(val)-1; + int len = strlen(val) - 1; while (len && val[len] != '|') --len; CHECK(len); val[len] = '\0'; @@ -320,6 +340,8 @@ bool JobManager::Init() { return false; if (!zk_.CreateNode(kZKPathJLock.c_str(), nullptr, 0, nullptr)) return false; + if (!zk_.CreateNode(kZKPathHostIdx.c_str(), "0", 0, nullptr)) + return false; if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr)) return false; return true; @@ -332,7 +354,52 @@ bool JobManager::GenerateJobID(int* id) { ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) { return false; } - *id = atoi(buf+strlen(buf)-10); + *id = atoi(buf + strlen(buf) - 10); + return true; +} + +bool JobManager::GenerateHostList(const char* job_file, vector<string>* list) { + // compute required #process from job conf + ClusterProto cluster; + google::protobuf::TextFormat::ParseFromString(ExtractClusterConf(job_file), + &cluster); + int nworker_procs = cluster.nworker_groups() * cluster.nworkers_per_group() + / cluster.nworkers_per_procs(); + int nserver_procs = cluster.nserver_groups() * cluster.nservers_per_group() + / cluster.nservers_per_procs(); + int nprocs = 0; + if (cluster.server_worker_separate()) + nprocs = nworker_procs + nserver_procs; + else + nprocs = std::max(nworker_procs, nserver_procs); + // get available host list from global conf + std::ifstream hostfile("conf/hostfile"); + if (!hostfile.is_open()) { + LOG(FATAL) << "Cannot open file: " << "conf/hostfile"; + } + vector<string> hosts; + string host; + while (!hostfile.eof()) { + getline(hostfile, host); + if (!host.length() || host[0] == '#') continue; + hosts.push_back(host); + } + if (!hosts.size()) { + LOG(FATAL) << "Empty host file"; + } + // read next host index + char val[kZKBufSize]; + if (!zk_.GetNode(kZKPathHostIdx.c_str(), val)) return false; + int next = atoi(val); + // generate host list + list->clear(); + for (int i = 0; i < nprocs; ++i) { + list->push_back(hosts[(next + i) % hosts.size()]); + } + // write next host index + next = (next + nprocs) % hosts.size(); + snprintf(val, kZKBufSize, "%d", next); + if (!zk_.UpdateNode(kZKPathHostIdx.c_str(), val)) return false; return true; } @@ -426,4 +493,37 @@ bool JobManager::CleanPath(const string& path, bool remove) { return true; } +// extract cluster configuration part from the job config file +// TODO(wangsh) improve this function to make it robust +string JobManager::ExtractClusterConf(const char* job_file) { + std::ifstream fin(job_file); + CHECK(fin.is_open()) << "cannot open job conf file " << job_file; + string line; + string cluster; + bool in_cluster = false; + while (!fin.eof()) { + std::getline(fin, line); + if (in_cluster == false) { + size_t pos = line.find("cluster"); + if (pos == std::string::npos) continue; + in_cluster = true; + line = line.substr(pos); + cluster = ""; + } + if (in_cluster == true) { + cluster += line + "\n"; + if (line.find("}") != std::string::npos) + in_cluster = false; + } + } + LOG(INFO) << "cluster configure: " << cluster; + size_t s_pos = cluster.find("{"); + size_t e_pos = cluster.find("}"); + if (s_pos == std::string::npos || e_pos == std::string::npos) { + LOG(FATAL) << "cannot extract valid cluster configuration in file: " + << job_file; + } + return cluster.substr(s_pos + 1, e_pos - s_pos-1); +} + } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/4e15c344/src/utils/tool.cc ---------------------------------------------------------------------- diff --git a/src/utils/tool.cc b/src/utils/tool.cc index 295dcf0..435129c 100644 --- a/src/utils/tool.cc +++ b/src/utils/tool.cc @@ -20,11 +20,9 @@ *************************************************************/ #include <glog/logging.h> -#include <google/protobuf/text_format.h> #include <algorithm> -#include <fstream> -#include <iostream> -#include "proto/job.pb.h" +#include <string> +#include <vector> #include "proto/singa.pb.h" #include "utils/cluster_rt.h" #include "utils/common.h" @@ -52,75 +50,15 @@ int create() { return SUCCESS; } -// extract cluster configuration part from the job config file -// TODO(wangsh) improve this function to make it robust -const std::string extract_cluster(const char* jobfile) { - std::ifstream fin; - fin.open(jobfile, std::ifstream::in); - CHECK(fin.is_open()) << "cannot open job conf file " << jobfile; - std::string line; - std::string cluster; - bool in_cluster = false; - while (std::getline(fin, line)) { - if (in_cluster == false) { - size_t pos = line.find("cluster"); - if (pos == std::string::npos) continue; - in_cluster = true; - line = line.substr(pos); - cluster = ""; - } - if (in_cluster == true) { - cluster += line + "\n"; - if (line.find("}") != std::string::npos) - in_cluster = false; - } - } - LOG(INFO) << "cluster configure: " << cluster; - size_t s_pos = cluster.find("{"); - size_t e_pos = cluster.find("}"); - if (s_pos == std::string::npos || e_pos == std::string::npos) { - LOG(FATAL) << "cannot extract valid cluster configuration in file: " - << jobfile; - } - return cluster.substr(s_pos+1, e_pos-s_pos-1); -} - // generate a host list int genhost(char* job_conf) { - // compute required #process from job conf - singa::ClusterProto cluster; - google::protobuf::TextFormat::ParseFromString(extract_cluster(job_conf), - &cluster); - int nworker_procs = cluster.nworker_groups() * cluster.nworkers_per_group() - / cluster.nworkers_per_procs(); - int nserver_procs = cluster.nserver_groups() * cluster.nservers_per_group() - / cluster.nservers_per_procs(); - int nprocs = 0; - if (cluster.server_worker_separate()) - nprocs = nworker_procs + nserver_procs; - else - nprocs = std::max(nworker_procs, nserver_procs); - - // get available host list from global conf - std::fstream hostfile("conf/hostfile"); - if (!hostfile.is_open()) { - LOG(ERROR) << "Cannot open file: " << "conf/hostfile"; - return RUN_ERR; - } - std::vector<std::string> hosts; - std::string host; - while (!hostfile.eof()) { - getline(hostfile, host); - if (!host.length() || host[0] == '#') continue; - hosts.push_back(host); - } - if (!hosts.size()) { - LOG(ERROR) << "Empty host file"; - return RUN_ERR; - } + singa::JobManager mngr(global.zookeeper_host()); + if (!mngr.Init()) return RUN_ERR; + std::vector<std::string> list; + if (!mngr.GenerateHostList(job_conf, &list)) return RUN_ERR; // output selected hosts - for (int i = 0; i < nprocs; ++i) - printf("%s\n", hosts[i % hosts.size()].c_str()); + for (std::string host : list) + printf("%s\n", host.c_str()); return SUCCESS; }
