Repository: mesos Updated Branches: refs/heads/master 84365a140 -> 9ee4da2b5
Fixed a task status update race in default executor tests. Previously in the test `DefaultExecutorTest.KillMultipleTasks` and `DefaultExecutorTest.KillTaskGroupOnTaskFailure`, when launching a task group which has multiple tasks, we expected the scheduler will receive all the TASK_STARTING status updates before receiving any TASK_RUNNING status updates. However this is not guaranteed, e.g., it is possible for the scheduler to receive TASK_RUNNING for the first task before receiving TASK_STARTING for the second task. So in this patch, we used `Sequence` to guarantee the order of TASK_STARTING and TASK_RUNNING for each task but do not care about the order between tasks. The following 3 tests have their own solutions to handle this issue, in this patch, I updated them to use the above solution. `DefaultExecutorTest.KillTask` `DefaultExecutorTest.CommitSuicideOnKillTask` `DefaultExecutorTest.ROOT_ContainerStatusForTask` Review: https://reviews.apache.org/r/63577/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e6c6ab85 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e6c6ab85 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e6c6ab85 Branch: refs/heads/master Commit: e6c6ab856d4f4829cbe71b60e93bb2c1dc6b44bc Parents: 84365a1 Author: Qian Zhang <zhq527...@gmail.com> Authored: Tue Nov 21 18:39:17 2017 +0100 Committer: Alexander Rukletsov <al...@apache.org> Committed: Tue Nov 21 18:39:17 2017 +0100 ---------------------------------------------------------------------- src/tests/default_executor_tests.cpp | 652 +++++++++++++++++------------- src/tests/mesos.hpp | 14 + 2 files changed, 394 insertions(+), 272 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e6c6ab85/src/tests/default_executor_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp index 6515021..04d9e1b 100644 --- a/src/tests/default_executor_tests.cpp +++ b/src/tests/default_executor_tests.cpp @@ -71,6 +71,7 @@ using std::string; using std::vector; using testing::_; +using testing::AllOf; using testing::DoAll; using testing::Return; using testing::WithParamInterface; @@ -296,28 +297,80 @@ TEST_P(DefaultExecutorTest, KillTask) v1::TaskInfo taskInfo2 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); - const hashset<v1::TaskID> tasks1{taskInfo1.task_id(), taskInfo2.task_id()}; - Future<v1::scheduler::Event::Update> startingUpdate1; - Future<v1::scheduler::Event::Update> startingOrRunningUpdate1; - Future<v1::scheduler::Event::Update> startingOrRunningUpdate2; Future<v1::scheduler::Event::Update> runningUpdate1; - EXPECT_CALL(*scheduler, update(_, _)) + Future<v1::scheduler::Event::Update> killedUpdate1; + + testing::Sequence task1; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task1) .WillOnce( DoAll( FutureArg<1>(&startingUpdate1), - v1::scheduler::SendAcknowledge(frameworkId, agentId))) + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task1) .WillOnce( DoAll( - FutureArg<1>(&startingOrRunningUpdate1), - v1::scheduler::SendAcknowledge(frameworkId, agentId))) + FutureArg<1>(&runningUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1), + TaskStatusUpdateStateEq(v1::TASK_KILLED)))) + .InSequence(task1) .WillOnce( DoAll( - FutureArg<1>(&startingOrRunningUpdate2), - v1::scheduler::SendAcknowledge(frameworkId, agentId))) + FutureArg<1>(&killedUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + Future<v1::scheduler::Event::Update> startingUpdate2; + Future<v1::scheduler::Event::Update> runningUpdate2; + Future<v1::scheduler::Event::Update> killedUpdate2; + + testing::Sequence task2; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task2) .WillOnce( DoAll( - FutureArg<1>(&runningUpdate1), + FutureArg<1>(&startingUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task2) + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2), + TaskStatusUpdateStateEq(v1::TASK_KILLED)))) + .InSequence(task2) + .WillOnce( + DoAll( + FutureArg<1>(&killedUpdate2), v1::scheduler::SendAcknowledge(frameworkId, agentId))); Future<v1::scheduler::Event::Offers> offers2; @@ -329,6 +382,7 @@ TEST_P(DefaultExecutorTest, KillTask) v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP( executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2})); + Call call = v1::createCallAccept(frameworkId, offer1, {launchGroup}); // Set a 0s filter to immediately get another offer to launch @@ -339,20 +393,10 @@ TEST_P(DefaultExecutorTest, KillTask) } AWAIT_READY(startingUpdate1); - ASSERT_EQ(TASK_STARTING, startingUpdate1->status().state()); - AWAIT_READY(runningUpdate1); - ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state()); - - // When running a task, TASK_RUNNING updates for the tasks in a - // task group can be received in any order. - const hashset<v1::TaskID> tasksRunning{ - startingUpdate1->status().task_id(), - startingOrRunningUpdate1->status().task_id(), - startingOrRunningUpdate2->status().task_id(), - runningUpdate1->status().task_id()}; - ASSERT_EQ(tasks1, tasksRunning); + AWAIT_READY(startingUpdate2); + AWAIT_READY(runningUpdate2); AWAIT_READY(offers2); const v1::Offer& offer2 = offers2->offers(0); @@ -362,15 +406,41 @@ TEST_P(DefaultExecutorTest, KillTask) Future<v1::scheduler::Event::Update> startingUpdate3; Future<v1::scheduler::Event::Update> runningUpdate3; - EXPECT_CALL(*scheduler, update(_, _)) + Future<v1::scheduler::Event::Update> killedUpdate3; + + testing::Sequence task3; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo3), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task3) .WillOnce( DoAll( FutureArg<1>(&startingUpdate3), - v1::scheduler::SendAcknowledge(frameworkId, offer2.agent_id()))) + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo3), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task3) .WillOnce( DoAll( FutureArg<1>(&runningUpdate3), - v1::scheduler::SendAcknowledge(frameworkId, offer2.agent_id()))); + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo3), + TaskStatusUpdateStateEq(v1::TASK_KILLED)))) + .InSequence(task3) + .WillOnce( + DoAll( + FutureArg<1>(&killedUpdate3), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); // Launch the second task group. mesos.send( @@ -381,56 +451,32 @@ TEST_P(DefaultExecutorTest, KillTask) executorInfo, v1::createTaskGroupInfo({taskInfo3}))})); AWAIT_READY(startingUpdate3); - ASSERT_EQ(TASK_STARTING, startingUpdate3->status().state()); - ASSERT_EQ(taskInfo3.task_id(), startingUpdate3->status().task_id()); - AWAIT_READY(runningUpdate3); - ASSERT_EQ(TASK_RUNNING, runningUpdate3->status().state()); - ASSERT_EQ(taskInfo3.task_id(), runningUpdate3->status().task_id()); - - Future<v1::scheduler::Event::Update> killedUpdate1; - Future<v1::scheduler::Event::Update> killedUpdate2; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&killedUpdate1)) - .WillOnce(FutureArg<1>(&killedUpdate2)); Future<v1::scheduler::Event::Failure> executorFailure; EXPECT_CALL(*scheduler, failure(_, _)) .WillOnce(FutureArg<1>(&executorFailure)); + ASSERT_TRUE(killedUpdate1.isPending()); + ASSERT_TRUE(killedUpdate2.isPending()); + ASSERT_TRUE(killedUpdate3.isPending()); + // Now kill a task in the first task group. mesos.send(v1::createCallKill(frameworkId, taskInfo1.task_id())); - // All the tasks in the first task group should be killed. - + // Only the tasks in the first group were killed. AWAIT_READY(killedUpdate1); - ASSERT_EQ(TASK_KILLED, killedUpdate1->status().state()); - AWAIT_READY(killedUpdate2); - ASSERT_EQ(TASK_KILLED, killedUpdate2->status().state()); - - // When killing a task, TASK_KILLED updates for the tasks in a task - // group can be received in any order. - const hashset<v1::TaskID> tasksKilled{ - killedUpdate1->status().task_id(), - killedUpdate2->status().task_id()}; - - ASSERT_EQ(tasks1, tasksKilled); + ASSERT_TRUE(killedUpdate3.isPending()); // The executor should still be alive after the first task // group has been killed. ASSERT_TRUE(executorFailure.isPending()); - Future<v1::scheduler::Event::Update> killedUpdate3; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&killedUpdate3)); - // Now kill the only task present in the second task group. mesos.send(v1::createCallKill(frameworkId, taskInfo3.task_id())); AWAIT_READY(killedUpdate3); - ASSERT_EQ(TASK_KILLED, killedUpdate3->status().state()); - ASSERT_EQ(taskInfo3.task_id(), killedUpdate3->status().task_id()); // The executor should commit suicide after all the tasks have been // killed. @@ -505,30 +551,82 @@ TEST_P(DefaultExecutorTest, KillMultipleTasks) v1::TaskInfo taskInfo2 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); - const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()}; - Future<v1::scheduler::Event::Update> startingUpdate1; - Future<v1::scheduler::Event::Update> startingUpdate2; Future<v1::scheduler::Event::Update> runningUpdate1; - Future<v1::scheduler::Event::Update> runningUpdate2; - EXPECT_CALL(*scheduler, update(_, _)) + Future<v1::scheduler::Event::Update> killedUpdate1; + + testing::Sequence task1; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task1) .WillOnce( DoAll( FutureArg<1>(&startingUpdate1), - v1::scheduler::SendAcknowledge(frameworkId, agentId))) + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task1) .WillOnce( DoAll( - FutureArg<1>(&startingUpdate2), - v1::scheduler::SendAcknowledge(frameworkId, agentId))) + FutureArg<1>(&runningUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1), + TaskStatusUpdateStateEq(v1::TASK_KILLED)))) + .InSequence(task1) .WillOnce( DoAll( - FutureArg<1>(&runningUpdate1), - v1::scheduler::SendAcknowledge(frameworkId, agentId))) + FutureArg<1>(&killedUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + Future<v1::scheduler::Event::Update> startingUpdate2; + Future<v1::scheduler::Event::Update> runningUpdate2; + Future<v1::scheduler::Event::Update> killedUpdate2; + + testing::Sequence task2; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task2) + .WillOnce( + DoAll( + FutureArg<1>(&startingUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task2) .WillOnce( DoAll( FutureArg<1>(&runningUpdate2), v1::scheduler::SendAcknowledge(frameworkId, agentId))); + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2), + TaskStatusUpdateStateEq(v1::TASK_KILLED)))) + .InSequence(task2) + .WillOnce( + DoAll( + FutureArg<1>(&killedUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + mesos.send( v1::createCallAccept( frameworkId, @@ -537,30 +635,10 @@ TEST_P(DefaultExecutorTest, KillMultipleTasks) executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))})); AWAIT_READY(startingUpdate1); - ASSERT_EQ(TASK_STARTING, startingUpdate1->status().state()); - - AWAIT_READY(startingUpdate2); - ASSERT_EQ(TASK_STARTING, startingUpdate2->status().state()); - AWAIT_READY(runningUpdate1); - ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state()); + AWAIT_READY(startingUpdate2); AWAIT_READY(runningUpdate2); - ASSERT_EQ(TASK_RUNNING, runningUpdate2->status().state()); - - // When running a task, TASK_RUNNING updates for the tasks in a - // task group can be received in any order. - const hashset<v1::TaskID> tasksRunning{ - runningUpdate1->status().task_id(), - runningUpdate2->status().task_id()}; - - ASSERT_EQ(tasks, tasksRunning); - - Future<v1::scheduler::Event::Update> killedUpdate1; - Future<v1::scheduler::Event::Update> killedUpdate2; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&killedUpdate1)) - .WillOnce(FutureArg<1>(&killedUpdate2)); // Now kill all tasks in the task group. mesos.send(v1::createCallKill(frameworkId, taskInfo1.task_id())); @@ -568,18 +646,7 @@ TEST_P(DefaultExecutorTest, KillMultipleTasks) // All the tasks in the task group should be killed. AWAIT_READY(killedUpdate1); - ASSERT_EQ(TASK_KILLED, killedUpdate1->status().state()); - AWAIT_READY(killedUpdate2); - ASSERT_EQ(TASK_KILLED, killedUpdate2->status().state()); - - // When killing a task, TASK_KILLED updates for the tasks in a task - // group can be received in any order. - const hashset<v1::TaskID> tasksKilled{ - killedUpdate1->status().task_id(), - killedUpdate2->status().task_id()}; - - ASSERT_EQ(tasks, tasksKilled); } @@ -644,17 +711,81 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure) v1::TaskInfo taskInfo2 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); - const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()}; - Future<v1::scheduler::Event::Update> startingUpdate1; - Future<v1::scheduler::Event::Update> startingUpdate2; Future<v1::scheduler::Event::Update> runningUpdate1; + Future<v1::scheduler::Event::Update> failedUpdate1; + + testing::Sequence task1; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task1) + .WillOnce( + DoAll( + FutureArg<1>(&startingUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task1) + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1), + TaskStatusUpdateStateEq(v1::TASK_FAILED)))) + .InSequence(task1) + .WillOnce( + DoAll( + FutureArg<1>(&failedUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + Future<v1::scheduler::Event::Update> startingUpdate2; Future<v1::scheduler::Event::Update> runningUpdate2; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&startingUpdate1)) - .WillOnce(FutureArg<1>(&startingUpdate2)) - .WillOnce(FutureArg<1>(&runningUpdate1)) - .WillOnce(FutureArg<1>(&runningUpdate2)); + Future<v1::scheduler::Event::Update> killedUpdate2; + + testing::Sequence task2; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task2) + .WillOnce( + DoAll( + FutureArg<1>(&startingUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task2) + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2), + TaskStatusUpdateStateEq(v1::TASK_KILLED)))) + .InSequence(task2) + .WillOnce( + DoAll( + FutureArg<1>(&killedUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); mesos.send( v1::createCallAccept( @@ -664,60 +795,12 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure) executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))})); AWAIT_READY(startingUpdate1); - ASSERT_EQ(TASK_STARTING, startingUpdate1->status().state()); - - AWAIT_READY(startingUpdate2); - ASSERT_EQ(TASK_STARTING, startingUpdate2->status().state()); - - mesos.send( - v1::createCallAcknowledge(frameworkId, agentId, startingUpdate1.get())); - mesos.send( - v1::createCallAcknowledge(frameworkId, agentId, startingUpdate2.get())); - AWAIT_READY(runningUpdate1); - ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state()); + AWAIT_READY(failedUpdate1); + AWAIT_READY(startingUpdate2); AWAIT_READY(runningUpdate2); - ASSERT_EQ(TASK_RUNNING, runningUpdate2->status().state()); - - // When running a task, TASK_RUNNING updates for the tasks in a task - // group can be received in any order. - const hashset<v1::TaskID> tasksRunning{ - runningUpdate1->status().task_id(), - runningUpdate2->status().task_id()}; - - ASSERT_EQ(tasks, tasksRunning); - - Future<v1::scheduler::Event::Update> update1; - Future<v1::scheduler::Event::Update> update2; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&update1)) - .WillOnce(FutureArg<1>(&update2)); - - // Acknowledge the TASK_RUNNING updates to receive the next updates. - mesos.send( - v1::createCallAcknowledge(frameworkId, agentId, runningUpdate1.get())); - mesos.send( - v1::createCallAcknowledge(frameworkId, agentId, runningUpdate2.get())); - - // Updates for the tasks in a task group can be received in any order. - set<pair<v1::TaskID, v1::TaskState>> taskStates; - - taskStates.insert({taskInfo1.task_id(), v1::TASK_FAILED}); - taskStates.insert({taskInfo2.task_id(), v1::TASK_KILLED}); - - AWAIT_READY(update1); - AWAIT_READY(update2); - - set<std::pair<v1::TaskID, v1::TaskState>> expectedTaskStates; - - expectedTaskStates.insert( - {update1->status().task_id(), update1->status().state()}); - - expectedTaskStates.insert( - {update2->status().task_id(), update2->status().state()}); - - ASSERT_EQ(expectedTaskStates, taskStates); + AWAIT_READY(killedUpdate2); } @@ -858,30 +941,62 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask) const v1::Offer& offer = offers->offers(0); const v1::AgentID& agentId = offer.agent_id(); - v1::TaskInfo task1 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); + v1::TaskInfo taskInfo1 = + v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); - v1::TaskInfo task2 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); + v1::TaskInfo taskInfo2 = + v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); - Future<Event::Update> updateStarting1; - Future<Event::Update> updateStartingOrRunning1; - Future<Event::Update> updateStartingOrRunning2; - Future<Event::Update> updateRunning2; - EXPECT_CALL(*scheduler, update(_, _)) + Future<v1::scheduler::Event::Update> startingUpdate1; + Future<v1::scheduler::Event::Update> runningUpdate1; + + testing::Sequence task1; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task1) .WillOnce( DoAll( - FutureArg<1>(&updateStarting1), - v1::scheduler::SendAcknowledge(frameworkId, agentId))) + FutureArg<1>(&startingUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task1) .WillOnce( DoAll( - FutureArg<1>(&updateStartingOrRunning1), - v1::scheduler::SendAcknowledge(frameworkId, agentId))) + FutureArg<1>(&runningUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + Future<v1::scheduler::Event::Update> startingUpdate2; + Future<v1::scheduler::Event::Update> runningUpdate2; + + testing::Sequence task2; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task2) .WillOnce( DoAll( - FutureArg<1>(&updateStartingOrRunning2), - v1::scheduler::SendAcknowledge(frameworkId, agentId))) + FutureArg<1>(&startingUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task2) .WillOnce( DoAll( - FutureArg<1>(&updateRunning2), + FutureArg<1>(&runningUpdate2), v1::scheduler::SendAcknowledge(frameworkId, agentId))); mesos.send( @@ -889,32 +1004,19 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask) frameworkId, offer, {v1::LAUNCH_GROUP( - executorInfo, v1::createTaskGroupInfo({task1, task2}))})); - - - AWAIT_READY(updateStarting1); - AWAIT_READY(updateStartingOrRunning1); - AWAIT_READY(updateStartingOrRunning2); - AWAIT_READY(updateRunning2); - - ASSERT_EQ(TASK_STARTING, updateStarting1->status().state()); - ASSERT_EQ(TASK_RUNNING, updateRunning2->status().state()); + executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))})); - // Select the two TASK_RUNNING updates from the first four updates - Event::Update update1 = updateStartingOrRunning1.get(); - if (update1.status().state() == v1::TASK_STARTING) { - update1 = updateStartingOrRunning2.get(); - } - Event::Update update2 = updateRunning2.get(); + AWAIT_READY(startingUpdate1); + AWAIT_READY(runningUpdate1); - ASSERT_EQ(TASK_RUNNING, update1.status().state()); - ASSERT_EQ(TASK_RUNNING, update2.status().state()); + AWAIT_READY(startingUpdate2); + AWAIT_READY(runningUpdate2); - ASSERT_TRUE(update1.status().has_container_status()); - ASSERT_TRUE(update2.status().has_container_status()); + ASSERT_TRUE(runningUpdate1->status().has_container_status()); + ASSERT_TRUE(runningUpdate2->status().has_container_status()); - v1::ContainerStatus status1 = update1.status().container_status(); - v1::ContainerStatus status2 = update2.status().container_status(); + v1::ContainerStatus status1 = runningUpdate1->status().container_status(); + v1::ContainerStatus status2 = runningUpdate2->status().container_status(); ASSERT_TRUE(status1.has_container_id()); ASSERT_TRUE(status2.has_container_id()); @@ -1086,27 +1188,86 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask) // The first task finishes successfully while the second // task is explicitly killed later. - v1::TaskInfo task1 = v1::createTask(agentId, resources, "exit 0"); + v1::TaskInfo taskInfo1 = v1::createTask(agentId, resources, "exit 0"); - v1::TaskInfo task2 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); + v1::TaskInfo taskInfo2 = + v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); - // We expect two TASK_STARTING, two TASK_RUNNING, and one TASK_FINISHED - // updates. - vector<Future<v1::scheduler::Event::Update>> updates(5); + Future<v1::scheduler::Event::Update> startingUpdate1; + Future<v1::scheduler::Event::Update> runningUpdate1; + Future<v1::scheduler::Event::Update> finishedUpdate1; + + testing::Sequence task1; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task1) + .WillOnce( + DoAll( + FutureArg<1>(&startingUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); - { - // This variable doesn't have to be used explicitly. We need it so that the - // futures are satisfied in the order in which the updates are received. - testing::InSequence inSequence; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task1) + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); - foreach (Future<v1::scheduler::Event::Update>& update, updates) { - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce( - DoAll( - FutureArg<1>(&update), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - } - } + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1), + TaskStatusUpdateStateEq(v1::TASK_FINISHED)))) + .InSequence(task1) + .WillOnce( + DoAll( + FutureArg<1>(&finishedUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + Future<v1::scheduler::Event::Update> startingUpdate2; + Future<v1::scheduler::Event::Update> runningUpdate2; + Future<v1::scheduler::Event::Update> killedUpdate2; + + testing::Sequence task2; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task2) + .WillOnce( + DoAll( + FutureArg<1>(&startingUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task2) + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2), + TaskStatusUpdateStateEq(v1::TASK_KILLED)))) + .InSequence(task2) + .WillOnce( + DoAll( + FutureArg<1>(&killedUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); Future<v1::scheduler::Event::Failure> executorFailure; EXPECT_CALL(*scheduler, failure(_, _)) @@ -1117,77 +1278,24 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask) frameworkId, offer, {v1::LAUNCH_GROUP( - executorInfo, v1::createTaskGroupInfo({task1, task2}))})); - - enum class Stage - { - INITIAL, - STARTING, - RUNNING, - FINISHED - }; - - hashmap<v1::TaskID, Stage> taskStages; - taskStages[task1.task_id()] = Stage::INITIAL; - taskStages[task2.task_id()] = Stage::INITIAL; - - foreach (Future<v1::scheduler::Event::Update>& update, updates) { - AWAIT_READY(update); - - const v1::TaskStatus& taskStatus = update->status(); - - Option<Stage> taskStage = taskStages.get(taskStatus.task_id()); - ASSERT_SOME(taskStage); - - switch (taskStage.get()) { - case Stage::INITIAL: { - ASSERT_EQ(TASK_STARTING, taskStatus.state()); - - taskStages[taskStatus.task_id()] = Stage::STARTING; - - break; - } - case Stage::STARTING: { - ASSERT_EQ(TASK_RUNNING, taskStatus.state()); - - taskStages[taskStatus.task_id()] = Stage::RUNNING; - - break; - } - case Stage::RUNNING: { - ASSERT_EQ(TASK_FINISHED, taskStatus.state()); + executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))})); - taskStages[taskStatus.task_id()] = Stage::FINISHED; + AWAIT_READY(startingUpdate1); + AWAIT_READY(runningUpdate1); + AWAIT_READY(finishedUpdate1); - break; - } - case Stage::FINISHED: { - FAIL() << "Unexpected task update: " << update->DebugString(); - break; - } - } - } + AWAIT_READY(startingUpdate2); + AWAIT_READY(runningUpdate2); - // `task1` should have finished, `task2` should still be running. - ASSERT_EQ(Stage::FINISHED, taskStages[task1.task_id()]); - ASSERT_EQ(Stage::RUNNING, taskStages[task2.task_id()]); + ASSERT_TRUE(killedUpdate2.isPending()); // The executor should still be alive after task1 has finished successfully. ASSERT_TRUE(executorFailure.isPending()); - Future<v1::scheduler::Event::Update> killedUpdate; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce( - DoAll( - FutureArg<1>(&killedUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - // Now kill the second task in the task group. - mesos.send(v1::createCallKill(frameworkId, task2.task_id())); + mesos.send(v1::createCallKill(frameworkId, taskInfo2.task_id())); - AWAIT_READY(killedUpdate); - ASSERT_EQ(TASK_KILLED, killedUpdate->status().state()); - ASSERT_EQ(task2.task_id(), killedUpdate->status().task_id()); + AWAIT_READY(killedUpdate2); // The executor should commit suicide after the task is killed. AWAIT_READY(executorFailure); http://git-wip-us.apache.org/repos/asf/mesos/blob/e6c6ab85/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 345b883..d8ca9a3 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -3064,6 +3064,20 @@ void ExpectNoFutureUnionHttpProtobufs( MATCHER_P(TaskStatusEq, task, "") { return arg.task_id() == task.task_id(); } +// This matcher is used to match the task id of `Event.update.status` message. +MATCHER_P(TaskStatusUpdateTaskIdEq, taskInfo, "") +{ + return arg.status().task_id() == taskInfo.task_id(); +} + + +// This matcher is used to match the state of `Event.update.status` message. +MATCHER_P(TaskStatusUpdateStateEq, taskState, "") +{ + return arg.status().state() == taskState; +} + + struct ParamExecutorType { public: