Repository: mesos
Updated Branches:
  refs/heads/1.6.x 61f66d395 -> 0ba5cbf0f


Sent task (health) check updates over the operator streaming API.

With this patch subscribers to the master operator streaming API
start receiving task health (check) updates. This allows subscribers
to maintain more accurate view of the cluster's state, closer to
what the traditional `state.json` endpoint offers.

Review: https://reviews.apache.org/r/67320/
(cherry picked from commit 4e6d8047fd127745ef10463818dadc2399732961)


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/047ae01a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/047ae01a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/047ae01a

Branch: refs/heads/1.6.x
Commit: 047ae01ae8533025cd8415a6efa420951ff98659
Parents: 61f66d3
Author: Alexander Rukletsov <[email protected]>
Authored: Mon Jun 4 16:31:38 2018 +0200
Committer: Alexander Rukletsov <[email protected]>
Committed: Mon Jun 4 16:36:56 2018 +0200

----------------------------------------------------------------------
 src/master/master.cpp   |   7 +++
 src/tests/api_tests.cpp | 130 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 137 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/047ae01a/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 41862db..f237964 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -10810,6 +10810,13 @@ void Master::updateTask(Task* task, const 
StatusUpdate& update)
     task->set_state(latestState.getOrElse(status.state()));
   }
 
+  // If this is a (health) check status update, always forward it to
+  // subscribers.
+  if (status.reason() == TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED ||
+      status.reason() == TaskStatus::REASON_TASK_HEALTH_CHECK_STATUS_UPDATED) {
+    sendSubscribersUpdate = true;
+  }
+
   // TODO(brenden): Consider wiping the `message` field?
   if (task->statuses_size() > 0 &&
       task->statuses(task->statuses_size() - 1).state() == status.state()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/047ae01a/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 1ed26a7..64e4237 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -2382,6 +2382,136 @@ TEST_P(MasterAPITest, Subscribe)
 }
 
 
+// This test verifies that subscribers eventually receive events with the
+// task health information.
+TEST_P(MasterAPITest, SubscribersReceiveHealthUpdates)
+{
+  ContentType contentType = GetParam();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+  ASSERT_SOME(slave);
+
+  // Connect the scheduler.
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      contentType,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  // Subscribe the scheduler. This will trigger a resource offer.
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  mesos.send(v1::createCallSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // Subscribe to the master's event stream via the v1 API.
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::SUBSCRIBE);
+
+  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+  headers["Accept"] = stringify(contentType);
+
+  Future<http::Response> response = http::streaming::post(
+      master.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1Call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+  ASSERT_EQ(http::Response::PIPE, response->type);
+  ASSERT_SOME(response->reader);
+
+  http::Pipe::Reader reader = response->reader.get();
+
+  auto deserializer =
+    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+
+  Reader<v1::master::Event> decoder(
+      Decoder<v1::master::Event>(deserializer), reader);
+
+  // Prepare and launch a task with a failing health check using the scheduler.
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID agentId(offer.agent_id());
+
+  TaskInfo task = createTask(devolve(offer), SLEEP_COMMAND(10000));
+
+  // This describes a single health check that will fail.
+  HealthCheck healthCheck;
+  healthCheck.set_type(HealthCheck::HTTP);
+  healthCheck.mutable_http()->set_port(80);
+  healthCheck.mutable_http()->set_path("/help");
+  healthCheck.set_delay_seconds(0);
+  healthCheck.set_interval_seconds(1000);
+  healthCheck.set_grace_period_seconds(0);
+  task.mutable_health_check()->CopyFrom(healthCheck);
+
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId));
+
+  mesos.send(
+      v1::createCallAccept(frameworkId, offer, {v1::LAUNCH({evolve(task)})}));
+
+  // Expect to get a task health update after a _small_ number of other events,
+  // e.g., `SUBSCRIBED`, initial `HEARTBEAT`, `TASK_STARTING` and 
`TASK_RUNNING`
+  // task status updates, etc; 10 seems to be a reasonable number.
+  //
+  // TODO(alexr): Instead of guessing the number of events we should filter
+  // out all uninteresting events. This can be done for example by introducing
+  // a way to inject a "matcher" for the event, something like:
+  //   AWAIT_READY(event, [](const auto& event ) -> bool {
+  //     return (event->get().type() == ...); });
+  int maxEventsToSkip = 10;
+  bool taskHealthUpdateObserved = false;
+  while (maxEventsToSkip--) {
+    Future<Result<v1::master::Event>> event = decoder.read();
+    AWAIT_READY(event);
+
+    if (event->get().type() == v1::master::Event::TASK_UPDATED &&
+        event->get().task_updated().status().task_id() ==
+          evolve(task.task_id()) &&
+        event->get().task_updated().status().state() == v1::TASK_RUNNING &&
+        event->get().task_updated().status().reason() ==
+          v1::TaskStatus::REASON_TASK_HEALTH_CHECK_STATUS_UPDATED &&
+        event->get().task_updated().status().healthy() == false) {
+      taskHealthUpdateObserved = true;
+      break;
+    }
+  }
+
+  ASSERT_TRUE(taskHealthUpdateObserved)
+      << "Health update for task '" << task.task_id()
+      << "' has not been received";
+}
+
+
 // This test verifies that subscribing to the 'api/v1' endpoint between
 // a master failover and an agent re-registration won't cause the master
 // to crash. See MESOS-8601.

Reply via email to