Addressed flakiness in the agent tests.

Directly invoking unmocked calls in tests can lead to concurrent
access of actor state from multiple execution contexts. It is more
robust to dispatch the unmocked calls instead.

Expectations are also added to avoid "uninteresting mock function
call" test warnings.

Review: https://reviews.apache.org/r/65679/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cc57765c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cc57765c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cc57765c

Branch: refs/heads/master
Commit: cc57765ca35558acf787be7abae586bae3e69b42
Parents: aa2b495
Author: Meng Zhu <[email protected]>
Authored: Thu Apr 5 17:44:06 2018 -0700
Committer: Greg Mann <[email protected]>
Committed: Thu Apr 5 17:52:03 2018 -0700

----------------------------------------------------------------------
 src/tests/slave_tests.cpp | 177 ++++++++++++++++++++++++++++-------------
 1 file changed, 120 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cc57765c/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index ae8eeba..caf165e 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -4168,6 +4168,9 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
                            &MockSlave::unmocked_removeFramework),
                     FutureSatisfy(&removeFramework)));
 
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _))
+    .Times(AtMost(1));
+
   driver.killTask(task.task_id());
 
   AWAIT_READY(killTask);
@@ -4176,18 +4179,24 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   // since there remain no more tasks.
   AWAIT_READY(removeFramework);
 
-  slave.get()->mock()->unmocked__run(
-      unschedules,
-      frameworkInfo,
-      executorInfo,
-      task_,
-      taskGroup,
-      resourceVersionUuids,
-      launchExecutor);
+  Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] {
+    slave.get()->mock()->unmocked__run(
+        unschedules,
+        frameworkInfo,
+        executorInfo,
+        task_,
+        taskGroup,
+        resourceVersionUuids,
+        launchExecutor);
+
+    return Nothing();
+  });
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status->state());
 
+  AWAIT(unmocked__run);
+
   driver.stop();
   driver.join();
 }
@@ -4323,23 +4332,27 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   AWAIT_READY(removeFramework);
 
   // The `__run` continuations should have no effect.
-  slave.get()->mock()->unmocked__run(
-      unschedules1,
-      frameworkInfo1,
-      executorInfo1,
-      task_1,
-      taskGroup1,
-      resourceVersionUuids1,
-      launchExecutor1);
-
-  slave.get()->mock()->unmocked__run(
-      unschedules2,
-      frameworkInfo2,
-      executorInfo2,
-      task_2,
-      taskGroup2,
-      resourceVersionUuids2,
-      launchExecutor2);
+  process::dispatch(slave.get()->pid, [=] {
+    slave.get()->mock()->unmocked__run(
+        unschedules1,
+        frameworkInfo1,
+        executorInfo1,
+        task_1,
+        taskGroup1,
+        resourceVersionUuids1,
+        launchExecutor1);
+  });
+
+  process::dispatch(slave.get()->pid, [=] {
+    slave.get()->mock()->unmocked__run(
+        unschedules2,
+        frameworkInfo2,
+        executorInfo2,
+        task_2,
+        taskGroup2,
+        resourceVersionUuids2,
+        launchExecutor2);
+  });
 
   Clock::settle();
 
@@ -4671,6 +4684,10 @@ TEST_F(SlaveTest, RemoveExecutorUponFailedLaunch)
                   SaveArg<5>(&resourceVersionUuids),
                   SaveArg<6>(&launchExecutor)));
 
+  Future<Nothing> executorLost;
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _))
+    .WillOnce(FutureSatisfy(&executorLost));
+
   driver.launchTasks(offers.get()[0].id(), {task});
 
   AWAIT_READY(_run);
@@ -4685,21 +4702,27 @@ TEST_F(SlaveTest, RemoveExecutorUponFailedLaunch)
 
   AWAIT_READY(killTask);
 
-  process::dispatch(slave.get()->pid, [&] {
-    slave.get()->mock()->unmocked__run(
-        unschedules,
-        frameworkInfo,
-        executorInfo_,
-        task_,
-        taskGroup_,
-        resourceVersionUuids,
-        launchExecutor);
-  });
+  Future<Nothing> unmocked__run =
+    process::dispatch(slave.get()->pid, [=]() -> Future<Nothing> {
+      return slave.get()->mock()->unmocked__run(
+          frameworkInfo,
+          executorInfo_,
+          task_,
+          taskGroup_,
+          resourceVersionUuids,
+          launchExecutor);
+    });
+
+  promise.associate(unmocked__run);
+
+  AWAIT(unmocked__run);
 
   // Agent needs to send `ExitedExecutorMessage` to the master because
   // the executor never launched.
   AWAIT_READY(exitedExecutorMessage);
 
+  AWAIT_READY(executorLost);
+
   // Helper function to post a request to '/api/v1' master endpoint
   // and return the response.
   auto post = [](
@@ -4838,13 +4861,28 @@ TEST_F(SlaveTest, 
RemoveExecutorUponFailedTaskGroupLaunch)
   EXPECT_CALL(mockGarbageCollector, unschedule(_))
     .WillRepeatedly(Return(Failure("")));
 
+  Future<v1::scheduler::Event::Update> update1;
+  Future<v1::scheduler::Event::Update> update2;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&update1))
+    .WillOnce(FutureArg<1>(&update2));
+
   Future<ExitedExecutorMessage> exitedExecutorMessage =
     FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _);
 
