Repository: mesos Updated Branches: refs/heads/master 81efd727e -> 87de003c6
mesos: Replace volatile with std::atomic. MESOS-3326. Review: https://reviews.apache.org/r/37878 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/87de003c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/87de003c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/87de003c Branch: refs/heads/master Commit: 87de003c6e8a4dfc2d29ae8b0aab1f83ff0c66a3 Parents: 4b93805 Author: Neil Conway <[email protected]> Authored: Thu Sep 10 17:50:33 2015 -0700 Committer: Joris Van Remoortere <[email protected]> Committed: Thu Sep 10 19:39:41 2015 -0700 ---------------------------------------------------------------------- src/exec/exec.cpp | 32 +++++++++++++++---------------- src/sched/sched.cpp | 50 +++++++++++++++++++++++------------------------- 2 files changed, 40 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/87de003c/src/exec/exec.cpp ---------------------------------------------------------------------- diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp index 31e0c2f..7b51baa 100644 --- a/src/exec/exec.cpp +++ b/src/exec/exec.cpp @@ -20,6 +20,7 @@ #include <sys/types.h> +#include <atomic> #include <iostream> #include <string> #include <sstream> @@ -198,7 +199,7 @@ protected: const SlaveID& slaveId, const SlaveInfo& slaveInfo) { - if (aborted) { + if (aborted.load()) { VLOG(1) << "Ignoring registered message from slave " << slaveId << " because the driver is aborted!"; return; @@ -221,7 +222,7 @@ protected: void reregistered(const SlaveID& slaveId, const SlaveInfo& slaveInfo) { - if (aborted) { + if (aborted.load()) { VLOG(1) << "Ignoring re-registered message from slave " << slaveId << " because the driver is aborted!"; return; @@ -244,7 +245,7 @@ protected: void reconnect(const UPID& from, const SlaveID& slaveId) { - if (aborted) { + if (aborted.load()) { VLOG(1) << "Ignoring reconnect message from slave " << slaveId << " because the driver is aborted!"; return; @@ -280,7 +281,7 @@ protected: void runTask(const TaskInfo& task) { - if (aborted) { + if (aborted.load()) { VLOG(1) << "Ignoring run task message for task " << task.task_id() << " because the driver is aborted!"; return; @@ -305,7 +306,7 @@ protected: void killTask(const TaskID& taskId) { - if (aborted) { + if (aborted.load()) { VLOG(1) << "Ignoring kill task message for task " << taskId <<" because the driver is aborted!"; return; @@ -329,7 +330,7 @@ protected: const TaskID& taskId, const string& uuid) { - if (aborted) { + if (aborted.load()) { VLOG(1) << "Ignoring status update acknowledgement " << UUID::fromBytes(uuid) << " for task " << taskId << " of framework " << frameworkId @@ -353,7 +354,7 @@ protected: const ExecutorID& executorId, const string& data) { - if (aborted) { + if (aborted.load()) { VLOG(1) << "Ignoring framework message because the driver is aborted!"; return; } @@ -372,7 +373,7 @@ protected: void shutdown() { - if (aborted) { + if (aborted.load()) { VLOG(1) << "Ignoring shutdown message because the driver is aborted!"; return; } @@ -394,7 +395,7 @@ protected: VLOG(1) << "Executor::shutdown took " << stopwatch.elapsed(); - aborted = true; // To make sure not to accept any new messages. + aborted.store(true); // To make sure not to accept any new messages. if (local) { terminate(this); @@ -413,7 +414,7 @@ protected: void abort() { LOG(INFO) << "Deactivating the executor libprocess"; - CHECK(aborted); + CHECK(aborted.load()); synchronized (mutex) { CHECK_NOTNULL(latch)->trigger(); @@ -439,7 +440,7 @@ protected: virtual void exited(const UPID& pid) { - if (aborted) { + if (aborted.load()) { VLOG(1) << "Ignoring exited event because the driver is aborted!"; return; } @@ -478,7 +479,7 @@ protected: VLOG(1) << "Executor::shutdown took " << stopwatch.elapsed(); - aborted = true; // To make sure not to accept any new messages. + aborted.store(true); // To make sure not to accept any new messages. // This is a pretty bad state ... no slave is left. Rather // than exit lets kill our process group (which includes @@ -543,7 +544,7 @@ private: bool connected; // Registered with the slave. UUID connection; // UUID to identify the connection instance. bool local; - volatile bool aborted; + std::atomic_bool aborted; std::recursive_mutex* mutex; Latch* latch; const string directory; @@ -750,12 +751,11 @@ Status MesosExecutorDriver::abort() CHECK(process != NULL); - // We set the volatile aborted to true here to prevent any further + // We set the atomic aborted to true here to prevent any further // messages from being processed in the ExecutorProcess. However, // if abort() is called from another thread as the ExecutorProcess, // there may be at most one additional message processed. - // TODO(bmahler): Use an atomic boolean. - process->aborted = true; + process->aborted.store(true); // Dispatching here ensures that we still process the outstanding // requests *from* the executor, since those do proceed when http://git-wip-us.apache.org/repos/asf/mesos/blob/87de003c/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 012af05..1fc9e73 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -221,7 +221,7 @@ protected: void detected(const Future<Option<MasterInfo> >& _master) { - if (!running) { + if (!running.load()) { VLOG(1) << "Ignoring the master change because the driver is not" << " running!"; return; @@ -292,7 +292,7 @@ protected: void authenticate() { - if (!running) { + if (!running.load()) { VLOG(1) << "Ignoring authenticate because the driver is not running!"; return; } @@ -360,7 +360,7 @@ protected: void _authenticate() { - if (!running) { + if (!running.load()) { VLOG(1) << "Ignoring _authenticate because the driver is not running!"; return; } @@ -415,7 +415,7 @@ protected: void authenticationTimeout(Future<bool> future) { - if (!running) { + if (!running.load()) { VLOG(1) << "Ignoring authentication timeout because " << "the driver is not running!"; return; @@ -617,7 +617,7 @@ protected: const FrameworkID& frameworkId, const MasterInfo& masterInfo) { - if (!running) { + if (!running.load()) { VLOG(1) << "Ignoring framework registered message because " << "the driver is not running!"; return; @@ -659,7 +659,7 @@ protected: const FrameworkID& frameworkId, const MasterInfo& masterInfo) { - if (!running) { + if (!running.load()) { VLOG(1) << "Ignoring framework re-registered message because " << "the driver is not running!"; return; @@ -698,7 +698,7 @@ protected: void doReliableRegistration(Duration maxBackoff) { - if (!running) { + if (!running.load()) { return; } @@ -755,7 +755,7 @@ protected: const vector<Offer>& offers, const vector<string>& pids) { - if (!running) { + if (!running.load()) { VLOG(1) << "Ignoring resource offers message because " << "the driver is not running!"; return; @@ -805,7 +805,7 @@ protected: void rescindOffer(const UPID& from, const OfferID& offerId) { - if (!running) { + if (!running.load()) { VLOG(1) << "Ignoring rescind offer message because " << "the driver is not running!"; return; @@ -845,7 +845,7 @@ protected: const StatusUpdate& update, const UPID& pid) { - if (!running) { + if (!running.load()) { VLOG(1) << "Ignoring task status update message because " << "the driver is not running!"; return; @@ -910,10 +910,10 @@ protected: VLOG(1) << "Scheduler::statusUpdate took " << stopwatch.elapsed(); if (implicitAcknowledgements) { - // Note that we need to look at the volatile 'running' here + // Note that we need to look at the atomic 'running' here // so that we don't acknowledge the update if the driver was // aborted during the processing of the update. - if (!running) { + if (!running.load()) { VLOG(1) << "Not sending status update acknowledgment message because " << "the driver is not running!"; return; @@ -948,7 +948,7 @@ protected: void lostSlave(const UPID& from, const SlaveID& slaveId) { - if (!running) { + if (!running.load()) { VLOG(1) << "Ignoring lost slave message because the driver is not" << " running!"; return; @@ -988,7 +988,7 @@ protected: const ExecutorID& executorId, const string& data) { - if (!running) { + if (!running.load()) { VLOG(1) << "Ignoring framework message because the driver is not running!"; return; @@ -1008,7 +1008,7 @@ protected: void error(const string& message) { - if (!running) { + if (!running.load()) { VLOG(1) << "Ignoring error message because the driver is not running!"; return; } @@ -1061,7 +1061,7 @@ protected: { LOG(INFO) << "Aborting framework '" << framework.id() << "'"; - CHECK(!running); + CHECK(!running.load()); if (!connected) { VLOG(1) << "Not sending a deactivate message as master is disconnected"; @@ -1248,11 +1248,11 @@ protected: return; } - // NOTE: By ignoring the volatile 'running' here, we ensure that - // all acknowledgements requested before the driver was stopped - // or aborted are processed. Any acknowledgement that is requested - // after the driver stops or aborts (running == false) will be - // dropped in the driver before reaching here. + // NOTE: By ignoring the atomic 'running' here, we ensure that all + // acknowledgements requested before the driver was stopped or + // aborted are processed. Any acknowledgement that is requested + // after the driver stops or aborts (running.load() == false) will + // be dropped in the driver before reaching here. // Only statuses with a 'uuid' and a 'slave_id' need to have // acknowledgements sent to the master. Note that the driver @@ -1427,9 +1427,7 @@ private: // there may be one additional callback delivered to the scheduler. // This could happen if the SchedulerProcess is in the middle of // processing an event. - // TODO(vinod): Instead of 'volatile' use std::atomic() to guarantee - // atomicity. - volatile bool running; // Flag to indicate if the driver is running. + std::atomic_bool running; // Flag to indicate if the driver is running. MasterDetector* detector; @@ -1757,7 +1755,7 @@ Status MesosSchedulerDriver::stop(bool failover) // it due to bad parameters (e.g. error in creating the detector // or loading flags). if (process != NULL) { - process->running = false; + process->running.store(false); dispatch(process, &SchedulerProcess::stop, failover); } @@ -1788,7 +1786,7 @@ Status MesosSchedulerDriver::abort() } CHECK_NOTNULL(process); - process->running = false; + process->running.store(false); // Dispatching here ensures that we still process the outstanding // requests *from* the scheduler, since those do proceed when
