libprocess: Replace GCC instrinsics and volatile with std::atomic.

MESOS-3326.

Review: https://reviews.apache.org/r/37877


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4b938052
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4b938052
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4b938052

Branch: refs/heads/master
Commit: 4b938052b6af124eb1fdaec9b597c620627677ea
Parents: 4a01850
Author: Neil Conway <[email protected]>
Authored: Thu Sep 10 17:50:22 2015 -0700
Committer: Joris Van Remoortere <[email protected]>
Committed: Thu Sep 10 19:39:41 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/latch.hpp   |  4 +-
 .../include/process/metrics/counter.hpp         | 15 +++---
 3rdparty/libprocess/include/process/process.hpp |  2 +-
 3rdparty/libprocess/src/clock.cpp               |  5 +-
 3rdparty/libprocess/src/latch.cpp               | 15 +++---
 3rdparty/libprocess/src/process.cpp             | 52 ++++++++++----------
 3rdparty/libprocess/src/process_reference.hpp   |  8 +--
 7 files changed, 51 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/include/process/latch.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/latch.hpp 
b/3rdparty/libprocess/include/process/latch.hpp
index a1a2227..8a9d121 100644
--- a/3rdparty/libprocess/include/process/latch.hpp
+++ b/3rdparty/libprocess/include/process/latch.hpp
@@ -15,6 +15,8 @@
 #ifndef __PROCESS_LATCH_HPP__
 #define __PROCESS_LATCH_HPP__
 
+#include <atomic>
+
 #include <process/pid.hpp>
 
 #include <stout/duration.hpp>
@@ -43,7 +45,7 @@ private:
   Latch(const Latch& that);
   Latch& operator=(const Latch& that);
 
-  bool triggered;
+  std::atomic_bool triggered;
   UPID pid;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/include/process/metrics/counter.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/counter.hpp 
b/3rdparty/libprocess/include/process/metrics/counter.hpp
index e51a8be..fd8be32 100644
--- a/3rdparty/libprocess/include/process/metrics/counter.hpp
+++ b/3rdparty/libprocess/include/process/metrics/counter.hpp
@@ -35,19 +35,20 @@ public:
     : Metric(name, window),
       data(new Data())
   {
-    push(data->v);
+    push(data->value.load());
   }
 
   virtual ~Counter() {}
 
   virtual Future<double> value() const
   {
-    return static_cast<double>(data->v);
+    return static_cast<double>(data->value.load());
   }
 
   void reset()
   {
-    push(__sync_and_and_fetch(&data->v, 0));
+    data->value.store(0);
+    push(0);
   }
 
   Counter& operator++()
@@ -64,17 +65,17 @@ public:
 
   Counter& operator+=(int64_t v)
   {
-    push(__sync_add_and_fetch(&data->v, v));
+    int64_t prev = data->value.fetch_add(v);
+    push(prev + v);
     return *this;
   }
 
 private:
   struct Data
   {
-    explicit Data() : v(0) {}
+    explicit Data() : value(0) {}
 
-    // TODO(dhamon): Update to std::atomic<int64_t> when C++11 lands.
-    volatile int64_t v;
+    std::atomic<int64_t> value;
   };
 
   std::shared_ptr<Data> data;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp 
b/3rdparty/libprocess/include/process/process.hpp
index cc8317f..8b086f2 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -332,7 +332,7 @@ private:
   std::deque<Event*> events;
 
   // Active references.
-  int refs;
+  std::atomic_long refs;
 
   // Process PID.
   UPID pid;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/clock.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/clock.cpp 
b/3rdparty/libprocess/src/clock.cpp
index 09c60e5..5806098 100644
--- a/3rdparty/libprocess/src/clock.cpp
+++ b/3rdparty/libprocess/src/clock.cpp
@@ -247,14 +247,15 @@ Timer Clock::timer(
     const Duration& duration,
     const lambda::function<void(void)>& thunk)
 {
-  static uint64_t id = 1; // Start at 1 since Timer() instances use id 0.
+  // Start at 1 since Timer() instances use id 0.
+  static std::atomic<uint64_t> id(1);
 
   // Assumes Clock::now() does Clock::now(__process__).
   Timeout timeout = Timeout::in(duration);
 
   UPID pid = __process__ != NULL ? __process__->self() : UPID();
 
-  Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);
+  Timer timer(id.fetch_add(1), timeout, pid, thunk);
 
   VLOG(3) << "Created a timer for " << pid << " in " << stringify(duration)
           << " in the future (" << timeout.time() << ")";

