Added a test for killing the executor during task launch. This test verifies that when the agent shuts down a running executor, launching tasks on the agent that use the same executor will be dropped.
Review: https://reviews.apache.org/r/66347/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/06d8d759 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/06d8d759 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/06d8d759 Branch: refs/heads/master Commit: 06d8d759a4d7d85e291644711cecd835825d54dd Parents: 731136a Author: Meng Zhu <[email protected]> Authored: Thu Apr 5 17:44:53 2018 -0700 Committer: Greg Mann <[email protected]> Committed: Thu Apr 5 17:59:05 2018 -0700 ---------------------------------------------------------------------- src/tests/slave_tests.cpp | 170 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/06d8d759/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 2f90d1d..646a2b9 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -5641,6 +5641,176 @@ TEST_F(SlaveTest, LaunchTaskGroupsUsingSameExecutorKillLaterTaskGroup) } +// This test verifies that when agent shuts down a running executor, launching +// tasks on the agent that use the same executor will be dropped properly. +TEST_F(SlaveTest, ShutdownExecutorWhileTaskLaunching) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags slaveFlags = CreateSlaveFlags(); + + // Start a mock slave. + Try<Owned<cluster::Slave>> slave = + StartSlave(detector.get(), slaveFlags, true); + + ASSERT_SOME(slave); + ASSERT_NE(nullptr, slave.get()->mock()); + slave.get()->start(); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)); + + EXPECT_CALL(*scheduler, failure(_, _)) + .Times(AtMost(1)); + + v1::scheduler::TestMesos mesos( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + // Advance the clock to trigger both agent registration and a batch + // allocation. + Clock::advance(slaveFlags.registration_backoff_factor); + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(subscribed); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + "default1", None(), resources, v1::ExecutorInfo::DEFAULT, frameworkId); + + // Create two separate task groups that use the same executor. + v1::TaskInfo taskInfo1 = v1::createTask(agentId, resources, "sleep 1000"); + v1::TaskGroupInfo taskGroup1 = v1::createTaskGroupInfo({taskInfo1}); + + v1::TaskInfo taskInfo2 = v1::createTask(agentId, resources, "sleep 1000"); + v1::TaskGroupInfo taskGroup2 = v1::createTaskGroupInfo({taskInfo2}); + + v1::Offer::Operation launchGroup1 = + v1::LAUNCH_GROUP(executorInfo, taskGroup1); + v1::Offer::Operation launchGroup2 = + v1::LAUNCH_GROUP(executorInfo, taskGroup2); + + Future<v1::scheduler::Event::Update> task1Starting, task1Running; + Future<v1::scheduler::Event::Update> task2Lost; + EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateTaskIdEq(taskInfo1))) + .WillOnce(DoAll( + FutureArg<1>(&task1Starting), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillOnce(DoAll( + FutureArg<1>(&task1Running), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateTaskIdEq(taskInfo2))) + .WillOnce(DoAll( + FutureArg<1>(&task2Lost), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + // Saved arguments from `Slave::_run()` for `taskGroup2`. + FrameworkInfo _frameworkInfo; + ExecutorInfo _executorInfo; + Option<TaskGroupInfo> _taskGroup; + Option<TaskInfo> _task; + vector<ResourceVersionUUID> _resourceVersionUuids; + Option<bool> _launchExecutor; + + // Pause the launch of `taskGroup2` at `_run` by returning a pending future. + Promise<Nothing> promiseTask2; + Future<Nothing> runTask2; + EXPECT_CALL( + *slave.get()->mock(), + _run(_, _, _, + OptionTaskGroupHasTaskID(devolve(taskInfo2.task_id())), + _, _)) + .WillOnce( + DoAll(FutureSatisfy(&runTask2), + SaveArg<0>(&_frameworkInfo), + SaveArg<1>(&_executorInfo), + SaveArg<2>(&_task), + SaveArg<3>(&_taskGroup), + SaveArg<4>(&_resourceVersionUuids), + SaveArg<5>(&_launchExecutor), + Return(promiseTask2.future()))); + + // Launch the two task groups. + mesos.send( + v1::createCallAccept(frameworkId, offer, {launchGroup1, launchGroup2})); + + AWAIT_READY(runTask2); + + // `taskGroup1` launches successfully. + AWAIT_READY(task1Starting); + EXPECT_EQ(v1::TASK_STARTING, task1Starting->status().state()); + + AWAIT_READY(task1Running); + EXPECT_EQ(v1::TASK_RUNNING, task1Running->status().state()); + + // Shutdown the executor while `taskGroup2` is still launching. + Future<Nothing> shutdownExecutor; + EXPECT_CALL(*slave.get()->mock(), shutdownExecutor(_, _, _)) + .WillOnce(DoAll( + Invoke(slave.get()->mock(), &MockSlave::unmocked_shutdownExecutor), + FutureSatisfy(&shutdownExecutor))); + + { + Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.set_type(Call::SHUTDOWN); + + Call::Shutdown* shutdown = call.mutable_shutdown(); + shutdown->mutable_executor_id()->CopyFrom(executorInfo.executor_id()); + shutdown->mutable_agent_id()->CopyFrom(agentId); + + mesos.send(call); + } + + AWAIT_READY(shutdownExecutor); + + // Resume launching `taskGroup2`. + Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] { + return slave.get()->mock()->unmocked__run( + _frameworkInfo, + _executorInfo, + _task, + _taskGroup, + _resourceVersionUuids, + _launchExecutor); + }); + + promiseTask2.associate(unmocked__run); + + // `taskGroup2` is dropped because the executor is terminated. + AWAIT_READY(task2Lost); + EXPECT_EQ(v1::TASK_LOST, task2Lost->status().state()); +} + + // This test ensures that agent sends ExitedExecutorMessage when the task // fails to launch due to task authorization failure and that master's executor // bookkeeping entry is removed.
