This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 7301287  Refactored some of `MasterAPITest`s using 
MockMasterAPISubscriber.
7301287 is described below

commit 7301287210e19b309f6ddd472bd51897f5cb7c8d
Author: Andrei Sekretenko <[email protected]>
AuthorDate: Sun Jun 9 23:11:52 2019 -0400

    Refactored some of `MasterAPITest`s using MockMasterAPISubscriber.
    
    This patch replaces reading out all Master API events (both interesting
    and uninteresting ones) in the code of the tests with setting
    expectations on `MOCK_METHOD`s of the dedicated class.
    
    Review: https://reviews.apache.org/r/70756/
---
 src/tests/api_tests.cpp | 310 +++++++++++++++---------------------------------
 1 file changed, 98 insertions(+), 212 deletions(-)

diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index f191a1c..af1d215 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -66,6 +66,7 @@
 #include "tests/resources_utils.hpp"
 
 #include "tests/containerizer/mock_containerizer.hpp"
+#include "tests/master/mock_master_api_subscriber.hpp"
 
 namespace http = process::http;
 
@@ -1923,50 +1924,26 @@ TEST_P(MasterAPITest, SubscribeAgentEvents)
   Try<Owned<cluster::Master>> master = this->StartMaster();
   ASSERT_SOME(master);
 
-  v1::master::Call v1Call;
-  v1Call.set_type(v1::master::Call::SUBSCRIBE);
-
-  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-
-  headers["Accept"] = stringify(contentType);
-
-  Future<http::Response> response = http::streaming::post(
-      master.get()->pid,
-      "api/v1",
-      headers,
-      serialize(contentType, v1Call),
-      stringify(contentType));
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
-  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
-  ASSERT_EQ(http::Response::PIPE, response->type);
-  ASSERT_SOME(response->reader);
+  v1::MockMasterAPISubscriber subscriber;
 
-  http::Pipe::Reader reader = response->reader.get();
-
-  auto deserializer =
-    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+  Future<v1::master::Event::Subscribed> subscribed;
+  EXPECT_CALL(subscriber, subscribed(_))
+    .WillOnce(FutureArg<0>(&subscribed));
 
-  Reader<v1::master::Event> decoder(
-      Decoder<v1::master::Event>(deserializer), reader);
-
-  Future<Result<v1::master::Event>> event = decoder.read();
-  AWAIT_READY(event);
+  AWAIT_READY(subscriber.subscribe(master.get()->pid, contentType));
+  AWAIT_READY(subscribed);
 
-  EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
-  const v1::master::Response::GetState& getState =
-      event->get().subscribed().get_state();
+  const v1::master::Response::GetState& getState = subscribed->get_state();
 
   EXPECT_TRUE(getState.get_frameworks().frameworks().empty());
   EXPECT_TRUE(getState.get_agents().agents().empty());
   EXPECT_TRUE(getState.get_tasks().tasks().empty());
   EXPECT_TRUE(getState.get_executors().executors().empty());
 
-  event = decoder.read();
-
-  AWAIT_READY(event);
-
-  EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+  // Expect AGENT_ADDED message after starting an agent
+  Future<v1::master::Event::AgentAdded> agentAdded;
+  EXPECT_CALL(subscriber, agentAdded(_))
+    .WillOnce(FutureArg<0>(&agentAdded));
 
   // Start one agent.
   Future<SlaveRegisteredMessage> agentRegisteredMessage =
@@ -1981,16 +1958,12 @@ TEST_P(MasterAPITest, SubscribeAgentEvents)
 
   AWAIT_READY(agentRegisteredMessage);
 
-  event = decoder.read();
-  AWAIT_READY(event);
+  AWAIT_READY(agentAdded);
 
   SlaveID agentID = agentRegisteredMessage->slave_id();
 
   {
-    ASSERT_EQ(v1::master::Event::AGENT_ADDED, event->get().type());
-
-    const v1::master::Response::GetAgents::Agent& agent =
-      event->get().agent_added().agent();
+    const v1::master::Response::GetAgents::Agent& agent = agentAdded->agent();
 
     ASSERT_EQ("host", agent.agent_info().hostname());
     ASSERT_EQ(evolve(agentID), agent.agent_info().id());
@@ -1999,17 +1972,18 @@ TEST_P(MasterAPITest, SubscribeAgentEvents)
     ASSERT_EQ(4, agent.total_resources_size());
   }
 
+  // Expect AGENT_REMOVED message after agent removal.
+  Future<v1::master::Event::AgentRemoved> agentRemoved;
+  EXPECT_CALL(subscriber, agentRemoved(_))
+    .WillOnce(FutureArg<0>(&agentRemoved));
+
   // Forcefully trigger a shutdown on the slave so that master will remove it.
   slave.get()->shutdown();
   slave->reset();
 
