Update libprocess Process to use synchronized.

Review: https://reviews.apache.org/r/35090


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9cb1283b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9cb1283b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9cb1283b

Branch: refs/heads/master
Commit: 9cb1283bcd942574bea07d0cf9b6748ae3869cc6
Parents: 9e7f64a
Author: Joris Van Remoortere <[email protected]>
Authored: Sat Jun 13 05:58:13 2015 -0700
Committer: Benjamin Hindman <[email protected]>
Committed: Sun Jun 14 02:43:00 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/process.hpp | 20 ++++-----------
 3rdparty/libprocess/src/process.cpp             | 26 ++++----------------
 2 files changed, 10 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb1283b/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp 
b/3rdparty/libprocess/include/process/process.hpp
index e70dd38..6a0b21d 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -2,7 +2,6 @@
 #define __PROCESS_PROCESS_HPP__
 
 #include <stdint.h>
-#include <pthread.h>
 
 #include <map>
 #include <queue>
@@ -20,6 +19,7 @@
 #include <stout/duration.hpp>
 #include <stout/lambda.hpp>
 #include <stout/option.hpp>
+#include <stout/synchronized.hpp>
 #include <stout/thread.hpp>
 
 namespace process {
@@ -180,24 +180,14 @@ protected:
     assets[name] = asset;
   }
 
-  void lock()
-  {
-    pthread_mutex_lock(&m);
-  }
-
-  void unlock()
-  {
-    pthread_mutex_unlock(&m);
-  }
-
   template<typename T>
   size_t eventCount()
   {
     size_t count = 0U;
 
-    lock();
-    count = std::count_if(events.begin(), events.end(), isEventType<T>);
-    unlock();
+    synchronized (mutex) {
+      count = std::count_if(events.begin(), events.end(), isEventType<T>);
+    }
 
     return count;
   }
@@ -226,7 +216,7 @@ private:
 
   // Mutex protecting internals.
   // TODO(benh): Consider replacing with a spinlock, on multi-core systems.
-  pthread_mutex_t m;
+  std::recursive_mutex mutex;
 
   // Enqueue the specified message, request, or function call.
   void enqueue(Event* event, bool inject = false);

http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb1283b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index aadd7bb..c2baa6c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -2147,8 +2147,7 @@ void ProcessManager::resume(ProcessBase* process)
   while (!terminate && !blocked) {
     Event* event = NULL;
 
-    process->lock();
-    {
+    synchronized (process->mutex) {
       if (process->events.size() > 0) {
         event = process->events.front();
         process->events.pop_front();
@@ -2158,7 +2157,6 @@ void ProcessManager::resume(ProcessBase* process)
         blocked = true;
       }
     }
-    process->unlock();
 
     if (!blocked) {
       CHECK(event != NULL);
@@ -2251,13 +2249,11 @@ void ProcessManager::cleanup(ProcessBase* process)
   // another process that gets spawned with the same PID.
   deque<Event*> events;
 
-  process->lock();
-  {
+  synchronized (process->mutex) {
     process->state = ProcessBase::TERMINATING;
     events = process->events;
     process->events.clear();
   }
-  process->unlock();
 
   // Delete pending events.
   while (!events.empty()) {
@@ -2279,8 +2275,7 @@ void ProcessManager::cleanup(ProcessBase* process)
       __sync_synchronize();
     }
 
-    process->lock();
-    {
+    synchronized (process->mutex) {
       CHECK(process->events.empty());
 
       processes.erase(process->pid.id);
@@ -2296,7 +2291,6 @@ void ProcessManager::cleanup(ProcessBase* process)
       CHECK(process->refs == 0);
       process->state = ProcessBase::TERMINATED;
     }
-    process->unlock();
 
     // Note that we don't remove the process from the clock during
     // cleanup, but rather the clock is reset for a process when it is
@@ -2619,13 +2613,11 @@ Future<Response> ProcessManager::__processes__(const 
Request&)
         JSON::Array* events;
       } visitor(&events);
 
-      process->lock();
-      {
+      synchronized (process->mutex) {
         foreach (Event* event, process->events) {
           event->visit(&visitor);
         }
       }
-      process->unlock();
 
       object.values["events"] = events;
       array.values.push_back(object);
@@ -2642,12 +2634,6 @@ ProcessBase::ProcessBase(const string& id)
 
   state = ProcessBase::BOTTOM;
 
-  pthread_mutexattr_t attr;
-  pthread_mutexattr_init(&attr);
-  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
-  pthread_mutex_init(&m, &attr);
-  pthread_mutexattr_destroy(&attr);
-
   refs = 0;
 
   pid.id = id != "" ? id : ID::generate();
@@ -2669,8 +2655,7 @@ void ProcessBase::enqueue(Event* event, bool inject)
 {
   CHECK(event != NULL);
 
-  lock();
-  {
+  synchronized (mutex) {
     if (state != TERMINATING && state != TERMINATED) {
       if (!inject) {
         events.push_back(event);
@@ -2690,7 +2675,6 @@ void ProcessBase::enqueue(Event* event, bool inject)
       delete event;
     }
   }
-  unlock();
 }
 
 

Reply via email to