Removed pthread and used Latch in executor and scheduler drivers. Review: https://reviews.apache.org/r/36674
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4fc8089b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4fc8089b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4fc8089b Branch: refs/heads/master Commit: 4fc8089bbefda0fbc640da6ecf0be37020e9f680 Parents: 1bd50fc Author: Joris Van Remoortere <[email protected]> Authored: Fri Jul 24 14:35:12 2015 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Fri Jul 24 15:29:04 2015 -0700 ---------------------------------------------------------------------- include/mesos/executor.hpp | 16 ++++++++------ include/mesos/scheduler.hpp | 16 ++++++++------ src/exec/exec.cpp | 41 ++++++++++++++++++------------------ src/sched/sched.cpp | 45 ++++++++++++++++++++-------------------- 4 files changed, 63 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4fc8089b/include/mesos/executor.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/executor.hpp b/include/mesos/executor.hpp index f3cd3cc..72eca97 100644 --- a/include/mesos/executor.hpp +++ b/include/mesos/executor.hpp @@ -19,8 +19,7 @@ #ifndef __MESOS_EXECUTOR_HPP__ #define __MESOS_EXECUTOR_HPP__ -#include <pthread.h> - +#include <mutex> #include <string> #include <mesos/mesos.hpp> @@ -46,6 +45,11 @@ // THE SAME MODIFICATIONS FOR OTHER LANGUAGE BINDINGS (e.g., Java: // src/java/src/org/apache/mesos, Python: src/python/src, etc.). +// Forward declaration. +namespace process { +class Latch; +} // namespace process { + namespace mesos { // A few forward declarations. @@ -236,11 +240,11 @@ private: // Libprocess process for communicating with slave. internal::ExecutorProcess* process; - // Mutex to enforce all non-callbacks are execute serially. - pthread_mutex_t mutex; + // Mutex for enforcing serial execution of all non-callbacks. + std::recursive_mutex mutex; - // Condition variable for waiting until driver terminates. - pthread_cond_t cond; + // Latch for waiting until driver terminates. + process::Latch* latch; // Current status of the driver. Status status; http://git-wip-us.apache.org/repos/asf/mesos/blob/4fc8089b/include/mesos/scheduler.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp index 9dae0a8..cd235a1 100644 --- a/include/mesos/scheduler.hpp +++ b/include/mesos/scheduler.hpp @@ -19,9 +19,8 @@ #ifndef __MESOS_SCHEDULER_HPP__ #define __MESOS_SCHEDULER_HPP__ -#include <pthread.h> - #include <functional> +#include <mutex> #include <queue> #include <string> #include <vector> @@ -38,6 +37,11 @@ // THE SAME MODIFICATIONS FOR OTHER LANGUAGE BINDINGS (e.g., Java: // src/java/src/org/apache/mesos, Python: src/python/src, etc.). +// Forward declaration. +namespace process { +class Latch; +} // namespace process { + namespace mesos { // A few forward declarations. @@ -445,11 +449,11 @@ private: // URL for the master (e.g., zk://, file://, etc). std::string url; - // Mutex to enforce all non-callbacks are executed serially. - pthread_mutex_t mutex; + // Mutex for enforcing serial execution of all non-callbacks. + std::recursive_mutex mutex; - // Condition variable for waiting until driver terminates. - pthread_cond_t cond; + // Latch for waiting until driver terminates. + process::Latch* latch; // Current status of the driver. Status status; http://git-wip-us.apache.org/repos/asf/mesos/blob/4fc8089b/src/exec/exec.cpp ---------------------------------------------------------------------- diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp index 54ef622..d59a7e1 100644 --- a/src/exec/exec.cpp +++ b/src/exec/exec.cpp @@ -31,6 +31,7 @@ #include <process/delay.hpp> #include <process/dispatch.hpp> #include <process/id.hpp> +#include <process/latch.hpp> #include <process/process.hpp> #include <process/protobuf.hpp> @@ -64,6 +65,7 @@ using namespace process; using std::string; +using process::Latch; using process::wait; // Necessary on some OS's to disambiguate. @@ -110,8 +112,8 @@ public: const string& _directory, bool _checkpoint, Duration _recoveryTimeout, - pthread_mutex_t* _mutex, - pthread_cond_t* _cond) + std::recursive_mutex* _mutex, + Latch* _latch) : ProcessBase(ID::generate("executor")), slave(_slave), driver(_driver), @@ -124,7 +126,7 @@ public: local(_local), aborted(false), mutex(_mutex), - cond(_cond), + latch(_latch), directory(_directory), checkpoint(_checkpoint), recoveryTimeout(_recoveryTimeout) @@ -405,7 +407,7 @@ protected: terminate(self()); synchronized (mutex) { - pthread_cond_signal(cond); + CHECK_NOTNULL(latch)->trigger(); } } @@ -415,7 +417,7 @@ protected: CHECK(aborted); synchronized (mutex) { - pthread_cond_signal(cond); + CHECK_NOTNULL(latch)->trigger(); } } @@ -543,8 +545,8 @@ private: UUID connection; // UUID to identify the connection instance. bool local; volatile bool aborted; - pthread_mutex_t* mutex; - pthread_cond_t* cond; + std::recursive_mutex* mutex; + Latch* latch; const string directory; bool checkpoint; Duration recoveryTimeout; @@ -583,13 +585,8 @@ MesosExecutorDriver::MesosExecutorDriver(Executor* _executor) return; } - // Create mutex and condition variable. - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); - pthread_mutex_init(&mutex, &attr); - pthread_mutexattr_destroy(&attr); - pthread_cond_init(&cond, 0); + // Initialize Latch. + latch = new Latch(); // Initialize libprocess. process::initialize(); @@ -612,8 +609,7 @@ MesosExecutorDriver::~MesosExecutorDriver() wait(process); delete process; - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&cond); + delete latch; } @@ -717,7 +713,7 @@ Status MesosExecutorDriver::start() checkpoint, recoveryTimeout, &mutex, - &cond); + latch); spawn(process); @@ -774,15 +770,20 @@ Status MesosExecutorDriver::abort() Status MesosExecutorDriver::join() { + // Exit early if the driver is not running. synchronized (mutex) { if (status != DRIVER_RUNNING) { return status; } + } - while (status == DRIVER_RUNNING) { - pthread_cond_wait(&cond, &mutex); - } + // If the driver was running, the latch will be triggered regardless + // of the current `status`. Wait for this to happen to signify + // termination. + CHECK_NOTNULL(latch)->await(); + // Now return the current `status` of the driver. + synchronized (mutex) { CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED); return status; http://git-wip-us.apache.org/repos/asf/mesos/blob/4fc8089b/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 1bcc376..db0653d 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -46,6 +46,7 @@ #include <process/dispatch.hpp> #include <process/future.hpp> #include <process/id.hpp> +#include <process/latch.hpp> #include <process/owned.hpp> #include <process/pid.hpp> #include <process/process.hpp> @@ -96,6 +97,7 @@ using namespace mesos::scheduler; using process::Clock; using process::DispatchEvent; using process::Future; +using process::Latch; using process::MessageEvent; using process::Process; using process::UPID; @@ -128,8 +130,8 @@ public: const string& schedulerId, MasterDetector* _detector, const internal::scheduler::Flags& _flags, - pthread_mutex_t* _mutex, - pthread_cond_t* _cond) + std::recursive_mutex* _mutex, + Latch* _latch) // We use a UUID here to ensure that the master can reliably // distinguish between scheduler runs. Otherwise the master may // receive a delayed ExitedEvent enqueued behind a @@ -146,7 +148,7 @@ public: scheduler(_scheduler), framework(_framework), mutex(_mutex), - cond(_cond), + latch(_latch), failover(_framework.has_id() && !framework.id().value().empty()), connected(false), running(true), @@ -1047,7 +1049,7 @@ protected: } synchronized (mutex) { - pthread_cond_signal(cond); + CHECK_NOTNULL(latch)->trigger(); } } @@ -1073,7 +1075,7 @@ protected: } synchronized (mutex) { - pthread_cond_signal(cond); + CHECK_NOTNULL(latch)->trigger(); } } @@ -1410,8 +1412,8 @@ private: MesosSchedulerDriver* driver; Scheduler* scheduler; FrameworkInfo framework; - pthread_mutex_t* mutex; - pthread_cond_t* cond; + std::recursive_mutex* mutex; + Latch* latch; bool failover; @@ -1500,15 +1502,8 @@ void MesosSchedulerDriver::initialize() { VLOG(1) << "Disabling initialization of GLOG logging"; } - // Initialize mutex and condition variable. TODO(benh): Consider - // using a libprocess Latch rather than a pthread mutex and - // condition variable for signaling. - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); - pthread_mutex_init(&mutex, &attr); - pthread_mutexattr_destroy(&attr); - pthread_cond_init(&cond, 0); + // Initialize Latch. + latch = new Latch(); // TODO(benh): Check the user the framework wants to run tasks as, // see if the current user can switch to that user, or via an @@ -1657,8 +1652,7 @@ MesosSchedulerDriver::~MesosSchedulerDriver() delete process; } - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&cond); + delete latch; if (detector != NULL) { delete detector; @@ -1727,7 +1721,7 @@ Status MesosSchedulerDriver::start() detector, flags, &mutex, - &cond); + latch); } else { const Credential& cred = *credential; process = new SchedulerProcess( @@ -1740,7 +1734,7 @@ Status MesosSchedulerDriver::start() detector, flags, &mutex, - &cond); + latch); } spawn(process); @@ -1810,15 +1804,20 @@ Status MesosSchedulerDriver::abort() Status MesosSchedulerDriver::join() { + // Exit early if the driver is not running. synchronized (mutex) { if (status != DRIVER_RUNNING) { return status; } + } - while (status == DRIVER_RUNNING) { - pthread_cond_wait(&cond, &mutex); - } + // If the driver was running, the latch will be triggered regardless + // of the current `status`. Wait for this to happen to signify + // termination. + CHECK_NOTNULL(latch)->await(); + // Now return the current `status` of the driver. + synchronized (mutex) { CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED); return status;
