SINGA-11 Start SINGA on Apache Mesos

Fixed bug in singa_scheduler.cc to stop the job only after all tasks has 
finished. The old version stops when any one task finishes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/2f5db2a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/2f5db2a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/2f5db2a1

Branch: refs/heads/master
Commit: 2f5db2a1c3a3d8b6d08bbc1714e07fc6e3f133ac
Parents: 96c440f
Author: Anh Dinh <[email protected]>
Authored: Wed Oct 21 11:35:59 2015 +0800
Committer: Anh Dinh <[email protected]>
Committed: Wed Oct 21 11:36:06 2015 +0800

----------------------------------------------------------------------
 tool/mesos/singa_scheduler.cc | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/2f5db2a1/tool/mesos/singa_scheduler.cc
----------------------------------------------------------------------
diff --git a/tool/mesos/singa_scheduler.cc b/tool/mesos/singa_scheduler.cc
index 2474872..408b609 100644
--- a/tool/mesos/singa_scheduler.cc
+++ b/tool/mesos/singa_scheduler.cc
@@ -87,7 +87,7 @@ class SingaScheduler: public mesos::Scheduler {
      * @param jc               job counter
      */
     SingaScheduler(string namenode, string job_conf_file, int jc):
-      job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), 
is_running_(false), job_counter_(jc) {
+      job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), 
is_running_(false), job_counter_(jc), task_counter_(0) {
         hdfs_handle_ = hdfs_connect(namenode.c_str());
         if (hdfs_handle_) {
           if (hdfsExists(hdfs_handle_, SINGA_CONFIG) != 0)
@@ -107,7 +107,7 @@ class SingaScheduler: public mesos::Scheduler {
      * @param job_conf_file    job configuration file
      */
     SingaScheduler(string namenode, string job_conf_file, string singa_conf, 
int jc)
-      : job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), 
is_running_(false), job_counter_(jc) {
+      : job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), 
is_running_(false), job_counter_(jc), task_counter_(0) {
         hdfs_handle_ = hdfs_connect(namenode);
         if (!hdfs_handle_ || !hdfs_overwrite(hdfs_handle_, SINGA_CONFIG, 
singa_conf))
           LOG(ERROR) << "Failed to connect to HDFS";
@@ -194,7 +194,6 @@ class SingaScheduler: public mesos::Scheduler {
         snprintf(path, 512, "/singa/%d/job.conf", job_counter_);
         hdfs_overwrite(hdfs_handle_, path, job_conf_file_);
 
-        int job_count = 0;
         // launch tasks
         for (map<string, vector<mesos::TaskInfo>*>::iterator it =
             tasks_.begin(); it != tasks_.end(); ++it) {
@@ -203,8 +202,8 @@ class SingaScheduler: public mesos::Scheduler {
           newId.set_value(it->first);
           LOG(INFO) << "Launching task with offer ID = " << newId.value();
           driver->launchTasks(newId, *(it->second));
-          job_count++;
-          if (job_count>= job_conf_.cluster().nworker_groups())
+          task_counter_++;
+          if (task_counter_>= job_conf_.cluster().nworker_groups())
             break;
         }
 
@@ -220,7 +219,11 @@ class SingaScheduler: public mesos::Scheduler {
     virtual void statusUpdate(SchedulerDriver* driver,
         const mesos::TaskStatus& status) {
       if (status.state() == mesos::TASK_FINISHED)
+        task_counter_--; 
+
+      if (task_counter_ == 0)
         driver->stop();
+
       else if (status.state() == mesos::TASK_FAILED) {
         LOG(ERROR) << "TASK FAILED !!!!";
         driver->abort();
@@ -354,7 +357,7 @@ class SingaScheduler: public mesos::Scheduler {
       return job_conf_.cluster().server_worker_separate()? ncpus >= (n1+n2) : 
ncpus >= (n1 > n2 ? n1 : n2);
     }
 
-    int job_counter_;
+    int job_counter_, task_counter_;
 
     // true if the job has been launched
     bool is_running_;
@@ -373,6 +376,7 @@ class SingaScheduler: public mesos::Scheduler {
 
 int main(int argc, char** argv) {
   FLAGS_logtostderr = 1;
+  google::InitGoogleLogging(argv[0]); 
   int status = mesos::DRIVER_RUNNING;
   SingaScheduler *scheduler;
   if (!(argc == 2 || argc == 4 || argc == 6)) {

Reply via email to