Added waits in MemoryPressureTests to ensure deterministic behavior.

Sometimes _MemoryPressureMesosTest.CGROUPS_ROOT_SlaveRecovery_ will
fail because the tracker of the cgroups pressure counter is not
finished by the time the expectations are set.

This patch ensures that different test milestones are reached;
e.g. slave is registered, `KILLED_TASK` reached the scheduler,
etc; before continuing with the test.

Review: https://reviews.apache.org/r/43850/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/61ff6848
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/61ff6848
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/61ff6848

Branch: refs/heads/master
Commit: 61ff6848ad9871f98ec00f517f16b47aff3e58d7
Parents: d0d4d5a
Author: Alexander Rojas <alexan...@mesosphere.io>
Authored: Thu Feb 25 13:40:01 2016 +0100
Committer: Bernd Mathiske <be...@mesosphere.io>
Committed: Thu Feb 25 13:40:01 2016 +0100

----------------------------------------------------------------------
 .../containerizer/memory_pressure_tests.cpp     | 63 ++++++++++++++------
 1 file changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/61ff6848/src/tests/containerizer/memory_pressure_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/memory_pressure_tests.cpp 
b/src/tests/containerizer/memory_pressure_tests.cpp
index 4a03af2..79f1349 100644
--- a/src/tests/containerizer/memory_pressure_tests.cpp
+++ b/src/tests/containerizer/memory_pressure_tests.cpp
@@ -50,6 +50,7 @@ using std::vector;
 using testing::_;
 using testing::Eq;
 using testing::Return;
+using testing::Unused;
 
 namespace mesos {
 namespace internal {
@@ -86,9 +87,14 @@ TEST_F(MemoryPressureMesosTest, CGROUPS_ROOT_Statistics)
 
   ASSERT_SOME(containerizer);
 
+  Future<SlaveRegisteredMessage> registered =
+      FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), _);
+
   Try<PID<Slave>> slave = StartSlave(containerizer.get(), flags);
   ASSERT_SOME(slave);
 
+  AWAIT_READY(registered);
+
   MockScheduler sched;
 
   MesosSchedulerDriver driver(
@@ -115,16 +121,18 @@ TEST_F(MemoryPressureMesosTest, CGROUPS_ROOT_Statistics)
       Resources::parse("cpus:1;mem:256;disk:1024").get(),
       "while true; do dd count=512 bs=1M if=/dev/zero of=./temp; done");
 
-  Future<TaskStatus> status;
+  Future<TaskStatus> running;
+  Future<TaskStatus> killed;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status))
+    .WillOnce(FutureArg<1>(&running))
+    .WillOnce(FutureArg<1>(&killed))
     .WillRepeatedly(Return());       // Ignore subsequent updates.
 
   driver.launchTasks(offer.id(), {task});
 
-  AWAIT_READY(status);
-  EXPECT_EQ(task.task_id(), status.get().task_id());
-  EXPECT_EQ(TASK_RUNNING, status.get().state());
+  AWAIT_READY(running);
+  EXPECT_EQ(task.task_id(), running.get().task_id());
+  EXPECT_EQ(TASK_RUNNING, running.get().state());
 
   Future<hashset<ContainerID>> containers = containerizer.get()->containers();
   AWAIT_READY(containers);
@@ -154,10 +162,9 @@ TEST_F(MemoryPressureMesosTest, CGROUPS_ROOT_Statistics)
   // Stop the memory-hammering task.
   driver.killTask(task.task_id());
 
-  // Process any queued up events through before proceeding.
-  process::Clock::pause();
-  process::Clock::settle();
-  process::Clock::resume();
+  AWAIT_READY(killed);
+  EXPECT_EQ(task.task_id(), killed->task_id());
+  EXPECT_EQ(TASK_KILLED, killed->state());
 
   // Now check the correctness of the memory pressure counters.
   Future<ResourceStatistics> usage = containerizer.get()->usage(containerId);
@@ -195,9 +202,14 @@ TEST_F(MemoryPressureMesosTest, CGROUPS_ROOT_SlaveRecovery)
 
   ASSERT_SOME(containerizer1);
 
+  Future<SlaveRegisteredMessage> registered =
+      FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), _);
+
   Try<PID<Slave>> slave = StartSlave(containerizer1.get(), flags);
   ASSERT_SOME(slave);
 
+  AWAIT_READY(registered);
+
   MockScheduler sched;
 
   // Enable checkpointing for the framework.
@@ -228,16 +240,25 @@ TEST_F(MemoryPressureMesosTest, 
CGROUPS_ROOT_SlaveRecovery)
       Resources::parse("cpus:1;mem:256;disk:1024").get(),
       "while true; do dd count=512 bs=1M if=/dev/zero of=./temp; done");
 
-  Future<TaskStatus> status;
+  Future<TaskStatus> running;
+  Promise<TaskStatus> killed;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status))
-    .WillRepeatedly(Return());       // Ignore subsequent updates.
+    .WillOnce(FutureArg<1>(&running))
+    .WillRepeatedly(DoAll(
+        Invoke([&killed](Unused, const TaskStatus& status) {
+          // More than one TASK_RUNNING status can arrive
+          // before the TASK_KILLED does.
+          if (status.state() == TASK_KILLED) {
+            killed.set(status);
+          }
+        }),
+        Return()));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
-  AWAIT_READY(status);
-  EXPECT_EQ(task.task_id(), status.get().task_id());
-  EXPECT_EQ(TASK_RUNNING, status.get().state());
+  AWAIT_READY(running);
+  EXPECT_EQ(task.task_id(), running.get().task_id());
+  EXPECT_EQ(TASK_RUNNING, running.get().state());
 
   // We restart the slave to let it recover.
   Stop(slave.get());
@@ -253,9 +274,14 @@ TEST_F(MemoryPressureMesosTest, CGROUPS_ROOT_SlaveRecovery)
     MesosContainerizer::create(flags, true, &fetcher);
   ASSERT_SOME(containerizer2);
 
+  Future<SlaveReregisteredMessage> reregistered =
+      FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get(), _);
+
   slave = StartSlave(containerizer2.get(), flags);
   ASSERT_SOME(slave);
 
+  AWAIT_READY(reregistered);
+
   // Wait until the containerizer is updated.
   AWAIT_READY(update);
 
@@ -287,10 +313,9 @@ TEST_F(MemoryPressureMesosTest, CGROUPS_ROOT_SlaveRecovery)
   // Stop the memory-hammering task.
   driver.killTask(task.task_id());
 
-  // Process any queued up events through before proceeding.
-  process::Clock::pause();
-  process::Clock::settle();
-  process::Clock::resume();
+  AWAIT_READY(killed.future());
+  EXPECT_EQ(task.task_id(), killed.future()->task_id());
+  EXPECT_EQ(TASK_KILLED, killed.future()->state());
 
   // Now check the correctness of the memory pressure counters.
   Future<ResourceStatistics> usage = containerizer2.get()->usage(containerId);

Reply via email to