SINGA-34 Support external zookeeper service add a singa tool in src/utils/tool.cc to clean zookeeper content. It will be compiled as an executable 'singatool'. Now singa-stop.sh will not close the entire zookeeper service.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/209ba1f2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/209ba1f2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/209ba1f2 Branch: refs/heads/master Commit: 209ba1f2f172d8784f1c6219a32507b69ada4541 Parents: 3819e59 Author: wang sheng <[email protected]> Authored: Thu Jul 16 21:38:09 2015 +0800 Committer: wang sheng <[email protected]> Committed: Thu Jul 16 21:56:01 2015 +0800 ---------------------------------------------------------------------- .gitignore | 1 + Makefile.example | 4 +- bin/singa-run.sh | 6 +- bin/singa-stop.sh | 7 +- include/trainer/trainer.h | 1 - include/utils/cluster.h | 1 - include/utils/cluster_rt.h | 82 +++++++--- src/proto/cluster.proto | 7 + src/proto/global.proto | 8 - src/utils/cluster_rt.cc | 339 ++++++++++++++++++++++------------------ src/utils/tool.cc | 25 +++ 11 files changed, 290 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 527972b..be85b4f 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,7 @@ tmp/ *lmdb *.binaryproto singa +singatool .libs *.la *.deps http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/Makefile.example ---------------------------------------------------------------------- diff --git a/Makefile.example b/Makefile.example index f2c58fe..2195b91 100644 --- a/Makefile.example +++ b/Makefile.example @@ -31,7 +31,7 @@ PROTO_HDRS :=$(patsubst src%, include%, $(PROTOS:.proto=.pb.h)) PROTO_OBJS :=$(addprefix $(BUILD_DIR)/, $(PROTO_SRCS:.cc=.o)) # each singa src file will generate a .o file -SINGA_SRCS := $(shell find src/ \( -path "src/test" -o -path "src/main.cc" \) \ +SINGA_SRCS := $(shell find src/ \( -path "src/test" -o -path "src/main.cc" -o -path "src/utils/tool.cc" \) \ -prune -o \( -name "*.cc" -type f \) -print ) SINGA_OBJS := $(sort $(addprefix $(BUILD_DIR)/, $(SINGA_SRCS:.cc=.o)) \ $(PROTO_OBJS) ) @@ -53,6 +53,8 @@ OBJS := $(sort $(SINGA_OBJS) $(TEST_OBJS) ) singa: $(PROTO_OBJS) $(SINGA_OBJS) $(CXX) $(SINGA_OBJS) src/main.cc -o singa $(CXXFLAGS) $(LDFLAGS) @echo + $(CXX) $(SINGA_OBJS) src/utils/tool.cc -o singatool $(CXXFLAGS) $(LDFLAGS) + @echo loader: proto $(LOADER_OBJS) $(CXX) $(LOADER_OBJS) -o $(BUILD_DIR)/loader $(CXXFLAGS) $(LDFLAGS) http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/bin/singa-run.sh ---------------------------------------------------------------------- diff --git a/bin/singa-run.sh b/bin/singa-run.sh index 45be0a1..c548a54 100755 --- a/bin/singa-run.sh +++ b/bin/singa-run.sh @@ -61,15 +61,15 @@ BASE=`cd "$BIN/..">/dev/null; pwd` cd $BASE -# clenup singa data -$BIN/singa-stop.sh conf/hostfile - # start zookeeper $BIN/zk-service.sh start 2>/dev/null # wait for zk service to be up sleep 3 +# clenup singa data +$BIN/singa-stop.sh conf/hostfile + # check mode if [ $# = 2 ] ; then # start single singa process http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/bin/singa-stop.sh ---------------------------------------------------------------------- diff --git a/bin/singa-stop.sh b/bin/singa-stop.sh index 1b36675..64cde64 100755 --- a/bin/singa-stop.sh +++ b/bin/singa-stop.sh @@ -40,6 +40,7 @@ ZKDATA_DIR="/tmp/zookeeper" PROC_NAME="*singa" HOST_FILE=$1 +cd $BASE # kill singa processes if [ $# = 0 ] ; then @@ -62,10 +63,10 @@ elif [ $# = 1 ] ; then done fi -# close zookeeper -. $BIN/zk-service.sh stop 2>/dev/null +# wait for killall command +sleep 2 echo cleanning metadata in zookeeper ... # remove zk data -rm -r $ZKDATA_DIR +./singatool http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/include/trainer/trainer.h ---------------------------------------------------------------------- diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h index 0ee01d4..bc81e72 100644 --- a/include/trainer/trainer.h +++ b/include/trainer/trainer.h @@ -2,7 +2,6 @@ #define INCLUDE_TRAINER_TRAINER_H_ #include <unordered_map> #include "proto/cluster.pb.h" -#include "proto/global.pb.h" #include "proto/model.pb.h" #include "utils/updater.h" #include "utils/param.h" http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/include/utils/cluster.h ---------------------------------------------------------------------- diff --git a/include/utils/cluster.h b/include/utils/cluster.h index 68ae937..4b87da0 100644 --- a/include/utils/cluster.h +++ b/include/utils/cluster.h @@ -9,7 +9,6 @@ #include "utils/common.h" #include "proto/cluster.pb.h" #include "utils/cluster_rt.h" -#include "proto/global.pb.h" using std::shared_ptr; using std::string; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/include/utils/cluster_rt.h ---------------------------------------------------------------------- diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h index 1b877ec..55ca243 100644 --- a/include/utils/cluster_rt.h +++ b/include/utils/cluster_rt.h @@ -49,6 +49,42 @@ class ClusterRuntime { virtual bool LeaveSGroup(int gid, int wid, int s_group) = 0; }; +const std::string kZKPathSinga = "/singa"; +const std::string kZKPathStatus = "/singa/status"; +const std::string kZKPathRegist = "/singa/regist"; +const std::string kZKPathRegistProc = "/singa/regist/proc"; +const std::string kZKPathRegistLock = "/singa/regist/lock"; +const int kZKBufSize = 50; + +struct RTCallback { + rt_callback fn; + void* ctx; +}; + +class ZKService { + public: + static void ChildChanges(zhandle_t* zh, int type, int state, + const char *path, void* watcherCtx); + ~ZKService(); + bool Init(const std::string& host, int timeout); + bool CreateNode(const char* path, const char* val, int flag, char* output); + bool DeleteNode(const char* path); + bool Exist(const char* path); + bool GetNode(const char* path, char* output); + bool GetChild(const char* path, std::vector<std::string>* vt); + bool WGetChild(const char* path, std::vector<std::string>* vt, + RTCallback *cb); + + private: + const int kNumRetry = 10; + const int kSleepSec = 1; + + static void WatcherGlobal(zhandle_t* zh, int type, int state, + const char *path, void* watcherCtx); + + zhandle_t* zkhandle_ = nullptr; +}; + class ZKClusterRT : public ClusterRuntime { public: explicit ZKClusterRT(const std::string& host); @@ -63,35 +99,33 @@ class ZKClusterRT : public ClusterRuntime { bool LeaveSGroup(int gid, int wid, int s_group) override; private: - struct RTCallback { - rt_callback fn; - void* ctx; - }; + inline std::string groupPath(int gid) { + return kZKPathStatus + "/sg" + std::to_string(gid); + } + inline std::string workerPath(int gid, int wid) { + return "/g" + std::to_string(gid) + "_w" + std::to_string(wid); + } + + int timeout_ = 30000; + std::string host_ = ""; + ZKService zk_; + std::vector<RTCallback*> cb_vec_; +}; - const int kMaxBufLen = 50; - const int kNumRetry = 10; - const int kSleepSec = 1; - const std::string kZPathSinga = "/singa"; - const std::string kZPathStatus = "/singa/status"; - const std::string kZPathRegist = "/singa/regist"; - const std::string kZPathRegistProc = "/singa/regist/proc"; - const std::string kZPathRegistLock = "/singa/regist/lock"; +class JobManager { + public: + explicit JobManager(const std::string& host); + JobManager(const std::string& host, int timeout); - static void WatcherGlobal(zhandle_t* zh, int type, int state, - const char *path, void* watcherCtx); - static void ChildChanges(zhandle_t* zh, int type, int state, - const char *path, void* watcherCtx); - bool CreateZKNode(const char* path, const char* val, int flag, char* output); - bool DeleteZKNode(const char* path); - bool GetZKNode(const char* path, char* output); - bool GetZKChild(const char* path, std::vector<std::string>* vt); - inline std::string groupPath(int gid); - std::string workerPath(int gid, int wid); + bool Init(); + bool Clean(); + + private: + bool CleanPath(const std::string& path); int timeout_ = 30000; std::string host_ = ""; - zhandle_t* zkhandle_ = nullptr; - std::vector<RTCallback*> cb_vec_; + ZKService zk_; }; } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/src/proto/cluster.proto ---------------------------------------------------------------------- diff --git a/src/proto/cluster.proto b/src/proto/cluster.proto index 54ce300..7afb866 100644 --- a/src/proto/cluster.proto +++ b/src/proto/cluster.proto @@ -1,5 +1,12 @@ package singa; +message GlobalProto { + // ip/hostname:port[,ip/hostname:port] + required string zookeeper_host = 1; + // if not set, use the default dir of glog + optional string log_dir = 2 [default = "/tmp/singa-log/"]; +} + message ClusterProto { optional int32 nworker_groups = 1; optional int32 nserver_groups = 2; http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/src/proto/global.proto ---------------------------------------------------------------------- diff --git a/src/proto/global.proto b/src/proto/global.proto deleted file mode 100644 index 84eb7be..0000000 --- a/src/proto/global.proto +++ /dev/null @@ -1,8 +0,0 @@ -package singa; - -message GlobalProto { - // ip/hostname:port[,ip/hostname:port] - required string zookeeper_host = 1; - // if not set, use the default dir of glog - optional string log_dir = 2 [default = "/tmp/singa-log/"]; -} http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/src/utils/cluster_rt.cc ---------------------------------------------------------------------- diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc index 6143567..408adde 100644 --- a/src/utils/cluster_rt.cc +++ b/src/utils/cluster_rt.cc @@ -3,147 +3,12 @@ #include <glog/logging.h> #include <algorithm> -using std::to_string; using std::string; +using std::to_string; namespace singa { -ZKClusterRT::ZKClusterRT(const string& host) : ZKClusterRT(host, 30000) {} - -ZKClusterRT::ZKClusterRT(const string& host, int timeout) { - host_ = host; - timeout_ = timeout; - zkhandle_ = nullptr; -} - -ZKClusterRT::~ZKClusterRT() { - // close zookeeper handler - zookeeper_close(zkhandle_); - // release callback vector - for (RTCallback* p : cb_vec_) { - delete p; - } -} - -char zk_cxt[] = "ZKClusterRT"; - -bool ZKClusterRT::Init() { - zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); - zkhandle_ = zookeeper_init(host_.c_str(), WatcherGlobal, timeout_, 0, - static_cast<void *>(zk_cxt), 0); - if (zkhandle_ == NULL) { - LOG(ERROR) << "Error when connecting to zookeeper servers..."; - LOG(ERROR) << "Please ensure zookeeper service is up in host(s):"; - LOG(ERROR) << host_.c_str(); - return false; - } - // create kZPathSinga - if (!CreateZKNode(kZPathSinga.c_str(), nullptr, 0, nullptr)) - return false; - // create kZPathStatus - if (!CreateZKNode(kZPathStatus.c_str(), nullptr, 0, nullptr)) - return false; - // create kZPathRegist - if (!CreateZKNode(kZPathRegist.c_str(), nullptr, 0, nullptr)) - return false; - // create kZPathRegistProc - if (!CreateZKNode(kZPathRegistProc.c_str(), nullptr, 0, nullptr)) - return false; - // create kZPathRegistLock - if (!CreateZKNode(kZPathRegistLock.c_str(), nullptr, 0, nullptr)) - return false; - - return true; -} - -int ZKClusterRT::RegistProc(const string& host_addr) { - char buf[kMaxBufLen]; - string lock = kZPathRegistLock+"/lock-"; - if (!CreateZKNode(lock.c_str(), nullptr, ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) { - return -1; - } - // get all children in lock folder - std::vector<string> vt; - if (!GetZKChild(kZPathRegistLock.c_str(), &vt)) { - return -1; - } - // find own position among all locks - int id = -1; - std::sort(vt.begin(), vt.end()); - for (int i = 0; i < static_cast<int>(vt.size()); ++i) { - if (kZPathRegistLock+"/"+vt[i] == buf) { - id = i; - break; - } - } - if (id == -1) { - LOG(ERROR) << "cannot find own node " << buf; - return -1; - } - // create a new node in proc path - string path = kZPathRegistProc+"/proc-"+to_string(id); - if (!CreateZKNode(path.c_str(), host_addr.c_str(), ZOO_EPHEMERAL, nullptr)) { - return -1; - } - return id; -} - -string ZKClusterRT::GetProcHost(int proc_id) { - // char buf[kMaxBufLen]; - char val[kMaxBufLen]; - // construct file name - string path = kZPathRegistProc+"/proc-"+to_string(proc_id); - if (!GetZKNode(path.c_str(), val)) return ""; - return string(val); -} - -bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) { - CHECK_NOTNULL(fn); - string path = groupPath(gid); - // create zk node - if (!CreateZKNode(path.c_str(), nullptr, 0, nullptr)) return false; - struct String_vector child; - // store the callback function and context for later usage - RTCallback *cb = new RTCallback; - cb->fn = fn; - cb->ctx = ctx; - cb_vec_.push_back(cb); - // start to watch on the zk node, does not care about the first return value - int ret = zoo_wget_children(zkhandle_, path.c_str(), ChildChanges, - cb, &child); - if (ret != ZOK) { - LOG(ERROR) << "failed to get child of " << path; - return false; - } - return true; -} - -bool ZKClusterRT::JoinSGroup(int gid, int wid, int s_group) { - string path = groupPath(s_group) + workerPath(gid, wid); - // try to create an ephemeral node under server group path - if (!CreateZKNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr)) { - return false; - } - return true; -} - -bool ZKClusterRT::LeaveSGroup(int gid, int wid, int s_group) { - string path = groupPath(s_group) + workerPath(gid, wid); - if (!DeleteZKNode(path.c_str())) return false; - return true; -} - -void ZKClusterRT::WatcherGlobal(zhandle_t * zh, int type, int state, - const char *path, void *watcherCtx) { - if (type == ZOO_SESSION_EVENT) { - if (state == ZOO_CONNECTED_STATE) - LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper successfully!"; - else if (state == ZOO_EXPIRED_SESSION_STATE) - LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!"; - } -} - -void ZKClusterRT::ChildChanges(zhandle_t *zh, int type, int state, +void ZKService::ChildChanges(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) { // check if already callback RTCallback *cb = static_cast<RTCallback*>(watcherCtx); @@ -168,14 +33,35 @@ void ZKClusterRT::ChildChanges(zhandle_t *zh, int type, int state, } } -bool ZKClusterRT::CreateZKNode(const char* path, const char* val, int flag, +ZKService::~ZKService() { + // close zookeeper handler + zookeeper_close(zkhandle_); +} + +char zk_cxt[] = "ZKClusterRT"; + +bool ZKService::Init(const string& host, int timeout) { + zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); + zkhandle_ = zookeeper_init(host.c_str(), WatcherGlobal, timeout, 0, + static_cast<void *>(zk_cxt), 0); + if (zkhandle_ == NULL) { + LOG(ERROR) << "Error when connecting to zookeeper servers..."; + LOG(ERROR) << "Please ensure zookeeper service is up in host(s):"; + LOG(ERROR) << host.c_str(); + return false; + } + + return true; +} + +bool ZKService::CreateNode(const char* path, const char* val, int flag, char* output) { - char buf[kMaxBufLen]; + char buf[kZKBufSize]; int ret = 0; // send the zk request for (int i = 0; i < kNumRetry; ++i) { ret = zoo_create(zkhandle_, path, val, val == nullptr ? -1 : strlen(val), - &ZOO_OPEN_ACL_UNSAFE, flag, buf, kMaxBufLen); + &ZOO_OPEN_ACL_UNSAFE, flag, buf, kZKBufSize); if (ret == ZNONODE) { LOG(WARNING) << "zookeeper parent node of " << path << " not exist, retry later"; @@ -202,7 +88,7 @@ bool ZKClusterRT::CreateZKNode(const char* path, const char* val, int flag, return false; } -bool ZKClusterRT::DeleteZKNode(const char* path) { +bool ZKService::DeleteNode(const char* path) { int ret = zoo_delete(zkhandle_, path, -1); if (ret == ZOK) { LOG(INFO) << "deleted zookeeper node " << path; @@ -215,9 +101,18 @@ bool ZKClusterRT::DeleteZKNode(const char* path) { return false; } -bool ZKClusterRT::GetZKNode(const char* path, char* output) { +bool ZKService::Exist(const char* path) { struct Stat stat; - int val_len = kMaxBufLen; + int ret = zoo_exists(zkhandle_, path, 0, &stat); + if (ret == ZOK) return true; + else if (ret == ZNONODE) return false; + LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_exists)"; + return false; +} + +bool ZKService::GetNode(const char* path, char* output) { + struct Stat stat; + int val_len = kZKBufSize; int ret = zoo_get(zkhandle_, path, 0, output, &val_len, &stat); if (ret == ZOK) { output[val_len] = '\0'; @@ -230,24 +125,168 @@ bool ZKClusterRT::GetZKNode(const char* path, char* output) { return false; } -bool ZKClusterRT::GetZKChild(const char* path, std::vector<string>* vt) { - // get all children in lock folder +bool ZKService::GetChild(const char* path, std::vector<string>* vt) { struct String_vector child; int ret = zoo_get_children(zkhandle_, path, 0, &child); if (ret == ZOK) { for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]); return true; } - LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)"; + LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get_children)"; return false; } -string ZKClusterRT::groupPath(int gid) { - return kZPathStatus+"/sg"+to_string(gid); +bool ZKService::WGetChild(const char* path, std::vector<std::string>* vt, + RTCallback *cb) { + struct String_vector child; + int ret = zoo_wget_children(zkhandle_, path, ChildChanges, cb, &child); + if (ret == ZOK) { + for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]); + return true; + } + LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get_children)"; + return false; +} + + +void ZKService::WatcherGlobal(zhandle_t * zh, int type, int state, + const char *path, void *watcherCtx) { + if (type == ZOO_SESSION_EVENT) { + if (state == ZOO_CONNECTED_STATE) + LOG(INFO) << "GLOBAL_WATCHER connected to zookeeper successfully!"; + else if (state == ZOO_EXPIRED_SESSION_STATE) + LOG(INFO) << "GLOBAL_WATCHER zookeeper session expired!"; + } +} + +ZKClusterRT::ZKClusterRT(const string& host) : ZKClusterRT(host, 30000) {} + +ZKClusterRT::ZKClusterRT(const string& host, int timeout) { + host_ = host; + timeout_ = timeout; } -string ZKClusterRT::workerPath(int gid, int wid) { - return "/g"+to_string(gid)+"_w"+to_string(wid); +ZKClusterRT::~ZKClusterRT() { + // release callback vector + for (RTCallback* p : cb_vec_) { + delete p; + } +} + +bool ZKClusterRT::Init() { + if (!zk_.Init(host_, timeout_)) return false; + // create kZKPathSinga + if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr)) + return false; + // create kZKPathStatus + if (!zk_.CreateNode(kZKPathStatus.c_str(), nullptr, 0, nullptr)) + return false; + // create kZKPathRegist + if (!zk_.CreateNode(kZKPathRegist.c_str(), nullptr, 0, nullptr)) + return false; + // create kZKPathRegistProc + if (!zk_.CreateNode(kZKPathRegistProc.c_str(), nullptr, 0, nullptr)) + return false; + // create kZKPathRegistLock + if (!zk_.CreateNode(kZKPathRegistLock.c_str(), nullptr, 0, nullptr)) + return false; + return true; +} + +int ZKClusterRT::RegistProc(const string& host_addr) { + char buf[kZKBufSize]; + string lock = kZKPathRegistLock+"/lock-"; + if (!zk_.CreateNode(lock.c_str(), nullptr, + ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) { + return -1; + } + // get all children in lock folder + std::vector<string> vt; + if (!zk_.GetChild(kZKPathRegistLock.c_str(), &vt)) { + return -1; + } + // find own position among all locks + int id = -1; + std::sort(vt.begin(), vt.end()); + for (int i = 0; i < static_cast<int>(vt.size()); ++i) { + if (kZKPathRegistLock+"/"+vt[i] == buf) { + id = i; + break; + } + } + if (id == -1) { + LOG(ERROR) << "cannot find own node " << buf; + return -1; + } + // create a new node in proc path + string path = kZKPathRegistProc+"/proc-"+to_string(id); + if (!zk_.CreateNode(path.c_str(), host_addr.c_str(), ZOO_EPHEMERAL, + nullptr)) { + return -1; + } + return id; +} + +bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback fn, void *ctx) { + CHECK_NOTNULL(fn); + string path = groupPath(gid); + // create zk node + if (!zk_.CreateNode(path.c_str(), nullptr, 0, nullptr)) return false; + std::vector<string> child; + // store the callback function and context for later usage + RTCallback *cb = new RTCallback; + cb->fn = fn; + cb->ctx = ctx; + cb_vec_.push_back(cb); + // start to watch on the zk node, does not care about the first return value + return zk_.WGetChild(path.c_str(), &child, cb); +} + +string ZKClusterRT::GetProcHost(int proc_id) { + // char buf[kZKBufSize]; + char val[kZKBufSize]; + // construct file name + string path = kZKPathRegistProc+"/proc-"+to_string(proc_id); + if (!zk_.GetNode(path.c_str(), val)) return ""; + return string(val); +} + +bool ZKClusterRT::JoinSGroup(int gid, int wid, int s_group) { + string path = groupPath(s_group) + workerPath(gid, wid); + // try to create an ephemeral node under server group path + return zk_.CreateNode(path.c_str(), nullptr, ZOO_EPHEMERAL, nullptr); +} + +bool ZKClusterRT::LeaveSGroup(int gid, int wid, int s_group) { + string path = groupPath(s_group) + workerPath(gid, wid); + return zk_.DeleteNode(path.c_str()); +} + +JobManager::JobManager(const string& host) : JobManager(host, 30000) {} + +JobManager::JobManager(const string& host, int timeout) { + host_ = host; + timeout_ = timeout; +} + +bool JobManager::Init() { + return zk_.Init(host_, timeout_); +} + +bool JobManager::Clean() { + if (zk_.Exist(kZKPathSinga.c_str())) { + return CleanPath(kZKPathSinga.c_str()); + } + return true; +} + +bool JobManager::CleanPath(const std::string& path) { + std::vector<string> child; + if (!zk_.GetChild(path.c_str(), &child)) return false; + for (string c : child) { + if (!CleanPath(path + "/" + c)) return false; + } + return zk_.DeleteNode(path.c_str()); } } // namespace singa http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/209ba1f2/src/utils/tool.cc ---------------------------------------------------------------------- diff --git a/src/utils/tool.cc b/src/utils/tool.cc new file mode 100644 index 0000000..37ce729 --- /dev/null +++ b/src/utils/tool.cc @@ -0,0 +1,25 @@ +#include <gflags/gflags.h> +#include <glog/logging.h> +#include "proto/cluster.pb.h" +#include "utils/cluster_rt.h" +#include "utils/common.h" + +DEFINE_string(global, "conf/singa.conf", "Global config file"); + +int main(int argc, char **argv) { + google::InitGoogleLogging(argv[0]); + gflags::ParseCommandLineFlags(&argc, &argv, true); + + singa::GlobalProto global; + singa::ReadProtoFromTextFile(FLAGS_global.c_str(), &global); + singa::SetupLog(global.log_dir(), "SingaTool"); + + LOG(INFO) << "The global config is \n" << global.DebugString(); + + singa::JobManager mng(global.zookeeper_host()); + int ret = 0; + if (!mng.Init()) ret = 1; + if (!mng.Clean()) ret = 1; + if (ret) LOG(ERROR) << "errors in SingaTool!"; + return ret; +}
