Addressed flakiness in the agent tests. Directly invoking unmocked calls in tests can lead to concurrent access of actor state from multiple execution contexts. It is more robust to dispatch the unmocked calls instead.
Expectations are also added to avoid "uninteresting mock function call" test warnings. Review: https://reviews.apache.org/r/65679/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cc57765c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cc57765c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cc57765c Branch: refs/heads/master Commit: cc57765ca35558acf787be7abae586bae3e69b42 Parents: aa2b495 Author: Meng Zhu <[email protected]> Authored: Thu Apr 5 17:44:06 2018 -0700 Committer: Greg Mann <[email protected]> Committed: Thu Apr 5 17:52:03 2018 -0700 ---------------------------------------------------------------------- src/tests/slave_tests.cpp | 177 ++++++++++++++++++++++++++++------------- 1 file changed, 120 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/cc57765c/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index ae8eeba..caf165e 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -4168,6 +4168,9 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts) &MockSlave::unmocked_removeFramework), FutureSatisfy(&removeFramework))); + EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _)) + .Times(AtMost(1)); + driver.killTask(task.task_id()); AWAIT_READY(killTask); @@ -4176,18 +4179,24 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts) // since there remain no more tasks. AWAIT_READY(removeFramework); - slave.get()->mock()->unmocked__run( - unschedules, - frameworkInfo, - executorInfo, - task_, - taskGroup, - resourceVersionUuids, - launchExecutor); + Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] { + slave.get()->mock()->unmocked__run( + unschedules, + frameworkInfo, + executorInfo, + task_, + taskGroup, + resourceVersionUuids, + launchExecutor); + + return Nothing(); + }); AWAIT_READY(status); EXPECT_EQ(TASK_KILLED, status->state()); + AWAIT(unmocked__run); + driver.stop(); driver.join(); } @@ -4323,23 +4332,27 @@ TEST_F(SlaveTest, KillMultiplePendingTasks) AWAIT_READY(removeFramework); // The `__run` continuations should have no effect. - slave.get()->mock()->unmocked__run( - unschedules1, - frameworkInfo1, - executorInfo1, - task_1, - taskGroup1, - resourceVersionUuids1, - launchExecutor1); - - slave.get()->mock()->unmocked__run( - unschedules2, - frameworkInfo2, - executorInfo2, - task_2, - taskGroup2, - resourceVersionUuids2, - launchExecutor2); + process::dispatch(slave.get()->pid, [=] { + slave.get()->mock()->unmocked__run( + unschedules1, + frameworkInfo1, + executorInfo1, + task_1, + taskGroup1, + resourceVersionUuids1, + launchExecutor1); + }); + + process::dispatch(slave.get()->pid, [=] { + slave.get()->mock()->unmocked__run( + unschedules2, + frameworkInfo2, + executorInfo2, + task_2, + taskGroup2, + resourceVersionUuids2, + launchExecutor2); + }); Clock::settle(); @@ -4671,6 +4684,10 @@ TEST_F(SlaveTest, RemoveExecutorUponFailedLaunch) SaveArg<5>(&resourceVersionUuids), SaveArg<6>(&launchExecutor))); + Future<Nothing> executorLost; + EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _)) + .WillOnce(FutureSatisfy(&executorLost)); + driver.launchTasks(offers.get()[0].id(), {task}); AWAIT_READY(_run); @@ -4685,21 +4702,27 @@ TEST_F(SlaveTest, RemoveExecutorUponFailedLaunch) AWAIT_READY(killTask); - process::dispatch(slave.get()->pid, [&] { - slave.get()->mock()->unmocked__run( - unschedules, - frameworkInfo, - executorInfo_, - task_, - taskGroup_, - resourceVersionUuids, - launchExecutor); - }); + Future<Nothing> unmocked__run = + process::dispatch(slave.get()->pid, [=]() -> Future<Nothing> { + return slave.get()->mock()->unmocked__run( + frameworkInfo, + executorInfo_, + task_, + taskGroup_, + resourceVersionUuids, + launchExecutor); + }); + + promise.associate(unmocked__run); + + AWAIT(unmocked__run); // Agent needs to send `ExitedExecutorMessage` to the master because // the executor never launched. AWAIT_READY(exitedExecutorMessage); + AWAIT_READY(executorLost); + // Helper function to post a request to '/api/v1' master endpoint // and return the response. auto post = []( @@ -4838,13 +4861,28 @@ TEST_F(SlaveTest, RemoveExecutorUponFailedTaskGroupLaunch) EXPECT_CALL(mockGarbageCollector, unschedule(_)) .WillRepeatedly(Return(Failure(""))); + Future<v1::scheduler::Event::Update> update1; + Future<v1::scheduler::Event::Update> update2; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&update1)) + .WillOnce(FutureArg<1>(&update2)); + Future<ExitedExecutorMessage> exitedExecutorMessage = FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _); + EXPECT_CALL(*scheduler, failure(_, _)) + .Times(AtMost(1)); + mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup})); AWAIT_READY(exitedExecutorMessage); + AWAIT_READY(update1); + AWAIT_READY(update2); + + ASSERT_EQ(v1::TASK_LOST, update1->status().state()); + ASSERT_EQ(v1::TASK_LOST, update2->status().state()); + // Helper function to post a request to '/api/v1' master endpoint // and return the response. auto post = []( @@ -4948,6 +4986,9 @@ TEST_F(SlaveTest, RemoveExecutorUponFailedTaskAuthorization) EXPECT_CALL(sched, statusUpdate(&driver, _)) .WillOnce(FutureArg<1>(&statusError)); + EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _)) + .Times(AtMost(1)); + Future<ExitedExecutorMessage> exitedExecutorMessage = FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _); @@ -5105,14 +5146,16 @@ TEST_F(SlaveTest, KillAllInitialTasksTerminatesExecutor) TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, killTaskStatus->reason()); - slave.get()->mock()->unmocked____run( - future, - frameworkId, - executorId, - containerId, - tasks, - taskGroups); + // We continue dispatching `___run()` to make sure the `CHECK()` in the + // continuation passes. + Future<Nothing> unmocked___run = process::dispatch(slave.get()->pid, [=] { + slave.get()->mock()->unmocked____run( + future, frameworkId, executorId, containerId, tasks, taskGroups); + return Nothing(); + }); + + AWAIT_READY(unmocked___run); AWAIT_READY(executorShutdown); driver.stop(); @@ -5274,6 +5317,8 @@ TEST_F(SlaveTest, KillAllInitialTasksTerminatesHTTPExecutor) EXPECT_CALL(*executor, connected(_)) .WillOnce(FutureArg<0>(&executorLib)); + EXPECT_CALL(*executor, subscribed(_, _)); + // Saved arguments from `Slave::___run()`. Future<Nothing> _future; FrameworkID _frameworkId; @@ -5339,6 +5384,9 @@ TEST_F(SlaveTest, KillAllInitialTasksTerminatesHTTPExecutor) .WillOnce(FutureArg<1>(&update1)) .WillOnce(FutureArg<1>(&update2)); + EXPECT_CALL(*scheduler, failure(_, _)) + .Times(AtMost(1)); + // We kill only one of the tasks and expect the entire task group to be // killed. mesos.send( @@ -5350,17 +5398,23 @@ TEST_F(SlaveTest, KillAllInitialTasksTerminatesHTTPExecutor) ASSERT_EQ(v1::TASK_KILLED, update1->status().state()); ASSERT_EQ(v1::TASK_KILLED, update2->status().state()); - slave.get()->mock()->unmocked____run( - _future, - _frameworkId, - _executorId, - _containerId, - _tasks, - _taskGroups); + // We continue dispatching `___run()` to make sure the `CHECK()` in the + // continuation passes. + Future<Nothing> unmocked____run = process::dispatch(slave.get()->pid, [=] { + slave.get()->mock()->unmocked____run( + _future, _frameworkId, _executorId, _containerId, _tasks, _taskGroups); + + return Nothing(); + }); // The executor is killed because all of its initial tasks are killed // and cannot be delivered. AWAIT_READY(shutdown); + + // It is necessary to wait for the `unmocked____run` call to finish before we + // get out of scope. Otherwise, the objects we captured by reference and + // passed to `unmocked___run` may be destroyed in the middle of the function. + AWAIT_READY(unmocked____run); } @@ -8066,6 +8120,9 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts) EXPECT_CALL(*scheduler, heartbeat(_)) .WillRepeatedly(Return()); // Ignore heartbeats. + EXPECT_CALL(*scheduler, failure(_, _)) + .Times(AtMost(1)); + { Call call; call.set_type(Call::SUBSCRIBE); @@ -8193,18 +8250,24 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts) AWAIT_READY(removeFramework); - slave.get()->mock()->unmocked__run( - unschedules, - frameworkInfo, - executorInfo_, - task_, - taskGroup_, - resourceVersionUuids, - launchExecutor); + Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] { + slave.get()->mock()->unmocked__run( + unschedules, + frameworkInfo, + executorInfo_, + task_, + taskGroup_, + resourceVersionUuids, + launchExecutor); + + return Nothing(); + }); AWAIT_READY(update1); AWAIT_READY(update2); + AWAIT(unmocked__run); + const hashset<v1::TaskID> killedTasks{ update1->status().task_id(), update2->status().task_id()};