+  EXPECT_CALL(*scheduler, failure(_, _))
+    .Times(AtMost(1));
+
   mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup}));
 
   AWAIT_READY(exitedExecutorMessage);
 
+  AWAIT_READY(update1);
+  AWAIT_READY(update2);
+
+  ASSERT_EQ(v1::TASK_LOST, update1->status().state());
+  ASSERT_EQ(v1::TASK_LOST, update2->status().state());
+
   // Helper function to post a request to '/api/v1' master endpoint
   // and return the response.
   auto post = [](
@@ -4948,6 +4986,9 @@ TEST_F(SlaveTest, 
RemoveExecutorUponFailedTaskAuthorization)
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&statusError));
 
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _))
+    .Times(AtMost(1));
+
   Future<ExitedExecutorMessage> exitedExecutorMessage =
     FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _);
 
@@ -5105,14 +5146,16 @@ TEST_F(SlaveTest, KillAllInitialTasksTerminatesExecutor)
       TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
       killTaskStatus->reason());
 
-  slave.get()->mock()->unmocked____run(
-      future,
-      frameworkId,
-      executorId,
-      containerId,
-      tasks,
-      taskGroups);
+  // We continue dispatching `___run()` to make sure the `CHECK()` in the
+  // continuation passes.
+  Future<Nothing> unmocked___run = process::dispatch(slave.get()->pid, [=] {
+    slave.get()->mock()->unmocked____run(
+        future, frameworkId, executorId, containerId, tasks, taskGroups);
 
+    return Nothing();
+  });
+
+  AWAIT_READY(unmocked___run);
   AWAIT_READY(executorShutdown);
 
   driver.stop();
@@ -5274,6 +5317,8 @@ TEST_F(SlaveTest, 
KillAllInitialTasksTerminatesHTTPExecutor)
   EXPECT_CALL(*executor, connected(_))
     .WillOnce(FutureArg<0>(&executorLib));
 
+  EXPECT_CALL(*executor, subscribed(_, _));
+
   // Saved arguments from `Slave::___run()`.
   Future<Nothing> _future;
   FrameworkID _frameworkId;
@@ -5339,6 +5384,9 @@ TEST_F(SlaveTest, 
KillAllInitialTasksTerminatesHTTPExecutor)
     .WillOnce(FutureArg<1>(&update1))
     .WillOnce(FutureArg<1>(&update2));
 
+  EXPECT_CALL(*scheduler, failure(_, _))
+    .Times(AtMost(1));
+
   // We kill only one of the tasks and expect the entire task group to be
   // killed.
   mesos.send(
@@ -5350,17 +5398,23 @@ TEST_F(SlaveTest, 
KillAllInitialTasksTerminatesHTTPExecutor)
   ASSERT_EQ(v1::TASK_KILLED, update1->status().state());
   ASSERT_EQ(v1::TASK_KILLED, update2->status().state());
 
-  slave.get()->mock()->unmocked____run(
-      _future,
-      _frameworkId,
-      _executorId,
-      _containerId,
-      _tasks,
-      _taskGroups);
+  // We continue dispatching `___run()` to make sure the `CHECK()` in the
+  // continuation passes.
+  Future<Nothing> unmocked____run = process::dispatch(slave.get()->pid, [=] {
+    slave.get()->mock()->unmocked____run(
+        _future, _frameworkId, _executorId, _containerId, _tasks, _taskGroups);
+
+    return Nothing();
+  });
 
   // The executor is killed because all of its initial tasks are killed
   // and cannot be delivered.
   AWAIT_READY(shutdown);
+
+  // It is necessary to wait for the `unmocked____run` call to finish before we
+  // get out of scope. Otherwise, the objects we captured by reference and
+  // passed to `unmocked___run` may be destroyed in the middle of the function.
+  AWAIT_READY(unmocked____run);
 }
 
 
@@ -8066,6 +8120,9 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
   EXPECT_CALL(*scheduler, heartbeat(_))
     .WillRepeatedly(Return()); // Ignore heartbeats.
 
+  EXPECT_CALL(*scheduler, failure(_, _))
+    .Times(AtMost(1));
+
   {
     Call call;
     call.set_type(Call::SUBSCRIBE);
@@ -8193,18 +8250,24 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
 
   AWAIT_READY(removeFramework);
 
-  slave.get()->mock()->unmocked__run(
-      unschedules,
-      frameworkInfo,
-      executorInfo_,
-      task_,
-      taskGroup_,
-      resourceVersionUuids,
-      launchExecutor);
+  Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] {
+    slave.get()->mock()->unmocked__run(
+        unschedules,
+        frameworkInfo,
+        executorInfo_,
+        task_,
+        taskGroup_,
+        resourceVersionUuids,
+        launchExecutor);
+
+    return Nothing();
+  });
 
   AWAIT_READY(update1);
   AWAIT_READY(update2);
 
+  AWAIT(unmocked__run);
+
   const hashset<v1::TaskID> killedTasks{
     update1->status().task_id(), update2->status().task_id()};
 

Reply via email to