-  event = decoder.read();
-  AWAIT_READY(event);
+  AWAIT_READY(agentRemoved);
 
-  {
-    ASSERT_EQ(v1::master::Event::AGENT_REMOVED, event->get().type());
-    ASSERT_EQ(evolve(agentID), event->get().agent_removed().agent_id());
-  }
+  ASSERT_EQ(evolve(agentID), agentRemoved->agent_id());
 }
 
 
@@ -2419,53 +2393,36 @@ TEST_P(MasterAPITest, Subscribe)
 
   // Create event stream after seeing first offer but before first task is
   // launched. We should see one framework, one agent and zero task/executor.
-  v1::master::Call v1Call;
-  v1Call.set_type(v1::master::Call::SUBSCRIBE);
-
-  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-
-  headers["Accept"] = stringify(contentType);
-
-  Future<http::Response> response = http::streaming::post(
-      master.get()->pid,
-      "api/v1",
-      headers,
-      serialize(contentType, v1Call),
-      stringify(contentType));
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
-  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
-  ASSERT_EQ(http::Response::PIPE, response->type);
-  ASSERT_SOME(response->reader);
-
-  http::Pipe::Reader reader = response->reader.get();
-
-  auto deserializer =
-    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+  v1::MockMasterAPISubscriber subscriber;
 
-  Reader<v1::master::Event> decoder(
-      Decoder<v1::master::Event>(deserializer), reader);
+  Future<v1::master::Event::Subscribed> subscribedToMasterEvents;
+  EXPECT_CALL(subscriber, subscribed(_))
+    .WillOnce(FutureArg<0>(&subscribedToMasterEvents));
 
-  Future<Result<v1::master::Event>> event = decoder.read();
-  AWAIT_READY(event);
+  AWAIT_READY(subscriber.subscribe(master.get()->pid, contentType));
+  AWAIT_READY(subscribedToMasterEvents);
 
-  EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
   const v1::master::Response::GetState& getState =
-      event->get().subscribed().get_state();
+    subscribedToMasterEvents->get_state();
 
   EXPECT_EQ(1, getState.get_frameworks().frameworks_size());
   EXPECT_EQ(1, getState.get_agents().agents_size());
   EXPECT_TRUE(getState.get_tasks().tasks().empty());
   EXPECT_TRUE(getState.get_executors().executors().empty());
 
-  event = decoder.read();
-
-  AWAIT_READY(event);
+  // Expect TASK_ADDED and TASK_UPDATED events, in sequence.
+  Sequence masterEventsSequence;
 
-  EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+  Future<v1::master::Event::TaskAdded> taskAdded;
+  EXPECT_CALL(subscriber, taskAdded(_))
+    .InSequence(masterEventsSequence)
+    .WillOnce(FutureArg<0>(&taskAdded));
 
-  event = decoder.read();
-  EXPECT_TRUE(event.isPending());
+  // There should be only one taskUpdated event
+  Future<v1::master::Event::TaskUpdated> taskUpdated;
+  EXPECT_CALL(subscriber, taskUpdated(_))
+    .InSequence(masterEventsSequence)
+    .WillOnce(FutureArg<0>(&taskUpdated));
 
   const v1::Offer& offer = offers->offers(0);
 
@@ -2518,33 +2475,23 @@ TEST_P(MasterAPITest, Subscribe)
     mesos.send(call);
   }
 
-  AWAIT_READY(event);
+  AWAIT_READY(taskAdded);
   AWAIT_READY(update);
 
-  ASSERT_EQ(v1::master::Event::TASK_ADDED, event->get().type());
-  ASSERT_EQ(evolve(task.task_id()),
-            event->get().task_added().task().task_id());
-  ASSERT_TRUE(event->get().task_added().task().has_health_check());
+  ASSERT_EQ(evolve(task.task_id()), taskAdded->task().task_id());
+  ASSERT_TRUE(taskAdded->task().has_health_check());
   ASSERT_EQ(
       v1::HealthCheck::COMMAND,
-      event->get().task_added().task().health_check().type());
+      taskAdded->task().health_check().type());
   ASSERT_EQ(
       checkCommandValue,
-      event->get().task_added().task().health_check().command().value());
+      taskAdded->task().health_check().command().value());
 
-  event = decoder.read();
+  AWAIT_READY(taskUpdated);
 
-  AWAIT_READY(event);
-
-  ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
-  ASSERT_EQ(v1::TASK_RUNNING,
-            event->get().task_updated().state());
-  ASSERT_EQ(v1::TASK_RUNNING,
-            event->get().task_updated().status().state());
-  ASSERT_EQ(evolve(task.task_id()),
-            event->get().task_updated().status().task_id());
-
-  event = decoder.read();
+  ASSERT_EQ(v1::TASK_RUNNING, taskUpdated->state());
+  ASSERT_EQ(v1::TASK_RUNNING, taskUpdated->status().state());
+  ASSERT_EQ(evolve(task.task_id()), taskUpdated->status().task_id());
 
   Future<Nothing> update2;
   EXPECT_CALL(*scheduler, update(_, _))
