Repository: incubator-singa Updated Branches: refs/heads/master d190fa89a -> 2a1b988c6
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2e21c62/tool/mesos/singa_scheduler.cc ---------------------------------------------------------------------- diff --git a/tool/mesos/singa_scheduler.cc b/tool/mesos/singa_scheduler.cc deleted file mode 100644 index dc48ae2..0000000 --- a/tool/mesos/singa_scheduler.cc +++ /dev/null @@ -1,426 +0,0 @@ -/************************************************************ -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*************************************************************/ -#include "singa/proto/job.pb.h" -#include "singa/proto/singa.pb.h" -#include "./scheduler.pb.h" -#include "singa/utils/common.h" -#include <stdio.h> -#include <stdlib.h> -#include <string> -#include <iostream> -#include <fstream> -#include <hdfs.h> -#include <mesos/scheduler.hpp> -#include <google/protobuf/text_format.h> -#include <glog/logging.h> -/** - * \file singa_scheduler.cc implements a framework for managing SINGA jobs. - * - * The scheduler takes a job configuration file [file] and performs the following: - * 1. Parse the config file to determine the required resources. - * 2. [Optional] Copy singa.conf to the HDFS: /singa/singa.conf. - * 2.1. Raise error if singa.conf is NOT FOUND on HDFS - * 3. Wait for offers from the Mesos master until enough is acquired. - * 4. Keep an increasing counter of job ID. - * 5. Write [file] to HDFS: /singa/job_ID/job.conf - * 6. Start the task: - * + Set URI in the TaskInfo message to point to the config files on HDFS: - /singa/singa.conf - /singa/Job_ID/job.conf - * + Set the executable command: singa-run.sh -conf ./job.conf - * - * We assume that singa-run.sh is include in $PATH variable at all nodes. - * (Else, we set the executable to its full path) - * ./job.conf is relative path pointing to the current sandbox directory created - * dynamically by Mesos. - * - * - * Scheduling: - * Each SINGA job requires certain resources represented by: (1) number of workers, (2) number of worker groups - * and (3) number of worker per process. The resources offered by Mesos contains (1) number of host, (2) number of CPUs - * at each host and (3) memory available at each hosts. - * - * Our scheduler performs simply task assignment which guarantees that each process runs an entire work group, - * and each takes all the memory offered by the slave. We assume that each slave runs ONE process, that is the following - * condition holds: - * nCPUs_per_host >= nWorkersPerProcess+nServersPerProcess //if seperate - >= max(nWorkerPerProcess, nServersPerProcess) // if attached - */ -using std::string; -using mesos::SchedulerDriver; -using std::vector; -using std::map; - -const char usage[] = " singa_scheduler <job_conf> [-scheduler_conf global_config] [-singa_conf singa_config] \n" - " job_conf: job configuration file\n" - " -scheduler_conf: optional, system-wide configuration file\n" - " -singa_conf: optional, singa global configuration file\n"; - -const char SINGA_CONFIG[] = "/singa/singa.conf"; -const char DEFAULT_SCHEDULER_CONF[] = "scheduler.conf"; -class SingaScheduler: public mesos::Scheduler { - public: - /** - * Constructor, used when [sing_conf] is not given. Raise error if /singa/singa.conf is not found - * on HDFS. - * - * @param namenode address of HDFS namenode - * @param job_conf_file job configuration file - * @param jc job counter - */ - SingaScheduler(string namenode, string job_conf_file, int jc): - job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false), job_counter_(jc), task_counter_(0) { - hdfs_handle_ = hdfs_connect(namenode.c_str()); - if (hdfs_handle_) { - if (hdfsExists(hdfs_handle_, SINGA_CONFIG) != 0) - LOG(ERROR) << SINGA_CONFIG << " is not found on HDFS. Please use -singa_conf flag to upload the file"; - } else { - LOG(ERROR) << "Failed to connect to HDFS"; - } - ReadProtoFromTextFile(job_conf_file_.c_str(), &job_conf_); - } - /** - * Constructor. It overwrites /singa/singa.conf on HDFS (created a new one if necessary). - * The file contains zookeeper_host and log_dir values - * It also parses the JobProto from job_config file - * - * @param namenode address of HDFS namenode - * @param singa_conf singa global configuration file - * @param job_conf_file job configuration file - */ - SingaScheduler(string namenode, string job_conf_file, string singa_conf, int jc) - : job_conf_file_(job_conf_file), nhosts_(0), namenode_(namenode), is_running_(false), job_counter_(jc), task_counter_(0) { - hdfs_handle_ = hdfs_connect(namenode); - if (!hdfs_handle_ || !hdfs_overwrite(hdfs_handle_, SINGA_CONFIG, singa_conf)) - LOG(ERROR) << "Failed to connect to HDFS"; - - ReadProtoFromTextFile(job_conf_file_.c_str(), &job_conf_); - } - virtual void registered(SchedulerDriver *driver, - const mesos::FrameworkID& frameworkId, - const mesos::MasterInfo& masterInfo) { - } - - virtual void reregistered(SchedulerDriver *driver, - const mesos::MasterInfo& masterInfo) { - } - - virtual void disconnected(SchedulerDriver *driver) { - } - - /** - * Handle resource offering from Mesos scheduler. It implements the simple/naive - * scheduler: - * + For each offer that contains enough CPUs, adds new tasks to the list - * + Launch all the tasks when reaching the required number of tasks (nworkers_groups + nserver_groups). - */ - virtual void resourceOffers(SchedulerDriver* driver, const std::vector<mesos::Offer>& offers) { - // do nothing if the task is already running - if (is_running_) - return; - - for (int i = 0; i < offers.size(); i++) { - const mesos::Offer offer = offers[i]; - // check for resource and create temporary tasks - int cpus = 0, mem = 0; - int nresources = offer.resources().size(); - - for (int r = 0; r < nresources; r++) { - const mesos::Resource& resource = offer.resources(r); - if (resource.name() == "cpus" - && resource.type() == mesos::Value::SCALAR) - cpus = resource.scalar().value(); - else if (resource.name() == "mem" - && resource.type() == mesos::Value::SCALAR) - mem = resource.scalar().value(); - } - - if (!check_resources(cpus)) - break; - - vector<mesos::TaskInfo> *new_tasks = new vector<mesos::TaskInfo>(); - mesos::TaskInfo task; - task.set_name("SINGA"); - - char string_id[100]; - snprintf(string_id, 100, "SINGA_%d", nhosts_); - task.mutable_task_id()->set_value(string_id); - task.mutable_slave_id()->MergeFrom(offer.slave_id()); - - mesos::Resource *resource = task.add_resources(); - resource->set_name("cpus"); - resource->set_type(mesos::Value::SCALAR); - // take only nworkers_per_group CPUs - resource->mutable_scalar()->set_value(job_conf_.cluster().nworkers_per_group()); - - resource = task.add_resources(); - resource->set_name("mem"); - resource->set_type(mesos::Value::SCALAR); - // take all the memory - resource->mutable_scalar()->set_value(mem); - - // store in temporary map - new_tasks->push_back(task); - tasks_[offer.id().value()] = new_tasks; - hostnames_[offer.id().value()] = offer.hostname(); - nhosts_++; - } - - if (nhosts_>= job_conf_.cluster().nworker_groups()) { - LOG(INFO) << "Acquired enough resources: " - << job_conf_.cluster().nworker_groups()*job_conf_.cluster().nworkers_per_group() - << " CPUs over " << job_conf_.cluster().nworker_groups() << " hosts. Launching tasks ... "; - - // write job_conf_file_ to /singa/job_id/job.conf - char path[512]; - snprintf(path, 512, "/singa/%d/job.conf", job_counter_); - hdfs_overwrite(hdfs_handle_, path, job_conf_file_); - - // launch tasks - for (map<string, vector<mesos::TaskInfo>*>::iterator it = - tasks_.begin(); it != tasks_.end(); ++it) { - prepare_tasks(it->second, hostnames_[it->first], job_counter_, path); - mesos::OfferID newId; - newId.set_value(it->first); - LOG(INFO) << "Launching task with offer ID = " << newId.value(); - driver->launchTasks(newId, *(it->second)); - task_counter_++; - if (task_counter_>= job_conf_.cluster().nworker_groups()) - break; - } - - job_counter_++; - is_running_ = true; - } - } - - virtual void offerRescinded(SchedulerDriver *driver, - const mesos::OfferID& offerId) { - } - - virtual void statusUpdate(SchedulerDriver* driver, - const mesos::TaskStatus& status) { - if (status.state() == mesos::TASK_FINISHED) - task_counter_--; - - if (task_counter_ == 0) { - driver->stop(); - } else if (status.state() == mesos::TASK_FAILED) { - LOG(ERROR) << "TASK FAILED !!!!"; - driver->abort(); - } - } - - virtual void frameworkMessage(SchedulerDriver* driver, - const mesos::ExecutorID& executorId, const mesos::SlaveID& slaveId, - const string& data) { - } - - virtual void slaveLost(SchedulerDriver* driver, - const mesos::SlaveID& slaveId) { - } - - virtual void executorLost(SchedulerDriver* driver, - const mesos::ExecutorID& executorId, const mesos::SlaveID& slaveId, - int status) { - } - - virtual void error(SchedulerDriver* driver, const string& message) { - LOG(ERROR) << "ERROR !!! " << message; - } - - private: - /** - * Helper function that initialize TaskInfo with the correct URI and command - */ - void prepare_tasks(vector<mesos::TaskInfo> *tasks, string hostname, int job_id, string job_conf) { - char path_sys_config[512], path_job_config[512]; - // path to singa.conf - snprintf(path_sys_config, 512, "hdfs://%s%s", namenode_.c_str(), SINGA_CONFIG); - snprintf(path_job_config, 512, "hdfs://%s%s", namenode_.c_str(), job_conf.c_str()); - - char command[512]; - snprintf(command, 512, "singa -conf ./job.conf -singa_conf ./singa.conf -singa_job %d -host %s", job_id, hostname.c_str()); - - for (int i=0; i < tasks->size(); i++) { - mesos::CommandInfo *comm = (tasks->at(i)).mutable_command(); - comm->add_uris()->set_value(path_sys_config); - comm->add_uris()->set_value(path_job_config); - comm->set_value(command); - } - } - - /** - * Helper function to connect to HDFS - */ - hdfsFS hdfs_connect(string namenode) { - string path(namenode); - int idx = path.find_first_of(":"); - string host = path.substr(0, idx); - int port = atoi(path.substr(idx+1).c_str()); - return hdfsConnect(host.c_str(), port); - } - - /** - * Helper function to read HDFS file content into a string. - * It assumes the file exists. - * @return NULL if there's error. - */ - string hdfs_read(hdfsFS hdfs_handle, string filename) { - hdfsFileInfo* stat = hdfsGetPathInfo(hdfs_handle, filename.c_str()); - int file_size = stat->mSize; - string buffer; - buffer.resize(file_size); - - hdfsFile file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_RDONLY, 0, 0, 0); - int status = hdfsRead(hdfs_handle, file, const_cast<char*>(buffer.c_str()), stat->mSize); - hdfsFreeFileInfo(stat, 1); - hdfsCloseFile(hdfs_handle, file); - if (status != -1) - return string(buffer); - else - return NULL; - } - - /** - * Helper function that write content of source_file to filename, overwritting the latter - * if it exists. - * @return 1 if sucessfull, 0 if fail. - */ - int hdfs_overwrite(hdfsFS hdfs_handle, string filename, string source_file) { - hdfsFile file; - if (hdfsExists(hdfs_handle, filename.c_str()) == 0) { - file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_WRONLY, 0, 0, 0); - } else { - // create directory and file - int last_idx = filename.find_last_of("/"); - string dir = filename.substr(0, last_idx); - hdfsCreateDirectory(hdfs_handle, dir.c_str()); - file = hdfsOpenFile(hdfs_handle, filename.c_str(), O_WRONLY, 0, 0, 0); - } - - FILE *fh = fopen(source_file.c_str(), "r"); - if (!fh) { - LOG(ERROR) << "Cannot open " << source_file; - return 0; - } - - if (file) { - fseek(fh, 0, SEEK_END); - int len = ftell(fh); - rewind(fh); - string buf; - buf.resize(len); - fread(const_cast<char*>(buf.c_str()), len, 1, fh); - fclose(fh); - - hdfsWrite(hdfs_handle, file, buf.c_str(), len); - hdfsFlush(hdfs_handle, file); - hdfsCloseFile(hdfs_handle, file); - } else { - LOG(ERROR) << "ERROR openng file on HDFS " << filename; - return 0; - } - - return 1; - } - - /** - * Helper function, check if the offered CPUs satisfies the resource requirements - * @param ncpus: number of cpus offer at this host - * @return true when ncpus >= (nWorkersPerProcess + nServersPerProcess) if workers and servers are separated - * or when cpus >= max(nWorkersPerProcess, nServersPerProcess) if they are not. - */ - bool check_resources(int ncpus) { - int n1 = job_conf_.cluster().nworkers_per_procs(); - int n2 = job_conf_.cluster().nservers_per_procs(); - LOG(INFO) << "n1 = " << n1 << " n2 = " << n2 << " ncpus = " << ncpus; - return job_conf_.cluster().server_worker_separate()? ncpus >= (n1+n2) : ncpus >= (n1 > n2 ? n1 : n2); - } - - int job_counter_, task_counter_; - - // true if the job has been launched - bool is_running_; - singa::JobProto job_conf_; - // total number of hosts required - int nhosts_; - // temporary map of tasks: <offerID, TaskInfo> - map<string, vector<mesos::TaskInfo>*> tasks_; - // temporary map of offerID to slave IP addresses - map<string, string> hostnames_; - // SINGA job config file - string job_conf_file_; - // HDFS namenode - string namenode_; - // handle to HDFS - hdfsFS hdfs_handle_; -}; - -int main(int argc, char** argv) { - FLAGS_logtostderr = 1; - int status = mesos::DRIVER_RUNNING; - SingaScheduler *scheduler; - if (!(argc == 2 || argc == 4 || argc == 6)) { - std::cout << usage << std::endl; - return 1; - } - - int scheduler_conf_idx = 0; - int singa_conf_idx = 0; - for (int i=1; i < argc-1; i++) { - if (strcmp(argv[i], "-scheduler_conf") == 0) - scheduler_conf_idx = i+1; - if (strcmp(argv[i], "-singa_conf") == 0) - singa_conf_idx = i+1; - } - - SchedulerProto msg; - if (scheduler_conf_idx) - singa::ReadProtoFromTextFile((const char*)argv[scheduler_conf_idx], &msg); - else - singa::ReadProtoFromTextFile(DEFAULT_SCHEDULER_CONF, &msg); - - if (!singa_conf_idx) - scheduler = new SingaScheduler(msg.namenode(), string(argv[1]), msg.job_counter()); - else - scheduler = new SingaScheduler(msg.namenode(), string(argv[1]), string(argv[singa_conf_idx]), msg.job_counter()); - - msg.set_job_counter(msg.job_counter()+1); - if (scheduler_conf_idx) - singa::WriteProtoToTextFile(msg, (const char*)argv[scheduler_conf_idx]); - else - singa::WriteProtoToTextFile(msg, DEFAULT_SCHEDULER_CONF); - - LOG(INFO) << "Scheduler initialized"; - mesos::FrameworkInfo framework; - framework.set_user(""); - framework.set_name("SINGA"); - - SchedulerDriver *driver = new mesos::MesosSchedulerDriver(scheduler, framework, msg.master().c_str()); - LOG(INFO) << "Starting SINGA framework..."; - status = driver->run(); - driver->stop(); - LOG(INFO) << "Stoping SINGA framework..."; - - return status == mesos::DRIVER_STOPPED ? 0 : 1; -} - http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2e21c62/tool/node.sh ---------------------------------------------------------------------- diff --git a/tool/node.sh b/tool/node.sh deleted file mode 100755 index 653806f..0000000 --- a/tool/node.sh +++ /dev/null @@ -1,88 +0,0 @@ -#/** -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ -#!/bin/bash -if [[ $# < 2 || ! -f $2 ]] -then - echo "Usage: process/folder management" - echo "[cat, create, delete, kill, ls, ps, reset, scp, ssh] hostfile [args]" - echo " cat hostfile file--- cat the file on every node in hostfile" - echo " create hostfile folder--- create the folder on every node in hostfile" - echo " delete hostfile folder--- delete the folder on every node in hostfile" - echo " kill hostfile job_name--- kill the job on every node in hostfile" - echo " ls hostfile folder--- list the folder on every node in hostfile" - echo " ps hostfile job_name--- ps aux|grep job_name on every node in hostfile" - echo " reset hostfile folder--- delete and create the folder on every node in hostfile" - echo " scp hostfile local_dir [remote_dir]--- copy the local_dir to remote_dir on every node in hostfile, if remote_dir is omitted, remote_dir=local_dir" - echo " ssh hostfile--- test whether the nodes in hostfile are alive" - echo "each line in hostfile is a node name followed by a space and other fields" - exit -fi - -ssh_options="-oStrictHostKeyChecking=no \ --oUserKnownHostsFile=/dev/null \ --oLogLevel=quiet" - -hosts=(`cat $2 |cut -d ' ' -f 1`) - -for i in ${hosts[@]} -do - if [ $1 == "cat" ] - then - cmd="cat $3" - elif [ $1 == "create" -o $1 == "reset" ] - then - cmd="mkdir -p $3" - elif [ $1 == "delete" -o $1 == "reset" ] - then - cmd="rm -rf $3" - elif [ $1 == "kill" ] - then - cmd="ps ax|pgrep $3 |xargs kill" - elif [ $1 == "ls" ] - then - cmd="ls -l $3" - elif [ $1 == "scp" ] - then - local_dir=$3 - remote_dir=$3 - if [ $# -eq 4 ] - then - remote_dir=$4 - fi - r='' - if [[ -d $3 ]] - then - r='-r' - fi - echo "scp $r $local_dir $i:$remote_dir" - scp $r $local_dir $i:$remote_dir - elif [ $1 == "ssh" ] - then - cmd="exit" - elif [ $1 == "ps" ] - then - cmd="ps ax|pgrep $3" - else - echo "Incorrect commands:" $1 - fi - if [ $1 != "scp" ] - then - echo $cmd - ssh $i $cmd - fi -done
