libprocess: Replace GCC instrinsics and volatile with std::atomic. MESOS-3326.
Review: https://reviews.apache.org/r/37877 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4b938052 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4b938052 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4b938052 Branch: refs/heads/master Commit: 4b938052b6af124eb1fdaec9b597c620627677ea Parents: 4a01850 Author: Neil Conway <[email protected]> Authored: Thu Sep 10 17:50:22 2015 -0700 Committer: Joris Van Remoortere <[email protected]> Committed: Thu Sep 10 19:39:41 2015 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/latch.hpp | 4 +- .../include/process/metrics/counter.hpp | 15 +++--- 3rdparty/libprocess/include/process/process.hpp | 2 +- 3rdparty/libprocess/src/clock.cpp | 5 +- 3rdparty/libprocess/src/latch.cpp | 15 +++--- 3rdparty/libprocess/src/process.cpp | 52 ++++++++++---------- 3rdparty/libprocess/src/process_reference.hpp | 8 +-- 7 files changed, 51 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/include/process/latch.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/latch.hpp b/3rdparty/libprocess/include/process/latch.hpp index a1a2227..8a9d121 100644 --- a/3rdparty/libprocess/include/process/latch.hpp +++ b/3rdparty/libprocess/include/process/latch.hpp @@ -15,6 +15,8 @@ #ifndef __PROCESS_LATCH_HPP__ #define __PROCESS_LATCH_HPP__ +#include <atomic> + #include <process/pid.hpp> #include <stout/duration.hpp> @@ -43,7 +45,7 @@ private: Latch(const Latch& that); Latch& operator=(const Latch& that); - bool triggered; + std::atomic_bool triggered; UPID pid; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/include/process/metrics/counter.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/metrics/counter.hpp b/3rdparty/libprocess/include/process/metrics/counter.hpp index e51a8be..fd8be32 100644 --- a/3rdparty/libprocess/include/process/metrics/counter.hpp +++ b/3rdparty/libprocess/include/process/metrics/counter.hpp @@ -35,19 +35,20 @@ public: : Metric(name, window), data(new Data()) { - push(data->v); + push(data->value.load()); } virtual ~Counter() {} virtual Future<double> value() const { - return static_cast<double>(data->v); + return static_cast<double>(data->value.load()); } void reset() { - push(__sync_and_and_fetch(&data->v, 0)); + data->value.store(0); + push(0); } Counter& operator++() @@ -64,17 +65,17 @@ public: Counter& operator+=(int64_t v) { - push(__sync_add_and_fetch(&data->v, v)); + int64_t prev = data->value.fetch_add(v); + push(prev + v); return *this; } private: struct Data { - explicit Data() : v(0) {} + explicit Data() : value(0) {} - // TODO(dhamon): Update to std::atomic<int64_t> when C++11 lands. - volatile int64_t v; + std::atomic<int64_t> value; }; std::shared_ptr<Data> data; http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/include/process/process.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp index cc8317f..8b086f2 100644 --- a/3rdparty/libprocess/include/process/process.hpp +++ b/3rdparty/libprocess/include/process/process.hpp @@ -332,7 +332,7 @@ private: std::deque<Event*> events; // Active references. - int refs; + std::atomic_long refs; // Process PID. UPID pid; http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/clock.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp index 09c60e5..5806098 100644 --- a/3rdparty/libprocess/src/clock.cpp +++ b/3rdparty/libprocess/src/clock.cpp @@ -247,14 +247,15 @@ Timer Clock::timer( const Duration& duration, const lambda::function<void(void)>& thunk) { - static uint64_t id = 1; // Start at 1 since Timer() instances use id 0. + // Start at 1 since Timer() instances use id 0. + static std::atomic<uint64_t> id(1); // Assumes Clock::now() does Clock::now(__process__). Timeout timeout = Timeout::in(duration); UPID pid = __process__ != NULL ? __process__->self() : UPID(); - Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk); + Timer timer(id.fetch_add(1), timeout, pid, thunk); VLOG(3) << "Created a timer for " << pid << " in " << stringify(duration) << " in the future (" << timeout.time() << ")"; http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/latch.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/latch.cpp b/3rdparty/libprocess/src/latch.cpp index f7d94d9..f433a05 100644 --- a/3rdparty/libprocess/src/latch.cpp +++ b/3rdparty/libprocess/src/latch.cpp @@ -25,10 +25,8 @@ namespace process { // within libprocess such that it doesn't cost a memory allocation, a // spawn, a message send, a wait, and two user-space context-switchs. -Latch::Latch() +Latch::Latch() : triggered(false) { - triggered = false; - // Deadlock is possible if one thread is trying to delete a latch // but the libprocess thread(s) is trying to acquire a resource the // deleting thread is holding. Hence, we only save the PID for @@ -40,7 +38,8 @@ Latch::Latch() Latch::~Latch() { - if (__sync_bool_compare_and_swap(&triggered, false, true)) { + bool expected = false; + if (triggered.compare_exchange_strong(expected, true)) { terminate(pid); } } @@ -48,8 +47,8 @@ Latch::~Latch() bool Latch::trigger() { - // TODO(benh): Use std::atomic when C++11 rolls out. - if (__sync_bool_compare_and_swap(&triggered, false, true)) { + bool expected = false; + if (triggered.compare_exchange_strong(expected, true)) { terminate(pid); return true; } @@ -59,7 +58,7 @@ bool Latch::trigger() bool Latch::await(const Duration& duration) { - if (!triggered) { + if (!triggered.load()) { process::wait(pid, duration); // Explict to disambiguate. // It's possible that we failed to wait because: // (1) Our process has already terminated. @@ -71,7 +70,7 @@ bool Latch::await(const Duration& duration) // 'triggered' (which will also capture cases where we actually // timed out but have since triggered, which seems like an // acceptable semantics given such a "tie"). - return triggered; + return triggered.load(); } return true; http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 0e5394a..4afa305 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -427,7 +427,7 @@ private: std::recursive_mutex runq_mutex; // Number of running processes, to support Clock::settle operation. - int running; + std::atomic_long running; // List of rules applied to all incoming HTTP requests. vector<Owned<FirewallRule>> firewallRules; @@ -746,23 +746,26 @@ void install(vector<Owned<FirewallRule>>&& rules) void initialize(const string& delegate) { // TODO(benh): Return an error if attempting to initialize again - // with a different delegate then originally specified. + // with a different delegate than originally specified. // static pthread_once_t init = PTHREAD_ONCE_INIT; // pthread_once(&init, ...); - static volatile bool initialized = false; - static volatile bool initializing = true; + static std::atomic_bool initialized(false); + static std::atomic_bool initializing(true); // Try and do the initialization or wait for it to complete. - if (initialized && !initializing) { + // TODO(neilc): Try to simplify and/or document this logic. + if (initialized.load() && !initializing.load()) { return; - } else if (initialized && initializing) { - while (initializing); + } else if (initialized.load() && initializing.load()) { + while (initializing.load()); return; } else { - if (!__sync_bool_compare_and_swap(&initialized, false, true)) { - while (initializing); + // `compare_exchange_strong` needs an lvalue. + bool expected = false; + if (!initialized.compare_exchange_strong(expected, true)) { + while (initializing.load()); return; } } @@ -945,9 +948,9 @@ void initialize(const string& delegate) PLOG(FATAL) << "Failed to initialize: " << listen.error(); } - // Need to set initialzing here so that we can actually invoke - // 'spawn' below for the garbage collector. - initializing = false; + // Need to set `initializing` here so that we can actually invoke `spawn()` + // below for the garbage collector. + initializing.store(false); __s__->accept() .onAny(lambda::bind(&internal::on_accept, lambda::_1)); @@ -998,7 +1001,7 @@ void finalize() { delete process_manager; - // TODO(benh): Finialize/shutdown Clock so that it doesn't attempt + // TODO(benh): Finalize/shutdown Clock so that it doesn't attempt // to dereference 'process_manager' in the 'timedout' callback. } @@ -2111,8 +2114,7 @@ void SocketManager::swap_implementing_socket(const Socket& from, Socket* to) ProcessManager::ProcessManager(const string& _delegate) : delegate(_delegate) { - running = 0; - __sync_synchronize(); // Ensure write to 'running' visible in other threads. + running.store(0); } @@ -2485,8 +2487,8 @@ void ProcessManager::resume(ProcessBase* process) __process__ = NULL; - CHECK_GE(running, 1); - __sync_fetch_and_sub(&running, 1); + CHECK_GE(running.load(), 1); + running.fetch_sub(1); } @@ -2525,11 +2527,10 @@ void ProcessManager::cleanup(ProcessBase* process) // Remove process. synchronized (processes_mutex) { // Wait for all process references to get cleaned up. - while (process->refs > 0) { + while (process->refs.load() > 0) { #if defined(__i386__) || defined(__x86_64__) asm ("pause"); #endif - __sync_synchronize(); } synchronized (process->mutex) { @@ -2545,7 +2546,7 @@ void ProcessManager::cleanup(ProcessBase* process) gates.erase(it); } - CHECK(process->refs == 0); + CHECK(process->refs.load() == 0); process->state = ProcessBase::TERMINATED; } @@ -2672,7 +2673,7 @@ bool ProcessManager::wait(const UPID& pid) // 'runq' and 'running' equal to 0 between when we exit // this critical section and increment 'running'). runq.erase(it); - __sync_fetch_and_add(&running, 1); + running.fetch_add(1); } else { // Another thread has resumed the process ... process = NULL; @@ -2783,7 +2784,7 @@ ProcessBase* ProcessManager::dequeue() // Increment the running count of processes in order to support // the Clock::settle() operation (this must be done atomically // with removing the process from the runq). - __sync_fetch_and_add(&running, 1); + running.fetch_add(1); } } @@ -2803,7 +2804,7 @@ void ProcessManager::settle() // expect the http::get will have properly enqueued a process on // the run queue but http::get is just sending bytes on a // socket. Without sleeping at the beginning of this function we - // can get unlucky and appear settled when in actuallity the + // can get unlucky and appear settled when in actuality the // kernel just hasn't copied the bytes to a socket or we haven't // yet read the bytes and enqueued an event on a process (and the // process on the run queue). @@ -2817,10 +2818,7 @@ void ProcessManager::settle() continue; } - // Read barrier for 'running'. - __sync_synchronize(); - - if (running > 0) { + if (running.load() > 0) { done = false; continue; } http://git-wip-us.apache.org/repos/asf/mesos/blob/4b938052/3rdparty/libprocess/src/process_reference.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process_reference.hpp b/3rdparty/libprocess/src/process_reference.hpp index f8df4a6..e6110bb 100644 --- a/3rdparty/libprocess/src/process_reference.hpp +++ b/3rdparty/libprocess/src/process_reference.hpp @@ -66,7 +66,7 @@ private: : process(_process) { if (process != NULL) { - __sync_fetch_and_add(&(process->refs), 1); + process->refs.fetch_add(1); } } @@ -78,15 +78,15 @@ private: // There should be at least one reference to the process, so // we don't need to worry about checking if it's exiting or // not, since we know we can always create another reference. - CHECK(process->refs > 0); - __sync_fetch_and_add(&(process->refs), 1); + CHECK(process->refs.load() > 0); + process->refs.fetch_add(1); } } void cleanup() { if (process != NULL) { - __sync_fetch_and_sub(&(process->refs), 1); + process->refs.fetch_sub(1); } }