@@ -2559,10 +2506,6 @@ TEST_P(MasterAPITest, Subscribe)
 
   AWAIT_READY(update2);
 
-  EXPECT_TRUE(event.isPending());
-
-  EXPECT_TRUE(reader.close());
-
   EXPECT_CALL(*executor, shutdown(_))
     .Times(AtMost(1));
 
@@ -2571,6 +2514,23 @@ TEST_P(MasterAPITest, Subscribe)
 }
 
 
+// Helper functor for SubscribersReceiveHealthUpdates test.
+// NOTE: We cannot use a lambda due to testing::ResultOf requiring result_type.
+struct IsFailedHealthCheckTaskUpdate
+{
+  TaskID taskId;
+  using result_type = bool;
+  bool operator() (const v1::master::Event::TaskUpdated& event) const {
+    const v1::TaskStatus& status = event.status();
+    return status.task_id() == evolve(taskId) &&
+           status.state() == v1::TASK_RUNNING &&
+           status.reason() ==
+             v1::TaskStatus::REASON_TASK_HEALTH_CHECK_STATUS_UPDATED &&
+           !status.healthy();
+  };
+};
+
+
 // This test verifies that subscribers eventually receive events with the
 // task health information.
 TEST_P(MasterAPITest, SubscribersReceiveHealthUpdates)
@@ -2616,34 +2576,10 @@ TEST_P(MasterAPITest, SubscribersReceiveHealthUpdates)
   v1::FrameworkID frameworkId(subscribed->framework_id());
 
   // Subscribe to the master's event stream via the v1 API.
-  v1::master::Call v1Call;
-  v1Call.set_type(v1::master::Call::SUBSCRIBE);
-
-  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-
-  headers["Accept"] = stringify(contentType);
-
-  Future<http::Response> response = http::streaming::post(
-      master.get()->pid,
-      "api/v1",
-      headers,
-      serialize(contentType, v1Call),
-      stringify(contentType));
+  v1::MockMasterAPISubscriber subscriber;
+  AWAIT_READY(subscriber.subscribe(master.get()->pid, contentType));
 
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
-  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
-  ASSERT_EQ(http::Response::PIPE, response->type);
-  ASSERT_SOME(response->reader);
 
-  http::Pipe::Reader reader = response->reader.get();
-
-  auto deserializer =
-    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
-
-  Reader<v1::master::Event> decoder(
-      Decoder<v1::master::Event>(deserializer), reader);
-
-  // Prepare and launch a task with a failing health check using the scheduler.
   AWAIT_READY(offers);
   ASSERT_FALSE(offers->offers().empty());
 
@@ -2652,6 +2588,14 @@ TEST_P(MasterAPITest, SubscribersReceiveHealthUpdates)
 
   TaskInfo task = createTask(devolve(offer), SLEEP_COMMAND(10000));
 
+  // Expect taskUpdated event caused by a failed health check.
+  Future<Nothing> taskUpdateWithFailedHealthCheckObserved;
+  EXPECT_CALL(
+    subscriber,
+    taskUpdated(
+      testing::ResultOf(IsFailedHealthCheckTaskUpdate{task.task_id()}, true)))
+    .WillOnce(FutureSatisfy(&taskUpdateWithFailedHealthCheckObserved));
+
   // This describes a single health check that will fail.
   HealthCheck healthCheck;
   healthCheck.set_type(HealthCheck::HTTP);
@@ -2668,39 +2612,12 @@ TEST_P(MasterAPITest, SubscribersReceiveHealthUpdates)
   mesos.send(
       v1::createCallAccept(frameworkId, offer, {v1::LAUNCH({evolve(task)})}));
 
-  // Expect to get a task health update after a _small_ number of other events,
-  // e.g., `SUBSCRIBED`, initial `HEARTBEAT`, `TASK_STARTING` and 
`TASK_RUNNING`
-  // task status updates, etc; 10 seems to be a reasonable number.
-  //
-  // TODO(alexr): Instead of guessing the number of events we should filter
-  // out all uninteresting events. This can be done for example by introducing
-  // a way to inject a "matcher" for the event, something like:
-  //   AWAIT_READY(event, [](const auto& event ) -> bool {
-  //     return (event->get().type() == ...); });
-  int maxEventsToSkip = 10;
-  bool taskHealthUpdateObserved = false;
-  while (maxEventsToSkip--) {
-    Future<Result<v1::master::Event>> event = decoder.read();
-    AWAIT_READY(event);
-
-    if (event->get().type() == v1::master::Event::TASK_UPDATED &&
-        event->get().task_updated().status().task_id() ==
-          evolve(task.task_id()) &&
-        event->get().task_updated().status().state() == v1::TASK_RUNNING &&
-        event->get().task_updated().status().reason() ==
-          v1::TaskStatus::REASON_TASK_HEALTH_CHECK_STATUS_UPDATED &&
-        event->get().task_updated().status().healthy() == false) {
-      taskHealthUpdateObserved = true;
-      break;
-    }
-  }
-
-  ASSERT_TRUE(taskHealthUpdateObserved)
-      << "Health update for task '" << task.task_id()
-      << "' has not been received";
+  AWAIT_READY(taskUpdateWithFailedHealthCheckObserved);
 
   // Kill task before test tear-down to avoid race between scheduler
   // tear-down and scheduler getting/acknowledging status update.
