Repository: mesos Updated Branches: refs/heads/master 30d12fa1e -> 4356e4f73
Fixed MESOS-947: Slave should properly handle a killTask() that arrives between runTask() and _runTask(). Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4356e4f7 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4356e4f7 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4356e4f7 Branch: refs/heads/master Commit: 4356e4f73c9c683561240e13217c2e7b19ccb036 Parents: 30d12fa Author: Bernd Mathiske <[email protected]> Authored: Fri Oct 17 10:41:47 2014 -0700 Committer: Vinod Kone <[email protected]> Committed: Fri Oct 17 10:43:00 2014 -0700 ---------------------------------------------------------------------- src/slave/slave.cpp | 57 +++++++++++++++----- src/slave/slave.hpp | 12 +++-- src/tests/mesos.cpp | 62 ++++++++++++++++++++++ src/tests/mesos.hpp | 63 ++++++++++++++++++++++ src/tests/slave_tests.cpp | 116 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 294 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 0e342ed..7b5474a 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -1095,7 +1095,7 @@ void Slave::runTask( LOG(WARNING) << "Ignoring task " << task.task_id() << " because the slave is " << state; // TODO(vinod): Consider sending a TASK_LOST here. - // Currently it is tricky because 'statsuUpdate()' + // Currently it is tricky because 'statusUpdate()' // ignores updates for unknown frameworks. return; } @@ -1189,16 +1189,28 @@ void Slave::_runTask( LOG(INFO) << "Launching task " << task.task_id() << " for framework " << frameworkId; + Framework* framework = getFramework(frameworkId); + if (framework == NULL) { + LOG(WARNING) << "Ignoring run task " << task.task_id() + << " because the framework " << frameworkId + << " does not exist"; + return; + } + const ExecutorInfo& executorInfo = getExecutorInfo(frameworkId, task); const ExecutorID& executorId = executorInfo.executor_id(); - // Remove the pending task from framework. - Framework* framework = getFramework(frameworkId); - CHECK_NOTNULL(framework); - - framework->pending[executorId].erase(task.task_id()); - if (framework->pending[executorId].empty()) { - framework->pending.erase(executorId); + if (framework->pending.contains(executorId) && + framework->pending[executorId].contains(task.task_id())) { + framework->pending[executorId].erase(task.task_id()); + if (framework->pending[executorId].empty()) { + framework->pending.erase(executorId); + } + } else { + LOG(WARNING) << "Ignoring run task " << task.task_id() + << " of framework " << frameworkId + << " because the task has been killed in the meantime"; + return; } // We don't send a status update here because a terminating @@ -1395,14 +1407,35 @@ void Slave::killTask( return; } + foreachkey (const ExecutorID& executorId, framework->pending) { + if (framework->pending[executorId].contains(taskId)) { + LOG(WARNING) << "Killing task " << taskId + << " of framework " << frameworkId + << " before it was launched"; + + const StatusUpdate& update = protobuf::createStatusUpdate( + frameworkId, info.id(), taskId, TASK_KILLED, + "Task killed before it was launched"); + statusUpdate(update, UPID()); + + framework->pending[executorId].erase(taskId); + if (framework->pending[executorId].empty()) { + framework->pending.erase(executorId); + if (framework->pending.empty() && framework->executors.empty()) { + removeFramework(framework); + } + } + return; + } + } + Executor* executor = framework->getExecutor(taskId); if (executor == NULL) { - LOG(WARNING) << "Cannot kill task " << taskId + LOG(WARNING) << "Cannot kill task " << taskId << " of framework " << frameworkId << " because no corresponding executor is running"; - - // We send a TASK_LOST update because this task might have never - // been launched on this slave! + // We send a TASK_LOST update because this task has never + // been launched on this slave. const StatusUpdate& update = protobuf::createStatusUpdate( frameworkId, info.id(), taskId, TASK_LOST, "Cannot find executor"); http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 342b09f..ccc0e03 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -106,14 +106,16 @@ public: void doReliableRegistration(const Duration& duration); - void runTask( + // Made 'virtual' for Slave mocking. + virtual void runTask( const process::UPID& from, const FrameworkInfo& frameworkInfo, const FrameworkID& frameworkId, const std::string& pid, const TaskInfo& task); - void _runTask( + // Made 'virtual' for Slave mocking. + virtual void _runTask( const process::Future<bool>& future, const FrameworkInfo& frameworkInfo, const FrameworkID& frameworkId, @@ -122,7 +124,8 @@ public: process::Future<bool> unschedule(const std::string& path); - void killTask( + // Made 'virtual' for Slave mocking. + virtual void killTask( const process::UPID& from, const FrameworkID& frameworkId, const TaskID& taskId); @@ -320,7 +323,8 @@ public: void removeExecutor(Framework* framework, Executor* executor); // Removes and garbage collects the framework. - void removeFramework(Framework* framework); + // Made 'virtual' for Slave mocking. + virtual void removeFramework(Framework* framework); // Schedules a 'path' for gc based on its modification time. Future<Nothing> garbageCollect(const std::string& path); http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/tests/mesos.cpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp index 3dcb2ac..147e23f 100644 --- a/src/tests/mesos.cpp +++ b/src/tests/mesos.cpp @@ -48,6 +48,8 @@ #include "tests/mesos.hpp" using std::string; +using testing::_; +using testing::Invoke; using namespace process; @@ -338,6 +340,66 @@ void MesosTest::ShutdownSlaves() } +MockSlave::MockSlave(const slave::Flags& flags, + MasterDetector* detector, + slave::Containerizer* containerizer) + : slave::Slave( + flags, + detector, + containerizer, + &files, + &gc, + &statusUpdateManager) +{ + // Set up default behaviors, calling the original methods. + EXPECT_CALL(*this, runTask(_, _, _, _, _)). + WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask)); + EXPECT_CALL(*this, _runTask(_, _, _, _, _)). + WillRepeatedly(Invoke(this, &MockSlave::unmocked__runTask)); + EXPECT_CALL(*this, killTask(_, _, _)). + WillRepeatedly(Invoke(this, &MockSlave::unmocked_killTask)); + EXPECT_CALL(*this, removeFramework(_)). + WillRepeatedly(Invoke(this, &MockSlave::unmocked_removeFramework)); +} + + +void MockSlave::unmocked_runTask( + const process::UPID& from, + const FrameworkInfo& frameworkInfo, + const FrameworkID& frameworkId, + const std::string& pid, + const TaskInfo& task) +{ + slave::Slave::runTask(from, frameworkInfo, frameworkId, pid, task); +} + + +void MockSlave::unmocked__runTask( + const process::Future<bool>& future, + const FrameworkInfo& frameworkInfo, + const FrameworkID& frameworkId, + const std::string& pid, + const TaskInfo& task) +{ + slave::Slave::_runTask(future, frameworkInfo, frameworkId, pid, task); +} + + +void MockSlave::unmocked_killTask( + const process::UPID& from, + const FrameworkID& frameworkId, + const TaskID& taskId) +{ + slave::Slave::killTask(from, frameworkId, taskId); +} + + +void MockSlave::unmocked_removeFramework(slave::Framework* framework) +{ + slave::Slave::removeFramework(framework); +} + + slave::Flags ContainerizerTest<slave::MesosContainerizer>::CreateSlaveFlags() { slave::Flags flags = MesosTest::CreateSlaveFlags(); http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 957e223..e40575c 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -538,6 +538,69 @@ public: }; +// Definition of a mock Slave to be used in tests with gmock, covering +// potential races between runTask and killTask. +class MockSlave : public slave::Slave +{ +public: + MockSlave( + const slave::Flags& flags, + MasterDetector* detector, + slave::Containerizer* containerizer); + + virtual ~MockSlave() {} + + MOCK_METHOD5(runTask, void( + const process::UPID& from, + const FrameworkInfo& frameworkInfo, + const FrameworkID& frameworkId, + const std::string& pid, + const TaskInfo& task)); + + void unmocked_runTask( + const process::UPID& from, + const FrameworkInfo& frameworkInfo, + const FrameworkID& frameworkId, + const std::string& pid, + const TaskInfo& task); + + MOCK_METHOD5(_runTask, void( + const process::Future<bool>& future, + const FrameworkInfo& frameworkInfo, + const FrameworkID& frameworkId, + const std::string& pid, + const TaskInfo& task)); + + void unmocked__runTask( + const process::Future<bool>& future, + const FrameworkInfo& frameworkInfo, + const FrameworkID& frameworkId, + const std::string& pid, + const TaskInfo& task); + + MOCK_METHOD3(killTask, void( + const process::UPID& from, + const FrameworkID& frameworkId, + const TaskID& taskId)); + + void unmocked_killTask( + const process::UPID& from, + const FrameworkID& frameworkId, + const TaskID& taskId); + + MOCK_METHOD1(removeFramework, void( + slave::Framework* framework)); + + void unmocked_removeFramework( + slave::Framework* framework); + +private: + Files files; + MockGarbageCollector gc; + slave::StatusUpdateManager statusUpdateManager; +}; + + // Definition of a MockAuthozier that can be used in tests with gmock. class MockAuthorizer : public Authorizer { http://git-wip-us.apache.org/repos/asf/mesos/blob/4356e4f7/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index f585bdd..a1bd1d4 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -81,7 +81,10 @@ using testing::_; using testing::AtMost; using testing::DoAll; using testing::Eq; +using testing::Invoke; +using testing::InvokeWithoutArgs; using testing::Return; +using testing::SaveArg; // Those of the overall Mesos master/slave/scheduler/driver tests // that seem vaguely more slave than master-related are in this file. @@ -1035,3 +1038,116 @@ TEST_F(SlaveTest, PingTimeoutSomePings) AWAIT_READY(detected); AWAIT_READY(slaveReregisteredMessage); } + +// This test ensures that a killTask() can happen between runTask() +// and _runTask() and then gets "handled properly". This means that +// the task never gets started, but also does not get lost. The end +// result is status TASK_KILLED. Essentially, killing the task is +// realized while preparing to start it. See MESOS-947. +TEST_F(SlaveTest, KillTaskBetweenRunTaskParts) +{ + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + TestContainerizer containerizer(&exec); + + StandaloneMasterDetector detector(master.get()); + + MockSlave slave(CreateSlaveFlags(), &detector, &containerizer); + process::spawn(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)) + .Times(1); + + Future<vector<Offer> > offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); + task.mutable_resources()->MergeFrom(offers.get()[0].resources()); + task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); + + vector<TaskInfo> tasks; + tasks.push_back(task); + + EXPECT_CALL(exec, registered(_, _, _, _)) + .Times(0); + + EXPECT_CALL(exec, launchTask(_, _)) + .Times(0); + + EXPECT_CALL(exec, shutdown(_)) + .Times(0); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillRepeatedly(FutureArg<1>(&status)); + + EXPECT_CALL(slave, runTask(_, _, _, _, _)) + .WillOnce(Invoke(&slave, &MockSlave::unmocked_runTask)); + + // Saved arguments from Slave::_runTask(). + Future<bool> future; + FrameworkInfo frameworkInfo; + FrameworkID frameworkId; + + // Skip what Slave::_runTask() normally does, save its arguments for + // later, tie reaching the critical moment when to kill the task to + // a future. + Future<Nothing> _runTask; + EXPECT_CALL(slave, _runTask(_, _, _, _, _)) + .WillOnce(DoAll(FutureSatisfy(&_runTask), + SaveArg<0>(&future), + SaveArg<1>(&frameworkInfo), + SaveArg<2>(&frameworkId))); + + driver.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY(_runTask); + + Future<Nothing> killTask; + EXPECT_CALL(slave, killTask(_, _, _)) + .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_killTask), + FutureSatisfy(&killTask))); + driver.killTask(task.task_id()); + + // Since this is the only task ever for this framework, the + // framework should get removed in Slave::_runTask(). + // Thus we can observe that this happens before Shutdown(). + Future<Nothing> removeFramework; + EXPECT_CALL(slave, removeFramework(_)) + .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_removeFramework), + FutureSatisfy(&removeFramework))); + + AWAIT_READY(killTask); + slave.unmocked__runTask( + future, frameworkInfo, frameworkId, master.get(), task); + + AWAIT_READY(removeFramework); + + AWAIT_READY(status); + EXPECT_EQ(TASK_KILLED, status.get().state()); + + driver.stop(); + driver.join(); + + process::terminate(slave); + process::wait(slave); + + Shutdown(); // Must shutdown before 'containerizer' gets deallocated. +}
