Removed pthread and used Latch in executor and scheduler drivers.

Review: https://reviews.apache.org/r/36674


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4fc8089b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4fc8089b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4fc8089b

Branch: refs/heads/master
Commit: 4fc8089bbefda0fbc640da6ecf0be37020e9f680
Parents: 1bd50fc
Author: Joris Van Remoortere <[email protected]>
Authored: Fri Jul 24 14:35:12 2015 -0700
Committer: Benjamin Hindman <[email protected]>
Committed: Fri Jul 24 15:29:04 2015 -0700

----------------------------------------------------------------------
 include/mesos/executor.hpp  | 16 ++++++++------
 include/mesos/scheduler.hpp | 16 ++++++++------
 src/exec/exec.cpp           | 41 ++++++++++++++++++------------------
 src/sched/sched.cpp         | 45 ++++++++++++++++++++--------------------
 4 files changed, 63 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4fc8089b/include/mesos/executor.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/executor.hpp b/include/mesos/executor.hpp
index f3cd3cc..72eca97 100644
--- a/include/mesos/executor.hpp
+++ b/include/mesos/executor.hpp
@@ -19,8 +19,7 @@
 #ifndef __MESOS_EXECUTOR_HPP__
 #define __MESOS_EXECUTOR_HPP__
 
-#include <pthread.h>
-
+#include <mutex>
 #include <string>
 
 #include <mesos/mesos.hpp>
@@ -46,6 +45,11 @@
 // THE SAME MODIFICATIONS FOR OTHER LANGUAGE BINDINGS (e.g., Java:
 // src/java/src/org/apache/mesos, Python: src/python/src, etc.).
 
+// Forward declaration.
+namespace process {
+class Latch;
+} // namespace process {
+
 namespace mesos {
 
 // A few forward declarations.
@@ -236,11 +240,11 @@ private:
   // Libprocess process for communicating with slave.
   internal::ExecutorProcess* process;
 
-  // Mutex to enforce all non-callbacks are execute serially.
-  pthread_mutex_t mutex;
+  // Mutex for enforcing serial execution of all non-callbacks.
+  std::recursive_mutex mutex;
 
-  // Condition variable for waiting until driver terminates.
-  pthread_cond_t cond;
+  // Latch for waiting until driver terminates.
+  process::Latch* latch;
 
   // Current status of the driver.
   Status status;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4fc8089b/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 9dae0a8..cd235a1 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -19,9 +19,8 @@
 #ifndef __MESOS_SCHEDULER_HPP__
 #define __MESOS_SCHEDULER_HPP__
 
-#include <pthread.h>
-
 #include <functional>
+#include <mutex>
 #include <queue>
 #include <string>
 #include <vector>
@@ -38,6 +37,11 @@
 // THE SAME MODIFICATIONS FOR OTHER LANGUAGE BINDINGS (e.g., Java:
 // src/java/src/org/apache/mesos, Python: src/python/src, etc.).
 
+// Forward declaration.
+namespace process {
+class Latch;
+} // namespace process {
+
 namespace mesos {
 
 // A few forward declarations.
@@ -445,11 +449,11 @@ private:
   // URL for the master (e.g., zk://, file://, etc).
   std::string url;
 
-  // Mutex to enforce all non-callbacks are executed serially.
-  pthread_mutex_t mutex;
+  // Mutex for enforcing serial execution of all non-callbacks.
+  std::recursive_mutex mutex;
 
-  // Condition variable for waiting until driver terminates.
-  pthread_cond_t cond;
+  // Latch for waiting until driver terminates.
+  process::Latch* latch;
 
   // Current status of the driver.
   Status status;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4fc8089b/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index 54ef622..d59a7e1 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -31,6 +31,7 @@
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
 #include <process/id.hpp>
+#include <process/latch.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
 
@@ -64,6 +65,7 @@ using namespace process;
 
 using std::string;
 
+using process::Latch;
 using process::wait; // Necessary on some OS's to disambiguate.
 
 
@@ -110,8 +112,8 @@ public:
                   const string& _directory,
                   bool _checkpoint,
                   Duration _recoveryTimeout,
-                  pthread_mutex_t* _mutex,
-                  pthread_cond_t* _cond)
+                  std::recursive_mutex* _mutex,
+                  Latch* _latch)
     : ProcessBase(ID::generate("executor")),
       slave(_slave),
       driver(_driver),
@@ -124,7 +126,7 @@ public:
       local(_local),
       aborted(false),
       mutex(_mutex),
-      cond(_cond),
+      latch(_latch),
       directory(_directory),
       checkpoint(_checkpoint),
       recoveryTimeout(_recoveryTimeout)
@@ -405,7 +407,7 @@ protected:
     terminate(self());
 
     synchronized (mutex) {
-      pthread_cond_signal(cond);
+      CHECK_NOTNULL(latch)->trigger();
     }
   }
 
@@ -415,7 +417,7 @@ protected:
     CHECK(aborted);
 
     synchronized (mutex) {
-      pthread_cond_signal(cond);
+      CHECK_NOTNULL(latch)->trigger();
     }
   }
 
@@ -543,8 +545,8 @@ private:
   UUID connection; // UUID to identify the connection instance.
   bool local;
   volatile bool aborted;
-  pthread_mutex_t* mutex;
-  pthread_cond_t* cond;
+  std::recursive_mutex* mutex;
+  Latch* latch;
   const string directory;
   bool checkpoint;
   Duration recoveryTimeout;
@@ -583,13 +585,8 @@ MesosExecutorDriver::MesosExecutorDriver(Executor* 
_executor)
     return;
   }
 
