SINGA-38 Support concurrent jobs
After supporting concurrent jobs, SINGA could be used in following ways:
1. Quick Run
This is for whose who just want to test singa or run a single job.
* start a singa job
$./bin/singa-run.sh -conf=CONF_DIR
* stop singa
$./bin/singa-stop.sh
* remove all singa-related data
$./bin/singa-cleanup.sh
2. Multi-Job Management
This is for whose who want to manage multiple jobs in a cluster.
In this case you should first:
- edit host file in conf/hostfile
- edit zookeeper host in conf/singa.conf
- make your ssh command password-free
* start a singa job
$./bin/singa-run.sh -conf=CONF_DIR
* list/view/kill singa jobs
$./bin/singa-console.sh list
$./bin/singa-console.sh view JOB_ID
$./bin/singa-console.sh kill JOB_ID
* stop entire singa
$./bin/singa-stop.sh
* remove all singa-related data
$./bin/singa-cleanup.sh
3. Zookeeper Management
If you do not have a zookeeper, please install via thirdparty/install.sh
If you want to manage zookeeper externally, please ensure it is running, and
$export ZK_HOME=your_zookeeper_path
$export SINGA_MANAGES_ZK=false
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/1fa5d5f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/1fa5d5f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/1fa5d5f1
Branch: refs/heads/master
Commit: 1fa5d5f110b1c9b2feb59644c0e2fc58b8b32301
Parents: f746b99
Author: wang sheng <[email protected]>
Authored: Tue Jul 21 20:48:36 2015 +0800
Committer: wang sheng <[email protected]>
Committed: Tue Jul 21 22:33:31 2015 +0800
----------------------------------------------------------------------
.gitignore | 2 +
bin/singa-console.sh | 81 ++++++++++++++++++
bin/singa-run.sh | 36 ++++----
bin/singa-stop.sh | 6 +-
include/trainer/trainer.h | 4 +-
include/utils/cluster.h | 8 +-
include/utils/cluster_rt.h | 53 +++++++++---
src/main.cc | 9 +-
src/trainer/trainer.cc | 17 ++--
src/utils/cluster.cc | 28 ++-----
src/utils/cluster_rt.cc | 179 +++++++++++++++++++++++++++++++---------
src/utils/tool.cc | 71 ++++++++++++++--
12 files changed, 378 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index be85b4f..9abefe5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,6 +17,8 @@
*.pb.h
*.pb.cc
*.hosts
+*.id
+*.tmp
*.out
tool/pb2/*
src/test/data/*
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/bin/singa-console.sh
----------------------------------------------------------------------
diff --git a/bin/singa-console.sh b/bin/singa-console.sh
new file mode 100755
index 0000000..8844bca
--- /dev/null
+++ b/bin/singa-console.sh
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Copyright 2015 The Apache Software Foundation
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+#
+# console to list/view/kill singa jobs
+#
+
+usage="Usage:\n
+ singa-console.sh list : list running singa jobs\n
+ singa-console.sh view JOB_ID : view procs of a singa job\n
+ singa-console.sh kill JOB_ID : kill a singa job"
+
+if [ $# == 0 ]; then
+ echo -e $usage
+ exit 1
+fi
+
+# get environment variables
+. `dirname "${BASH_SOURCE-$0}"`/singa-env.sh
+cd $SINGA_HOME
+
+case $1 in
+ list)
+ if [ $# != 1 ]; then
+ echo -e $usage
+ exit 1
+ fi
+ ./singatool list || exit 1
+ ;;
+
+ view)
+ if [ $# != 2 ]; then
+ echo -e $usage
+ exit 1
+ fi
+ ./singatool view $2 || exit 1
+ ;;
+
+ kill)
+ if [ $# != 2 ]; then
+ echo -e $usage
+ exit 1
+ fi
+ host_file="job-$2.tmp"
+ ./singatool view $2 1>$host_file || exit 1
+ ssh_options="-oStrictHostKeyChecking=no \
+ -oUserKnownHostsFile=/dev/null \
+ -oLogLevel=quiet"
+ hosts=`cat $host_file | cut -d ' ' -f 1`
+ for i in ${hosts[@]}; do
+ echo kill singa @ $i ...
+ proc=(`echo $i | tr '|' ' '`)
+ singa_kill="kill -9 "${proc[1]}
+ ssh $ssh_options ${proc[0]} $singa_kill
+ done
+ rm $host_file
+ ./singatool clean $2 || exit 1
+ ;;
+
+ *)
+ echo -e $usage
+ exit 1
+esac
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/bin/singa-run.sh
----------------------------------------------------------------------
diff --git a/bin/singa-run.sh b/bin/singa-run.sh
index 2c282c3..a478221 100755
--- a/bin/singa-run.sh
+++ b/bin/singa-run.sh
@@ -23,12 +23,8 @@
# run a Singa job
#
-usage="Usage: singa-run.sh -conf=CONF_DIR
- (CONF_DIR should contain cluster.conf && model.conf)"
-# usage="Usage: \n
-# (single process): singa-run.sh -cluster=YOUR_CONF_FILE
-model=YOUR_CONF_FILE \n
-# (multi-process): singa-run.sh -conf=YOUR_CONF_DIR
-# (the directory should contain cluster.conf && model.conf)"
+usage="Usage: singa-run.sh -conf=CONF_DIR
+ (CONF_DIR should contain cluster.conf && model.conf)"
# check arguments
if [ $# != 1 ] || [[ $1 != "-conf="* ]]; then
@@ -40,32 +36,40 @@ fi
. `dirname "${BASH_SOURCE-$0}"`/singa-env.sh
# get workspace path
workspace=`cd "${1:6}">/dev/null; pwd`
+cluster_conf=$workspace/cluster.conf
+model_conf=$workspace/model.conf
+if [ ! -f $cluster_conf ] || [ ! -f $model_conf ]; then
+ echo cluster.conf or model.conf not exists in $workspace
+ exit 1
+fi
+cd $SINGA_HOME
# start zookeeper
if [ $SINGA_MANAGES_ZK = true ]; then
$SINGA_BIN/zk-service.sh start || exit 1
fi
-# cleanup old processes and data
-$SINGA_BIN/singa-stop.sh || exit 1
-
# generate host file
host_file=$workspace/job.hosts
-python $SINGA_HOME/tool/gen_hosts.py -conf=$workspace/cluster.conf \
+python $SINGA_HOME/tool/gen_hosts.py -conf=$cluster_conf \
-hosts=$SINGA_CONF/hostfile \
-output=$host_file \
|| exit 1
+# generate unique job id
+./singatool create 1>$workspace/job.id || exit 1
+job_id=`cat $workspace/job.id`
+echo generate job id at $workspace/job.id [job_id = $job_id]
+
# ssh and start singa processes
ssh_options="-oStrictHostKeyChecking=no \
-oUserKnownHostsFile=/dev/null \
-oLogLevel=quiet"
-hosts=`cat $host_file |cut -d ' ' -f 1`
-# cd to SINGA_HOME as it need conf/singa.conf
-cd $SINGA_HOME
-singa_run="./singa -cluster=$workspace/cluster.conf
-model=$workspace/model.conf"
-singa_sshrun="cd $SINGA_HOME; ./singa -cluster=$workspace/cluster.conf \
- -model=$workspace/model.conf"
+hosts=`cat $host_file | cut -d ' ' -f 1`
+singa_run="./singa -cluster=$cluster_conf -model=$model_conf \
+ -job=$job_id"
+singa_sshrun="cd $SINGA_HOME; $singa_run"
+
for i in ${hosts[@]} ; do
if [ $i = localhost ] ; then
echo executing : $singa_run
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/bin/singa-stop.sh
----------------------------------------------------------------------
diff --git a/bin/singa-stop.sh b/bin/singa-stop.sh
index e3f24dc..dd3e7bf 100755
--- a/bin/singa-stop.sh
+++ b/bin/singa-stop.sh
@@ -34,13 +34,14 @@
# get environment variables
. `dirname "${BASH_SOURCE-$0}"`/singa-env.sh
+cd $SINGA_HOME
# kill singa processes
host_file=$SINGA_CONF/hostfile
ssh_options="-oStrictHostKeyChecking=no \
-oUserKnownHostsFile=/dev/null \
-oLogLevel=quiet"
-hosts=`cat $host_file |cut -d ' ' -f 1`
+hosts=`cat $host_file | cut -d ' ' -f 1`
singa_kill="killall -s SIGKILL -r singa"
for i in ${hosts[@]}; do
echo kill singa @ $i ...
@@ -56,5 +57,4 @@ sleep 2
# remove zk data
# singatool need global conf under SINGA_HOME
echo cleanning metadata in zookeeper ...
-cd $SINGA_HOME
-./singatool || exit 1
+./singatool cleanup || exit 1
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
index 6c2b7c6..9f47ccd 100644
--- a/include/trainer/trainer.h
+++ b/include/trainer/trainer.h
@@ -34,8 +34,8 @@ class Trainer{
* @param globalconf global singa configuration
* @param cconf cluster configuration
*/
- void Start(bool resume, int job, ModelProto& mconf,
- const GlobalProto& gconf, const ClusterProto& cconf);
+ void Start(ModelProto& mconf, const GlobalProto& gconf,
+ const ClusterProto& cconf, int job, bool resume);
protected:
/**
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
index 570377f..b48c1c5 100644
--- a/include/utils/cluster.h
+++ b/include/utils/cluster.h
@@ -24,8 +24,8 @@ namespace singa {
class Cluster {
public:
static shared_ptr<Cluster> Get();
- static shared_ptr<Cluster> Get(const GlobalProto& global,
- const ClusterProto& cluster, int procs_id=0);
+ static shared_ptr<Cluster> Get(const GlobalProto& global,
+ const ClusterProto& cluster, int job_id);
const int nserver_groups()const{ return cluster_.nserver_groups(); }
const int nworker_groups()const { return cluster_.nworker_groups(); }
@@ -125,10 +125,10 @@ class Cluster {
const string hostip() const {
return hostip_;
}
- void Register(const string& endpoint);
+ void Register(const string& endpoint, int pid);
private:
- Cluster(const GlobalProto& global, const ClusterProto &cluster, int
procs_id) ;
+ Cluster(const GlobalProto& global, const ClusterProto &cluster, int job_id);
void SetupFolders(const ClusterProto &cluster);
int Hash(int gid, int id, int flag);
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/include/utils/cluster_rt.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster_rt.h b/include/utils/cluster_rt.h
index 55ca243..ab95adf 100644
--- a/include/utils/cluster_rt.h
+++ b/include/utils/cluster_rt.h
@@ -27,11 +27,11 @@ class ClusterRuntime {
*
* \return the process id, -1 if failed
*/
- virtual int RegistProc(const std::string& host_addr) = 0;
+ virtual int RegistProc(const std::string& host_addr, int pid) = 0;
/**
* translate the process id to host address
*
- * \return the host and port, "" if no such proc id
+ * \return the host and port, "" if no such proc id
*/
virtual std::string GetProcHost(int proc_id) = 0;
/**
@@ -49,18 +49,35 @@ class ClusterRuntime {
virtual bool LeaveSGroup(int gid, int wid, int s_group) = 0;
};
+const int kZKBufSize = 100;
+// following paths are global
const std::string kZKPathSinga = "/singa";
-const std::string kZKPathStatus = "/singa/status";
-const std::string kZKPathRegist = "/singa/regist";
-const std::string kZKPathRegistProc = "/singa/regist/proc";
-const std::string kZKPathRegistLock = "/singa/regist/lock";
-const int kZKBufSize = 50;
+const std::string kZKPathSys = "/singa/sys";
+const std::string kZKPathJLock = "/singa/sys/job-lock";
+const std::string kZKPathApp = "/singa/app";
+const std::string kZKPathJob = "/singa/app/job-";
+// following paths are local under /singa/app/job-X
+const std::string kZKPathJobGroup = "/group";
+const std::string kZKPathJobProc = "/proc";
+const std::string kZKPathJobPLock = "/proc-lock";
+
+inline std::string GetZKJobWorkspace(int job_id) {
+ char buf[kZKBufSize];
+ sprintf(buf, "%010d", job_id);
+ return kZKPathJob + buf;
+}
struct RTCallback {
rt_callback fn;
void* ctx;
};
+struct JobInfo {
+ int id;
+ int procs;
+ std::string name;
+};
+
class ZKService {
public:
static void ChildChanges(zhandle_t* zh, int type, int state,
@@ -87,12 +104,12 @@ class ZKService {
class ZKClusterRT : public ClusterRuntime {
public:
- explicit ZKClusterRT(const std::string& host);
- ZKClusterRT(const std::string& host, int timeout);
+ ZKClusterRT(const std::string& host, int job_id);
+ ZKClusterRT(const std::string& host, int job_id, int timeout);
~ZKClusterRT() override;
bool Init() override;
- int RegistProc(const std::string& host_addr) override;
+ int RegistProc(const std::string& host_addr, int pid) override;
std::string GetProcHost(int proc_id) override;
bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx) override;
bool JoinSGroup(int gid, int wid, int s_group) override;
@@ -100,7 +117,7 @@ class ZKClusterRT : public ClusterRuntime {
private:
inline std::string groupPath(int gid) {
- return kZKPathStatus + "/sg" + std::to_string(gid);
+ return group_path_ + "/sg" + std::to_string(gid);
}
inline std::string workerPath(int gid, int wid) {
return "/g" + std::to_string(gid) + "_w" + std::to_string(wid);
@@ -109,6 +126,10 @@ class ZKClusterRT : public ClusterRuntime {
int timeout_ = 30000;
std::string host_ = "";
ZKService zk_;
+ std::string workspace_ = "";
+ std::string group_path_ = "";
+ std::string proc_path_ = "";
+ std::string proc_lock_path_ = "";
std::vector<RTCallback*> cb_vec_;
};
@@ -118,10 +139,16 @@ class JobManager {
JobManager(const std::string& host, int timeout);
bool Init();
- bool Clean();
+ int GenerateJobID();
+ bool ListJobs(std::vector<JobInfo>* jobs);
+ bool ListJobProcs(int job, std::vector<std::string>* procs);
+ bool Clean(int job);
+ bool Cleanup();
private:
- bool CleanPath(const std::string& path);
+ const int kJobsNotRemoved = 10;
+
+ bool CleanPath(const std::string& path, bool remove);
int timeout_ = 30000;
std::string host_ = "";
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/src/main.cc
----------------------------------------------------------------------
diff --git a/src/main.cc b/src/main.cc
index a72a2db..cda1294 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -18,10 +18,9 @@
* easily, e.g., AddLayer(layer_type, source_layers, meta_data).
*/
-// Job ID is not used now, TODO passing job id from singa-run script and
-// re-organize ClusterProto, GlobalProto and ModelProto.
-DEFINE_int32(job, -1, "Job ID"); // not used now
-DEFINE_bool(resume, false, "resume from checkpoint");
+// TODO: re-organize ClusterProto, GlobalProto and ModelProto.
+DEFINE_int32(job, -1, "Unique job ID");
+DEFINE_bool(resume, false, "Resume from checkpoint");
DEFINE_string(cluster, "examples/mnist/cluster.conf", "Cluster config file");
DEFINE_string(model, "examples/mnist/conv.conf", "Model config file");
DEFINE_string(global, "conf/singa.conf", "Global config file");
@@ -54,6 +53,6 @@ int main(int argc, char **argv) {
RegisterClasses(model);
singa::Trainer trainer;
- trainer.Start(FLAGS_resume, FLAGS_job, model, global, cluster);
+ trainer.Start(model, global, cluster, FLAGS_job, FLAGS_resume);
return 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/src/trainer/trainer.cc
----------------------------------------------------------------------
diff --git a/src/trainer/trainer.cc b/src/trainer/trainer.cc
index 5d43e19..78ec49f 100644
--- a/src/trainer/trainer.cc
+++ b/src/trainer/trainer.cc
@@ -4,6 +4,7 @@
#include <chrono>
#include <glog/logging.h>
#include "utils/tinydir.h"
+#include <unistd.h>
#include "utils/cluster.h"
#include "utils/common.h"
#include "proto/common.pb.h"
@@ -229,8 +230,8 @@ void Trainer::Resume(ModelProto& mconf) {
tinydir_close(&dir);
}
-void Trainer::Start(bool resume, int job, ModelProto& mconf,
- const GlobalProto& gconf, const ClusterProto& cconf) {
+void Trainer::Start(ModelProto& mconf, const GlobalProto& gconf,
+ const ClusterProto& cconf, int job, bool resume){
// register job to zookeeper at the beginning
auto cluster=Cluster::Get(gconf, cconf, job);
@@ -240,14 +241,10 @@ void Trainer::Start(bool resume, int job, ModelProto&
mconf,
router_ = new Router();
router_->Bind(kInprocRouterEndpoint);
- if (cluster->nprocs() > 1) {
- const string hostip = cluster->hostip();
- int port = router_->Bind("tcp://" + hostip + ":*");
- // register endpoint to zookeeper
- cluster->Register(hostip + ":" + std::to_string(port));
- } else {
- cluster->set_procs_id(0);
- }
+ const string hostip = cluster->hostip();
+ int port = router_->Bind("tcp://" + hostip + ":*");
+ // register endpoint to zookeeper
+ cluster->Register(hostip + ":" + std::to_string(port), getpid());
int nthreads = 1;
const vector<Worker*> workers = CreateWorkers(nthreads, mconf);
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/src/utils/cluster.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster.cc b/src/utils/cluster.cc
index 3019f14..791332d 100644
--- a/src/utils/cluster.cc
+++ b/src/utils/cluster.cc
@@ -11,8 +11,7 @@ namespace singa {
std::shared_ptr<Cluster> Cluster::instance_;
Cluster::Cluster(const GlobalProto & global, const ClusterProto &cluster,
- int procs_id) {
- procs_id_=procs_id;
+ int job_id) {
cluster_ = cluster;
global_ = global;
SetupFolders(cluster);
@@ -20,16 +19,6 @@ Cluster::Cluster(const GlobalProto & global, const
ClusterProto &cluster,
nprocs_=nworker_procs()+nserver_procs();
else
nprocs_=std::max(nworker_procs(), nserver_procs());
- CHECK_LT(procs_id, nprocs_);
- if(nprocs_>1&&procs_id>-1){
- std::ifstream ifs(cluster.hostfile(), std::ifstream::in);
- std::string line;
- while(std::getline(ifs, line)&&
- endpoints_.size()< static_cast<size_t>(nprocs_)){
- endpoints_.push_back(line);
- }
- CHECK_EQ(endpoints_.size(), nprocs_);
- }
// locate the process id of every worker/server
int ngrps=cluster_.nworker_groups(), grp_size=cluster_.nworkers_per_group();
@@ -49,18 +38,19 @@ Cluster::Cluster(const GlobalProto & global, const
ClusterProto &cluster,
}
}
- auto rt=new ZKClusterRT(global_.zookeeper_host());
+ auto rt = new ZKClusterRT(global_.zookeeper_host(), job_id);
rt->Init();
cluster_rt_=shared_ptr<ClusterRuntime>(static_cast<ClusterRuntime*>(rt));
hostip_=GetHostIP();
}
-void Cluster::Register(const string& endpoint) {
- procs_id_=cluster_rt_->RegistProc(endpoint);
+void Cluster::Register(const string& endpoint, int pid) {
+ procs_id_=cluster_rt_->RegistProc(endpoint, pid);
CHECK_GE(procs_id_,0);
CHECK_LT(procs_id_,nprocs());
- LOG(ERROR) << "proc #" << procs_id_ << " -> " << endpoint;
+ LOG(ERROR) << "proc #" << procs_id_ << " -> " << endpoint
+ << " (pid = " << pid << ")";
}
const string Cluster::endpoint(int procsid) const {
@@ -79,9 +69,9 @@ void Cluster::SetupFolders(const ClusterProto &cluster){
mkdir(checkpoint_folder().c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
}
-shared_ptr<Cluster> Cluster::Get(const GlobalProto& global, const
ClusterProto& cluster,
- int procs_id){
- instance_.reset(new Cluster(global, cluster, procs_id));
+shared_ptr<Cluster> Cluster::Get(const GlobalProto& global,
+ const ClusterProto& cluster, int job_id){
+ instance_.reset(new Cluster(global, cluster, job_id));
return instance_;
}
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/src/utils/cluster_rt.cc
----------------------------------------------------------------------
diff --git a/src/utils/cluster_rt.cc b/src/utils/cluster_rt.cc
index 7e5a86b..c722a73 100644
--- a/src/utils/cluster_rt.cc
+++ b/src/utils/cluster_rt.cc
@@ -5,6 +5,7 @@
using std::string;
using std::to_string;
+using std::vector;
namespace singa {
@@ -25,11 +26,11 @@ void ZKService::ChildChanges(zhandle_t *zh, int type, int
state,
cb->fn = nullptr;
}
} else {
- LOG(ERROR) << "Unhandled ZK error code: " << ret
- << " (zoo_wget_children)";
+ LOG(FATAL) << "Unhandled ZK error code: " << ret
+ << " (zoo_wget_children " << path << ")";
}
} else {
- LOG(ERROR) << "Unhandled callback type code: "<< type;
+ LOG(FATAL) << "Unhandled callback type code: "<< type;
}
}
@@ -84,7 +85,8 @@ bool ZKService::CreateNode(const char* path, const char* val,
int flag,
LOG(WARNING) << "zookeeper node " << path << " already exists";
return true;
}
- LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_create)";
+ LOG(FATAL) << "Unhandled ZK error code: " << ret
+ << " (zoo_create " << path << ")";
return false;
}
@@ -97,7 +99,8 @@ bool ZKService::DeleteNode(const char* path) {
LOG(WARNING) << "try to delete an non-existing zookeeper node " << path;
return true;
}
- LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_delete)";
+ LOG(FATAL) << "Unhandled ZK error code: " << ret
+ << " (zoo_delete " << path << ")";
return false;
}
@@ -106,7 +109,7 @@ bool ZKService::Exist(const char* path) {
int ret = zoo_exists(zkhandle_, path, 0, &stat);
if (ret == ZOK) return true;
else if (ret == ZNONODE) return false;
- //LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_exists)";
+ LOG(WARNING) << "Unhandled ZK error code: " << ret << " (zoo_exists)";
return false;
}
@@ -121,30 +124,35 @@ bool ZKService::GetNode(const char* path, char* output) {
LOG(ERROR) << "zk node " << path << " does not exist";
return false;
}
- LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get)";
+ LOG(FATAL) << "Unhandled ZK error code: " << ret
+ << " (zoo_get " << path << ")";
return false;
}
-bool ZKService::GetChild(const char* path, std::vector<string>* vt) {
+bool ZKService::GetChild(const char* path, vector<string>* vt) {
struct String_vector child;
int ret = zoo_get_children(zkhandle_, path, 0, &child);
if (ret == ZOK) {
+ vt->clear();
for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
return true;
}
- LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get_children)";
+ LOG(FATAL) << "Unhandled ZK error code: " << ret
+ << " (zoo_get_children " << path << ")";
return false;
}
-bool ZKService::WGetChild(const char* path, std::vector<std::string>* vt,
+bool ZKService::WGetChild(const char* path, vector<string>* vt,
RTCallback *cb) {
struct String_vector child;
int ret = zoo_wget_children(zkhandle_, path, ChildChanges, cb, &child);
if (ret == ZOK) {
+ vt->clear();
for (int i = 0; i < child.count; ++i) vt->push_back(child.data[i]);
return true;
}
- LOG(ERROR) << "Unhandled ZK error code: " << ret << " (zoo_get_children)";
+ LOG(FATAL) << "Unhandled ZK error code: " << ret
+ << " (zoo_get_children " << path << ")";
return false;
}
@@ -159,11 +167,15 @@ void ZKService::WatcherGlobal(zhandle_t * zh, int type,
int state,
}
}
-ZKClusterRT::ZKClusterRT(const string& host) : ZKClusterRT(host, 30000) {}
+ZKClusterRT::ZKClusterRT(const string& host, int job_id) : ZKClusterRT(host,
job_id, 30000) {}
-ZKClusterRT::ZKClusterRT(const string& host, int timeout) {
+ZKClusterRT::ZKClusterRT(const string& host, int job_id, int timeout) {
host_ = host;
timeout_ = timeout;
+ workspace_ = GetZKJobWorkspace(job_id);
+ group_path_ = workspace_ + kZKPathJobGroup;
+ proc_path_ = workspace_ + kZKPathJobProc;
+ proc_lock_path_ = workspace_ + kZKPathJobPLock;
}
ZKClusterRT::~ZKClusterRT() {
@@ -175,41 +187,38 @@ ZKClusterRT::~ZKClusterRT() {
bool ZKClusterRT::Init() {
if (!zk_.Init(host_, timeout_)) return false;
- // create kZKPathSinga
if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
return false;
- // create kZKPathStatus
- if (!zk_.CreateNode(kZKPathStatus.c_str(), nullptr, 0, nullptr))
+ if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr))
return false;
- // create kZKPathRegist
- if (!zk_.CreateNode(kZKPathRegist.c_str(), nullptr, 0, nullptr))
+ if (!zk_.CreateNode(workspace_.c_str(), nullptr, 0, nullptr))
return false;
- // create kZKPathRegistProc
- if (!zk_.CreateNode(kZKPathRegistProc.c_str(), nullptr, 0, nullptr))
+ if (!zk_.CreateNode(group_path_.c_str(), nullptr, 0, nullptr))
return false;
- // create kZKPathRegistLock
- if (!zk_.CreateNode(kZKPathRegistLock.c_str(), nullptr, 0, nullptr))
+ if (!zk_.CreateNode(proc_path_.c_str(), nullptr, 0, nullptr))
+ return false;
+ if (!zk_.CreateNode(proc_lock_path_.c_str(), nullptr, 0, nullptr))
return false;
return true;
}
-int ZKClusterRT::RegistProc(const string& host_addr) {
+int ZKClusterRT::RegistProc(const string& host_addr, int pid) {
char buf[kZKBufSize];
- string lock = kZKPathRegistLock+"/lock-";
+ string lock = proc_lock_path_ + "/lock-";
if (!zk_.CreateNode(lock.c_str(), nullptr,
ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
return -1;
}
// get all children in lock folder
- std::vector<string> vt;
- if (!zk_.GetChild(kZKPathRegistLock.c_str(), &vt)) {
+ vector<string> vt;
+ if (!zk_.GetChild(proc_lock_path_.c_str(), &vt)) {
return -1;
}
// find own position among all locks
int id = -1;
std::sort(vt.begin(), vt.end());
for (int i = 0; i < static_cast<int>(vt.size()); ++i) {
- if (kZKPathRegistLock+"/"+vt[i] == buf) {
+ if (proc_lock_path_+"/"+vt[i] == buf) {
id = i;
break;
}
@@ -219,8 +228,9 @@ int ZKClusterRT::RegistProc(const string& host_addr) {
return -1;
}
// create a new node in proc path
- string path = kZKPathRegistProc+"/proc-"+to_string(id);
- if (!zk_.CreateNode(path.c_str(), host_addr.c_str(), ZOO_EPHEMERAL,
+ string path = proc_path_ + "/proc-" + to_string(id);
+ string content = host_addr + "|" + to_string(pid);
+ if (!zk_.CreateNode(path.c_str(), content.c_str(), ZOO_EPHEMERAL,
nullptr)) {
return -1;
}
@@ -232,7 +242,7 @@ bool ZKClusterRT::WatchSGroup(int gid, int sid, rt_callback
fn, void *ctx) {
string path = groupPath(gid);
// create zk node
if (!zk_.CreateNode(path.c_str(), nullptr, 0, nullptr)) return false;
- std::vector<string> child;
+ vector<string> child;
// store the callback function and context for later usage
RTCallback *cb = new RTCallback;
cb->fn = fn;
@@ -242,12 +252,16 @@ bool ZKClusterRT::WatchSGroup(int gid, int sid,
rt_callback fn, void *ctx) {
return zk_.WGetChild(path.c_str(), &child, cb);
}
-string ZKClusterRT::GetProcHost(int proc_id) {
+std::string ZKClusterRT::GetProcHost(int proc_id) {
// char buf[kZKBufSize];
char val[kZKBufSize];
// construct file name
- string path = kZKPathRegistProc+"/proc-"+to_string(proc_id);
+ string path = proc_path_+"/proc-"+to_string(proc_id);
if (!zk_.GetNode(path.c_str(), val)) return "";
+ int len = strlen(val)-1;
+ while (len && val[len] != '|') --len;
+ CHECK(len);
+ val[len] = '\0';
return string(val);
}
@@ -270,23 +284,110 @@ JobManager::JobManager(const string& host, int timeout) {
}
bool JobManager::Init() {
- return zk_.Init(host_, timeout_);
+ if (!zk_.Init(host_, timeout_)) return false;
+ if (!zk_.CreateNode(kZKPathSinga.c_str(), nullptr, 0, nullptr))
+ return false;
+ if (!zk_.CreateNode(kZKPathSys.c_str(), nullptr, 0, nullptr))
+ return false;
+ if (!zk_.CreateNode(kZKPathJLock.c_str(), nullptr, 0, nullptr))
+ return false;
+ if (!zk_.CreateNode(kZKPathApp.c_str(), nullptr, 0, nullptr))
+ return false;
+ return true;
+}
+
+int JobManager::GenerateJobID() {
+ char buf[kZKBufSize];
+ string lock = kZKPathJLock + "/lock-";
+ if (!zk_.CreateNode(lock.c_str(), nullptr,
+ ZOO_EPHEMERAL | ZOO_SEQUENCE, buf)) {
+ return -1;
+ }
+ return atoi(buf+strlen(buf)-10);
}
-bool JobManager::Clean() {
+bool JobManager::ListJobProcs(int job, vector<string>* procs) {
+ procs->clear();
+ string job_path = GetZKJobWorkspace(job);
+ // check job path
+ if (!zk_.Exist(job_path.c_str())) {
+ LOG(ERROR) << "job " << job << " not exists";
+ return true;
+ }
+ string proc_path = job_path + kZKPathJobProc;
+ vector<string> vt;
+ // check job proc path
+ if (!zk_.GetChild(proc_path.c_str(), &vt)) {
+ return false;
+ }
+ char buf[singa::kZKBufSize];
+ for (string pname : vt){
+ pname = proc_path + "/" + pname;
+ if (!zk_.GetNode(pname.c_str(), buf)) continue;
+ std::string proc = "";
+ for (int i = 0; buf[i] != '\0'; ++i) {
+ if (buf[i] == ':') {
+ buf[i] = '\0';
+ proc += buf;
+ }
+ else if (buf[i] == '|') {
+ proc += buf + i;
+ }
+ }
+ procs->push_back(proc);
+ }
+ if (!procs->size()) LOG(ERROR) << "job " << job << " not exists";
+ return true;
+}
+
+bool JobManager::ListJobs(vector<JobInfo>* jobs) {
+ // get all children in app path
+ jobs->clear();
+ vector<string> vt;
+ if (!zk_.GetChild(kZKPathApp.c_str(), &vt)) {
+ return false;
+ }
+ std::sort(vt.begin(), vt.end());
+ int size = static_cast<int>(vt.size());
+ vector<string> procs;
+ for (int i = 0; i < size; ++i) {
+ string path = kZKPathApp + "/" + vt[i] + kZKPathJobProc;
+ if (!zk_.GetChild(path.c_str(), &procs)) continue;
+ JobInfo job;
+ string jid = vt[i].substr(vt[i].length()-10);
+ job.id = atoi(jid.c_str());
+ job.procs = procs.size();
+ jobs->push_back(job);
+ //may need to delete it
+ if (!job.procs && (i + kJobsNotRemoved < size))
+ CleanPath(kZKPathApp + "/" + vt[i], true);
+ }
+ return true;
+}
+
+bool JobManager::Clean(int job) {
+ string path = GetZKJobWorkspace(job) + kZKPathJobProc;
+ if (zk_.Exist(path.c_str())) {
+ return CleanPath(path.c_str(), false);
+ }
+ return true;
+}
+
+bool JobManager::Cleanup() {
if (zk_.Exist(kZKPathSinga.c_str())) {
- return CleanPath(kZKPathSinga.c_str());
+ return CleanPath(kZKPathSinga.c_str(), true);
}
return true;
}
-bool JobManager::CleanPath(const std::string& path) {
- std::vector<string> child;
+bool JobManager::CleanPath(const string& path, bool remove) {
+ vector<string> child;
if (!zk_.GetChild(path.c_str(), &child)) return false;
for (string c : child) {
- if (!CleanPath(path + "/" + c)) return false;
+ if (!CleanPath(path + "/" + c, true)) return false;
}
- return zk_.DeleteNode(path.c_str());
+ if (remove) return zk_.DeleteNode(path.c_str());
+ return true;
}
} // namespace singa
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/1fa5d5f1/src/utils/tool.cc
----------------------------------------------------------------------
diff --git a/src/utils/tool.cc b/src/utils/tool.cc
index 7309108..fc9f618 100644
--- a/src/utils/tool.cc
+++ b/src/utils/tool.cc
@@ -1,5 +1,7 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
+#include <iostream>
+#include <fstream>
#include "proto/cluster.pb.h"
#include "utils/cluster_rt.h"
#include "utils/common.h"
@@ -20,9 +22,68 @@ int main(int argc, char **argv) {
LOG(INFO) << "The global config is \n" << global.DebugString();
singa::JobManager mng(global.zookeeper_host());
- int ret = 0;
- if (!mng.Init()) ret = 1;
- if (!mng.Clean()) ret = 1;
- if (ret) LOG(ERROR) << "errors in SingaTool!";
- return ret;
+ std::string usage = "singatool usage:\n"
+ "./singatool create : generate a unique job id\n"
+ "./singatool list : list running singa jobs\n"
+ "./singatool view JOB_ID : view procs of a singa job\n"
+ "./singatool clean JOB_ID : clean a job path in zookeeper\n"
+ "./singatool cleanup : clean all singa data in zookeeper\n"
+ "./singatool listall : list all singa jobs\n";
+ if (argc <= 1) {
+ LOG(ERROR) << usage;
+ return 1;
+ }
+ if (!mng.Init()) return 1;
+ if (!strcmp(argv[1], "create")) {
+ int id = mng.GenerateJobID();
+ printf("%d\n", id);
+ }
+ else if (!strcmp(argv[1], "list")) {
+ std::vector<singa::JobInfo> jobs;
+ if (!mng.ListJobs(&jobs)) return 1;
+ printf("JOB ID |NUM PROCS \n");
+ printf("----------|-----------\n");
+ for (singa::JobInfo job : jobs) {
+ if (!job.procs) continue;
+ printf("job-%-6d|%-10d\n", job.id, job.procs);
+ }
+ }
+ else if (!strcmp(argv[1], "listall")) {
+ std::vector<singa::JobInfo> jobs;
+ if (!mng.ListJobs(&jobs)) return 1;
+ printf("JOB ID |NUM PROCS \n");
+ printf("----------|-----------\n");
+ for (singa::JobInfo job : jobs) {
+ printf("job-%-6d|%-10d\n", job.id, job.procs);
+ }
+ }
+ else if (!strcmp(argv[1], "view")) {
+ if (argc <= 2) {
+ LOG(ERROR) << usage;
+ return 1;
+ }
+ int id = atoi(argv[2]);
+ std::vector<std::string> procs;
+ if (!mng.ListJobProcs(id, &procs)) return 1;
+ for (std::string s : procs) {
+ printf("%s\n", s.c_str());
+ }
+ }
+ else if (!strcmp(argv[1], "clean")) {
+ if (argc <= 2) {
+ LOG(ERROR) << usage;
+ return 1;
+ }
+ int id = atoi(argv[2]);
+ if (!mng.Clean(id)) return 1;
+ }
+ else if (!strcmp(argv[1], "cleanup")) {
+ if (!mng.Cleanup()) return 1;
+ }
+ else{
+ LOG(ERROR) << usage;
+ return 1;
+ }
+
+ return 0;
}