SINGA-11 Start SINGA on Apache Mesos Fixed bug in singa_scheduler.cc to stop the job only after all tasks has finished. The old version stops when any one task finishes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/2f5db2a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/2f5db2a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/2f5db2a1 Branch: refs/heads/master Commit: 2f5db2a1c3a3d8b6d08bbc1714e07fc6e3f133ac Parents: 96c440f Author: Anh Dinh <[email protected]> Authored: Wed Oct 21 11:35:59 2015 +0800 Committer: Anh Dinh <[email protected]> Committed: Wed Oct 21 11:36:06 2015 +0800 ---------------------------------------------------------------------- tool/mesos/singa_scheduler.cc | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/2f5db2a1/tool/mesos/singa_scheduler.cc ---------------------------------------------------------------------- diff --git a/tool/mesos/singa_scheduler.cc b/tool/mesos/singa_scheduler.cc index 2474872..408b609 100644 --- a/tool/mesos/singa_scheduler.cc +++ b/tool/mesos/singa_scheduler.cc @@ -87,7 +87,7 @@ class SingaScheduler: public mesos::Scheduler { * @param jc job counter */ SingaScheduler(string namenode, string job_conf_file, int jc): - job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false), job_counter_(jc) { + job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false), job_counter_(jc), task_counter_(0) { hdfs_handle_ = hdfs_connect(namenode.c_str()); if (hdfs_handle_) { if (hdfsExists(hdfs_handle_, SINGA_CONFIG) != 0) @@ -107,7 +107,7 @@ class SingaScheduler: public mesos::Scheduler { * @param job_conf_file job configuration file */ SingaScheduler(string namenode, string job_conf_file, string singa_conf, int jc) - : job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false), job_counter_(jc) { + : job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false), job_counter_(jc), task_counter_(0) { hdfs_handle_ = hdfs_connect(namenode); if (!hdfs_handle_ || !hdfs_overwrite(hdfs_handle_, SINGA_CONFIG, singa_conf)) LOG(ERROR) << "Failed to connect to HDFS"; @@ -194,7 +194,6 @@ class SingaScheduler: public mesos::Scheduler { snprintf(path, 512, "/singa/%d/job.conf", job_counter_); hdfs_overwrite(hdfs_handle_, path, job_conf_file_); - int job_count = 0; // launch tasks for (map<string, vector<mesos::TaskInfo>*>::iterator it = tasks_.begin(); it != tasks_.end(); ++it) { @@ -203,8 +202,8 @@ class SingaScheduler: public mesos::Scheduler { newId.set_value(it->first); LOG(INFO) << "Launching task with offer ID = " << newId.value(); driver->launchTasks(newId, *(it->second)); - job_count++; - if (job_count>= job_conf_.cluster().nworker_groups()) + task_counter_++; + if (task_counter_>= job_conf_.cluster().nworker_groups()) break; } @@ -220,7 +219,11 @@ class SingaScheduler: public mesos::Scheduler { virtual void statusUpdate(SchedulerDriver* driver, const mesos::TaskStatus& status) { if (status.state() == mesos::TASK_FINISHED) + task_counter_--; + + if (task_counter_ == 0) driver->stop(); + else if (status.state() == mesos::TASK_FAILED) { LOG(ERROR) << "TASK FAILED !!!!"; driver->abort(); @@ -354,7 +357,7 @@ class SingaScheduler: public mesos::Scheduler { return job_conf_.cluster().server_worker_separate()? ncpus >= (n1+n2) : ncpus >= (n1 > n2 ? n1 : n2); } - int job_counter_; + int job_counter_, task_counter_; // true if the job has been launched bool is_running_; @@ -373,6 +376,7 @@ class SingaScheduler: public mesos::Scheduler { int main(int argc, char** argv) { FLAGS_logtostderr = 1; + google::InitGoogleLogging(argv[0]); int status = mesos::DRIVER_RUNNING; SingaScheduler *scheduler; if (!(argc == 2 || argc == 4 || argc == 6)) {
