Repository: mesos
Updated Branches:
  refs/heads/master 81efd727e -> 87de003c6


mesos: Replace volatile with std::atomic.

MESOS-3326.

Review: https://reviews.apache.org/r/37878


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/87de003c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/87de003c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/87de003c

Branch: refs/heads/master
Commit: 87de003c6e8a4dfc2d29ae8b0aab1f83ff0c66a3
Parents: 4b93805
Author: Neil Conway <[email protected]>
Authored: Thu Sep 10 17:50:33 2015 -0700
Committer: Joris Van Remoortere <[email protected]>
Committed: Thu Sep 10 19:39:41 2015 -0700

----------------------------------------------------------------------
 src/exec/exec.cpp   | 32 +++++++++++++++----------------
 src/sched/sched.cpp | 50 +++++++++++++++++++++++-------------------------
 2 files changed, 40 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/87de003c/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index 31e0c2f..7b51baa 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -20,6 +20,7 @@
 
 #include <sys/types.h>
 
+#include <atomic>
 #include <iostream>
 #include <string>
 #include <sstream>
@@ -198,7 +199,7 @@ protected:
                   const SlaveID& slaveId,
                   const SlaveInfo& slaveInfo)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring registered message from slave " << slaveId
               << " because the driver is aborted!";
       return;
@@ -221,7 +222,7 @@ protected:
 
   void reregistered(const SlaveID& slaveId, const SlaveInfo& slaveInfo)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring re-registered message from slave " << slaveId
               << " because the driver is aborted!";
       return;
@@ -244,7 +245,7 @@ protected:
 
   void reconnect(const UPID& from, const SlaveID& slaveId)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring reconnect message from slave " << slaveId
               << " because the driver is aborted!";
       return;
@@ -280,7 +281,7 @@ protected:
 
   void runTask(const TaskInfo& task)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring run task message for task " << task.task_id()
               << " because the driver is aborted!";
       return;
@@ -305,7 +306,7 @@ protected:
 
   void killTask(const TaskID& taskId)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring kill task message for task " << taskId
               <<" because the driver is aborted!";
       return;
@@ -329,7 +330,7 @@ protected:
       const TaskID& taskId,
       const string& uuid)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring status update acknowledgement "
               << UUID::fromBytes(uuid) << " for task " << taskId
               << " of framework " << frameworkId
@@ -353,7 +354,7 @@ protected:
                         const ExecutorID& executorId,
                         const string& data)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring framework message because the driver is aborted!";
       return;
     }
@@ -372,7 +373,7 @@ protected:
 
   void shutdown()
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring shutdown message because the driver is aborted!";
       return;
     }
@@ -394,7 +395,7 @@ protected:
 
     VLOG(1) << "Executor::shutdown took " << stopwatch.elapsed();
 
