Repository: mesos Updated Branches: refs/heads/master 38d0e5892 -> 99dc04868
Fixed scheduler driver to not acknowledge status update when stop() is called. Review: https://reviews.apache.org/r/27896 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/99dc0486 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/99dc0486 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/99dc0486 Branch: refs/heads/master Commit: 99dc04868da8e30ad7d4e733ba5e3573602ce9e9 Parents: 38d0e58 Author: Vinod Kone <[email protected]> Authored: Tue Nov 11 17:03:37 2014 -0800 Committer: Vinod Kone <[email protected]> Committed: Wed Nov 12 16:29:39 2014 -0800 ---------------------------------------------------------------------- src/sched/sched.cpp | 95 +++++++++++++++++++++++--------------- src/tests/scheduler_tests.cpp | 69 +++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/99dc0486/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index a9c111e..0b08512 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -136,7 +136,7 @@ public: cond(_cond), failover(_framework.has_id() && !framework.id().value().empty()), connected(false), - aborted(false), + running(true), detector(_detector), flags(_flags), credential(_credential), @@ -204,8 +204,9 @@ protected: void detected(const Future<Option<MasterInfo> >& _master) { - if (aborted) { - VLOG(1) << "Ignoring the master change because the driver is aborted!"; + if (!running) { + VLOG(1) << "Ignoring the master change because the driver is not" + << " running!"; return; } @@ -274,8 +275,8 @@ protected: void authenticate() { - if (aborted) { - VLOG(1) << "Ignoring authenticate because the driver is aborted!"; + if (!running) { + VLOG(1) << "Ignoring authenticate because the driver is not running!"; return; } @@ -342,8 +343,8 @@ protected: void _authenticate() { - if (aborted) { - VLOG(1) << "Ignoring _authenticate because the driver is aborted!"; + if (!running) { + VLOG(1) << "Ignoring _authenticate because the driver is not running!"; return; } @@ -395,9 +396,9 @@ protected: void authenticationTimeout(Future<bool> future) { - if (aborted) { + if (!running) { VLOG(1) << "Ignoring authentication timeout because " - << "the driver is aborted!"; + << "the driver is not running!"; return; } @@ -415,9 +416,9 @@ protected: const FrameworkID& frameworkId, const MasterInfo& masterInfo) { - if (aborted) { + if (!running) { VLOG(1) << "Ignoring framework registered message because " - << "the driver is aborted!"; + << "the driver is not running!"; return; } @@ -457,9 +458,9 @@ protected: const FrameworkID& frameworkId, const MasterInfo& masterInfo) { - if (aborted) { + if (!running) { VLOG(1) << "Ignoring framework re-registered message because " - << "the driver is aborted!"; + << "the driver is not running!"; return; } @@ -496,6 +497,10 @@ protected: void doReliableRegistration(Duration maxBackoff) { + if (!running) { + return; + } + if (connected || master.isNone()) { return; } @@ -549,9 +554,9 @@ protected: const vector<Offer>& offers, const vector<string>& pids) { - if (aborted) { + if (!running) { VLOG(1) << "Ignoring resource offers message because " - << "the driver is aborted!"; + << "the driver is not running!"; return; } @@ -599,9 +604,9 @@ protected: void rescindOffer(const UPID& from, const OfferID& offerId) { - if (aborted) { + if (!running) { VLOG(1) << "Ignoring rescind offer message because " - << "the driver is aborted!"; + << "the driver is not running!"; return; } @@ -641,9 +646,9 @@ protected: { const TaskStatus& status = update.status(); - if (aborted) { + if (!running) { VLOG(1) << "Ignoring task status update message because " - << "the driver is aborted!"; + << "the driver is not running!"; return; } @@ -690,9 +695,9 @@ protected: // Note that we need to look at the volatile 'aborted' here to // so that we don't acknowledge the update if the driver was // aborted during the processing of the update. - if (aborted) { + if (!running) { VLOG(1) << "Not sending status update acknowledgment message because " - << "the driver is aborted!"; + << "the driver is not running!"; return; } @@ -716,8 +721,9 @@ protected: void lostSlave(const UPID& from, const SlaveID& slaveId) { - if (aborted) { - VLOG(1) << "Ignoring lost slave message because the driver is aborted!"; + if (!running) { + VLOG(1) << "Ignoring lost slave message because the driver is not" + << " running!"; return; } @@ -755,8 +761,9 @@ protected: const ExecutorID& executorId, const string& data) { - if (aborted) { - VLOG(1) << "Ignoring framework message because the driver is aborted!"; + if (!running) { + VLOG(1) + << "Ignoring framework message because the driver is not running!"; return; } @@ -774,8 +781,8 @@ protected: void error(const string& message) { - if (aborted) { - VLOG(1) << "Ignoring error message because the driver is aborted!"; + if (!running) { + VLOG(1) << "Ignoring error message because the driver is not running!"; return; } @@ -822,7 +829,7 @@ protected: { LOG(INFO) << "Aborting framework '" << framework.id() << "'"; - CHECK(aborted); + CHECK(!running); if (!connected) { VLOG(1) << "Not sending a deactivate message as master is disconnected"; @@ -1070,7 +1077,18 @@ private: Option<UPID> master; bool connected; // Flag to indicate if framework is registered. - volatile bool aborted; // Flag to indicate if the driver is aborted. + + // TODO(vinod): Instead of 'bool' use 'Status'. + // We set 'running' to false in SchedulerDriver::stop() and + // SchedulerDriver::abort() to prevent any further messages from + // being processed in the SchedulerProcess. However, if abort() + // or stop() is called from a thread other than SchedulerProcess, + // there may be one additional callback delivered to the scheduler. + // This could happen if the SchedulerProcess is in the middle of + // processing an event. + // TODO(vinod): Instead of 'volatile' use std::atomic() to guarantee + // atomicity. + volatile bool running; // Flag to indicate if the driver is running. MasterDetector* detector; @@ -1343,7 +1361,11 @@ Status MesosSchedulerDriver::stop(bool failover) { Lock lock(&mutex); + LOG(INFO) << "Asked to stop the driver"; + if (status != DRIVER_RUNNING && status != DRIVER_ABORTED) { + VLOG(1) << "Ignoring stop because the status of the driver is " + << Status_Name(status); return status; } @@ -1351,6 +1373,7 @@ Status MesosSchedulerDriver::stop(bool failover) // it due to bad parameters (e.g. error in creating the detector // or loading flags). if (process != NULL) { + process->running = false; dispatch(process, &SchedulerProcess::stop, failover); } @@ -1372,18 +1395,16 @@ Status MesosSchedulerDriver::abort() { Lock lock(&mutex); + LOG(INFO) << "Asked to abort the driver"; + if (status != DRIVER_RUNNING) { + VLOG(1) << "Ignoring abort because the status of the driver is " + << Status_Name(status); return status; } - CHECK(process != NULL); - - // We set the volatile aborted to true here to prevent any further - // messages from being processed in the SchedulerProcess. However, - // if abort() is called from another thread as the SchedulerProcess, - // there may be at most one additional message processed. - // TODO(bmahler): Use an atomic boolean. - process->aborted = true; + CHECK_NOTNULL(process); + process->running = false; // Dispatching here ensures that we still process the outstanding // requests *from* the scheduler, since those do proceed when http://git-wip-us.apache.org/repos/asf/mesos/blob/99dc0486/src/tests/scheduler_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp index 6502161..e998217 100644 --- a/src/tests/scheduler_tests.cpp +++ b/src/tests/scheduler_tests.cpp @@ -24,6 +24,7 @@ #include <mesos/executor.hpp> #include <mesos/scheduler.hpp> +#include <process/clock.hpp> #include <process/future.hpp> #include <process/gmock.hpp> #include <process/gtest.hpp> @@ -38,6 +39,7 @@ #include <stout/lambda.hpp> #include <stout/memory.hpp> #include <stout/try.hpp> +#include <stout/uuid.hpp> #include "master/master.hpp" @@ -56,6 +58,7 @@ using mesos::internal::slave::Slave; using mesos::scheduler::Call; using mesos::scheduler::Event; +using process::Clock; using process::Future; using process::Owned; using process::PID; @@ -249,3 +252,69 @@ TEST_F(MesosSchedulerDriverTest, MetricsEndpoint) Shutdown(); } + + +// This action calls driver stop() followed by abort(). +ACTION(StopAndAbort) +{ + arg0->stop(); + arg0->abort(); +} + + +// This test verifies that when the scheduler calls stop() before +// abort(), no pending acknowledgements are sent. +TEST_F(MesosSchedulerDriverTest, DropAckIfStopCalledBeforeAbort) +{ + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + TestContainerizer containerizer(&exec); + Try<PID<Slave> > slave = StartSlave(&containerizer); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 16, "*")) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + // When an update is received, stop the driver and then abort it. + Future<Nothing> statusUpdate; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(DoAll(StopAndAbort(), + FutureSatisfy(&statusUpdate))); + + // Ensure no status update acknowledgements are sent from the driver + // to the master. + EXPECT_NO_FUTURE_PROTOBUFS( + StatusUpdateAcknowledgementMessage(), _ , master.get()); + + EXPECT_CALL(exec, registered(_, _, _, _)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver.start(); + + AWAIT_READY(statusUpdate); + + // Settle the clock to ensure driver finishes processing the status + // update and sends acknowledgement if necessary. In this test it + // shouldn't send an acknowledgement. + Clock::pause(); + Clock::settle(); + + driver.stop(); + driver.join(); + + Shutdown(); +}
