Update libprocess Process to use synchronized. Review: https://reviews.apache.org/r/35090
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9cb1283b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9cb1283b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9cb1283b Branch: refs/heads/master Commit: 9cb1283bcd942574bea07d0cf9b6748ae3869cc6 Parents: 9e7f64a Author: Joris Van Remoortere <[email protected]> Authored: Sat Jun 13 05:58:13 2015 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Sun Jun 14 02:43:00 2015 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/process.hpp | 20 ++++----------- 3rdparty/libprocess/src/process.cpp | 26 ++++---------------- 2 files changed, 10 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb1283b/3rdparty/libprocess/include/process/process.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp index e70dd38..6a0b21d 100644 --- a/3rdparty/libprocess/include/process/process.hpp +++ b/3rdparty/libprocess/include/process/process.hpp @@ -2,7 +2,6 @@ #define __PROCESS_PROCESS_HPP__ #include <stdint.h> -#include <pthread.h> #include <map> #include <queue> @@ -20,6 +19,7 @@ #include <stout/duration.hpp> #include <stout/lambda.hpp> #include <stout/option.hpp> +#include <stout/synchronized.hpp> #include <stout/thread.hpp> namespace process { @@ -180,24 +180,14 @@ protected: assets[name] = asset; } - void lock() - { - pthread_mutex_lock(&m); - } - - void unlock() - { - pthread_mutex_unlock(&m); - } - template<typename T> size_t eventCount() { size_t count = 0U; - lock(); - count = std::count_if(events.begin(), events.end(), isEventType<T>); - unlock(); + synchronized (mutex) { + count = std::count_if(events.begin(), events.end(), isEventType<T>); + } return count; } @@ -226,7 +216,7 @@ private: // Mutex protecting internals. // TODO(benh): Consider replacing with a spinlock, on multi-core systems. - pthread_mutex_t m; + std::recursive_mutex mutex; // Enqueue the specified message, request, or function call. void enqueue(Event* event, bool inject = false); http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb1283b/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index aadd7bb..c2baa6c 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -2147,8 +2147,7 @@ void ProcessManager::resume(ProcessBase* process) while (!terminate && !blocked) { Event* event = NULL; - process->lock(); - { + synchronized (process->mutex) { if (process->events.size() > 0) { event = process->events.front(); process->events.pop_front(); @@ -2158,7 +2157,6 @@ void ProcessManager::resume(ProcessBase* process) blocked = true; } } - process->unlock(); if (!blocked) { CHECK(event != NULL); @@ -2251,13 +2249,11 @@ void ProcessManager::cleanup(ProcessBase* process) // another process that gets spawned with the same PID. deque<Event*> events; - process->lock(); - { + synchronized (process->mutex) { process->state = ProcessBase::TERMINATING; events = process->events; process->events.clear(); } - process->unlock(); // Delete pending events. while (!events.empty()) { @@ -2279,8 +2275,7 @@ void ProcessManager::cleanup(ProcessBase* process) __sync_synchronize(); } - process->lock(); - { + synchronized (process->mutex) { CHECK(process->events.empty()); processes.erase(process->pid.id); @@ -2296,7 +2291,6 @@ void ProcessManager::cleanup(ProcessBase* process) CHECK(process->refs == 0); process->state = ProcessBase::TERMINATED; } - process->unlock(); // Note that we don't remove the process from the clock during // cleanup, but rather the clock is reset for a process when it is @@ -2619,13 +2613,11 @@ Future<Response> ProcessManager::__processes__(const Request&) JSON::Array* events; } visitor(&events); - process->lock(); - { + synchronized (process->mutex) { foreach (Event* event, process->events) { event->visit(&visitor); } } - process->unlock(); object.values["events"] = events; array.values.push_back(object); @@ -2642,12 +2634,6 @@ ProcessBase::ProcessBase(const string& id) state = ProcessBase::BOTTOM; - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); - pthread_mutex_init(&m, &attr); - pthread_mutexattr_destroy(&attr); - refs = 0; pid.id = id != "" ? id : ID::generate(); @@ -2669,8 +2655,7 @@ void ProcessBase::enqueue(Event* event, bool inject) { CHECK(event != NULL); - lock(); - { + synchronized (mutex) { if (state != TERMINATING && state != TERMINATED) { if (!inject) { events.push_back(event); @@ -2690,7 +2675,6 @@ void ProcessBase::enqueue(Event* event, bool inject) delete event; } } - unlock(); }
