This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 7301287 Refactored some of `MasterAPITest`s using
MockMasterAPISubscriber.
7301287 is described below
commit 7301287210e19b309f6ddd472bd51897f5cb7c8d
Author: Andrei Sekretenko <[email protected]>
AuthorDate: Sun Jun 9 23:11:52 2019 -0400
Refactored some of `MasterAPITest`s using MockMasterAPISubscriber.
This patch replaces reading out all Master API events (both interesting
and uninteresting ones) in the code of the tests with setting
expectations on `MOCK_METHOD`s of the dedicated class.
Review: https://reviews.apache.org/r/70756/
---
src/tests/api_tests.cpp | 310 +++++++++++++++---------------------------------
1 file changed, 98 insertions(+), 212 deletions(-)
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index f191a1c..af1d215 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -66,6 +66,7 @@
#include "tests/resources_utils.hpp"
#include "tests/containerizer/mock_containerizer.hpp"
+#include "tests/master/mock_master_api_subscriber.hpp"
namespace http = process::http;
@@ -1923,50 +1924,26 @@ TEST_P(MasterAPITest, SubscribeAgentEvents)
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
- v1::master::Call v1Call;
- v1Call.set_type(v1::master::Call::SUBSCRIBE);
-
- http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-
- headers["Accept"] = stringify(contentType);
-
- Future<http::Response> response = http::streaming::post(
- master.get()->pid,
- "api/v1",
- headers,
- serialize(contentType, v1Call),
- stringify(contentType));
-
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
- AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
- ASSERT_EQ(http::Response::PIPE, response->type);
- ASSERT_SOME(response->reader);
+ v1::MockMasterAPISubscriber subscriber;
- http::Pipe::Reader reader = response->reader.get();
-
- auto deserializer =
- lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+ Future<v1::master::Event::Subscribed> subscribed;
+ EXPECT_CALL(subscriber, subscribed(_))
+ .WillOnce(FutureArg<0>(&subscribed));
- Reader<v1::master::Event> decoder(
- Decoder<v1::master::Event>(deserializer), reader);
-
- Future<Result<v1::master::Event>> event = decoder.read();
- AWAIT_READY(event);
+ AWAIT_READY(subscriber.subscribe(master.get()->pid, contentType));
+ AWAIT_READY(subscribed);
- EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
- const v1::master::Response::GetState& getState =
- event->get().subscribed().get_state();
+ const v1::master::Response::GetState& getState = subscribed->get_state();
EXPECT_TRUE(getState.get_frameworks().frameworks().empty());
EXPECT_TRUE(getState.get_agents().agents().empty());
EXPECT_TRUE(getState.get_tasks().tasks().empty());
EXPECT_TRUE(getState.get_executors().executors().empty());
- event = decoder.read();
-
- AWAIT_READY(event);
-
- EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+ // Expect AGENT_ADDED message after starting an agent
+ Future<v1::master::Event::AgentAdded> agentAdded;
+ EXPECT_CALL(subscriber, agentAdded(_))
+ .WillOnce(FutureArg<0>(&agentAdded));
// Start one agent.
Future<SlaveRegisteredMessage> agentRegisteredMessage =
@@ -1981,16 +1958,12 @@ TEST_P(MasterAPITest, SubscribeAgentEvents)
AWAIT_READY(agentRegisteredMessage);
- event = decoder.read();
- AWAIT_READY(event);
+ AWAIT_READY(agentAdded);
SlaveID agentID = agentRegisteredMessage->slave_id();
{
- ASSERT_EQ(v1::master::Event::AGENT_ADDED, event->get().type());
-
- const v1::master::Response::GetAgents::Agent& agent =
- event->get().agent_added().agent();
+ const v1::master::Response::GetAgents::Agent& agent = agentAdded->agent();
ASSERT_EQ("host", agent.agent_info().hostname());
ASSERT_EQ(evolve(agentID), agent.agent_info().id());
@@ -1999,17 +1972,18 @@ TEST_P(MasterAPITest, SubscribeAgentEvents)
ASSERT_EQ(4, agent.total_resources_size());
}
+ // Expect AGENT_REMOVED message after agent removal.
+ Future<v1::master::Event::AgentRemoved> agentRemoved;
+ EXPECT_CALL(subscriber, agentRemoved(_))
+ .WillOnce(FutureArg<0>(&agentRemoved));
+
// Forcefully trigger a shutdown on the slave so that master will remove it.
slave.get()->shutdown();
slave->reset();
- event = decoder.read();
- AWAIT_READY(event);
+ AWAIT_READY(agentRemoved);
- {
- ASSERT_EQ(v1::master::Event::AGENT_REMOVED, event->get().type());
- ASSERT_EQ(evolve(agentID), event->get().agent_removed().agent_id());
- }
+ ASSERT_EQ(evolve(agentID), agentRemoved->agent_id());
}
@@ -2419,53 +2393,36 @@ TEST_P(MasterAPITest, Subscribe)
// Create event stream after seeing first offer but before first task is
// launched. We should see one framework, one agent and zero task/executor.
- v1::master::Call v1Call;
- v1Call.set_type(v1::master::Call::SUBSCRIBE);
-
- http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-
- headers["Accept"] = stringify(contentType);
-
- Future<http::Response> response = http::streaming::post(
- master.get()->pid,
- "api/v1",
- headers,
- serialize(contentType, v1Call),
- stringify(contentType));
-
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
- AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
- ASSERT_EQ(http::Response::PIPE, response->type);
- ASSERT_SOME(response->reader);
-
- http::Pipe::Reader reader = response->reader.get();
-
- auto deserializer =
- lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+ v1::MockMasterAPISubscriber subscriber;
- Reader<v1::master::Event> decoder(
- Decoder<v1::master::Event>(deserializer), reader);
+ Future<v1::master::Event::Subscribed> subscribedToMasterEvents;
+ EXPECT_CALL(subscriber, subscribed(_))
+ .WillOnce(FutureArg<0>(&subscribedToMasterEvents));
- Future<Result<v1::master::Event>> event = decoder.read();
- AWAIT_READY(event);
+ AWAIT_READY(subscriber.subscribe(master.get()->pid, contentType));
+ AWAIT_READY(subscribedToMasterEvents);
- EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
const v1::master::Response::GetState& getState =
- event->get().subscribed().get_state();
+ subscribedToMasterEvents->get_state();
EXPECT_EQ(1, getState.get_frameworks().frameworks_size());
EXPECT_EQ(1, getState.get_agents().agents_size());
EXPECT_TRUE(getState.get_tasks().tasks().empty());
EXPECT_TRUE(getState.get_executors().executors().empty());
- event = decoder.read();
-
- AWAIT_READY(event);
+ // Expect TASK_ADDED and TASK_UPDATED events, in sequence.
+ Sequence masterEventsSequence;
- EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+ Future<v1::master::Event::TaskAdded> taskAdded;
+ EXPECT_CALL(subscriber, taskAdded(_))
+ .InSequence(masterEventsSequence)
+ .WillOnce(FutureArg<0>(&taskAdded));
- event = decoder.read();
- EXPECT_TRUE(event.isPending());
+ // There should be only one taskUpdated event
+ Future<v1::master::Event::TaskUpdated> taskUpdated;
+ EXPECT_CALL(subscriber, taskUpdated(_))
+ .InSequence(masterEventsSequence)
+ .WillOnce(FutureArg<0>(&taskUpdated));
const v1::Offer& offer = offers->offers(0);
@@ -2518,33 +2475,23 @@ TEST_P(MasterAPITest, Subscribe)
mesos.send(call);
}
- AWAIT_READY(event);
+ AWAIT_READY(taskAdded);
AWAIT_READY(update);
- ASSERT_EQ(v1::master::Event::TASK_ADDED, event->get().type());
- ASSERT_EQ(evolve(task.task_id()),
- event->get().task_added().task().task_id());
- ASSERT_TRUE(event->get().task_added().task().has_health_check());
+ ASSERT_EQ(evolve(task.task_id()), taskAdded->task().task_id());
+ ASSERT_TRUE(taskAdded->task().has_health_check());
ASSERT_EQ(
v1::HealthCheck::COMMAND,
- event->get().task_added().task().health_check().type());
+ taskAdded->task().health_check().type());
ASSERT_EQ(
checkCommandValue,
- event->get().task_added().task().health_check().command().value());
+ taskAdded->task().health_check().command().value());
- event = decoder.read();
+ AWAIT_READY(taskUpdated);
- AWAIT_READY(event);
-
- ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
- ASSERT_EQ(v1::TASK_RUNNING,
- event->get().task_updated().state());
- ASSERT_EQ(v1::TASK_RUNNING,
- event->get().task_updated().status().state());
- ASSERT_EQ(evolve(task.task_id()),
- event->get().task_updated().status().task_id());
-
- event = decoder.read();
+ ASSERT_EQ(v1::TASK_RUNNING, taskUpdated->state());
+ ASSERT_EQ(v1::TASK_RUNNING, taskUpdated->status().state());
+ ASSERT_EQ(evolve(task.task_id()), taskUpdated->status().task_id());
Future<Nothing> update2;
EXPECT_CALL(*scheduler, update(_, _))
@@ -2559,10 +2506,6 @@ TEST_P(MasterAPITest, Subscribe)
AWAIT_READY(update2);
- EXPECT_TRUE(event.isPending());
-
- EXPECT_TRUE(reader.close());
-
EXPECT_CALL(*executor, shutdown(_))
.Times(AtMost(1));
@@ -2571,6 +2514,23 @@ TEST_P(MasterAPITest, Subscribe)
}
+// Helper functor for SubscribersReceiveHealthUpdates test.
+// NOTE: We cannot use a lambda due to testing::ResultOf requiring result_type.
+struct IsFailedHealthCheckTaskUpdate
+{
+ TaskID taskId;
+ using result_type = bool;
+ bool operator() (const v1::master::Event::TaskUpdated& event) const {
+ const v1::TaskStatus& status = event.status();
+ return status.task_id() == evolve(taskId) &&
+ status.state() == v1::TASK_RUNNING &&
+ status.reason() ==
+ v1::TaskStatus::REASON_TASK_HEALTH_CHECK_STATUS_UPDATED &&
+ !status.healthy();
+ };
+};
+
+
// This test verifies that subscribers eventually receive events with the
// task health information.
TEST_P(MasterAPITest, SubscribersReceiveHealthUpdates)
@@ -2616,34 +2576,10 @@ TEST_P(MasterAPITest, SubscribersReceiveHealthUpdates)
v1::FrameworkID frameworkId(subscribed->framework_id());
// Subscribe to the master's event stream via the v1 API.
- v1::master::Call v1Call;
- v1Call.set_type(v1::master::Call::SUBSCRIBE);
-
- http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-
- headers["Accept"] = stringify(contentType);
-
- Future<http::Response> response = http::streaming::post(
- master.get()->pid,
- "api/v1",
- headers,
- serialize(contentType, v1Call),
- stringify(contentType));
+ v1::MockMasterAPISubscriber subscriber;
+ AWAIT_READY(subscriber.subscribe(master.get()->pid, contentType));
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
- AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
- ASSERT_EQ(http::Response::PIPE, response->type);
- ASSERT_SOME(response->reader);
- http::Pipe::Reader reader = response->reader.get();
-
- auto deserializer =
- lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
-
- Reader<v1::master::Event> decoder(
- Decoder<v1::master::Event>(deserializer), reader);
-
- // Prepare and launch a task with a failing health check using the scheduler.
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
@@ -2652,6 +2588,14 @@ TEST_P(MasterAPITest, SubscribersReceiveHealthUpdates)
TaskInfo task = createTask(devolve(offer), SLEEP_COMMAND(10000));
+ // Expect taskUpdated event caused by a failed health check.
+ Future<Nothing> taskUpdateWithFailedHealthCheckObserved;
+ EXPECT_CALL(
+ subscriber,
+ taskUpdated(
+ testing::ResultOf(IsFailedHealthCheckTaskUpdate{task.task_id()}, true)))
+ .WillOnce(FutureSatisfy(&taskUpdateWithFailedHealthCheckObserved));
+
// This describes a single health check that will fail.
HealthCheck healthCheck;
healthCheck.set_type(HealthCheck::HTTP);
@@ -2668,39 +2612,12 @@ TEST_P(MasterAPITest, SubscribersReceiveHealthUpdates)
mesos.send(
v1::createCallAccept(frameworkId, offer, {v1::LAUNCH({evolve(task)})}));
- // Expect to get a task health update after a _small_ number of other events,
- // e.g., `SUBSCRIBED`, initial `HEARTBEAT`, `TASK_STARTING` and
`TASK_RUNNING`
- // task status updates, etc; 10 seems to be a reasonable number.
- //
- // TODO(alexr): Instead of guessing the number of events we should filter
- // out all uninteresting events. This can be done for example by introducing
- // a way to inject a "matcher" for the event, something like:
- // AWAIT_READY(event, [](const auto& event ) -> bool {
- // return (event->get().type() == ...); });
- int maxEventsToSkip = 10;
- bool taskHealthUpdateObserved = false;
- while (maxEventsToSkip--) {
- Future<Result<v1::master::Event>> event = decoder.read();
- AWAIT_READY(event);
-
- if (event->get().type() == v1::master::Event::TASK_UPDATED &&
- event->get().task_updated().status().task_id() ==
- evolve(task.task_id()) &&
- event->get().task_updated().status().state() == v1::TASK_RUNNING &&
- event->get().task_updated().status().reason() ==
- v1::TaskStatus::REASON_TASK_HEALTH_CHECK_STATUS_UPDATED &&
- event->get().task_updated().status().healthy() == false) {
- taskHealthUpdateObserved = true;
- break;
- }
- }
-
- ASSERT_TRUE(taskHealthUpdateObserved)
- << "Health update for task '" << task.task_id()
- << "' has not been received";
+ AWAIT_READY(taskUpdateWithFailedHealthCheckObserved);
// Kill task before test tear-down to avoid race between scheduler
// tear-down and scheduler getting/acknowledging status update.
+ //
+ // TODO(asekretenko): Check if we still need this.
Future<Nothing> killed;
EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateStateEq(v1::TASK_KILLED)))
.WillOnce(FutureSatisfy(&killed));
@@ -2809,53 +2726,36 @@ TEST_P(MasterAPITest, MasterFailover)
detector.appoint(master.get()->pid);
- // Create event stream after the master failover but before the agent
- // re-registration. We should see no framework, agent, task and
- // executor at all.
- v1::master::Call v1Call;
- v1Call.set_type(v1::master::Call::SUBSCRIBE);
-
- http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ v1::MockMasterAPISubscriber subscriber;
- headers["Accept"] = stringify(contentType);
-
- Future<http::Response> response = http::streaming::post(
- master.get()->pid,
- "api/v1",
- headers,
- serialize(contentType, v1Call),
- stringify(contentType));
+ Future<v1::master::Event::Subscribed> subscribedToMasterAPIEvents;
+ EXPECT_CALL(subscriber, subscribed(_))
+ .WillOnce(FutureArg<0>(&subscribedToMasterAPIEvents));
- AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
- AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
- ASSERT_EQ(http::Response::PIPE, response->type);
- ASSERT_SOME(response->reader);
-
- http::Pipe::Reader reader = response->reader.get();
-
- auto deserializer =
- lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+ // The agent re-registration should result in an `AGENT_ADDED` event
+ // and a `TASK_ADDED` event.
+ Future<Nothing> taskAdded;
+ EXPECT_CALL(subscriber, taskAdded(_))
+ .WillOnce(FutureSatisfy(&taskAdded));
- Reader<v1::master::Event> decoder(
- Decoder<v1::master::Event>(deserializer), reader);
+ Future<Nothing> agentAdded;
+ EXPECT_CALL(subscriber, agentAdded(_))
+ .WillOnce(FutureSatisfy(&agentAdded));
- Future<Result<v1::master::Event>> event = decoder.read();
- AWAIT_READY(event);
+ // Create event stream after the master failover but before the agent
+ // re-registration. We should see no framework, agent, task and
+ // executor at all.
+ AWAIT_READY(subscriber.subscribe(master.get()->pid, contentType));
+ AWAIT_READY(subscribedToMasterAPIEvents);
- EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
const v1::master::Response::GetState& getState =
- event->get().subscribed().get_state();
+ subscribedToMasterAPIEvents->get_state();
EXPECT_EQ(0, getState.get_frameworks().frameworks_size());
EXPECT_EQ(0, getState.get_agents().agents_size());
EXPECT_EQ(0, getState.get_tasks().tasks_size());
EXPECT_EQ(0, getState.get_executors().executors_size());
- event = decoder.read();
- AWAIT_READY(event);
-
- EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
-
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
@@ -2863,22 +2763,8 @@ TEST_P(MasterAPITest, MasterFailover)
Clock::advance(slaveFlags.registration_backoff_factor);
AWAIT_READY(slaveReregisteredMessage);
-
- // The agent re-registration should result in an `AGENT_ADDED` event
- // and a `TASK_ADDED` event.
- set<v1::master::Event::Type> expectedEvents =
- {v1::master::Event::AGENT_ADDED, v1::master::Event::TASK_ADDED};
- set<v1::master::Event::Type> observedEvents;
-
- event = decoder.read();
- AWAIT_READY(event);
- observedEvents.insert(event->get().type());
-
- event = decoder.read();
- AWAIT_READY(event);
- observedEvents.insert(event->get().type());
-
- EXPECT_EQ(expectedEvents, observedEvents);
+ AWAIT_READY(taskAdded);
+ AWAIT_READY(agentAdded);
}