http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/latch.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/latch.cpp 
b/3rdparty/libprocess/src/latch.cpp
index f7d94d9..f433a05 100644
--- a/3rdparty/libprocess/src/latch.cpp
+++ b/3rdparty/libprocess/src/latch.cpp
@@ -25,10 +25,8 @@ namespace process {
 // within libprocess such that it doesn't cost a memory allocation, a
 // spawn, a message send, a wait, and two user-space context-switchs.
 
-Latch::Latch()
+Latch::Latch() : triggered(false)
 {
-  triggered = false;
-
   // Deadlock is possible if one thread is trying to delete a latch
   // but the libprocess thread(s) is trying to acquire a resource the
   // deleting thread is holding. Hence, we only save the PID for
@@ -40,7 +38,8 @@ Latch::Latch()
 
 Latch::~Latch()
 {
-  if (__sync_bool_compare_and_swap(&triggered, false, true)) {
+  bool expected = false;
+  if (triggered.compare_exchange_strong(expected, true)) {
     terminate(pid);
   }
 }
@@ -48,8 +47,8 @@ Latch::~Latch()
 
 bool Latch::trigger()
 {
-  // TODO(benh): Use std::atomic when C++11 rolls out.
-  if (__sync_bool_compare_and_swap(&triggered, false, true)) {
+  bool expected = false;
+  if (triggered.compare_exchange_strong(expected, true)) {
     terminate(pid);
     return true;
   }
@@ -59,7 +58,7 @@ bool Latch::trigger()
 
 bool Latch::await(const Duration& duration)
 {
-  if (!triggered) {
+  if (!triggered.load()) {
     process::wait(pid, duration); // Explict to disambiguate.
     // It's possible that we failed to wait because:
     //   (1) Our process has already terminated.
@@ -71,7 +70,7 @@ bool Latch::await(const Duration& duration)
     // 'triggered' (which will also capture cases where we actually
     // timed out but have since triggered, which seems like an
     // acceptable semantics given such a "tie").
-    return triggered;
+    return triggered.load();
   }
 
   return true;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index 0e5394a..4afa305 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -427,7 +427,7 @@ private:
   std::recursive_mutex runq_mutex;
 
   // Number of running processes, to support Clock::settle operation.
-  int running;
+  std::atomic_long running;
 
   // List of rules applied to all incoming HTTP requests.
   vector<Owned<FirewallRule>> firewallRules;
@@ -746,23 +746,26 @@ void install(vector<Owned<FirewallRule>>&& rules)
 void initialize(const string& delegate)
 {
   // TODO(benh): Return an error if attempting to initialize again
-  // with a different delegate then originally specified.
+  // with a different delegate than originally specified.
 
   // static pthread_once_t init = PTHREAD_ONCE_INIT;
   // pthread_once(&init, ...);
 
-  static volatile bool initialized = false;
-  static volatile bool initializing = true;
+  static std::atomic_bool initialized(false);
+  static std::atomic_bool initializing(true);
 
   // Try and do the initialization or wait for it to complete.
-  if (initialized && !initializing) {
+  // TODO(neilc): Try to simplify and/or document this logic.
+  if (initialized.load() && !initializing.load()) {
     return;
-  } else if (initialized && initializing) {
-    while (initializing);
+  } else if (initialized.load() && initializing.load()) {
+    while (initializing.load());
     return;
   } else {
-    if (!__sync_bool_compare_and_swap(&initialized, false, true)) {
-      while (initializing);
+    // `compare_exchange_strong` needs an lvalue.
+    bool expected = false;
+    if (!initialized.compare_exchange_strong(expected, true)) {
+      while (initializing.load());
       return;
     }
   }
@@ -945,9 +948,9 @@ void initialize(const string& delegate)
     PLOG(FATAL) << "Failed to initialize: " << listen.error();
   }
 
-  // Need to set initialzing here so that we can actually invoke
-  // 'spawn' below for the garbage collector.
-  initializing = false;
+  // Need to set `initializing` here so that we can actually invoke `spawn()`
+  // below for the garbage collector.
+  initializing.store(false);
 
   __s__->accept()
     .onAny(lambda::bind(&internal::on_accept, lambda::_1));
@@ -998,7 +1001,7 @@ void finalize()
 {
   delete process_manager;
 
-  // TODO(benh): Finialize/shutdown Clock so that it doesn't attempt
+  // TODO(benh): Finalize/shutdown Clock so that it doesn't attempt
   // to dereference 'process_manager' in the 'timedout' callback.
 }
 
@@ -2111,8 +2114,7 @@ void SocketManager::swap_implementing_socket(const 
Socket& from, Socket* to)
 ProcessManager::ProcessManager(const string& _delegate)
   : delegate(_delegate)
 {
-  running = 0;
-  __sync_synchronize(); // Ensure write to 'running' visible in other threads.
+  running.store(0);
 }
 
 
@@ -2485,8 +2487,8 @@ void ProcessManager::resume(ProcessBase* process)
 
   __process__ = NULL;
 
-  CHECK_GE(running, 1);
-  __sync_fetch_and_sub(&running, 1);
+  CHECK_GE(running.load(), 1);
+  running.fetch_sub(1);
 }
 
 
@@ -2525,11 +2527,10 @@ void ProcessManager::cleanup(ProcessBase* process)
   // Remove process.
   synchronized (processes_mutex) {
     // Wait for all process references to get cleaned up.
-    while (process->refs > 0) {
+    while (process->refs.load() > 0) {
 #if defined(__i386__) || defined(__x86_64__)
       asm ("pause");
 #endif
-      __sync_synchronize();
     }
 
     synchronized (process->mutex) {
@@ -2545,7 +2546,7 @@ void ProcessManager::cleanup(ProcessBase* process)
         gates.erase(it);
       }
 
-      CHECK(process->refs == 0);
+      CHECK(process->refs.load() == 0);
       process->state = ProcessBase::TERMINATED;
     }
 
@@ -2672,7 +2673,7 @@ bool ProcessManager::wait(const UPID& pid)
             // 'runq' and 'running' equal to 0 between when we exit
             // this critical section and increment 'running').
             runq.erase(it);
-            __sync_fetch_and_add(&running, 1);
+            running.fetch_add(1);
           } else {
             // Another thread has resumed the process ...
             process = NULL;
@@ -2783,7 +2784,7 @@ ProcessBase* ProcessManager::dequeue()
       // Increment the running count of processes in order to support
       // the Clock::settle() operation (this must be done atomically
       // with removing the process from the runq).
-      __sync_fetch_and_add(&running, 1);
+      running.fetch_add(1);
     }
   }
 
@@ -2803,7 +2804,7 @@ void ProcessManager::settle()
     // expect the http::get will have properly enqueued a process on
     // the run queue but http::get is just sending bytes on a
     // socket. Without sleeping at the beginning of this function we
-    // can get unlucky and appear settled when in actuallity the
+    // can get unlucky and appear settled when in actuality the
     // kernel just hasn't copied the bytes to a socket or we haven't
     // yet read the bytes and enqueued an event on a process (and the
     // process on the run queue).
@@ -2817,10 +2818,7 @@ void ProcessManager::settle()
         continue;
       }
 
-      // Read barrier for 'running'.
-      __sync_synchronize();
-
-      if (running > 0) {
+      if (running.load() > 0) {
         done = false;
         continue;
       }

http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/process_reference.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process_reference.hpp 
b/3rdparty/libprocess/src/process_reference.hpp
index f8df4a6..e6110bb 100644
--- a/3rdparty/libprocess/src/process_reference.hpp
+++ b/3rdparty/libprocess/src/process_reference.hpp
@@ -66,7 +66,7 @@ private:
     : process(_process)
   {
     if (process != NULL) {
-      __sync_fetch_and_add(&(process->refs), 1);
+      process->refs.fetch_add(1);
     }
   }
 
@@ -78,15 +78,15 @@ private:
       // There should be at least one reference to the process, so
       // we don't need to worry about checking if it's exiting or
       // not, since we know we can always create another reference.
-      CHECK(process->refs > 0);
-      __sync_fetch_and_add(&(process->refs), 1);
+      CHECK(process->refs.load() > 0);
+      process->refs.fetch_add(1);
     }
   }
 
   void cleanup()
   {
     if (process != NULL) {
-      __sync_fetch_and_sub(&(process->refs), 1);
+      process->refs.fetch_sub(1);
     }
   }
 

Reply via email to