Update Mesos scheduler to use synchronized. Review: https://reviews.apache.org/r/35096
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b2d80474 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b2d80474 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b2d80474 Branch: refs/heads/master Commit: b2d8047428228cbbea65f4af889d11e8918e2e96 Parents: 5b0eeb0 Author: Joris Van Remoortere <[email protected]> Authored: Sat Jun 13 07:06:22 2015 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Sun Jun 14 02:43:01 2015 -0700 ---------------------------------------------------------------------- include/mesos/scheduler.hpp | 5 +- src/sched/sched.cpp | 382 +++++++++++++++++++-------------------- 2 files changed, 193 insertions(+), 194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b2d80474/include/mesos/scheduler.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp index 2ee6b5c..0b54ffe 100644 --- a/include/mesos/scheduler.hpp +++ b/include/mesos/scheduler.hpp @@ -19,11 +19,10 @@ #ifndef __MESOS_SCHEDULER_HPP__ #define __MESOS_SCHEDULER_HPP__ -#include <functional> -#include <queue> - #include <pthread.h> +#include <functional> +#include <queue> #include <string> #include <vector> http://git-wip-us.apache.org/repos/asf/mesos/blob/b2d80474/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 9423607..bc76c71 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -71,8 +71,6 @@ #include "authentication/cram_md5/authenticatee.hpp" -#include "common/lock.hpp" - #include "local/flags.hpp" #include "local/local.hpp" @@ -841,8 +839,9 @@ protected: send(master.get(), message); } - Lock lock(mutex); - pthread_cond_signal(cond); + synchronized (mutex) { + pthread_cond_signal(cond); + } } // NOTE: This function informs the master to stop attempting to send @@ -866,8 +865,9 @@ protected: send(master.get(), message); } - Lock lock(mutex); - pthread_cond_signal(cond); + synchronized (mutex) { + pthread_cond_signal(cond); + } } void killTask(const TaskID& taskId) @@ -1507,156 +1507,156 @@ MesosSchedulerDriver::~MesosSchedulerDriver() Status MesosSchedulerDriver::start() { - Lock lock(&mutex); - - if (status != DRIVER_NOT_STARTED) { - return status; - } - - if (detector == NULL) { - Try<MasterDetector*> detector_ = MasterDetector::create(url); - - if (detector_.isError()) { - status = DRIVER_ABORTED; - string message = "Failed to create a master detector for '" + - master + "': " + detector_.error(); - scheduler->error(this, message); + synchronized (mutex) { + if (status != DRIVER_NOT_STARTED) { return status; } - // Save the detector so we can delete it later. - detector = detector_.get(); - } + if (detector == NULL) { + Try<MasterDetector*> detector_ = MasterDetector::create(url); - // Load scheduler flags. - internal::scheduler::Flags flags; - Try<Nothing> load = flags.load("MESOS_"); + if (detector_.isError()) { + status = DRIVER_ABORTED; + string message = "Failed to create a master detector for '" + + master + "': " + detector_.error(); + scheduler->error(this, message); + return status; + } - if (load.isError()) { - status = DRIVER_ABORTED; - scheduler->error(this, load.error()); - return status; - } + // Save the detector so we can delete it later. + detector = detector_.get(); + } - // Initialize modules. Note that since other subsystems may depend - // upon modules, we should initialize modules before anything else. - if (flags.modules.isSome()) { - Try<Nothing> result = modules::ModuleManager::load(flags.modules.get()); - if (result.isError()) { + // Load scheduler flags. + internal::scheduler::Flags flags; + Try<Nothing> load = flags.load("MESOS_"); + + if (load.isError()) { status = DRIVER_ABORTED; - scheduler->error(this, "Error loading modules: " + result.error()); + scheduler->error(this, load.error()); return status; } - } - CHECK(process == NULL); + // Initialize modules. Note that since other subsystems may depend + // upon modules, we should initialize modules before anything else. + if (flags.modules.isSome()) { + Try<Nothing> result = modules::ModuleManager::load(flags.modules.get()); + if (result.isError()) { + status = DRIVER_ABORTED; + scheduler->error(this, "Error loading modules: " + result.error()); + return status; + } + } - if (credential == NULL) { - process = new SchedulerProcess( - this, - scheduler, - framework, - None(), - implicitAcknowlegements, - schedulerId, - detector, - flags, - &mutex, - &cond); - } else { - const Credential& cred = *credential; - process = new SchedulerProcess( - this, - scheduler, - framework, - cred, - implicitAcknowlegements, - schedulerId, - detector, - flags, - &mutex, - &cond); + CHECK(process == NULL); + + if (credential == NULL) { + process = new SchedulerProcess( + this, + scheduler, + framework, + None(), + implicitAcknowlegements, + schedulerId, + detector, + flags, + &mutex, + &cond); + } else { + const Credential& cred = *credential; + process = new SchedulerProcess( + this, + scheduler, + framework, + cred, + implicitAcknowlegements, + schedulerId, + detector, + flags, + &mutex, + &cond); + } + + spawn(process); + + return status = DRIVER_RUNNING; } - - spawn(process); - - return status = DRIVER_RUNNING; } Status MesosSchedulerDriver::stop(bool failover) { - Lock lock(&mutex); - - LOG(INFO) << "Asked to stop the driver"; + synchronized (mutex) { + LOG(INFO) << "Asked to stop the driver"; - if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) { - VLOG(1) << "Ignoring stop because the status of the driver is " - << Status_Name(status); - return status; - } + if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) { + VLOG(1) << "Ignoring stop because the status of the driver is " + << Status_Name(status); + return status; + } - // 'process' might be NULL if the driver has failed to instantiate - // it due to bad parameters (e.g. error in creating the detector - // or loading flags). - if (process != NULL) { - process->running = false; - dispatch(process, &SchedulerProcess::stop, failover); - } + // 'process' might be NULL if the driver has failed to instantiate + // it due to bad parameters (e.g. error in creating the detector + // or loading flags). + if (process != NULL) { + process->running = false; + dispatch(process, &SchedulerProcess::stop, failover); + } - // TODO(benh): It might make more sense to clean up our local - // cluster here than in the destructor. However, what would be even - // better is to allow multiple local clusters to exist (i.e. not use - // global vars in local.cpp) so that ours can just be an instance - // variable in MesosSchedulerDriver. + // TODO(benh): It might make more sense to clean up our local + // cluster here than in the destructor. However, what would be + // even better is to allow multiple local clusters to exist (i.e. + // not use global vars in local.cpp) so that ours can just be an + // instance variable in MesosSchedulerDriver. - bool aborted = status == DRIVER_ABORTED; + bool aborted = status == DRIVER_ABORTED; - status = DRIVER_STOPPED; + status = DRIVER_STOPPED; - return aborted ? DRIVER_ABORTED : status; + return aborted ? DRIVER_ABORTED : status; + } } Status MesosSchedulerDriver::abort() { - Lock lock(&mutex); - - LOG(INFO) << "Asked to abort the driver"; + synchronized (mutex) { + LOG(INFO) << "Asked to abort the driver"; - if (status != DRIVER_RUNNING) { - VLOG(1) << "Ignoring abort because the status of the driver is " - << Status_Name(status); - return status; - } + if (status != DRIVER_RUNNING) { + VLOG(1) << "Ignoring abort because the status of the driver is " + << Status_Name(status); + return status; + } - CHECK_NOTNULL(process); - process->running = false; + CHECK_NOTNULL(process); + process->running = false; - // Dispatching here ensures that we still process the outstanding - // requests *from* the scheduler, since those do proceed when - // aborted is true. - dispatch(process, &SchedulerProcess::abort); + // Dispatching here ensures that we still process the outstanding + // requests *from* the scheduler, since those do proceed when + // aborted is true. + dispatch(process, &SchedulerProcess::abort); - return status = DRIVER_ABORTED; + return status = DRIVER_ABORTED; + } } Status MesosSchedulerDriver::join() { - Lock lock(&mutex); - - if (status != DRIVER_RUNNING) { - return status; - } + synchronized (mutex) { + if (status != DRIVER_RUNNING) { + return status; + } - while (status == DRIVER_RUNNING) { - pthread_cond_wait(&cond, &mutex); - } + while (status == DRIVER_RUNNING) { + pthread_cond_wait(&cond, &mutex); + } - CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED); + CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED); - return status; + return status; + } } @@ -1669,17 +1669,17 @@ Status MesosSchedulerDriver::run() Status MesosSchedulerDriver::killTask(const TaskID& taskId) { - Lock lock(&mutex); - - if (status != DRIVER_RUNNING) { - return status; - } + synchronized (mutex) { + if (status != DRIVER_RUNNING) { + return status; + } - CHECK(process != NULL); + CHECK(process != NULL); - dispatch(process, &SchedulerProcess::killTask, taskId); + dispatch(process, &SchedulerProcess::killTask, taskId); - return status; + return status; + } } @@ -1700,17 +1700,17 @@ Status MesosSchedulerDriver::launchTasks( const vector<TaskInfo>& tasks, const Filters& filters) { - Lock lock(&mutex); - - if (status != DRIVER_RUNNING) { - return status; - } + synchronized (mutex) { + if (status != DRIVER_RUNNING) { + return status; + } - CHECK(process != NULL); + CHECK(process != NULL); - dispatch(process, &SchedulerProcess::launchTasks, offerIds, tasks, filters); + dispatch(process, &SchedulerProcess::launchTasks, offerIds, tasks, filters); - return status; + return status; + } } @@ -1719,22 +1719,22 @@ Status MesosSchedulerDriver::acceptOffers( const vector<Offer::Operation>& operations, const Filters& filters) { - Lock lock(&mutex); - - if (status != DRIVER_RUNNING) { - return status; - } + synchronized (mutex) { + if (status != DRIVER_RUNNING) { + return status; + } - CHECK(process != NULL); + CHECK(process != NULL); - dispatch( - process, - &SchedulerProcess::acceptOffers, - offerIds, - operations, - filters); + dispatch( + process, + &SchedulerProcess::acceptOffers, + offerIds, + operations, + filters); - return status; + return status; + } } @@ -1751,40 +1751,40 @@ Status MesosSchedulerDriver::declineOffer( Status MesosSchedulerDriver::reviveOffers() { - Lock lock(&mutex); - - if (status != DRIVER_RUNNING) { - return status; - } + synchronized (mutex) { + if (status != DRIVER_RUNNING) { + return status; + } - CHECK(process != NULL); + CHECK(process != NULL); - dispatch(process, &SchedulerProcess::reviveOffers); + dispatch(process, &SchedulerProcess::reviveOffers); - return status; + return status; + } } Status MesosSchedulerDriver::acknowledgeStatusUpdate( const TaskStatus& taskStatus) { - Lock lock(&mutex); - - if (status != DRIVER_RUNNING) { - return status; - } + synchronized (mutex) { + if (status != DRIVER_RUNNING) { + return status; + } - // TODO(bmahler): Should this use abort() instead? - if (implicitAcknowlegements) { - ABORT("Cannot call acknowledgeStatusUpdate:" - " Implicit acknowledgements are enabled"); - } + // TODO(bmahler): Should this use abort() instead? + if (implicitAcknowlegements) { + ABORT("Cannot call acknowledgeStatusUpdate:" + " Implicit acknowledgements are enabled"); + } - CHECK(process != NULL); + CHECK(process != NULL); - dispatch(process, &SchedulerProcess::acknowledgeStatusUpdate, taskStatus); + dispatch(process, &SchedulerProcess::acknowledgeStatusUpdate, taskStatus); - return status; + return status; + } } @@ -1793,50 +1793,50 @@ Status MesosSchedulerDriver::sendFrameworkMessage( const SlaveID& slaveId, const string& data) { - Lock lock(&mutex); - - if (status != DRIVER_RUNNING) { - return status; - } + synchronized (mutex) { + if (status != DRIVER_RUNNING) { + return status; + } - CHECK(process != NULL); + CHECK(process != NULL); - dispatch(process, &SchedulerProcess::sendFrameworkMessage, - executorId, slaveId, data); + dispatch(process, &SchedulerProcess::sendFrameworkMessage, + executorId, slaveId, data); - return status; + return status; + } } Status MesosSchedulerDriver::reconcileTasks( const vector<TaskStatus>& statuses) { - Lock lock(&mutex); - - if (status != DRIVER_RUNNING) { - return status; - } + synchronized (mutex) { + if (status != DRIVER_RUNNING) { + return status; + } - CHECK(process != NULL); + CHECK(process != NULL); - dispatch(process, &SchedulerProcess::reconcileTasks, statuses); + dispatch(process, &SchedulerProcess::reconcileTasks, statuses); - return status; + return status; + } } Status MesosSchedulerDriver::requestResources( const vector<Request>& requests) { - Lock lock(&mutex); - - if (status != DRIVER_RUNNING) { - return status; - } + synchronized (mutex) { + if (status != DRIVER_RUNNING) { + return status; + } - CHECK(process != NULL); + CHECK(process != NULL); - dispatch(process, &SchedulerProcess::requestResources, requests); + dispatch(process, &SchedulerProcess::requestResources, requests); - return status; + return status; + } }