+  //
+  // TODO(asekretenko): Check if we still need this.
   Future<Nothing> killed;
   EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateStateEq(v1::TASK_KILLED)))
     .WillOnce(FutureSatisfy(&killed));
@@ -2809,53 +2726,36 @@ TEST_P(MasterAPITest, MasterFailover)
 
   detector.appoint(master.get()->pid);
 
-  // Create event stream after the master failover but before the agent
-  // re-registration. We should see no framework, agent, task and
-  // executor at all.
-  v1::master::Call v1Call;
-  v1Call.set_type(v1::master::Call::SUBSCRIBE);
-
-  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  v1::MockMasterAPISubscriber subscriber;
 
-  headers["Accept"] = stringify(contentType);
-
-  Future<http::Response> response = http::streaming::post(
-      master.get()->pid,
-      "api/v1",
-      headers,
-      serialize(contentType, v1Call),
-      stringify(contentType));
+  Future<v1::master::Event::Subscribed> subscribedToMasterAPIEvents;
+  EXPECT_CALL(subscriber, subscribed(_))
+    .WillOnce(FutureArg<0>(&subscribedToMasterAPIEvents));
 
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
-  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
-  ASSERT_EQ(http::Response::PIPE, response->type);
-  ASSERT_SOME(response->reader);
-
-  http::Pipe::Reader reader = response->reader.get();
-
-  auto deserializer =
-    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+  // The agent re-registration should result in an `AGENT_ADDED` event
+  // and a `TASK_ADDED` event.
+  Future<Nothing> taskAdded;
+  EXPECT_CALL(subscriber, taskAdded(_))
+    .WillOnce(FutureSatisfy(&taskAdded));
 
-  Reader<v1::master::Event> decoder(
-      Decoder<v1::master::Event>(deserializer), reader);
+  Future<Nothing> agentAdded;
+  EXPECT_CALL(subscriber, agentAdded(_))
+    .WillOnce(FutureSatisfy(&agentAdded));
 
-  Future<Result<v1::master::Event>> event = decoder.read();
-  AWAIT_READY(event);
+  // Create event stream after the master failover but before the agent
+  // re-registration. We should see no framework, agent, task and
+  // executor at all.
+  AWAIT_READY(subscriber.subscribe(master.get()->pid, contentType));
+  AWAIT_READY(subscribedToMasterAPIEvents);
 
-  EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
   const v1::master::Response::GetState& getState =
-      event->get().subscribed().get_state();
+    subscribedToMasterAPIEvents->get_state();
 
   EXPECT_EQ(0, getState.get_frameworks().frameworks_size());
   EXPECT_EQ(0, getState.get_agents().agents_size());
   EXPECT_EQ(0, getState.get_tasks().tasks_size());
   EXPECT_EQ(0, getState.get_executors().executors_size());
 
-  event = decoder.read();
-  AWAIT_READY(event);
-
-  EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
-
   Future<SlaveReregisteredMessage> slaveReregisteredMessage =
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
@@ -2863,22 +2763,8 @@ TEST_P(MasterAPITest, MasterFailover)
   Clock::advance(slaveFlags.registration_backoff_factor);
 
   AWAIT_READY(slaveReregisteredMessage);
-
-  // The agent re-registration should result in an `AGENT_ADDED` event
-  // and a `TASK_ADDED` event.
-  set<v1::master::Event::Type> expectedEvents =
-    {v1::master::Event::AGENT_ADDED, v1::master::Event::TASK_ADDED};
-  set<v1::master::Event::Type> observedEvents;
-
-  event = decoder.read();
-  AWAIT_READY(event);
-  observedEvents.insert(event->get().type());
-
-  event = decoder.read();
-  AWAIT_READY(event);
-  observedEvents.insert(event->get().type());
-
-  EXPECT_EQ(expectedEvents, observedEvents);
+  AWAIT_READY(taskAdded);
+  AWAIT_READY(agentAdded);
 }
 
 

Reply via email to