Repository: mesos
Updated Branches:
  refs/heads/master 38d0e5892 -> 99dc04868


Fixed scheduler driver to not acknowledge status update when stop() is called.

Review: https://reviews.apache.org/r/27896


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/99dc0486
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/99dc0486
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/99dc0486

Branch: refs/heads/master
Commit: 99dc04868da8e30ad7d4e733ba5e3573602ce9e9
Parents: 38d0e58
Author: Vinod Kone <[email protected]>
Authored: Tue Nov 11 17:03:37 2014 -0800
Committer: Vinod Kone <[email protected]>
Committed: Wed Nov 12 16:29:39 2014 -0800

----------------------------------------------------------------------
 src/sched/sched.cpp           | 95 +++++++++++++++++++++++---------------
 src/tests/scheduler_tests.cpp | 69 +++++++++++++++++++++++++++
 2 files changed, 127 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/99dc0486/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index a9c111e..0b08512 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -136,7 +136,7 @@ public:
       cond(_cond),
       failover(_framework.has_id() && !framework.id().value().empty()),
       connected(false),
-      aborted(false),
+      running(true),
       detector(_detector),
       flags(_flags),
       credential(_credential),
@@ -204,8 +204,9 @@ protected:
 
   void detected(const Future<Option<MasterInfo> >& _master)
   {
-    if (aborted) {
-      VLOG(1) << "Ignoring the master change because the driver is aborted!";
+    if (!running) {
+      VLOG(1) << "Ignoring the master change because the driver is not"
+              << " running!";
       return;
     }
 
@@ -274,8 +275,8 @@ protected:
 
   void authenticate()
   {
-    if (aborted) {
-      VLOG(1) << "Ignoring authenticate because the driver is aborted!";
+    if (!running) {
+      VLOG(1) << "Ignoring authenticate because the driver is not running!";
       return;
     }
 
@@ -342,8 +343,8 @@ protected:
 
   void _authenticate()
   {
-    if (aborted) {
-      VLOG(1) << "Ignoring _authenticate because the driver is aborted!";
+    if (!running) {
+      VLOG(1) << "Ignoring _authenticate because the driver is not running!";
       return;
     }
 
@@ -395,9 +396,9 @@ protected:
 
   void authenticationTimeout(Future<bool> future)
   {
-    if (aborted) {
+    if (!running) {
       VLOG(1) << "Ignoring authentication timeout because "
-              << "the driver is aborted!";
+              << "the driver is not running!";
       return;
     }
 
@@ -415,9 +416,9 @@ protected:
       const FrameworkID& frameworkId,
       const MasterInfo& masterInfo)
   {
-    if (aborted) {
+    if (!running) {
       VLOG(1) << "Ignoring framework registered message because "
-              << "the driver is aborted!";
+              << "the driver is not running!";
       return;
     }
 
@@ -457,9 +458,9 @@ protected:
       const FrameworkID& frameworkId,
       const MasterInfo& masterInfo)
   {
-    if (aborted) {
+    if (!running) {
       VLOG(1) << "Ignoring framework re-registered message because "
-              << "the driver is aborted!";
+              << "the driver is not running!";
       return;
     }
 
@@ -496,6 +497,10 @@ protected:
 
   void doReliableRegistration(Duration maxBackoff)
   {
+    if (!running) {
+      return;
+    }
+
     if (connected || master.isNone()) {
       return;
     }
@@ -549,9 +554,9 @@ protected:
       const vector<Offer>& offers,
       const vector<string>& pids)
   {
-    if (aborted) {
+    if (!running) {
       VLOG(1) << "Ignoring resource offers message because "
-              << "the driver is aborted!";
+              << "the driver is not running!";
       return;
     }
 
@@ -599,9 +604,9 @@ protected:
 
   void rescindOffer(const UPID& from, const OfferID& offerId)
   {
-    if (aborted) {
+    if (!running) {
       VLOG(1) << "Ignoring rescind offer message because "
-              << "the driver is aborted!";
+              << "the driver is not running!";
       return;
     }
 
@@ -641,9 +646,9 @@ protected:
   {
     const TaskStatus& status = update.status();
 
-    if (aborted) {
+    if (!running) {
       VLOG(1) << "Ignoring task status update message because "
-              << "the driver is aborted!";
+              << "the driver is not running!";
       return;
     }
 
@@ -690,9 +695,9 @@ protected:
     // Note that we need to look at the volatile 'aborted' here to
     // so that we don't acknowledge the update if the driver was
     // aborted during the processing of the update.
-    if (aborted) {
+    if (!running) {
       VLOG(1) << "Not sending status update acknowledgment message because "
-              << "the driver is aborted!";
+              << "the driver is not running!";
       return;
     }
 
@@ -716,8 +721,9 @@ protected:
 
   void lostSlave(const UPID& from, const SlaveID& slaveId)
   {
-    if (aborted) {
-      VLOG(1) << "Ignoring lost slave message because the driver is aborted!";
+    if (!running) {
+      VLOG(1) << "Ignoring lost slave message because the driver is not"
+              << " running!";
       return;
     }
 
@@ -755,8 +761,9 @@ protected:
                         const ExecutorID& executorId,
                         const string& data)
   {
-    if (aborted) {
-      VLOG(1) << "Ignoring framework message because the driver is aborted!";
+    if (!running) {
+      VLOG(1)
+        << "Ignoring framework message because the driver is not running!";
       return;
     }
 
@@ -774,8 +781,8 @@ protected:
 
   void error(const string& message)
   {
-    if (aborted) {
-      VLOG(1) << "Ignoring error message because the driver is aborted!";
+    if (!running) {
+      VLOG(1) << "Ignoring error message because the driver is not running!";
       return;
     }
 
@@ -822,7 +829,7 @@ protected:
   {
     LOG(INFO) << "Aborting framework '" << framework.id() << "'";
 
-    CHECK(aborted);
+    CHECK(!running);
 
     if (!connected) {
       VLOG(1) << "Not sending a deactivate message as master is disconnected";
@@ -1070,7 +1077,18 @@ private:
   Option<UPID> master;
 
   bool connected; // Flag to indicate if framework is registered.
-  volatile bool aborted; // Flag to indicate if the driver is aborted.
+
+  // TODO(vinod): Instead of 'bool' use 'Status'.
+  // We set 'running' to false in SchedulerDriver::stop() and
+  // SchedulerDriver::abort() to prevent any further messages from
+  // being processed in the SchedulerProcess. However, if abort()
+  // or stop() is called from a thread other than SchedulerProcess,
+  // there may be one additional callback delivered to the scheduler.
+  // This could happen if the SchedulerProcess is in the middle of
+  // processing an event.
+  // TODO(vinod): Instead of 'volatile' use std::atomic() to guarantee
+  // atomicity.
+  volatile bool running; // Flag to indicate if the driver is running.
 
   MasterDetector* detector;
 
@@ -1343,7 +1361,11 @@ Status MesosSchedulerDriver::stop(bool failover)
 {
   Lock lock(&mutex);
 
+  LOG(INFO) << "Asked to stop the driver";
+
   if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) {
+    VLOG(1) << "Ignoring stop because the status of the driver is "
+            << Status_Name(status);
     return status;
   }
 
@@ -1351,6 +1373,7 @@ Status MesosSchedulerDriver::stop(bool failover)
   // it due to bad parameters (e.g. error in creating the detector
   // or loading flags).
   if (process != NULL) {
+    process->running =  false;
     dispatch(process, &SchedulerProcess::stop, failover);
   }
 
@@ -1372,18 +1395,16 @@ Status MesosSchedulerDriver::abort()
 {
   Lock lock(&mutex);
 
+  LOG(INFO) << "Asked to abort the driver";
+
   if (status != DRIVER_RUNNING) {
+    VLOG(1) << "Ignoring abort because the status of the driver is "
+            << Status_Name(status);
     return status;
   }
 
-  CHECK(process != NULL);
-
-  // We set the volatile aborted to true here to prevent any further
-  // messages from being processed in the SchedulerProcess. However,
-  // if abort() is called from another thread as the SchedulerProcess,
-  // there may be at most one additional message processed.
-  // TODO(bmahler): Use an atomic boolean.
-  process->aborted = true;
+  CHECK_NOTNULL(process);
+  process->running = false;
 
   // Dispatching here ensures that we still process the outstanding
   // requests *from* the scheduler, since those do proceed when

http://git-wip-us.apache.org/repos/asf/mesos/blob/99dc0486/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 6502161..e998217 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -24,6 +24,7 @@
 #include <mesos/executor.hpp>
 #include <mesos/scheduler.hpp>
 
+#include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
@@ -38,6 +39,7 @@
 #include <stout/lambda.hpp>
 #include <stout/memory.hpp>
 #include <stout/try.hpp>
+#include <stout/uuid.hpp>
 
 #include "master/master.hpp"
 
@@ -56,6 +58,7 @@ using mesos::internal::slave::Slave;
 using mesos::scheduler::Call;
 using mesos::scheduler::Event;
 
+using process::Clock;
 using process::Future;
 using process::Owned;
 using process::PID;
@@ -249,3 +252,69 @@ TEST_F(MesosSchedulerDriverTest, MetricsEndpoint)
 
   Shutdown();
 }
+
+
+// This action calls driver stop() followed by abort().
+ACTION(StopAndAbort)
+{
+  arg0->stop();
+  arg0->abort();
+}
+
+
+// This test verifies that when the scheduler calls stop() before
+// abort(), no pending acknowledgements are sent.
+TEST_F(MesosSchedulerDriverTest, DropAckIfStopCalledBeforeAbort)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+  Try<PID<Slave> > slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 16, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // When an update is received, stop the driver and then abort it.
+  Future<Nothing> statusUpdate;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(DoAll(StopAndAbort(),
+                    FutureSatisfy(&statusUpdate)));
+
+  // Ensure no status update acknowledgements are sent from the driver
+  // to the master.
+  EXPECT_NO_FUTURE_PROTOBUFS(
+      StatusUpdateAcknowledgementMessage(), _ , master.get());
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.start();
+
+  AWAIT_READY(statusUpdate);
+
+  // Settle the clock to ensure driver finishes processing the status
+  // update and sends acknowledgement if necessary. In this test it
+  // shouldn't send an acknowledgement.
+  Clock::pause();
+  Clock::settle();
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}

Reply via email to