-    aborted = true; // To make sure not to accept any new messages.
+    aborted.store(true); // To make sure not to accept any new messages.
 
     if (local) {
       terminate(this);
@@ -413,7 +414,7 @@ protected:
   void abort()
   {
     LOG(INFO) << "Deactivating the executor libprocess";
-    CHECK(aborted);
+    CHECK(aborted.load());
 
     synchronized (mutex) {
       CHECK_NOTNULL(latch)->trigger();
@@ -439,7 +440,7 @@ protected:
 
   virtual void exited(const UPID& pid)
   {
-    if (aborted) {
+    if (aborted.load()) {
       VLOG(1) << "Ignoring exited event because the driver is aborted!";
       return;
     }
@@ -478,7 +479,7 @@ protected:
 
     VLOG(1) << "Executor::shutdown took " << stopwatch.elapsed();
 
-    aborted = true; // To make sure not to accept any new messages.
+    aborted.store(true); // To make sure not to accept any new messages.
 
     // This is a pretty bad state ... no slave is left. Rather
     // than exit lets kill our process group (which includes
@@ -543,7 +544,7 @@ private:
   bool connected; // Registered with the slave.
   UUID connection; // UUID to identify the connection instance.
   bool local;
-  volatile bool aborted;
+  std::atomic_bool aborted;
   std::recursive_mutex* mutex;
   Latch* latch;
   const string directory;
@@ -750,12 +751,11 @@ Status MesosExecutorDriver::abort()
 
     CHECK(process != NULL);
 
-    // We set the volatile aborted to true here to prevent any further
+    // We set the atomic aborted to true here to prevent any further
     // messages from being processed in the ExecutorProcess. However,
     // if abort() is called from another thread as the ExecutorProcess,
     // there may be at most one additional message processed.
-    // TODO(bmahler): Use an atomic boolean.
-    process->aborted = true;
+    process->aborted.store(true);
 
     // Dispatching here ensures that we still process the outstanding
     // requests *from* the executor, since those do proceed when

http://git-wip-us.apache.org/repos/asf/mesos/blob/87de003c/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 012af05..1fc9e73 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -221,7 +221,7 @@ protected:
 
   void detected(const Future<Option<MasterInfo> >& _master)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring the master change because the driver is not"
               << " running!";
       return;
@@ -292,7 +292,7 @@ protected:
 
   void authenticate()
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring authenticate because the driver is not running!";
       return;
     }
@@ -360,7 +360,7 @@ protected:
 
   void _authenticate()
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring _authenticate because the driver is not running!";
       return;
     }
@@ -415,7 +415,7 @@ protected:
 
   void authenticationTimeout(Future<bool> future)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring authentication timeout because "
               << "the driver is not running!";
       return;
@@ -617,7 +617,7 @@ protected:
       const FrameworkID& frameworkId,
       const MasterInfo& masterInfo)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring framework registered message because "
               << "the driver is not running!";
       return;
@@ -659,7 +659,7 @@ protected:
       const FrameworkID& frameworkId,
       const MasterInfo& masterInfo)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring framework re-registered message because "
               << "the driver is not running!";
       return;
@@ -698,7 +698,7 @@ protected:
 
   void doReliableRegistration(Duration maxBackoff)
   {
-    if (!running) {
+    if (!running.load()) {
       return;
     }
 
@@ -755,7 +755,7 @@ protected:
       const vector<Offer>& offers,
       const vector<string>& pids)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring resource offers message because "
               << "the driver is not running!";
       return;
@@ -805,7 +805,7 @@ protected:
 
   void rescindOffer(const UPID& from, const OfferID& offerId)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring rescind offer message because "
               << "the driver is not running!";
       return;
@@ -845,7 +845,7 @@ protected:
       const StatusUpdate& update,
       const UPID& pid)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring task status update message because "
               << "the driver is not running!";
       return;
@@ -910,10 +910,10 @@ protected:
     VLOG(1) << "Scheduler::statusUpdate took " << stopwatch.elapsed();
 
     if (implicitAcknowledgements) {
-      // Note that we need to look at the volatile 'running' here
+      // Note that we need to look at the atomic 'running' here
       // so that we don't acknowledge the update if the driver was
       // aborted during the processing of the update.
-      if (!running) {
+      if (!running.load()) {
         VLOG(1) << "Not sending status update acknowledgment message because "
                 << "the driver is not running!";
         return;
@@ -948,7 +948,7 @@ protected:
 
   void lostSlave(const UPID& from, const SlaveID& slaveId)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring lost slave message because the driver is not"
               << " running!";
       return;
@@ -988,7 +988,7 @@ protected:
       const ExecutorID& executorId,
       const string& data)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1)
         << "Ignoring framework message because the driver is not running!";
       return;
@@ -1008,7 +1008,7 @@ protected:
 
   void error(const string& message)
   {
-    if (!running) {
+    if (!running.load()) {
       VLOG(1) << "Ignoring error message because the driver is not running!";
       return;
     }
@@ -1061,7 +1061,7 @@ protected:
   {
     LOG(INFO) << "Aborting framework '" << framework.id() << "'";
 
-    CHECK(!running);
+    CHECK(!running.load());
 
     if (!connected) {
       VLOG(1) << "Not sending a deactivate message as master is disconnected";
@@ -1248,11 +1248,11 @@ protected:
       return;
     }
 
-    // NOTE: By ignoring the volatile 'running' here, we ensure that
-    // all acknowledgements requested before the driver was stopped
-    // or aborted are processed. Any acknowledgement that is requested
-    // after the driver stops or aborts (running == false) will be
-    // dropped in the driver before reaching here.
+    // NOTE: By ignoring the atomic 'running' here, we ensure that all
+    // acknowledgements requested before the driver was stopped or
+    // aborted are processed. Any acknowledgement that is requested
+    // after the driver stops or aborts (running.load() == false) will
+    // be dropped in the driver before reaching here.
 
     // Only statuses with a 'uuid' and a 'slave_id' need to have
     // acknowledgements sent to the master. Note that the driver
@@ -1427,9 +1427,7 @@ private:
   // there may be one additional callback delivered to the scheduler.
   // This could happen if the SchedulerProcess is in the middle of
   // processing an event.
-  // TODO(vinod): Instead of 'volatile' use std::atomic() to guarantee
-  // atomicity.
-  volatile bool running; // Flag to indicate if the driver is running.
+  std::atomic_bool running; // Flag to indicate if the driver is running.
 
   MasterDetector* detector;
 
@@ -1757,7 +1755,7 @@ Status MesosSchedulerDriver::stop(bool failover)
     // it due to bad parameters (e.g. error in creating the detector
     // or loading flags).
     if (process != NULL) {
-      process->running =  false;
+      process->running.store(false);
       dispatch(process, &SchedulerProcess::stop, failover);
     }
 
@@ -1788,7 +1786,7 @@ Status MesosSchedulerDriver::abort()
     }
 
     CHECK_NOTNULL(process);
-    process->running = false;
+    process->running.store(false);
 
     // Dispatching here ensures that we still process the outstanding
     // requests *from* the scheduler, since those do proceed when

Reply via email to