Repository: incubator-singa
Updated Branches:
refs/heads/master 316c65bcd -> 5d076e526
SINGA-11 Start SINGA on Apache Mesos
Apache Mesos is a fine-grained cluster management framework which enables
resource sharing in the same cluster. Mesos abstracts out the physical
configurations of cluster nodes, and presents resources to the users in the
form of "offers". SINGA uses Mesos for two purposes:
1. To acquire necessary resources for training the model.
2. To launch and monitor progress of the training task.
To this end, we implement a "SINGA Scheduler" which interacts with Mesos
master. The scheduler assumes that SINGA has been installed at the Mesos slave
nodes. The scheduler is called when the user wants to start a new SINGA job,
and it performs the following steps:
Step 1. Read the job configuration file to determine necessary resources in
terms of CPUs, memory and storage.
Step 2. Wait for resource offers from the Mesos master.
Step 3. Determine if the offers meet the requirement of resources.
Step 4. Prepare the task to launch at each slave:
+ Deliver the job configuration file to the slave node.
+ Specify the command to run on the slave: "singa
-conf ./job.conf"
Step 5: Launch and monitor the progress
For step 3, we currently implement a simple scheme: the number of CPUs offered
by each Mesos slave exceed the total number of SINGA worker and SINGA server
per process. In other words, each Mesos slave must be able to run the entire
worker group or server group.
For step 4, we currently relies on HDFS to deliver the configuration file to
each slave. Particularly, we write the file to a known directory (different for
each job) on HDFS and ask the slave to use its Fetcher utility to download the
file before executing the task.
The source code and README.md files are in the `tool/mesos` directory.
Project: http://git-wip-us.apache.org/repos/asf/incubator-singa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-singa/commit/f0e16293
Tree: http://git-wip-us.apache.org/repos/asf/incubator-singa/tree/f0e16293
Diff: http://git-wip-us.apache.org/repos/asf/incubator-singa/diff/f0e16293
Branch: refs/heads/master
Commit: f0e162937fa9aeb178d1c107bae936bce6de34d6
Parents: 497dafc
Author: Anh Dinh <[email protected]>
Authored: Thu Jun 25 08:30:46 2015 -0700
Committer: Anh Dinh <[email protected]>
Committed: Sun Oct 18 12:06:16 2015 +0800
----------------------------------------------------------------------
tool/mesos/Makefile | 25 +++
tool/mesos/README.md | 86 ++++++++
tool/mesos/scheduler.conf | 3 +
tool/mesos/scheduler.proto | 33 +++
tool/mesos/singa.conf | 7 +
tool/mesos/singa_scheduler.cc | 439 +++++++++++++++++++++++++++++++++++++
6 files changed, 593 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f0e16293/tool/mesos/Makefile
----------------------------------------------------------------------
diff --git a/tool/mesos/Makefile b/tool/mesos/Makefile
new file mode 100644
index 0000000..09b250b
--- /dev/null
+++ b/tool/mesos/Makefile
@@ -0,0 +1,25 @@
+CXX_FLAGS=-I ../include -std=c++11 -I /usr/local/include/hdfs -I
/root/incubator-singa/include
+LD_FLAGS=-lmesos -lsinga -lhdfs3
+EXE=scheduler
+OBJS=singa_scheduler.o scheduler.pb.o
+PROTOS=scheduler.proto
+PROTO_HDRS=scheduler.pb.h
+PROTO_SRCS=scheduler.pb.cc
+
+CXX=g++
+
+all: $(PROTO_HDRS) $(EXE)
+
+$(PROTO_SRCS) $(PROTO_HDRS): $(PROTOS)
+ protoc --cpp_out=. $(PROTOS)
+
+$(EXE): $(OBJS)
+ $(CXX) -o $@ $(OBJS) $(LD_FLAGS)
+ rm -rf *.o
+
+%.o: %.cc
+ $(CXX) $(CXX_FLAGS) -c $<
+
+clean:
+ rm -rf *.o $(EXE) $(PROTO_HDRS) $(PROTO_SRCS)
+
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f0e16293/tool/mesos/README.md
----------------------------------------------------------------------
diff --git a/tool/mesos/README.md b/tool/mesos/README.md
new file mode 100644
index 0000000..5330801
--- /dev/null
+++ b/tool/mesos/README.md
@@ -0,0 +1,86 @@
+This guide explains how to start SINGA distributed training on a Mesos
cluster. It assumes that both Mesos and HDFS are
+already running, and every node has SINGA installed.
+
+For this guide, we assume the following architecture, in which a cluster is
set up from a set of containers (when the
+cluster is already set up, the user can immediate [start the
training](#job_start)). Refer to [Docker
guide](https://github.com/ug93tad/incubator-singa/tree/SINGA-89/tool/docker/README.md)
for details of how to start individual nodes and set up network connection
between them (make sure
[weave](http://weave.works/guides/weave-docker-ubuntu-simple.html) is running
at each node, and the cluster's headnode is running in container `node0`)
+
+<img src="http://www.comp.nus.edu.sg/~dinhtta/files/singa_mesos.png"
align="center">
+
+## Start HDFS and Mesos
+Go inside each container, using:
+````
+docker exec -it nodeX /bin/bash
+````
+and configure it as follows:
+
+* On container `node0`
+
+ hadoop namenode -format
+ hadoop-daemon.sh start namenode
+ /opt/mesos-0.22.0/build/bin/mesos-master.sh --work_dir=/opt
--log_dir=/opt --quiet > /dev/null &
+ zk-service.sh start
+
+* On container `node1, node2, ...`
+
+ hadoop-daemon.sh start datanode
+ /opt/mesos-0.22.0/build/bin/mesos-slave.sh --master=node0:5050
--log_dir=/opt --quiet > /dev/null &
+
+To check if the setup has been successful, check that HDFS namenode has
registered `N` datanodes, via:
+
+````
+hadoop dfsadmin -report
+````
+
+#### Mesos logs
+Mesos logs are stored at `/opt/lt-mesos-master.INFO` on `node0` and
`/opt/lt-mesos-slave.INFO` at other nodes.
+
+---
+
+## Starting SINGA training on Mesos
+Assumed that Mesos and HDFS are already started, SINGA job can be launched at
**any** container.
+
+#### Launching job
+
+1. Log in to any container, then
+
+ cd incubator-singa/tool/mesos
+
+<a name="job_start"></a>
+2. Check that configuration files are correct:
+ + `scheduler.conf` contains information about the master nodes
+ + `singa.conf` contains information about Zookeeper node
+ + Job configuration file `job.conf` **contains full path to the examples
directories (NO RELATIVE PATH!).**
+
+3. Start the job:
+ + If starting for the first time:
+
+ ./scheduler <job config file> -scheduler_conf <scheduler
config file> -singa_conf <SINGA config file>
+
+ + If not the first time:
+
+ ./scheduler <job config file>
+
+**Notes.** Each running job is given a `frameworkID`. Look for the log message
of the form:
+
+ Framework registered with XXX-XXX-XXX-XXX-XXX-XXX
+
+#### Monitoring and Debugging
+
+Each Mesos job is given a `frameworkID` and a *sandbox* directory is created
for each job.
+The directory is in the specified `work_dir` (or `/tmp/mesos`) by default. For
example, the error
+during SINGA execution can be found at:
+
+
/tmp/mesos/slaves/xxxxx-Sx/frameworks/xxxxx/executors/SINGA_x/runs/latest/stderr
+
+Other artifacts, like files downloaded from HDFS (`job.conf`) and `stdout` can
be found in the same
+directory.
+
+#### Stopping
+
+There are two way to kill the running job:
+
+1. If the scheduler is running in the foreground, simply kill it (using
`Ctrl-C`, for example).
+2. If the scheduler is running in the background, kill it using Mesos's REST
API:
+
+ curl -d "frameworkId=XXX-XXX-XXX-XXX-XXX-XXX" -X POST
http://<master>/master/shutdown
+
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f0e16293/tool/mesos/scheduler.conf
----------------------------------------------------------------------
diff --git a/tool/mesos/scheduler.conf b/tool/mesos/scheduler.conf
new file mode 100644
index 0000000..1ab5fba
--- /dev/null
+++ b/tool/mesos/scheduler.conf
@@ -0,0 +1,3 @@
+namenode: "node0:9000"
+master: "node0:5050"
+job_counter: 5
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f0e16293/tool/mesos/scheduler.proto
----------------------------------------------------------------------
diff --git a/tool/mesos/scheduler.proto b/tool/mesos/scheduler.proto
new file mode 100644
index 0000000..322f7bb
--- /dev/null
+++ b/tool/mesos/scheduler.proto
@@ -0,0 +1,33 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+/**
+ * Protocol Buffer message containing global information for the
+ * scheduler.
+ */
+message SchedulerProto{
+ // HDFS namenode, for example node0:9000
+ required string namenode=1;
+ // Mesos master node, for example node0:5050
+ required string master=2;
+ // jobID, incremented at each run
+ required int32 job_counter=3;
+}
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f0e16293/tool/mesos/singa.conf
----------------------------------------------------------------------
diff --git a/tool/mesos/singa.conf b/tool/mesos/singa.conf
new file mode 100644
index 0000000..f49e689
--- /dev/null
+++ b/tool/mesos/singa.conf
@@ -0,0 +1,7 @@
+# point to your active zookeeper service
+# this is comma separated host:port pairs, each corresponding to a zk server
+# e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zookeeper_host: "node0:2181"
+
+# set if you want to change log directory
+log_dir: "/tmp/singa-log/"
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/f0e16293/tool/mesos/singa_scheduler.cc
----------------------------------------------------------------------
diff --git a/tool/mesos/singa_scheduler.cc b/tool/mesos/singa_scheduler.cc
new file mode 100644
index 0000000..86a4ded
--- /dev/null
+++ b/tool/mesos/singa_scheduler.cc
@@ -0,0 +1,439 @@
+/************************************************************
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*************************************************************/
+
+#include <stdio.h>
+#include <glog/logging.h>
+#include <stdlib.h>
+#include <mesos/scheduler.hpp>
+#include <string>
+#include <iostream>
+#include <fstream>
+#include <hdfs.h>
+#include "singa/proto/job.pb.h"
+#include "singa/proto/singa.pb.h"
+#include "scheduler.pb.h"
+#include "singa/utils/common.h"
+#include <google/protobuf/text_format.h>
+
+/**
+ * \file singa_scheduler.cc implements a framework for managing SINGA jobs.
+ *
+ * The scheduler takes a job configuration file [file] and performs the
following:
+ * 1. Parse the config file to determine the required resources.
+ * 2. [Optional] Copy singa.conf to the HDFS: /singa/singa.conf.
+ * 2.1. Raise error if singa.conf is NOT FOUND on HDFS
+ * 3. Wait for offers from the Mesos master until enough is acquired.
+ * 4. Keep an increasing counter of job ID.
+ * 5. Write [file] to HDFS: /singa/job_ID/job.conf
+ * 6. Start the task:
+ * + Set URI in the TaskInfo message to point to the
config files on HDFS:
+ /singa/singa.conf
+ /singa/Job_ID/job.conf
+ * + Set the executable command: singa-run.sh -conf
./job.conf
+ *
+ * We assume that singa-run.sh is include in $PATH variable at all nodes.
+ * (Else, we set the executable to its full path)
+ * ./job.conf is relative path pointing to the current sandbox directory
created
+ * dynamically by Mesos.
+ *
+ *
+ * Scheduling:
+ * Each SINGA job requires certain resources represented by: (1) number of
workers, (2) number of worker groups
+ * and (3) number of worker per process. The resources offered by Mesos
contains (1) number of host, (2) number of CPUs
+ * at each host and (3) memory available at each hosts.
+ *
+ * Our scheduler performs simply task assignment which guarantees that
each process runs an entire work group,
+ * and each takes all the memory offered by the slave. We assume that each
slave runs ONE process, that is the following
+ * condition holds:
+ * nCPUs_per_host >= nWorkersPerProcess+nServersPerProcess //if
seperate
+
>= max(nWorkerPerProcess, nServersPerProcess) // if attached
+ */
+using std::string;
+using mesos::SchedulerDriver;
+using std::vector;
+using std::map;
+
+string usage = " singa_scheduler <job_conf> [-scheduler_conf
global_config] [-singa_conf singa_config] \n"
+ " job_conf: job
configuration file\n"
+ " -scheduler_conf:
optional, system-wide configuration file\n"
+ " -singa_conf:
optional, singa global configuration file\n";
+
+string SINGA_CONFIG = "/singa/singa.conf";
+string DEFAULT_SCHEDULER_CONF = "scheduler.conf";
+
+class SingaScheduler: public mesos::Scheduler {
+ public:
+
+ /**
+ * Constructor, used when [sing_conf] is not given. Raise error
if /singa/singa.conf is not found
+ * on HDFS.
+ *
+ * @param namenode address of HDFS namenode
+ * @param job_conf_file job configuration file
+ * @param jc job counter
+ */
+ SingaScheduler(string namenode, string job_conf_file, int jc):
+ job_conf_file_(job_conf_file), nhosts_(0),
namenode_(namenode), is_running_(false), job_counter_(jc){
+
+ hdfs_handle_ = hdfs_connect(namenode.c_str());
+ if (hdfs_handle_){
+ if
(hdfsExists(hdfs_handle_,SINGA_CONFIG.c_str())!=0)
+ LOG(ERROR) << SINGA_CONFIG << "
is not found on HDFS. Please use -singa_conf flag to upload the file";
+ }
+ else
+ LOG(ERROR) << "Failed to connect to
HDFS";
+
+ ReadProtoFromTextFile(job_conf_file_.c_str(),
&job_conf_);
+
+ }
+
+ /**
+ * Constructor. It overwrites /singa/singa.conf on HDFS
(created a new one if necessary).
+ * The file contains zookeeper_host and log_dir values
+ * It also parses the JobProto from job_config file
+ *
+ * @param namenode address of HDFS namenode
+ * @param singa_conf singa global configuration file
+ * @param job_conf_file job configuration file
+ */
+ SingaScheduler(string namenode, string job_conf_file, string
singa_conf, int jc)
+ : job_conf_file_(job_conf_file), nhosts_(0),
namenode_(namenode), is_running_(false), job_counter_(jc){
+
+ hdfs_handle_ = hdfs_connect(namenode);
+ if (!hdfs_handle_ ||
!hdfs_overwrite(hdfs_handle_, SINGA_CONFIG, singa_conf))
+ LOG(ERROR) << "Failed to connect to
HDFS";
+
+ ReadProtoFromTextFile(job_conf_file_.c_str(),
&job_conf_);
+ }
+ virtual void registered(SchedulerDriver *driver,
+ const mesos::FrameworkID& frameworkId,
+ const mesos::MasterInfo& masterInfo) {
+ }
+
+ virtual void reregistered(SchedulerDriver *driver,
+ const mesos::MasterInfo& masterInfo) {
+ }
+
+ virtual void disconnected(SchedulerDriver *driver) {
+ }
+
+ /**
+ * Handle resource offering from Mesos scheduler. It implements
the simple/naive
+ * scheduler:
+ * + For each offer that contains enough CPUs, adds new tasks
to the list
+ * + Launch all the tasks when reaching the required number of
tasks (nworkers_groups + nserver_groups).
+ */
+ virtual void resourceOffers(SchedulerDriver* driver, const
+ std::vector<mesos::Offer>& offers) {
+
+ // do nothing if the task is already running
+ if (is_running_)
+ return;
+
+ for (int i = 0; i < offers.size(); i++) {
+ const mesos::Offer offer = offers[i];
+
+ // check for resource and create temporary
tasks
+ int cpus = 0, mem = 0;
+ int nresources = offer.resources().size();
+
+ for (int r = 0; r < nresources; r++) {
+ const mesos::Resource& resource =
offer.resources(r);
+ if (resource.name() == "cpus"
+ && resource.type() ==
mesos::Value::SCALAR)
+ cpus =
resource.scalar().value();
+ else if (resource.name() == "mem"
+ && resource.type() ==
mesos::Value::SCALAR)
+ mem = resource.scalar().value();
+ }
+
+ if (!check_resources(cpus))
+ break;
+
+ vector<mesos::TaskInfo> *new_tasks = new
vector<mesos::TaskInfo>();
+ mesos::TaskInfo task;
+ task.set_name("SINGA");
+
+ char string_id[100];
+ sprintf(string_id, "SINGA_%d", nhosts_);
+ task.mutable_task_id()->set_value(string_id);
+
task.mutable_slave_id()->MergeFrom(offer.slave_id());
+
+ mesos::Resource *resource =
task.add_resources();
+ resource->set_name("cpus");
+ resource->set_type(mesos::Value::SCALAR);
+ // take only nworkers_per_group CPUs
+
resource->mutable_scalar()->set_value(job_conf_.cluster().nworkers_per_group());
+
+ resource = task.add_resources();
+ resource->set_name("mem");
+ resource->set_type(mesos::Value::SCALAR);
+ // take all the memory
+ resource->mutable_scalar()->set_value(mem);
+
+ // store in temporary map
+ new_tasks->push_back(task);
+ tasks_[offer.id().value()] = new_tasks;
+
+ nhosts_++;
+ }
+
+ if (nhosts_>= job_conf_.cluster().nworker_groups()){
+ LOG(INFO) << "Acquired enough resources: "
+ <<
job_conf_.cluster().nworker_groups()*job_conf_.cluster().nworkers_per_group()
+ << " CPUs over " <<
job_conf_.cluster().nworker_groups() << " hosts. Launching tasks ... ";
+
+ //write job_conf_file_ to /singa/job_id/job.conf
+ char path[512];
+ sprintf(path,
"/singa/%d/job.conf",job_counter_);
+ hdfs_overwrite(hdfs_handle_, path,
job_conf_file_);
+
+ int job_count=0;
+ // launch tasks
+ for (map<string,
vector<mesos::TaskInfo>*>::iterator it =
+ tasks_.begin(); it !=
tasks_.end(); ++it) {
+ prepare_tasks(it->second, job_counter_,
path);
+ mesos::OfferID newId;
+ newId.set_value(it->first);
+ LOG(INFO) << "Launching task with offer
ID = " << newId.value();
+ driver->launchTasks(newId,
*(it->second));
+ job_count++;
+ if (job_count>=
job_conf_.cluster().nworker_groups())
+ break;
+ }
+
+ job_counter_++;
+ is_running_ = true;
+ }
+ }
+
+ virtual void offerRescinded(SchedulerDriver *driver,
+ const mesos::OfferID& offerId) {
+ }
+
+ virtual void statusUpdate(SchedulerDriver* driver,
+ const mesos::TaskStatus& status) {
+ if (status.state() == mesos::TASK_FINISHED)
+ driver->stop();
+ else if (status.state() == mesos::TASK_FAILED){
+ LOG(ERROR) << "TASK FAILED !!!!";
+ driver->abort();
+ }
+ }
+
+ virtual void frameworkMessage(SchedulerDriver* driver,
+ const mesos::ExecutorID& executorId, const
mesos::SlaveID& slaveId,
+ const string& data) {
+ }
+
+ virtual void slaveLost(SchedulerDriver* driver,
+ const mesos::SlaveID& slaveId) {
+ }
+
+ virtual void executorLost(SchedulerDriver* driver,
+ const mesos::ExecutorID& executorId, const
mesos::SlaveID& slaveId,
+ int status) {
+ }
+
+ virtual void error(SchedulerDriver* driver, const string&
message) {
+ LOG(ERROR) << "ERROR !!! " << message;
+ }
+
+ private:
+
+ /**
+ * Helper function that initialize TaskInfo with the correct
URI and command
+ */
+ void prepare_tasks(vector<mesos::TaskInfo> *tasks, int job_id,
string job_conf){
+ char path_sys_config[512], path_job_config[512];
+ //path to singa.conf
+ sprintf(path_sys_config, "hdfs://%s%s",
namenode_.c_str(), SINGA_CONFIG.c_str());
+ sprintf(path_job_config, "hdfs://%s%s",
namenode_.c_str(), job_conf.c_str());
+
+ char command[512];
+ sprintf(command, "singa -conf ./job.conf -singa_conf
./singa.conf -singa_job %d", job_id);
+
+ for (int i=0; i<tasks->size(); i++){
+ mesos::CommandInfo *comm =
(tasks->at(i)).mutable_command();
+ comm->add_uris()->set_value(path_sys_config);
+ comm->add_uris()->set_value(path_job_config);
+ comm->set_value(command);
+ }
+ }
+
+ /**
+ * Helper function to connect to HDFS
+ */
+ hdfsFS hdfs_connect(string namenode){
+ string path(namenode);
+ int idx = path.find_first_of(":");
+ string host=path.substr(0,idx);
+ int port = atoi(path.substr(idx+1).c_str());
+ return hdfsConnect(host.c_str(), port);
+ }
+
+ /**
+ * Helper function to read HDFS file content into a string.
+ * It assumes the file exists.
+ * @return NULL if there's error.
+ */
+ string hdfs_read(hdfsFS hdfs_handle, string filename){
+ hdfsFileInfo* stat = hdfsGetPathInfo(hdfs_handle,
filename.c_str());
+ char buffer[stat->mSize];
+ hdfsFile file = hdfsOpenFile(hdfs_handle,
filename.c_str(), O_RDONLY, 0, 0, 0);
+ int status = hdfsRead(hdfs_handle, file, buffer,
stat->mSize);
+ hdfsFreeFileInfo(stat,1);
+ hdfsCloseFile(hdfs_handle, file);
+ if (status!=-1)
+ return string(buffer);
+ else
+ return NULL;
+ }
+
+ /**
+ * Helper function that write content of source_file to
filename, overwritting the latter
+ * if it exists.
+ * @return 1 if sucessfull, 0 if fail.
+ */
+ int hdfs_overwrite(hdfsFS hdfs_handle, string filename, string
source_file){
+ hdfsFile file;
+ if (hdfsExists(hdfs_handle, filename.c_str())==0){
+ file = hdfsOpenFile(hdfs_handle,
filename.c_str(), O_WRONLY, 0, 0, 0);
+ }
+ else{
+ //create directory and file
+ int last_idx = filename.find_last_of("/");
+ string dir = filename.substr(0,last_idx);
+ hdfsCreateDirectory(hdfs_handle, dir.c_str());
+ file = hdfsOpenFile(hdfs_handle,
filename.c_str(), O_WRONLY, 0, 0, 0);
+ }
+
+ FILE *fh = fopen(source_file.c_str(), "r");
+ if (!fh){
+ LOG(ERROR) << "Cannot open " << source_file;
+ return 0;
+ }
+
+ if (file){
+ fseek(fh,0,SEEK_END);
+ int len = ftell(fh);
+ rewind(fh);
+ char buf[len];
+ fread(buf,len,1,fh);
+ fclose(fh);
+
+ hdfsWrite(hdfs_handle, file, buf, len);
+ hdfsFlush(hdfs_handle, file);
+ hdfsCloseFile(hdfs_handle, file);
+ }
+ else{
+ LOG(ERROR) << "ERROR openng file on HDFS " <<
filename;
+ return 0;
+ }
+
+ return 1;
+ }
+
+ /**
+ * Helper function, check if the offered CPUs satisfies the
resource requirements
+ * @param ncpus: number of cpus offer at this host
+ * @return true when ncpus >= (nWorkersPerProcess +
nServersPerProcess) if workers and servers are separated
+ *
or when cpus >= max(nWorkersPerProcess, nServersPerProcess) if they are not.
+ */
+ bool check_resources(int ncpus){
+ int n1 = job_conf_.cluster().nworkers_per_procs();
+ int n2 = job_conf_.cluster().nservers_per_procs();
+ LOG(INFO) << "n1 = " << n1 << " n2 = " << n2 << " ncpus
= " << ncpus;
+ return job_conf_.cluster().server_worker_separate()?
ncpus >= (n1+n2)
+
: ncpus >= (n1>n2 ? n1 : n2);
+ }
+
+ int job_counter_;
+
+ // true if the job has been launched
+ bool is_running_;
+ singa::JobProto job_conf_;
+ // total number of hosts required
+ int nhosts_;
+ // temporary map of tasks: <offerID, TaskInfo>
+ map<string, vector<mesos::TaskInfo>*> tasks_;
+ // SINGA job config file
+ string job_conf_file_;
+ // HDFS namenode
+ string namenode_;
+ // handle to HDFS
+ hdfsFS hdfs_handle_;
+};
+
+int main(int argc, char** argv) {
+ FLAGS_logtostderr = 1;
+
+ int status = mesos::DRIVER_RUNNING;
+ SingaScheduler *scheduler;
+
+ if (!(argc==2 || argc==4 || argc==6)){
+ std::cout << usage << std::endl;
+ return 1;
+ }
+
+ int scheduler_conf_idx=0;
+ int singa_conf_idx=0;
+ for (int i=1; i<argc-1; i++){
+ if (strcmp(argv[i],"-scheduler_conf")==0)
+ scheduler_conf_idx=i+1;
+ if (strcmp(argv[i],"-singa_conf")==0)
+ singa_conf_idx=i+1;
+ }
+
+ SchedulerProto msg;
+
+ if (scheduler_conf_idx)
+ singa::ReadProtoFromTextFile((const
char*)argv[scheduler_conf_idx], &msg);
+ else
+ singa::ReadProtoFromTextFile(DEFAULT_SCHEDULER_CONF.c_str(),
&msg);
+
+ if (!singa_conf_idx)
+ scheduler = new SingaScheduler(msg.namenode(), string(argv[1]),
msg.job_counter());
+ else
+ scheduler = new SingaScheduler(msg.namenode(),string(argv[1]),
string(argv[singa_conf_idx]), msg.job_counter());
+
+ msg.set_job_counter(msg.job_counter()+1);
+ if (scheduler_conf_idx)
+ singa::WriteProtoToTextFile(msg, (const
char*)argv[scheduler_conf_idx]);
+ else
+ singa::WriteProtoToTextFile(msg,
DEFAULT_SCHEDULER_CONF.c_str());
+
+ LOG(INFO) << "Scheduler initialized";
+ mesos::FrameworkInfo framework;
+ framework.set_user("");
+ framework.set_name("SINGA");
+
+ SchedulerDriver *driver = new mesos::MesosSchedulerDriver(scheduler,
framework,
+ msg.master().c_str());
+ LOG(INFO) << "Starting SINGA framework...";
+ status = driver->run();
+ driver->stop();
+ LOG(INFO) << "Stoping SINGA framework...";
+
+
+ return status == mesos::DRIVER_STOPPED ? 0 : 1;
+}
+