SINGA-11 Start SINGA on Apache Mesos

Refined Makefile to use relative path to SINGA header files.

Removed singa.conf file from the directory. The file can be found on SINGA's 
top directory.

Refined singa_scheduler.cc to following the project's coding style.


Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/96c440ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/96c440ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/96c440ff

Branch: refs/heads/master
Commit: 96c440ffc0106a8be6b93e86a864854abe7b7d58
Parents: f0e1629
Author: Anh Dinh <[email protected]>
Authored: Tue Oct 20 11:08:05 2015 +0800
Committer: Anh Dinh <[email protected]>
Committed: Tue Oct 20 11:41:40 2015 +0800

----------------------------------------------------------------------
 tool/mesos/Makefile           |   2 +-
 tool/mesos/singa.conf         |   7 -
 tool/mesos/singa_scheduler.cc | 717 ++++++++++++++++++-------------------
 3 files changed, 351 insertions(+), 375 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/96c440ff/tool/mesos/Makefile
----------------------------------------------------------------------
diff --git a/tool/mesos/Makefile b/tool/mesos/Makefile
index 09b250b..d64fb5b 100644
--- a/tool/mesos/Makefile
+++ b/tool/mesos/Makefile
@@ -1,4 +1,4 @@
-CXX_FLAGS=-I ../include -std=c++11 -I /usr/local/include/hdfs -I 
/root/incubator-singa/include
+CXX_FLAGS=-I ../include -std=c++11 -I /usr/local/include/hdfs -I ../../include
 LD_FLAGS=-lmesos -lsinga -lhdfs3
 EXE=scheduler
 OBJS=singa_scheduler.o scheduler.pb.o

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/96c440ff/tool/mesos/singa.conf
----------------------------------------------------------------------
diff --git a/tool/mesos/singa.conf b/tool/mesos/singa.conf
deleted file mode 100644
index f49e689..0000000
--- a/tool/mesos/singa.conf
+++ /dev/null
@@ -1,7 +0,0 @@
-# point to your active zookeeper service
-# this is comma separated host:port pairs, each corresponding to a zk server
-# e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zookeeper_host: "node0:2181"
-
-# set if you want to change log directory
-log_dir: "/tmp/singa-log/"

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/96c440ff/tool/mesos/singa_scheduler.cc
----------------------------------------------------------------------
diff --git a/tool/mesos/singa_scheduler.cc b/tool/mesos/singa_scheduler.cc
index 86a4ded..2474872 100644
--- a/tool/mesos/singa_scheduler.cc
+++ b/tool/mesos/singa_scheduler.cc
@@ -18,21 +18,19 @@
 * under the License.
 *
 *************************************************************/
-
+#include "singa/proto/job.pb.h"
+#include "singa/proto/singa.pb.h"
+#include "./scheduler.pb.h"
+#include "singa/utils/common.h"
 #include <stdio.h>
-#include <glog/logging.h>
 #include <stdlib.h>
-#include <mesos/scheduler.hpp>
 #include <string>
 #include <iostream>
 #include <fstream>
 #include <hdfs.h>
-#include "singa/proto/job.pb.h"
-#include "singa/proto/singa.pb.h"
-#include "scheduler.pb.h"
-#include "singa/utils/common.h"
+#include <mesos/scheduler.hpp>
 #include <google/protobuf/text_format.h>
