Added tests for failed task groups on a single executor.

These tests verify agent behavior when launching several task groups
using the same executor. When task groups are launching on the agent
(before creating any executor), if the first received task group
fails to launch, subsequent task groups destined for the same executor
will be dropped. If a task group that is not the first to be received
fails to launch, the first received task group should still launch
successfully.

Review: https://reviews.apache.org/r/66323/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/731136aa
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/731136aa
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/731136aa

Branch: refs/heads/master
Commit: 731136aa55c0825698c7643ec0555142351828af
Parents: 7b8937e
Author: Meng Zhu <[email protected]>
Authored: Thu Apr 5 17:44:47 2018 -0700
Committer: Greg Mann <[email protected]>
Committed: Thu Apr 5 17:58:48 2018 -0700

----------------------------------------------------------------------
 src/tests/slave_tests.cpp | 426 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 426 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/731136aa/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 7877f9d..2f90d1d 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -5215,6 +5215,432 @@ TEST_F(SlaveTest, LaunchTasksReorderTaskAuthorization)
 }
 
 
+// This test verifies the agent behavior of launching three task groups using
+// the same executor. When all three task groups are launching on the agent
+// (before creating any executor), if the first received task group fails to
+// launch, subsequent task group launches would also fail.
+TEST_F(SlaveTest, LaunchTaskGroupsUsingSameExecutorKillFirstTaskGroup)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  // Start a mock slave.
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), slaveFlags, true);
+
+  ASSERT_SOME(slave);
+  ASSERT_NE(nullptr, slave.get()->mock());
+  slave.get()->start();
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(*scheduler, failure(_, _))
+    .Times(AtMost(1));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  // Advance the clock to trigger both agent registration and a batch
+  // allocation.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(subscribed);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      "default1", None(), resources, v1::ExecutorInfo::DEFAULT, frameworkId);
+
+  // Create three separate task groups that use the same executor.
+
+  v1::TaskInfo taskInfo1 = v1::createTask(agentId, resources, "sleep 1000");
+  v1::TaskGroupInfo taskGroup1 = v1::createTaskGroupInfo({taskInfo1});
+
+  v1::TaskInfo taskInfo2 = v1::createTask(agentId, resources, "sleep 1000");
+  v1::TaskGroupInfo taskGroup2 = v1::createTaskGroupInfo({taskInfo2});
+
+  v1::TaskInfo taskInfo3 = v1::createTask(agentId, resources, "sleep 1000");
+  v1::TaskGroupInfo taskGroup3 = v1::createTaskGroupInfo({taskInfo3});
+
+  v1::Offer::Operation launchGroup1 =
+    v1::LAUNCH_GROUP(executorInfo, taskGroup1);
+  v1::Offer::Operation launchGroup2 =
+    v1::LAUNCH_GROUP(executorInfo, taskGroup2);
+  v1::Offer::Operation launchGroup3 =
+    v1::LAUNCH_GROUP(executorInfo, taskGroup3);
+
+  Future<v1::scheduler::Event::Update> task1Killed;
+  Future<v1::scheduler::Event::Update> task2Lost;
+  Future<v1::scheduler::Event::Update> task3Lost;
+  EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateTaskIdEq(taskInfo1)))
+    .WillOnce(DoAll(
+        FutureArg<1>(&task1Killed),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+  EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateTaskIdEq(taskInfo2)))
+    .WillOnce(DoAll(
+        FutureArg<1>(&task2Lost),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+  EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateTaskIdEq(taskInfo3)))
+    .WillOnce(DoAll(
+        FutureArg<1>(&task3Lost),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  // Saved arguments from `Slave::_run()`.
+  FrameworkInfo _frameworkInfo1, _frameworkInfo2, _frameworkInfo3;
+  ExecutorInfo _executorInfo1, _executorInfo2, _executorInfo3;
+  Option<TaskGroupInfo> _taskGroup1, _taskGroup2,  _taskGroup3;
+  Option<TaskInfo> _task1, _task2, _task3;
+  vector<ResourceVersionUUID>
+    _resourceVersionUuids1, _resourceVersionUuids2, _resourceVersionUuids3;
+  Option<bool> _launchExecutor1, _launchExecutor2, _launchExecutor3;
+
+  // Pause all taskgroups at `_run` by returning a pending future.
+  Promise<Nothing> promise1, promise2, promise3;
+  Future<Nothing> _run1, _run2, _run3;
+  EXPECT_CALL(
+      *slave.get()->mock(),
+      _run(_, _, _,
+          OptionTaskGroupHasTaskID(devolve(taskInfo1.task_id())),
+          _, _))
+    .WillOnce(DoAll(
+        FutureSatisfy(&_run1),
+        SaveArg<0>(&_frameworkInfo1),
+        SaveArg<1>(&_executorInfo1),
+        SaveArg<2>(&_task1),
+        SaveArg<3>(&_taskGroup1),
+        SaveArg<4>(&_resourceVersionUuids1),
+        SaveArg<5>(&_launchExecutor1),
+        Return(promise1.future())));
+  EXPECT_CALL(
+      *slave.get()->mock(),
+      _run(_, _, _,
+          OptionTaskGroupHasTaskID(devolve(taskInfo2.task_id())),
+          _, _))
+    .WillOnce(DoAll(
+        FutureSatisfy(&_run2),
+        SaveArg<0>(&_frameworkInfo2),
+        SaveArg<1>(&_executorInfo2),
+        SaveArg<2>(&_task2),
+        SaveArg<3>(&_taskGroup2),
+        SaveArg<4>(&_resourceVersionUuids2),
+        SaveArg<5>(&_launchExecutor2),
+        Return(promise2.future())));
+  EXPECT_CALL(
+      *slave.get()->mock(),
+      _run(_, _, _,
+          OptionTaskGroupHasTaskID(devolve(taskInfo3.task_id())),
+          _, _))
+    .WillOnce(DoAll(
+        FutureSatisfy(&_run3),
+        SaveArg<0>(&_frameworkInfo3),
+        SaveArg<1>(&_executorInfo3),
+        SaveArg<2>(&_task3),
+        SaveArg<3>(&_taskGroup3),
+        SaveArg<4>(&_resourceVersionUuids3),
+        SaveArg<5>(&_launchExecutor3),
+        Return(promise3.future())));
+
+  // Launch task groups.
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId, offer, {launchGroup1, launchGroup2, launchGroup3}));
+
+  AWAIT_READY(_run1);
+  AWAIT_READY(_run2);
+  AWAIT_READY(_run3);
+
+  Future<Nothing> killTask1;
+  EXPECT_CALL(*slave.get()->mock(), killTask(_, _))
+    .WillOnce(DoAll(Invoke(slave.get()->mock(),
+                           &MockSlave::unmocked_killTask),
+                    FutureSatisfy(&killTask1)));
+
+  // Kill task1.
+  mesos.send(v1::createCallKill(frameworkId, taskInfo1.task_id(), agentId));
+
+  AWAIT_READY(killTask1);
+
+  // Resume the continuation for `taskGroup1`.
+  Future<Nothing> unmocked__run1 = process::dispatch(slave.get()->pid, [=] {
+    return slave.get()->mock()->unmocked__run(
+        _frameworkInfo1,
+        _executorInfo1,
+        _task1,
+        _taskGroup1,
+        _resourceVersionUuids1,
+        _launchExecutor1);
+  });
+
+  promise1.associate(unmocked__run1);
+
+  AWAIT(unmocked__run1);
+  AWAIT_READY(task1Killed);
+
+  EXPECT_EQ(v1::TASK_KILLED, task1Killed->status().state());
+
+  // Resume the continuation for taskgroup2.
+  Future<Nothing> unmocked__run2 = process::dispatch(slave.get()->pid, [=] {
+    return slave.get()->mock()->unmocked__run(
+        _frameworkInfo2,
+        _executorInfo2,
+        _task2,
+        _taskGroup2,
+        _resourceVersionUuids2,
+        _launchExecutor2);
+  });
+
+  promise2.associate(unmocked__run2);
+
+  // Resume the continuation for taskgroup3.
+  Future<Nothing> unmocked__run3 = process::dispatch(slave.get()->pid, [=] {
+    return slave.get()->mock()->unmocked__run(
+        _frameworkInfo3,
+        _executorInfo3,
+        _task3,
+        _taskGroup3,
+        _resourceVersionUuids3,
+        _launchExecutor3);
+  });
+
+  promise3.associate(unmocked__run3);
+
+  AWAIT(unmocked__run2);
+  AWAIT_READY(task2Lost);
+
+  EXPECT_EQ(v1::TASK_LOST, task2Lost->status().state());
+
+  AWAIT(unmocked__run3);
+  AWAIT_READY(task3Lost);
+
+  EXPECT_EQ(v1::TASK_LOST, task3Lost->status().state());
+}
+
+
+// This test verifies the agent behavior of launching two task groups using
+// the same executor. When both task groups are launching on the agent
+// (before creating any executor), if the second received task group fails to
+// launch, the first task group can continue launching successfully.
+TEST_F(SlaveTest, LaunchTaskGroupsUsingSameExecutorKillLaterTaskGroup)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  // Start a mock slave.
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), slaveFlags, true);
+
+  ASSERT_SOME(slave);
+  ASSERT_NE(nullptr, slave.get()->mock());
+  slave.get()->start();
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  EXPECT_CALL(*scheduler, failure(_, _))
+    .Times(AtMost(1));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  // Advance the clock to trigger both agent registration and a batch
+  // allocation.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(subscribed);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      "default1", None(), resources, v1::ExecutorInfo::DEFAULT, frameworkId);
+
+  // Create two separate task groups that use the same executor.
+
+  v1::TaskInfo taskInfo1 = v1::createTask(agentId, resources, "sleep 1000");
+  v1::TaskGroupInfo taskGroup1 = v1::createTaskGroupInfo({taskInfo1});
+
+  v1::TaskInfo taskInfo2 = v1::createTask(agentId, resources, "sleep 1000");
+  v1::TaskGroupInfo taskGroup2 = v1::createTaskGroupInfo({taskInfo2});
+
+  v1::Offer::Operation launchGroup1 =
+    v1::LAUNCH_GROUP(executorInfo, taskGroup1);
+  v1::Offer::Operation launchGroup2 =
+    v1::LAUNCH_GROUP(executorInfo, taskGroup2);
+
+  Future<v1::scheduler::Event::Update> task1Starting, task1Running;
+  Future<v1::scheduler::Event::Update> task2Killed;
+  EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateTaskIdEq(taskInfo1)))
+    .WillOnce(DoAll(
+        FutureArg<1>(&task1Starting),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(DoAll(
+        FutureArg<1>(&task1Running),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+  EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateTaskIdEq(taskInfo2)))
+    .WillOnce(DoAll(
+        FutureArg<1>(&task2Killed),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  // Saved arguments from `Slave::_run()`.
+  FrameworkInfo _frameworkInfo1, _frameworkInfo2;
+  ExecutorInfo _executorInfo1, _executorInfo2;
+  Option<TaskGroupInfo> _taskGroup1, _taskGroup2;
+  Option<TaskInfo> _task1, _task2;
+  vector<ResourceVersionUUID> _resourceVersionUuids1, _resourceVersionUuids2;
+  Option<bool> _launchExecutor1, _launchExecutor2;
+
+  // Pause both taskgroups at `_run` by returning a pending future.
+  Promise<Nothing> promise1, promise2;
+  Future<Nothing> _run1, _run2;
+  EXPECT_CALL(
+      *slave.get()->mock(),
+      _run(_, _, _,
+          OptionTaskGroupHasTaskID(devolve(taskInfo1.task_id())),
+          _, _))
+    .WillOnce(DoAll(
+        FutureSatisfy(&_run1),
+        SaveArg<0>(&_frameworkInfo1),
+        SaveArg<1>(&_executorInfo1),
+        SaveArg<2>(&_task1),
+        SaveArg<3>(&_taskGroup1),
+        SaveArg<4>(&_resourceVersionUuids1),
+        SaveArg<5>(&_launchExecutor1),
+        Return(promise1.future())));
+  EXPECT_CALL(
+      *slave.get()->mock(),
+      _run(_, _, _,
+          OptionTaskGroupHasTaskID(devolve(taskInfo2.task_id())),
+          _, _))
+    .WillOnce(DoAll(
+        FutureSatisfy(&_run2),
+        SaveArg<0>(&_frameworkInfo2),
+        SaveArg<1>(&_executorInfo2),
+        SaveArg<2>(&_task2),
+        SaveArg<3>(&_taskGroup2),
+        SaveArg<4>(&_resourceVersionUuids2),
+        SaveArg<5>(&_launchExecutor2),
+        Return(promise2.future())));
+
+  // Launch the two task groups.
+  mesos.send(
+      v1::createCallAccept(frameworkId, offer, {launchGroup1, launchGroup2}));
+
+  AWAIT_READY(_run1);
+  AWAIT_READY(_run2);
+
+  Future<Nothing> killTask2;
+  EXPECT_CALL(*slave.get()->mock(), killTask(_, _))
+    .WillOnce(DoAll(Invoke(slave.get()->mock(),
+                           &MockSlave::unmocked_killTask),
+                    FutureSatisfy(&killTask2)));
+
+  // Kill task2.
+  mesos.send(v1::createCallKill(frameworkId, taskInfo2.task_id(), agentId));
+
+  AWAIT_READY(killTask2);
+
+  // Resume the continuation for `taskGroup2`.
+  Future<Nothing> unmocked__run2 = process::dispatch(slave.get()->pid, [=] {
+    return slave.get()->mock()->unmocked__run(
+        _frameworkInfo2,
+        _executorInfo2,
+        _task2,
+        _taskGroup2,
+        _resourceVersionUuids2,
+        _launchExecutor2);
+  });
+
+  promise2.associate(unmocked__run2);
+
+  AWAIT(unmocked__run2);
+  AWAIT_READY(task2Killed);
+
+  EXPECT_EQ(v1::TASK_KILLED, task2Killed->status().state());
+
+  // Resume the continuation for taskgroup1.
+  Future<Nothing> unmocked__run1 = process::dispatch(slave.get()->pid, [=] {
+    return slave.get()->mock()->unmocked__run(
+        _frameworkInfo1,
+        _executorInfo1,
+        _task1,
+        _taskGroup1,
+        _resourceVersionUuids1,
+        _launchExecutor1);
+  });
+
+  promise1.associate(unmocked__run1);
+
+  AWAIT(unmocked__run1);
+
+  AWAIT_READY(task1Starting);
+  EXPECT_EQ(v1::TASK_STARTING, task1Starting->status().state());
+
+  AWAIT_READY(task1Running);
+  EXPECT_EQ(v1::TASK_RUNNING, task1Running->status().state());
+}
+
+
 // This test ensures that agent sends ExitedExecutorMessage when the task
 // fails to launch due to task authorization failure and that master's executor
 // bookkeeping entry is removed.

Reply via email to