Added a test `DefaultExecutorTest.KillMultipleTasks`.

Review: https://reviews.apache.org/r/62837


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/28831de3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/28831de3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/28831de3

Branch: refs/heads/master
Commit: 28831de34d098c894042246dd6fef402eb3b960d
Parents: 05c7dd8
Author: Qian Zhang <zhq527...@gmail.com>
Authored: Mon Oct 9 14:25:31 2017 +0800
Committer: Qian Zhang <zhq527...@gmail.com>
Committed: Tue Oct 31 12:47:17 2017 +0800

----------------------------------------------------------------------
 src/tests/default_executor_tests.cpp | 141 ++++++++++++++++++++++++++++++
 1 file changed, 141 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/28831de3/src/tests/default_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/default_executor_tests.cpp 
b/src/tests/default_executor_tests.cpp
index 5078bd4..f485b61 100644
--- a/src/tests/default_executor_tests.cpp
+++ b/src/tests/default_executor_tests.cpp
@@ -441,6 +441,147 @@ TEST_P(DefaultExecutorTest, KillTask)
 }
 
 
+// This is a regression test for MESOS-8051. It verifies that if the
+// default executor is asked to kill all tasks from a task group
+// simultaneously, all the tasks can be successfully killed and the
+// default executor can send TASK_KILLED updates for all of them.
+TEST_P(DefaultExecutorTest, KillMultipleTasks)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.containerizers = GetParam();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID,
+      None(),
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  v1::TaskInfo taskInfo1 =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+
+  v1::TaskInfo taskInfo2 =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
+
+  const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()};
+
+  Future<v1::scheduler::Event::Update> startingUpdate1;
+  Future<v1::scheduler::Event::Update> startingUpdate2;
+  Future<v1::scheduler::Event::Update> runningUpdate1;
+  Future<v1::scheduler::Event::Update> runningUpdate2;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&startingUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&startingUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH_GROUP(
+              executorInfo, v1::createTaskGroupInfo({taskInfo1, 
taskInfo2}))}));
+
+  AWAIT_READY(startingUpdate1);
+  ASSERT_EQ(TASK_STARTING, startingUpdate1->status().state());
+
+  AWAIT_READY(startingUpdate2);
+  ASSERT_EQ(TASK_STARTING, startingUpdate2->status().state());
+
+  AWAIT_READY(runningUpdate1);
+  ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state());
+
+  AWAIT_READY(runningUpdate2);
+  ASSERT_EQ(TASK_RUNNING, runningUpdate2->status().state());
+
+  // When running a task, TASK_RUNNING updates for the tasks in a
+  // task group can be received in any order.
+  const hashset<v1::TaskID> tasksRunning{
+    runningUpdate1->status().task_id(),
+    runningUpdate2->status().task_id()};
+
+  ASSERT_EQ(tasks, tasksRunning);
+
+  Future<v1::scheduler::Event::Update> killedUpdate1;
+  Future<v1::scheduler::Event::Update> killedUpdate2;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&killedUpdate1))
+    .WillOnce(FutureArg<1>(&killedUpdate2));
+
+  // Now kill all tasks in the task group.
+  mesos.send(v1::createCallKill(frameworkId, taskInfo1.task_id()));
+  mesos.send(v1::createCallKill(frameworkId, taskInfo2.task_id()));
+
+  // All the tasks in the task group should be killed.
+  AWAIT_READY(killedUpdate1);
+  ASSERT_EQ(TASK_KILLED, killedUpdate1->status().state());
+
+  AWAIT_READY(killedUpdate2);
+  ASSERT_EQ(TASK_KILLED, killedUpdate2->status().state());
+
+  // When killing a task, TASK_KILLED updates for the tasks in a task
+  // group can be received in any order.
+  const hashset<v1::TaskID> tasksKilled{
+    killedUpdate1->status().task_id(),
+    killedUpdate2->status().task_id()};
+
+  ASSERT_EQ(tasks, tasksKilled);
+}
+
+
 // This test verifies that if the default executor receives a
 // non-zero exit status code for a task in the task group, it
 // kills all the other tasks (default restart policy).

Reply via email to