-
+#include <glog/logging.h>
 /**
  * \file singa_scheduler.cc implements a framework for managing SINGA jobs.
  *  
@@ -71,369 +69,354 @@ using mesos::SchedulerDriver;
 using std::vector;
 using std::map;
 
-string usage = "       singa_scheduler <job_conf> [-scheduler_conf 
global_config] [-singa_conf singa_config] \n"
-                                                        "        job_conf: job 
configuration file\n"
-                                                        "    -scheduler_conf: 
optional, system-wide configuration file\n"
-                                                        "    -singa_conf: 
optional, singa global configuration file\n"; 
-
-string SINGA_CONFIG = "/singa/singa.conf"; 
-string DEFAULT_SCHEDULER_CONF = "scheduler.conf";
+const char usage[] = " singa_scheduler <job_conf> [-scheduler_conf 
global_config] [-singa_conf singa_config] \n"
+                      " job_conf: job configuration file\n"
+                      " -scheduler_conf: optional, system-wide configuration 
file\n"
+                      " -singa_conf: optional, singa global configuration 
file\n";
 
+const char SINGA_CONFIG[] = "/singa/singa.conf";
+const char DEFAULT_SCHEDULER_CONF[] = "scheduler.conf";
 class SingaScheduler: public mesos::Scheduler {
-       public:
-
-               /**
-                * Constructor, used when [sing_conf] is not given. Raise error 
if /singa/singa.conf is not found
-                * on HDFS. 
-                *
-                * @param namenode      address of HDFS namenode
-                * @param job_conf_file job configuration file
-                * @param jc            job counter
-                */
-               SingaScheduler(string namenode, string job_conf_file, int jc): 
-                       job_conf_file_(job_conf_file), nhosts_(0), 
namenode_(namenode), is_running_(false), job_counter_(jc){
-
-                               hdfs_handle_ = hdfs_connect(namenode.c_str()); 
-                               if (hdfs_handle_){
-                                       if 
(hdfsExists(hdfs_handle_,SINGA_CONFIG.c_str())!=0)
-                                               LOG(ERROR) << SINGA_CONFIG << " 
is not found on HDFS. Please use -singa_conf flag to upload the file"; 
-                               }
-                               else
-                                       LOG(ERROR) << "Failed to connect to 
HDFS"; 
-
-                               ReadProtoFromTextFile(job_conf_file_.c_str(), 
&job_conf_);      
-                               
-                       }
-
-               /**
-                * Constructor. It overwrites /singa/singa.conf on HDFS 
(created a new one if necessary).  
-                * The file contains zookeeper_host and log_dir values
-                * It also parses the JobProto from job_config file
-                *
-                * @param namenode      address of HDFS namenode
-                * @param singa_conf    singa global configuration file
-                * @param job_conf_file job configuration file
-                */
-               SingaScheduler(string namenode, string job_conf_file, string 
singa_conf, int jc)
-                       : job_conf_file_(job_conf_file), nhosts_(0), 
namenode_(namenode), is_running_(false), job_counter_(jc){
-
-                               hdfs_handle_ = hdfs_connect(namenode); 
-                               if (!hdfs_handle_ || 
!hdfs_overwrite(hdfs_handle_, SINGA_CONFIG, singa_conf))
-                                       LOG(ERROR) << "Failed to connect to 
HDFS"; 
-                       
-                               ReadProtoFromTextFile(job_conf_file_.c_str(), 
&job_conf_); 
-                       }
-               virtual void registered(SchedulerDriver *driver,
-                               const mesos::FrameworkID& frameworkId,
-                               const mesos::MasterInfo& masterInfo) {
-               }
-
-               virtual void reregistered(SchedulerDriver *driver,
-                               const mesos::MasterInfo& masterInfo) {
-               }
-
-               virtual void disconnected(SchedulerDriver *driver) {
-               }
-
-               /**
-                * Handle resource offering from Mesos scheduler. It implements 
the simple/naive
-                * scheduler:
-                * + For each offer that contains enough CPUs, adds new tasks 
to the list
-                * + Launch all the tasks when reaching the required number of 
tasks (nworkers_groups + nserver_groups). 
-                */
-               virtual void resourceOffers(SchedulerDriver* driver,    const 
-                               std::vector<mesos::Offer>& offers) {
-
-                       // do nothing if the task is already running
-                       if (is_running_)
-                               return; 
-
-                       for (int i = 0; i < offers.size(); i++) {
-                               const mesos::Offer offer = offers[i];
-
-                               // check for resource and create temporary 
tasks 
-                               int cpus = 0, mem = 0;
-                               int nresources = offer.resources().size();
-
-                               for (int r = 0; r < nresources; r++) {
-                                       const mesos::Resource& resource = 
offer.resources(r);
-                                       if (resource.name() == "cpus"
-                                                       && resource.type() == 
mesos::Value::SCALAR)
-                                               cpus = 
resource.scalar().value();
-                                       else if (resource.name() == "mem"
-                                                       && resource.type() == 
mesos::Value::SCALAR)
-                                               mem = resource.scalar().value();
-                               }
-
-                               if (!check_resources(cpus)) 
-                                       break; 
-
-                               vector<mesos::TaskInfo> *new_tasks = new 
vector<mesos::TaskInfo>();
-                               mesos::TaskInfo task;
-                               task.set_name("SINGA");
-
-                               char string_id[100];
-                               sprintf(string_id, "SINGA_%d", nhosts_); 
-                               task.mutable_task_id()->set_value(string_id);
-                               
task.mutable_slave_id()->MergeFrom(offer.slave_id());
-
-                               mesos::Resource *resource = 
task.add_resources();
-                               resource->set_name("cpus");
-                               resource->set_type(mesos::Value::SCALAR);
-                               // take only nworkers_per_group CPUs
-                               
resource->mutable_scalar()->set_value(job_conf_.cluster().nworkers_per_group());
-
-                               resource = task.add_resources();
-                               resource->set_name("mem");
-                               resource->set_type(mesos::Value::SCALAR);
-                               // take all the memory
-                               resource->mutable_scalar()->set_value(mem);
-
-                               // store in temporary map
-                               new_tasks->push_back(task);
-                               tasks_[offer.id().value()] = new_tasks;
-
-                               nhosts_++;
-                       }
-
-                       if (nhosts_>= job_conf_.cluster().nworker_groups()){
-                               LOG(INFO) << "Acquired enough resources: " 
-                                       << 
job_conf_.cluster().nworker_groups()*job_conf_.cluster().nworkers_per_group()
-                                       << " CPUs over " << 
job_conf_.cluster().nworker_groups() << " hosts. Launching tasks ... "; 
-
-                               //write job_conf_file_ to /singa/job_id/job.conf
-                               char path[512];
-                               sprintf(path, 
"/singa/%d/job.conf",job_counter_); 
-                               hdfs_overwrite(hdfs_handle_, path, 
job_conf_file_); 
-
-                               int job_count=0; 
-                               // launch tasks
-                               for (map<string, 
vector<mesos::TaskInfo>*>::iterator it =
-                                               tasks_.begin(); it != 
tasks_.end(); ++it) {
-                                       prepare_tasks(it->second, job_counter_, 
path);  
-                                       mesos::OfferID newId;
-                                       newId.set_value(it->first);
-                                       LOG(INFO) << "Launching task with offer 
ID = " << newId.value(); 
-                                       driver->launchTasks(newId, 
*(it->second));
-                                       job_count++;
-                                       if (job_count>= 
job_conf_.cluster().nworker_groups())
-                                               break;
-                               }
-
-                               job_counter_++; 
-                               is_running_ = true; 
-                       }
-               }
-
-               virtual void offerRescinded(SchedulerDriver *driver,
-                               const mesos::OfferID& offerId) {
-               }
-
-               virtual void statusUpdate(SchedulerDriver* driver,
-                               const mesos::TaskStatus& status) {
-                       if (status.state() == mesos::TASK_FINISHED)
-                               driver->stop();
-                       else if (status.state() == mesos::TASK_FAILED){
-                               LOG(ERROR) << "TASK FAILED !!!!"; 
-                               driver->abort();
-                       }
-               }
-
-               virtual void frameworkMessage(SchedulerDriver* driver,
-                               const mesos::ExecutorID& executorId, const 
mesos::SlaveID& slaveId,
-                               const string& data) {
-               }
-
-               virtual void slaveLost(SchedulerDriver* driver,
-                               const mesos::SlaveID& slaveId) {
-               }
-
-               virtual void executorLost(SchedulerDriver* driver,
-                               const mesos::ExecutorID& executorId, const 
mesos::SlaveID& slaveId,
-                               int status) {
-               }
-
-               virtual void error(SchedulerDriver* driver, const string& 
message) {
-                       LOG(ERROR) << "ERROR !!! " << message;
-               }
-
-       private:
-
-               /**
-                * Helper function that initialize TaskInfo with the correct 
URI and command
-                */
-               void prepare_tasks(vector<mesos::TaskInfo> *tasks, int job_id, 
string job_conf){
-                       char path_sys_config[512], path_job_config[512];
-                       //path to singa.conf
-                       sprintf(path_sys_config, "hdfs://%s%s", 
namenode_.c_str(), SINGA_CONFIG.c_str()); 
-                       sprintf(path_job_config, "hdfs://%s%s", 
namenode_.c_str(), job_conf.c_str()); 
-
-                       char command[512];
-                       sprintf(command, "singa -conf ./job.conf -singa_conf 
./singa.conf -singa_job %d", job_id); 
-
-                       for (int i=0; i<tasks->size(); i++){
-                               mesos::CommandInfo *comm = 
(tasks->at(i)).mutable_command();
-                               comm->add_uris()->set_value(path_sys_config); 
-                               comm->add_uris()->set_value(path_job_config); 
-                               comm->set_value(command); 
-                       }
-               }
-
-               /**
-                * Helper function to connect to HDFS
-                */
-               hdfsFS hdfs_connect(string namenode){
-                       string path(namenode);
-                       int idx = path.find_first_of(":"); 
-                       string host=path.substr(0,idx);
-                       int port = atoi(path.substr(idx+1).c_str()); 
-                       return hdfsConnect(host.c_str(), port); 
-               }
-
-               /**
-                * Helper function to read HDFS file content into a string. 
-                * It assumes the file exists. 
-                * @return NULL if there's error. 
-                */
-               string hdfs_read(hdfsFS hdfs_handle, string filename){
-                       hdfsFileInfo* stat = hdfsGetPathInfo(hdfs_handle, 
filename.c_str()); 
-                       char buffer[stat->mSize]; 
-                       hdfsFile file = hdfsOpenFile(hdfs_handle, 
filename.c_str(), O_RDONLY, 0, 0, 0); 
-                       int status = hdfsRead(hdfs_handle, file, buffer, 
stat->mSize); 
-                       hdfsFreeFileInfo(stat,1); 
-                       hdfsCloseFile(hdfs_handle, file); 
-                       if (status!=-1)
-                               return string(buffer); 
-                       else
-                               return NULL; 
-               }
-
-               /**
-                * Helper function that write content of source_file to 
filename, overwritting the latter 
-                * if it exists. 
-                * @return 1 if sucessfull, 0 if fail. 
-                */
-               int hdfs_overwrite(hdfsFS hdfs_handle, string filename, string 
source_file){
-                       hdfsFile file; 
-                       if (hdfsExists(hdfs_handle, filename.c_str())==0){
-                               file = hdfsOpenFile(hdfs_handle, 
filename.c_str(), O_WRONLY, 0, 0, 0); 
-                       }
-                       else{
-                               //create directory and file
-                               int last_idx = filename.find_last_of("/"); 
-                               string dir = filename.substr(0,last_idx); 
-                               hdfsCreateDirectory(hdfs_handle, dir.c_str()); 
-                               file = hdfsOpenFile(hdfs_handle, 
filename.c_str(), O_WRONLY, 0, 0, 0); 
-                       }
-
-                       FILE *fh = fopen(source_file.c_str(), "r"); 
-                       if (!fh){
-                               LOG(ERROR) << "Cannot open " << source_file; 
-                               return 0; 
-                       }
-
-                       if (file){
-                               fseek(fh,0,SEEK_END); 
-                               int len = ftell(fh); 
-                               rewind(fh); 
-                               char buf[len];
-                               fread(buf,len,1,fh);
-                               fclose(fh);
-
-                               hdfsWrite(hdfs_handle, file, buf, len); 
-                               hdfsFlush(hdfs_handle, file); 
-                               hdfsCloseFile(hdfs_handle, file); 
-                       }
-                       else{
-                               LOG(ERROR) << "ERROR openng file on HDFS " << 
filename; 
-                               return 0; 
-                       }
-
-                       return 1; 
-               }
-
-               /**
-                * Helper function, check if the offered CPUs satisfies the 
resource requirements
-                * @param ncpus:        number of cpus offer at this host
-                * @return true         when ncpus >= (nWorkersPerProcess + 
nServersPerProcess) if workers and servers are separated
-                *                                                              
or when cpus >= max(nWorkersPerProcess, nServersPerProcess) if they are not. 
-                */
-               bool check_resources(int ncpus){
-                       int n1 = job_conf_.cluster().nworkers_per_procs();
-                       int n2 = job_conf_.cluster().nservers_per_procs();
-                       LOG(INFO) << "n1 = " << n1 << " n2 = " << n2 << " ncpus 
= " << ncpus; 
-                       return job_conf_.cluster().server_worker_separate()? 
ncpus >= (n1+n2) 
-                                                                               
                                        : ncpus >= (n1>n2 ? n1 : n2); 
-               }
-
-               int job_counter_; 
-
-               // true if the job has been launched
-               bool is_running_; 
-               singa::JobProto job_conf_;      
-               // total number of hosts required
-               int nhosts_;
-               // temporary map of tasks: <offerID, TaskInfo> 
-               map<string, vector<mesos::TaskInfo>*> tasks_;
-               // SINGA job config file
-               string job_conf_file_; 
-               // HDFS namenode
-               string namenode_; 
-               // handle to HDFS
-               hdfsFS hdfs_handle_;
+ public:
+    /**
+     * Constructor, used when [sing_conf] is not given. Raise error if 
/singa/singa.conf is not found
+     * on HDFS. 
+     *
+     * @param namenode address of HDFS namenode
+     * @param job_conf_file    job configuration file
+     * @param jc               job counter
+     */
+    SingaScheduler(string namenode, string job_conf_file, int jc):
+      job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), 
is_running_(false), job_counter_(jc) {
+        hdfs_handle_ = hdfs_connect(namenode.c_str());
+        if (hdfs_handle_) {
+          if (hdfsExists(hdfs_handle_, SINGA_CONFIG) != 0)
+            LOG(ERROR) << SINGA_CONFIG << " is not found on HDFS. Please use 
-singa_conf flag to upload the file";
+        } else {
+          LOG(ERROR) << "Failed to connect to HDFS";
+        }
+        ReadProtoFromTextFile(job_conf_file_.c_str(), &job_conf_);
+    }
+    /**
+     * Constructor. It overwrites /singa/singa.conf on HDFS (created a new one 
if necessary).  
+     * The file contains zookeeper_host and log_dir values
+     * It also parses the JobProto from job_config file
+     *
+     * @param namenode address of HDFS namenode
+     * @param singa_conf       singa global configuration file
+     * @param job_conf_file    job configuration file
+     */
+    SingaScheduler(string namenode, string job_conf_file, string singa_conf, 
int jc)
+      : job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), 
is_running_(false), job_counter_(jc) {
+        hdfs_handle_ = hdfs_connect(namenode);
+        if (!hdfs_handle_ || !hdfs_overwrite(hdfs_handle_, SINGA_CONFIG, 
singa_conf))
+          LOG(ERROR) << "Failed to connect to HDFS";
+
+        ReadProtoFromTextFile(job_conf_file_.c_str(), &job_conf_);
+      }
+    virtual void registered(SchedulerDriver *driver,
+        const mesos::FrameworkID& frameworkId,
+        const mesos::MasterInfo& masterInfo) {
+    }
+
+    virtual void reregistered(SchedulerDriver *driver,
+        const mesos::MasterInfo& masterInfo) {
+    }
+
+    virtual void disconnected(SchedulerDriver *driver) {
+    }
+
+    /**
+     * Handle resource offering from Mesos scheduler. It implements the 
simple/naive
+     * scheduler:
+     * + For each offer that contains enough CPUs, adds new tasks to the list
+     * + Launch all the tasks when reaching the required number of tasks 
(nworkers_groups + nserver_groups). 
+     */
+    virtual void resourceOffers(SchedulerDriver* driver, const 
std::vector<mesos::Offer>& offers) {
+      // do nothing if the task is already running
+      if (is_running_)
+        return;
+
+      for (int i = 0; i < offers.size(); i++) {
+        const mesos::Offer offer = offers[i];
+        // check for resource and create temporary tasks
+        int cpus = 0, mem = 0;
+        int nresources = offer.resources().size();
+
+        for (int r = 0; r < nresources; r++) {
+          const mesos::Resource& resource = offer.resources(r);
+          if (resource.name() == "cpus"
+              && resource.type() == mesos::Value::SCALAR)
+            cpus = resource.scalar().value();
+          else if (resource.name() == "mem"
+              && resource.type() == mesos::Value::SCALAR)
+            mem = resource.scalar().value();
+        }
+
+        if (!check_resources(cpus))
+          break;
+
+        vector<mesos::TaskInfo> *new_tasks = new vector<mesos::TaskInfo>();
+        mesos::TaskInfo task;
+        task.set_name("SINGA");
+
+        char string_id[100];
+        snprintf(string_id, 100, "SINGA_%d", nhosts_);
+        task.mutable_task_id()->set_value(string_id);
+        task.mutable_slave_id()->MergeFrom(offer.slave_id());
+
+        mesos::Resource *resource = task.add_resources();
+        resource->set_name("cpus");
+        resource->set_type(mesos::Value::SCALAR);
+        // take only nworkers_per_group CPUs
+        
resource->mutable_scalar()->set_value(job_conf_.cluster().nworkers_per_group());
+
+        resource = task.add_resources();
+        resource->set_name("mem");
+        resource->set_type(mesos::Value::SCALAR);
+        // take all the memory
+        resource->mutable_scalar()->set_value(mem);
+
+        // store in temporary map
+        new_tasks->push_back(task);
+        tasks_[offer.id().value()] = new_tasks;
+
+        nhosts_++;
+      }
+
+      if (nhosts_>= job_conf_.cluster().nworker_groups()) {
+        LOG(INFO) << "Acquired enough resources: "
+          << 
job_conf_.cluster().nworker_groups()*job_conf_.cluster().nworkers_per_group()
+          << " CPUs over " << job_conf_.cluster().nworker_groups() << " hosts. 
Launching tasks ... ";
+
+        // write job_conf_file_ to /singa/job_id/job.conf
+        char path[512];
+        snprintf(path, 512, "/singa/%d/job.conf", job_counter_);
+        hdfs_overwrite(hdfs_handle_, path, job_conf_file_);
+
+        int job_count = 0;
+        // launch tasks
+        for (map<string, vector<mesos::TaskInfo>*>::iterator it =
+            tasks_.begin(); it != tasks_.end(); ++it) {
+          prepare_tasks(it->second, job_counter_, path);
+          mesos::OfferID newId;
+          newId.set_value(it->first);
+          LOG(INFO) << "Launching task with offer ID = " << newId.value();
+          driver->launchTasks(newId, *(it->second));
+          job_count++;
+          if (job_count>= job_conf_.cluster().nworker_groups())
+            break;
+        }
+
+        job_counter_++;
+        is_running_ = true;
+      }
+    }
+
+    virtual void offerRescinded(SchedulerDriver *driver,
+        const mesos::OfferID& offerId) {
+    }
+
+    virtual void statusUpdate(SchedulerDriver* driver,
+        const mesos::TaskStatus& status) {
+      if (status.state() == mesos::TASK_FINISHED)
+        driver->stop();
+      else if (status.state() == mesos::TASK_FAILED) {
+        LOG(ERROR) << "TASK FAILED !!!!";
+        driver->abort();
+      }
+    }
+
+    virtual void frameworkMessage(SchedulerDriver* driver,
+        const mesos::ExecutorID& executorId, const mesos::SlaveID& slaveId,
+        const string& data) {
+    }
+
+    virtual void slaveLost(SchedulerDriver* driver,
+        const mesos::SlaveID& slaveId) {
+    }
+
+    virtual void executorLost(SchedulerDriver* driver,
+        const mesos::ExecutorID& executorId, const mesos::SlaveID& slaveId,
+        int status) {
+    }
+
+    virtual void error(SchedulerDriver* driver, const string& message) {
+      LOG(ERROR) << "ERROR !!! " << message;
+    }
+
+ private:
+    /**
+     * Helper function that initialize TaskInfo with the correct URI and 
command
+     */
+    void prepare_tasks(vector<mesos::TaskInfo> *tasks, int job_id, string 
job_conf) {
+      char path_sys_config[512], path_job_config[512];
+      // path to singa.conf
+      snprintf(path_sys_config, 512, "hdfs://%s%s", namenode_.c_str(), 
SINGA_CONFIG);
+      snprintf(path_job_config, 512, "hdfs://%s%s", namenode_.c_str(), 
job_conf.c_str());
+
+      char command[512];
+      snprintf(command, 512, "singa -conf ./job.conf -singa_conf ./singa.conf 
-singa_job %d", job_id);
+
+      for (int i=0; i < tasks->size(); i++) {
+        mesos::CommandInfo *comm = (tasks->at(i)).mutable_command();
+        comm->add_uris()->set_value(path_sys_config);
+        comm->add_uris()->set_value(path_job_config);
+        comm->set_value(command);
+      }
+    }
+
+    /**
+     * Helper function to connect to HDFS
+     */
+    hdfsFS hdfs_connect(string namenode) {
+      string path(namenode);
+      int idx = path.find_first_of(":");
+      string host = path.substr(0, idx);
+      int port = atoi(path.substr(idx+1).c_str());
+      return hdfsConnect(host.c_str(), port);
+    }
+
+    /**
+     * Helper function to read HDFS file content into a string. 
+     * It assumes the file exists. 
+     * @return NULL if there's error. 
+     */
+    string hdfs_read(hdfsFS hdfs_handle, string filename) {
+      hdfsFileInfo* stat = hdfsGetPathInfo(hdfs_handle, filename.c_str());
+      int file_size = stat->mSize;
+      string buffer;
+      buffer.resize(file_size);
+
+      hdfsFile file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_RDONLY, 0, 
0, 0);
+      int status = hdfsRead(hdfs_handle, file, 
const_cast<char*>(buffer.c_str()), stat->mSize);
+      hdfsFreeFileInfo(stat, 1);
+      hdfsCloseFile(hdfs_handle, file);
+      if (status != -1)
+        return string(buffer);
+      else
+        return NULL;
+    }
+
+    /**
+     * Helper function that write content of source_file to filename, 
overwritting the latter 
+     * if it exists. 
+     * @return 1 if sucessfull, 0 if fail. 
+     */
+    int hdfs_overwrite(hdfsFS hdfs_handle, string filename, string 
source_file) {
+      hdfsFile file;
+      if (hdfsExists(hdfs_handle, filename.c_str()) == 0) {
+        file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_WRONLY, 0, 0, 0);
+      } else {
+        // create directory and file
+        int last_idx = filename.find_last_of("/");
+        string dir = filename.substr(0, last_idx);
+        hdfsCreateDirectory(hdfs_handle, dir.c_str());
+        file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_WRONLY, 0, 0, 0);
+      }
+
+      FILE *fh = fopen(source_file.c_str(), "r");
+      if (!fh) {
+        LOG(ERROR) << "Cannot open " << source_file;
+        return 0;
+      }
+
+      if (file) {
+        fseek(fh, 0, SEEK_END);
+        int len = ftell(fh);
+        rewind(fh);
+        string buf;
+        buf.resize(len);
+        fread(const_cast<char*>(buf.c_str()), len, 1, fh);
+        fclose(fh);
+
+        hdfsWrite(hdfs_handle, file, buf.c_str(), len);
+        hdfsFlush(hdfs_handle, file);
+        hdfsCloseFile(hdfs_handle, file);
+      } else {
+        LOG(ERROR) << "ERROR openng file on HDFS " << filename;
+        return 0;
+      }
+
+      return 1;
+    }
+
+    /**
+     * Helper function, check if the offered CPUs satisfies the resource 
requirements
+     * @param ncpus:   number of cpus offer at this host
+     * @return true            when ncpus >= (nWorkersPerProcess + 
nServersPerProcess) if workers and servers are separated
+     *                                                         or when cpus >= 
max(nWorkersPerProcess, nServersPerProcess) if they are not. 
+     */
+    bool check_resources(int ncpus) {
+      int n1 = job_conf_.cluster().nworkers_per_procs();
+      int n2 = job_conf_.cluster().nservers_per_procs();
+      LOG(INFO) << "n1 = " << n1 << " n2 = " << n2 << " ncpus = " << ncpus;
+      return job_conf_.cluster().server_worker_separate()? ncpus >= (n1+n2) : 
ncpus >= (n1 > n2 ? n1 : n2);
+    }
+
+    int job_counter_;
+
+    // true if the job has been launched
+    bool is_running_;
+    singa::JobProto job_conf_;
+    // total number of hosts required
+    int nhosts_;
+    // temporary map of tasks: <offerID, TaskInfo>
+    map<string, vector<mesos::TaskInfo>*> tasks_;
+    // SINGA job config file
+    string job_conf_file_;
+    // HDFS namenode
+    string namenode_;
+    // handle to HDFS
+    hdfsFS hdfs_handle_;
 };
 
 int main(int argc, char** argv) {
-       FLAGS_logtostderr = 1;
-
-       int status = mesos::DRIVER_RUNNING;
-       SingaScheduler *scheduler; 
-
-       if (!(argc==2 || argc==4 || argc==6)){
-               std::cout << usage << std::endl; 
-               return 1; 
-       }
-
-       int scheduler_conf_idx=0;
-       int singa_conf_idx=0; 
-       for (int i=1; i<argc-1; i++){
-               if (strcmp(argv[i],"-scheduler_conf")==0)
-                       scheduler_conf_idx=i+1;
-               if (strcmp(argv[i],"-singa_conf")==0)
-                       singa_conf_idx=i+1; 
-       }
-
-       SchedulerProto msg; 
-       
-       if (scheduler_conf_idx)
-               singa::ReadProtoFromTextFile((const 
char*)argv[scheduler_conf_idx], &msg);      
-       else
-               singa::ReadProtoFromTextFile(DEFAULT_SCHEDULER_CONF.c_str(), 
&msg);     
-
-       if (!singa_conf_idx)
-               scheduler = new SingaScheduler(msg.namenode(), string(argv[1]), 
msg.job_counter());
-       else
-               scheduler = new SingaScheduler(msg.namenode(),string(argv[1]), 
string(argv[singa_conf_idx]), msg.job_counter());        
-
-       msg.set_job_counter(msg.job_counter()+1); 
-       if (scheduler_conf_idx)
-               singa::WriteProtoToTextFile(msg, (const 
char*)argv[scheduler_conf_idx]);        
-       else
-               singa::WriteProtoToTextFile(msg, 
DEFAULT_SCHEDULER_CONF.c_str());       
-
-       LOG(INFO) << "Scheduler initialized"; 
-       mesos::FrameworkInfo framework;
-       framework.set_user("");
-       framework.set_name("SINGA");
-
-       SchedulerDriver *driver = new mesos::MesosSchedulerDriver(scheduler, 
framework,
-                       msg.master().c_str());
-       LOG(INFO) << "Starting SINGA framework...";
-       status = driver->run();
-       driver->stop();
-       LOG(INFO) << "Stoping SINGA framework..."; 
-       
-               
-       return status == mesos::DRIVER_STOPPED ? 0 : 1;
+  FLAGS_logtostderr = 1;
+  int status = mesos::DRIVER_RUNNING;
+  SingaScheduler *scheduler;
+  if (!(argc == 2 || argc == 4 || argc == 6)) {
+    std::cout << usage << std::endl;
+    return 1;
+  }
+
+  int scheduler_conf_idx = 0;
+  int singa_conf_idx = 0;
+  for (int i=1; i < argc-1; i++) {
+    if (strcmp(argv[i], "-scheduler_conf") == 0)
+      scheduler_conf_idx = i+1;
+    if (strcmp(argv[i], "-singa_conf") == 0)
+      singa_conf_idx = i+1;
+  }
+
+  SchedulerProto msg;
+  if (scheduler_conf_idx)
+    singa::ReadProtoFromTextFile((const char*)argv[scheduler_conf_idx], &msg);
+  else
+    singa::ReadProtoFromTextFile(DEFAULT_SCHEDULER_CONF, &msg);
+
+  if (!singa_conf_idx)
+    scheduler = new SingaScheduler(msg.namenode(), string(argv[1]), 
msg.job_counter());
+  else
+    scheduler = new SingaScheduler(msg.namenode(), string(argv[1]), 
string(argv[singa_conf_idx]), msg.job_counter());
+
+  msg.set_job_counter(msg.job_counter()+1);
+  if (scheduler_conf_idx)
+    singa::WriteProtoToTextFile(msg, (const char*)argv[scheduler_conf_idx]);
+  else
+    singa::WriteProtoToTextFile(msg, DEFAULT_SCHEDULER_CONF);
+
+  LOG(INFO) << "Scheduler initialized";
+  mesos::FrameworkInfo framework;
+  framework.set_user("");
+  framework.set_name("SINGA");
+
+  SchedulerDriver *driver = new mesos::MesosSchedulerDriver(scheduler, 
framework, msg.master().c_str());
+  LOG(INFO) << "Starting SINGA framework...";
+  status = driver->run();
+  driver->stop();
+  LOG(INFO) << "Stoping SINGA framework...";
+
+  return status == mesos::DRIVER_STOPPED ? 0 : 1;
 }
 

Reply via email to