Added a test `DefaultExecutorTest.KillMultipleTasks`. Review: https://reviews.apache.org/r/62837
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/28831de3 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/28831de3 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/28831de3 Branch: refs/heads/master Commit: 28831de34d098c894042246dd6fef402eb3b960d Parents: 05c7dd8 Author: Qian Zhang <zhq527...@gmail.com> Authored: Mon Oct 9 14:25:31 2017 +0800 Committer: Qian Zhang <zhq527...@gmail.com> Committed: Tue Oct 31 12:47:17 2017 +0800 ---------------------------------------------------------------------- src/tests/default_executor_tests.cpp | 141 ++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/28831de3/src/tests/default_executor_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp index 5078bd4..f485b61 100644 --- a/src/tests/default_executor_tests.cpp +++ b/src/tests/default_executor_tests.cpp @@ -441,6 +441,147 @@ TEST_P(DefaultExecutorTest, KillTask) } +// This is a regression test for MESOS-8051. It verifies that if the +// default executor is asked to kill all tasks from a task group +// simultaneously, all the tasks can be successfully killed and the +// default executor can send TASK_KILLED updates for all of them. +TEST_P(DefaultExecutorTest, KillMultipleTasks) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = CreateSlaveFlags(); + flags.containerizers = GetParam(); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); + ASSERT_SOME(slave); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + resources, + v1::ExecutorInfo::DEFAULT, + frameworkId); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + v1::TaskInfo taskInfo1 = + v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); + + v1::TaskInfo taskInfo2 = + v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); + + const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()}; + + Future<v1::scheduler::Event::Update> startingUpdate1; + Future<v1::scheduler::Event::Update> startingUpdate2; + Future<v1::scheduler::Event::Update> runningUpdate1; + Future<v1::scheduler::Event::Update> runningUpdate2; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce( + DoAll( + FutureArg<1>(&startingUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillOnce( + DoAll( + FutureArg<1>(&startingUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + mesos.send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH_GROUP( + executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))})); + + AWAIT_READY(startingUpdate1); + ASSERT_EQ(TASK_STARTING, startingUpdate1->status().state()); + + AWAIT_READY(startingUpdate2); + ASSERT_EQ(TASK_STARTING, startingUpdate2->status().state()); + + AWAIT_READY(runningUpdate1); + ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state()); + + AWAIT_READY(runningUpdate2); + ASSERT_EQ(TASK_RUNNING, runningUpdate2->status().state()); + + // When running a task, TASK_RUNNING updates for the tasks in a + // task group can be received in any order. + const hashset<v1::TaskID> tasksRunning{ + runningUpdate1->status().task_id(), + runningUpdate2->status().task_id()}; + + ASSERT_EQ(tasks, tasksRunning); + + Future<v1::scheduler::Event::Update> killedUpdate1; + Future<v1::scheduler::Event::Update> killedUpdate2; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&killedUpdate1)) + .WillOnce(FutureArg<1>(&killedUpdate2)); + + // Now kill all tasks in the task group. + mesos.send(v1::createCallKill(frameworkId, taskInfo1.task_id())); + mesos.send(v1::createCallKill(frameworkId, taskInfo2.task_id())); + + // All the tasks in the task group should be killed. + AWAIT_READY(killedUpdate1); + ASSERT_EQ(TASK_KILLED, killedUpdate1->status().state()); + + AWAIT_READY(killedUpdate2); + ASSERT_EQ(TASK_KILLED, killedUpdate2->status().state()); + + // When killing a task, TASK_KILLED updates for the tasks in a task + // group can be received in any order. + const hashset<v1::TaskID> tasksKilled{ + killedUpdate1->status().task_id(), + killedUpdate2->status().task_id()}; + + ASSERT_EQ(tasks, tasksKilled); +} + + // This test verifies that if the default executor receives a // non-zero exit status code for a task in the task group, it // kills all the other tasks (default restart policy).