-  // Create mutex and condition variable.
-  pthread_mutexattr_t attr;
-  pthread_mutexattr_init(&attr);
-  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
-  pthread_mutex_init(&mutex, &attr);
-  pthread_mutexattr_destroy(&attr);
-  pthread_cond_init(&cond, 0);
+  // Initialize Latch.
+  latch = new Latch();
 
   // Initialize libprocess.
   process::initialize();
@@ -612,8 +609,7 @@ MesosExecutorDriver::~MesosExecutorDriver()
   wait(process);
   delete process;
 
-  pthread_mutex_destroy(&mutex);
-  pthread_cond_destroy(&cond);
+  delete latch;
 }
 
 
@@ -717,7 +713,7 @@ Status MesosExecutorDriver::start()
         checkpoint,
         recoveryTimeout,
         &mutex,
-        &cond);
+        latch);
 
     spawn(process);
 
@@ -774,15 +770,20 @@ Status MesosExecutorDriver::abort()
 
 Status MesosExecutorDriver::join()
 {
+  // Exit early if the driver is not running.
   synchronized (mutex) {
     if (status != DRIVER_RUNNING) {
       return status;
     }
+  }
 
-    while (status == DRIVER_RUNNING) {
-      pthread_cond_wait(&cond, &mutex);
-    }
+  // If the driver was running, the latch will be triggered regardless
+  // of the current `status`. Wait for this to happen to signify
+  // termination.
+  CHECK_NOTNULL(latch)->await();
 
+  // Now return the current `status` of the driver.
+  synchronized (mutex) {
     CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
 
     return status;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4fc8089b/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 1bcc376..db0653d 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -46,6 +46,7 @@
 #include <process/dispatch.hpp>
 #include <process/future.hpp>
 #include <process/id.hpp>
+#include <process/latch.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
 #include <process/process.hpp>
@@ -96,6 +97,7 @@ using namespace mesos::scheduler;
 using process::Clock;
 using process::DispatchEvent;
 using process::Future;
+using process::Latch;
 using process::MessageEvent;
 using process::Process;
 using process::UPID;
@@ -128,8 +130,8 @@ public:
                    const string& schedulerId,
                    MasterDetector* _detector,
                    const internal::scheduler::Flags& _flags,
-                   pthread_mutex_t* _mutex,
-                   pthread_cond_t* _cond)
+                   std::recursive_mutex* _mutex,
+                   Latch* _latch)
       // We use a UUID here to ensure that the master can reliably
       // distinguish between scheduler runs. Otherwise the master may
       // receive a delayed ExitedEvent enqueued behind a
@@ -146,7 +148,7 @@ public:
       scheduler(_scheduler),
       framework(_framework),
       mutex(_mutex),
-      cond(_cond),
+      latch(_latch),
       failover(_framework.has_id() && !framework.id().value().empty()),
       connected(false),
       running(true),
@@ -1047,7 +1049,7 @@ protected:
     }
 
     synchronized (mutex) {
-      pthread_cond_signal(cond);
+      CHECK_NOTNULL(latch)->trigger();
     }
   }
 
@@ -1073,7 +1075,7 @@ protected:
     }
 
     synchronized (mutex) {
-      pthread_cond_signal(cond);
+      CHECK_NOTNULL(latch)->trigger();
     }
   }
 
@@ -1410,8 +1412,8 @@ private:
   MesosSchedulerDriver* driver;
   Scheduler* scheduler;
   FrameworkInfo framework;
-  pthread_mutex_t* mutex;
-  pthread_cond_t* cond;
+  std::recursive_mutex* mutex;
+  Latch* latch;
 
   bool failover;
 
@@ -1500,15 +1502,8 @@ void MesosSchedulerDriver::initialize() {
     VLOG(1) << "Disabling initialization of GLOG logging";
   }
 
-  // Initialize mutex and condition variable. TODO(benh): Consider
-  // using a libprocess Latch rather than a pthread mutex and
-  // condition variable for signaling.
-  pthread_mutexattr_t attr;
-  pthread_mutexattr_init(&attr);
-  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
-  pthread_mutex_init(&mutex, &attr);
-  pthread_mutexattr_destroy(&attr);
-  pthread_cond_init(&cond, 0);
+  // Initialize Latch.
+  latch = new Latch();
 
   // TODO(benh): Check the user the framework wants to run tasks as,
   // see if the current user can switch to that user, or via an
@@ -1657,8 +1652,7 @@ MesosSchedulerDriver::~MesosSchedulerDriver()
     delete process;
   }
 
-  pthread_mutex_destroy(&mutex);
-  pthread_cond_destroy(&cond);
+  delete latch;
 
   if (detector != NULL) {
     delete detector;
@@ -1727,7 +1721,7 @@ Status MesosSchedulerDriver::start()
           detector,
           flags,
           &mutex,
-          &cond);
+          latch);
     } else {
       const Credential& cred = *credential;
       process = new SchedulerProcess(
@@ -1740,7 +1734,7 @@ Status MesosSchedulerDriver::start()
           detector,
           flags,
           &mutex,
-          &cond);
+          latch);
     }
 
     spawn(process);
@@ -1810,15 +1804,20 @@ Status MesosSchedulerDriver::abort()
 
 Status MesosSchedulerDriver::join()
 {
+  // Exit early if the driver is not running.
   synchronized (mutex) {
     if (status != DRIVER_RUNNING) {
       return status;
     }
+  }
 
-    while (status == DRIVER_RUNNING) {
-      pthread_cond_wait(&cond, &mutex);
-    }
+  // If the driver was running, the latch will be triggered regardless
+  // of the current `status`. Wait for this to happen to signify
+  // termination.
+  CHECK_NOTNULL(latch)->await();
 
+  // Now return the current `status` of the driver.
+  synchronized (mutex) {
     CHECK(status == DRIVER_ABORTED || status == DRIVER_STOPPED);
 
     return status;

Reply via email to