Added a master API test for agent re-registration after master failover.

This test verifies that subscribing to the 'api/v1' endpoint between a
master failover and an agent re-registration won't cause the master to
crash.

Review: https://reviews.apache.org/r/65775/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b4e21067
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b4e21067
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b4e21067

Branch: refs/heads/master
Commit: b4e210678c04e57c2fa9f277b44f6d011da1846a
Parents: f2ec2b2
Author: Chun-Hung Hsiao <chhs...@mesosphere.io>
Authored: Fri Feb 23 18:37:17 2018 -0800
Committer: Greg Mann <gregorywm...@gmail.com>
Committed: Fri Feb 23 18:37:17 2018 -0800

----------------------------------------------------------------------
 src/tests/api_tests.cpp | 170 +++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 170 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e21067/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 31906eb..9c172f7 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -94,6 +94,7 @@ using process::Promise;
 
 using recordio::Decoder;
 
+using std::set;
 using std::string;
 using std::tuple;
 using std::vector;
@@ -2389,6 +2390,175 @@ TEST_P(MasterAPITest, Subscribe)
 }
 
 
+// This test verifies that subscribing to the 'api/v1' endpoint between
+// a master failover and an agent re-registration won't cause the master
+// to crash. See MESOS-8601.
+TEST_P(MasterAPITest, MasterFailover)
+{
+  ContentType contentType = GetParam();
+
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  Owned<v1::scheduler::TestMesos> mesos(new v1::scheduler::TestMesos(
+      master.get()->pid,
+      contentType,
+      scheduler));
+
+  AWAIT_READY(subscribed);
+  const v1::FrameworkID& frameworkId = subscribed->framework_id();
+
+  // Settle the clock to get the first offer.
+  Clock::settle();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+  const v1::AgentID& agentId = offers->offers(0).agent_id();
+
+  // Launch a task using the scheduler. This should result in a
+  // `TASK_STARTING` followed by a `TASK_RUNNING` status update.
+  Future<v1::scheduler::Event::Update> taskStarting;
+  Future<v1::scheduler::Event::Update> taskRunning;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(DoAll(
+        FutureArg<1>(&taskStarting),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(DoAll(
+        FutureArg<1>(&taskRunning),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  v1::TaskInfo task = v1::createTask(
+      agentId,
+      offers->offers(0).resources(),
+      "sleep 1000");
+
+  mesos->send(v1::createCallAccept(
+      frameworkId,
+      offers->offers(0),
+      {v1::LAUNCH({task})}));
+
+  AWAIT_READY(taskStarting);
+  EXPECT_EQ(v1::TASK_STARTING, taskStarting->status().state());
+
+  AWAIT_READY(taskRunning);
+  EXPECT_EQ(v1::TASK_RUNNING, taskRunning->status().state());
+
+  // Stop the scheduler and shutdown the master.
+  mesos.reset();
+  master->reset();
+
+  // Restart the master.
+  master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  detector.appoint(master.get()->pid);
+
+  // Create event stream after the master failover but before the agent
+  // re-registration. We should see no framework, agent, task and
+  // executor at all.
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::SUBSCRIBE);
+
+  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+  headers["Accept"] = stringify(contentType);
+
+  Future<http::Response> response = http::streaming::post(
+      master.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1Call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+  ASSERT_EQ(http::Response::PIPE, response->type);
+  ASSERT_SOME(response->reader);
+
+  http::Pipe::Reader reader = response->reader.get();
+
+  auto deserializer =
+    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+
+  Reader<v1::master::Event> decoder(
+      Decoder<v1::master::Event>(deserializer), reader);
+
+  Future<Result<v1::master::Event>> event = decoder.read();
+  AWAIT_READY(event);
+
+  EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
+  const v1::master::Response::GetState& getState =
+      event->get().subscribed().get_state();
+
+  EXPECT_EQ(0, getState.get_frameworks().frameworks_size());
+  EXPECT_EQ(0, getState.get_agents().agents_size());
+  EXPECT_EQ(0, getState.get_tasks().tasks_size());
+  EXPECT_EQ(0, getState.get_executors().executors_size());
+
+  event = decoder.read();
+  AWAIT_READY(event);
+
+  EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Advance the clock to trigger agent re-registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // The agent re-registration should result in an `AGENT_ADDED` event
+  // and a `TASK_ADDED` event.
+  set<v1::master::Event::Type> expectedEvents =
+    {v1::master::Event::AGENT_ADDED, v1::master::Event::TASK_ADDED};
+  set<v1::master::Event::Type> observedEvents;
+
+  event = decoder.read();
+  AWAIT_READY(event);
+  observedEvents.insert(event->get().type());
+
+  event = decoder.read();
+  AWAIT_READY(event);
+  observedEvents.insert(event->get().type());
+
+  EXPECT_EQ(expectedEvents, observedEvents);
+}
+
+
 // Verifies that operators subscribed to the master's operator API event
 // stream only receive events that they are authorized to see.
 TEST_P(MasterAPITest, EventAuthorizationFiltering)

Reply via email to