This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new c6caf50  Updated master::Call::SUBSCRIBE to be served in parallel.
c6caf50 is described below

commit c6caf5017cc03a150293a2cc7fa897029e1de833
Author: Benjamin Mahler <[email protected]>
AuthorDate: Thu Jan 16 17:14:50 2020 -0500

    Updated master::Call::SUBSCRIBE to be served in parallel.
    
    This call is not entirely read-only, unlike the other GET_* v1 master
    calls, and therefore it warranted its own patch.
    
    The approach used is to add a post-processing "write" step to the
    handler return type. The post-processing step gets executed
    synchronously. In order to deal with different potential post-
    processing steps, we use a Variant.
    
    Note that SUBSCRIBE cannot asynchronously register the subscriber
    after the read-only state is served, because it will miss events
    in the interim!
    
    Review: https://reviews.apache.org/r/72019
---
 src/common/http.hpp             |  23 +--
 src/master/http.cpp             | 265 +++++++++-------------------
 src/master/master.hpp           |  87 ++++++---
 src/master/readonly_handler.cpp | 380 +++++++++++++++++++++++++++++++---------
 src/tests/master_load_tests.cpp |  10 +-
 5 files changed, 455 insertions(+), 310 deletions(-)

diff --git a/src/common/http.hpp b/src/common/http.hpp
index 47a4d6a..5fc19fd 100644
--- a/src/common/http.hpp
+++ b/src/common/http.hpp
@@ -264,8 +264,10 @@ public:
   bool approved(const Args&... args)
   {
     if (!approvers.contains(action)) {
-      LOG(WARNING) << "Attempted to authorize " << principal
-                   << " for unexpected action " << stringify(action);
+      LOG(WARNING)
+        << "Attempted to authorize principal "
+        << " '" << (principal.isSome() ? stringify(*principal) : "") << "'"
+        << " for unexpected action " << stringify(action);
       return false;
     }
 
@@ -274,29 +276,28 @@ public:
 
     if (approved.isError()) {
       // TODO(joerg84): Expose these errors back to the caller.
-      LOG(WARNING) << "Failed to authorize principal " << principal
-                   << "for action " << stringify(action) << ": "
-                   << approved.error();
+      LOG(WARNING)
+          << "Failed to authorize principal "
+          << " '" << (principal.isSome() ? stringify(*principal) : "") << "'"
+          << " for action " << stringify(action) << ": " << approved.error();
       return false;
     }
 
     return approved.get();
   }
 
+  const Option<process::http::authentication::Principal> principal;
+
 private:
   ObjectApprovers(
       hashmap<
           authorization::Action,
           process::Owned<ObjectApprover>>&& _approvers,
       const Option<process::http::authentication::Principal>& _principal)
-    : approvers(std::move(_approvers)),
-      principal(_principal.isSome()
-          ? "'" + stringify(_principal.get()) + "'"
-          : "")
-    {}
+    : principal(_principal),
+      approvers(std::move(_approvers)) {}
 
   hashmap<authorization::Action, process::Owned<ObjectApprover>> approvers;
-  const std::string principal; // Only used for logging.
 };
 
 
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 8a58863..eeaac88 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -132,6 +132,7 @@ using std::copy_if;
 using std::function;
 using std::list;
 using std::map;
+using std::pair;
 using std::set;
 using std::string;
 using std::tie;
@@ -419,7 +420,7 @@ Future<Response> Master::Http::api(
 Future<Response> Master::Http::subscribe(
     const mesos::master::Call& call,
     const Option<Principal>& principal,
-    ContentType contentType) const
+    ContentType outputContentType) const
 {
   CHECK_EQ(mesos::master::Call::SUBSCRIBE, call.type());
 
@@ -428,160 +429,16 @@ Future<Response> Master::Http::subscribe(
       principal,
       {VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR, VIEW_ROLE})
     .then(defer(
-        master->self(),
-        [=](const Owned<ObjectApprovers>& approvers) -> Future<Response> {
-          Pipe pipe;
-          OK ok;
-
-          ok.headers["Content-Type"] = stringify(contentType);
-          ok.type = Response::PIPE;
-          ok.reader = pipe.reader();
-
-          StreamingHttpConnection<v1::master::Event> http(
-              pipe.writer(), contentType);
-
-          // Serialize the following event:
-          //
-          //   mesos::master::Event event;
-          //   event.set_type(mesos::master::Event::SUBSCRIBED);
-          //   *event.mutable_subscribed()->mutable_get_state() =
-          //     _getState(approvers);
-          //   event.mutable_subscribed()->set_heartbeat_interval_seconds(
-          //       DEFAULT_HEARTBEAT_INTERVAL.secs());
-          //
-          //   http.send(event);
-
-          switch (contentType) {
-            case ContentType::PROTOBUF: {
-              string serialized;
-              google::protobuf::io::StringOutputStream stream(&serialized);
-              google::protobuf::io::CodedOutputStream writer(&stream);
-
-              WireFormatLite::WriteEnum(
-                  mesos::v1::master::Event::kTypeFieldNumber,
-                  mesos::v1::master::Event::SUBSCRIBED,
-                  &writer);
-
-              WireFormatLite::WriteBytes(
-                  mesos::v1::master::Event::kSubscribedFieldNumber,
-                  serializeSubscribe(approvers),
-                  &writer);
-
-              // We must manually trim the unused buffer space since
-              // we use the string before the coded output stream is
-              // destructed.
-              writer.Trim();
-
-              http.send(serialized);
-
-              break;
-            }
-
-            case ContentType::JSON: {
-              string serialized = jsonify([&](JSON::ObjectWriter* writer) {
-                const google::protobuf::Descriptor* descriptor =
-                  v1::master::Event::descriptor();
-
-                int field;
-
-                field = v1::master::Event::kTypeFieldNumber;
-                writer->field(
-                    descriptor->FindFieldByNumber(field)->name(),
-                    v1::master::Event::Type_Name(
-                        v1::master::Event::SUBSCRIBED));
-
-                field = v1::master::Event::kSubscribedFieldNumber;
-                writer->field(
-                    descriptor->FindFieldByNumber(field)->name(),
-                    jsonifySubscribe(master, approvers));
-              });
-
-              http.send(serialized);
-
-              break;
-            }
-
-            default:
-              return NotAcceptable("Request must accept json or protobuf");
-          }
-
-          mesos::master::Event heartbeatEvent;
-          heartbeatEvent.set_type(mesos::master::Event::HEARTBEAT);
-          http.send(heartbeatEvent);
-
-          // Master::subscribe will start the heartbeater process, which should
-          // only happen after `SUBSCRIBED` event is sent.
-          master->subscribe(http, principal);
-
-          return ok;
-        }));
-}
-
-
-function<void(JSON::ObjectWriter*)> Master::Http::jsonifySubscribe(
-    const Master* master,
-    const Owned<ObjectApprovers>& approvers)
-{
-  // Jsonify the following message:
-  //
-  //   mesos::master::Event::Subscribed subscribed;
-  //   *subscribed.mutable_get_state() = _getState(approvers);
-  //   subscribed.set_heartbeat_interval_seconds(
-  //       DEFAULT_HEARTBEAT_INTERVAL.secs());
-
-  // TODO(bmahler): This copies the Owned object approvers.
-  return [=](JSON::ObjectWriter* writer) {
-    const google::protobuf::Descriptor* descriptor =
-      v1::master::Event::Subscribed::descriptor();
-
-    int field;
-
-    field = v1::master::Event::Subscribed::kGetStateFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        Master::ReadOnlyHandler(master).jsonifyGetState(approvers));
-
-    field = 
v1::master::Event::Subscribed::kHeartbeatIntervalSecondsFieldNumber;
-    writer->field(
-        descriptor->FindFieldByNumber(field)->name(),
-        DEFAULT_HEARTBEAT_INTERVAL.secs());
-  };
-}
-
-
-string Master::Http::serializeSubscribe(
-    const Owned<ObjectApprovers>& approvers) const
-{
-  // Serialize the following message:
-  //
-  //   mesos::master::Event::Subscribed subscribed;
-  //   *subscribed.mutable_get_state() = _getState(approvers);
-  //   subscribed.set_heartbeat_interval_seconds(
-  //       DEFAULT_HEARTBEAT_INTERVAL.secs());
-
-  string output;
-  google::protobuf::io::StringOutputStream stream(&output);
-  google::protobuf::io::CodedOutputStream writer(&stream);
-
-  WireFormatLite::WriteBytes(
-      mesos::v1::master::Event::Subscribed::kGetStateFieldNumber,
-      Master::ReadOnlyHandler(master).serializeGetState(approvers),
-      &writer);
-
-  WireFormatLite::WriteDouble(
-      mesos::v1::master::Event::Subscribed
-        ::kHeartbeatIntervalSecondsFieldNumber,
-      DEFAULT_HEARTBEAT_INTERVAL.secs(),
-      &writer);
-
-  // While an explicit Trim() isn't necessary (since the coded
-  // output stream is destructed before the string is returned),
-  // it's a quite tricky bug to diagnose if Trim() is missed, so
-  // we always do it explicitly to signal the reader about this
-  // subtlety.
-  writer.Trim();
-
-  return output;
+          master->self(),
+          [this, principal, outputContentType](
+              const Owned<ObjectApprovers>& approvers) {
+            return deferBatchedRequest(
+                &Master::ReadOnlyHandler::subscribe,
+                principal,
+                outputContentType,
+                {},
+                approvers);
+          }));
 }
 
 
@@ -2712,16 +2569,12 @@ Future<Response> Master::Http::deferBatchedRequest(
       });
 
   Future<Response> future;
-  if (it != batchedRequests.end()) {
-    // Return the existing future if we have a matching request.
-    // NOTE: This is effectively adding a layer of authorization permissions
-    // caching since we only checked the equality of principals, not the
-    // equality of the approvers themselves.
-    // On heavily-loaded masters, this could lead to a delay of several seconds
-    // before permission changes for a principal take effect.
-    future = it->promise.future();
-    ++master->metrics->http_cache_hits;
-  } else {
+
+  // Note that we do not de-duplicate the SUBSCRIBE responses,
+  // since the http server in libprocess assumes there's only
+  // 1 reader of the pipe.
+  if (handler == &Master::ReadOnlyHandler::subscribe ||
+      it == batchedRequests.end()) {
     // Add an element to the batched state requests.
     Promise<Response> promise;
     future = promise.future();
@@ -2732,6 +2585,23 @@ Future<Response> Master::Http::deferBatchedRequest(
         principal,
         approvers,
         std::move(promise)});
+  } else {
+    // Return the existing future if we have a matching request.
+    // NOTE: This is effectively adding a layer of authorization permissions
+    // caching since we only checked the equality of principals, not the
+    // equality of the approvers themselves.
+    // On heavily-loaded masters, this could lead to a delay of several seconds
+    // before permission changes for a principal take effect.
+    future = it->promise.future();
+    ++master->metrics->http_cache_hits;
+
+    // NOTE: The returned response should be either of type
+    // `BODY` or `PATH`, since `PIPE`-type responses cannot
+    // be de-duplicated currently.
+    it->promise.future()
+      .onReady([](const Response& r) {
+        CHECK_NE(r.type, Response::PIPE);
+      });
   }
 
   // Schedule processing of batched requests if not yet scheduled.
@@ -2750,6 +2620,9 @@ void Master::Http::processRequestsBatch() const
   CHECK(!batchedRequests.empty())
     << "Bug in state batching logic: No requests to process";
 
+  vector<Future<pair<Response, Option<ReadOnlyHandler::PostProcessing>>>>
+    results;
+
   // Produce the responses in parallel.
   //
   // TODO(alexr): Consider abstracting this into `parallel_async` or
@@ -2758,20 +2631,30 @@ void Master::Http::processRequestsBatch() const
   // TODO(alexr): Consider moving `BatchedStateRequest`'s fields into
   // `process::async` once it supports moving.
   foreach (BatchedRequest& request, batchedRequests) {
-    request.promise.associate(process::async(
-        [this](ReadOnlyRequestHandler handler,
-               ContentType outputContentType,
-               const hashmap<std::string, std::string>& queryParameters,
-               const process::Owned<ObjectApprovers>& approvers) {
-          return (readonlyHandler.*handler)(
-              outputContentType,
-              queryParameters,
-              approvers);
-        },
-        request.handler,
-        request.outputContentType,
-        request.queryParameters,
-        request.approvers));
+    Future<pair<Response, Option<ReadOnlyHandler::PostProcessing>>>
+      f = process::async(
+          [this](ReadOnlyRequestHandler handler,
+                 ContentType outputContentType,
+                 const hashmap<std::string, std::string>& queryParameters,
+                 const process::Owned<ObjectApprovers>& approvers) {
+            return (readonlyHandler.*handler)(
+                outputContentType,
+                queryParameters,
+                approvers);
+          },
+          request.handler,
+          request.outputContentType,
+          request.queryParameters,
+          request.approvers);
+
+    request.promise.associate(
+      f.then([](const pair<
+          Response,
+          Option<ReadOnlyHandler::PostProcessing>>& result) {
+        return result.first;
+      }));
+
+    results.push_back(f);
   }
 
   // Block the master actor until all workers have generated state responses.
@@ -2780,13 +2663,27 @@ void Master::Http::processRequestsBatch() const
   //
   // NOTE: There is the potential for deadlock since we are blocking 1 working
   // thread here, see MESOS-8256.
-  vector<Future<Response>> responses;
-  foreach (const BatchedRequest& request, batchedRequests) {
-    responses.push_back(request.promise.future());
-  }
-  process::await(responses).await();
+  process::await(results).await();
 
   batchedRequests.clear();
+
+  // Now perform the post-processing "writes" synchronously.
+  for (const auto& result : results) {
+    CHECK(!result.isPending()) << result;
+
+    // Response failed or was discarded.
+    if (!result.isReady()) continue;
+
+    // No post-processing needed.
+    if (result->second.isNone()) continue;
+
+    const ReadOnlyHandler::PostProcessing& postProcessing = *result->second;
+
+    postProcessing.state.visit(
+        [&](const ReadOnlyHandler::PostProcessing::Subscribe& s) {
+          master->subscribe(s.connection, s.principal);
+        });
+  }
 }
 
 
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 3074918..c813e9f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -24,6 +24,7 @@
 #include <memory>
 #include <set>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <mesos/mesos.hpp>
@@ -1290,12 +1291,21 @@ private:
   };
 
 public:
-  // Inner class used to namespace HTTP handlers that do not change the
-  // underlying master object.
+  // Inner class used to namespace read-only HTTP handlers; these handlers
+  // may be executed in parallel.
   //
-  // Endpoints served by this handler are only permitted to depend on
-  // the request query parameters and the authorization filters to
-  // make caching of responses possible.
+  // A synchronously executed post-processing step is provided for any
+  // cases where the handler is not purely read-only and requires a
+  // synchronous write (i.e. it's not feasible to perform the write
+  // asynchronously (e.g. SUBSCRIBE cannot have a gap between serving
+  // the initial state and registering the subscriber, or else events
+  // will be missed in the interim)).
+  //
+  // The handlers are only permitted to depend on the output content
+  // type (derived from the request headers), the request query
+  // parameters and the authorization filters to de-duplicate identical
+  // responses (this does not de-duplicate all identical responses, e.g.
+  // different authz principal but same permissions).
   //
   // NOTE: Most member functions of this class are not routed directly but
   // dispatched from their corresponding handlers in the outer `Http` class.
@@ -1305,91 +1315,106 @@ public:
   class ReadOnlyHandler
   {
   public:
+    struct PostProcessing
+    {
+      struct Subscribe
+      {
+        Option<process::http::authentication::Principal> principal;
+        StreamingHttpConnection<v1::master::Event> connection;
+      };
+
+      // Any additional post-processing cases will add additional
+      // cases into this variant.
+      Variant<Subscribe> state;
+    };
+
     explicit ReadOnlyHandler(const Master* _master) : master(_master) {}
 
     // /frameworks
-    process::http::Response frameworks(
+    std::pair<process::http::Response, Option<PostProcessing>> frameworks(
         ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // /roles
-    process::http::Response roles(
+    std::pair<process::http::Response, Option<PostProcessing>> roles(
         ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // /slaves
-    process::http::Response slaves(
+    std::pair<process::http::Response, Option<PostProcessing>> slaves(
         ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // /state
-    process::http::Response state(
+    std::pair<process::http::Response, Option<PostProcessing>> state(
         ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // /state-summary
-    process::http::Response stateSummary(
+    std::pair<process::http::Response, Option<PostProcessing>> stateSummary(
         ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // /tasks
-    process::http::Response tasks(
+    std::pair<process::http::Response, Option<PostProcessing>> tasks(
         ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // master::Call::GET_STATE
-    process::http::Response getState(
+    std::pair<process::http::Response, Option<PostProcessing>> getState(
         ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // master::Call::GET_AGENTS
-    process::http::Response getAgents(
+    std::pair<process::http::Response, Option<PostProcessing>> getAgents(
         ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // master::Call::GET_FRAMEWORKS
-    process::http::Response getFrameworks(
+    std::pair<process::http::Response, Option<PostProcessing>> getFrameworks(
         ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // master::Call::GET_EXECUTORS
-    process::http::Response getExecutors(
+    std::pair<process::http::Response, Option<PostProcessing>> getExecutors(
         ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // master::Call::GET_OPERATIONS
-    process::http::Response getOperations(
+    std::pair<process::http::Response, Option<PostProcessing>> getOperations(
         ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // master::Call::GET_TASKS
-    process::http::Response getTasks(
+    std::pair<process::http::Response, Option<PostProcessing>> getTasks(
         ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
     // master::Call::GET_ROLES
-    process::http::Response getRoles(
+    std::pair<process::http::Response, Option<PostProcessing>> getRoles(
+        ContentType outputContentType,
+        const hashmap<std::string, std::string>& queryParameters,
+        const process::Owned<ObjectApprovers>& approvers) const;
+
+    // master::Call::SUBSCRIBE
+    std::pair<process::http::Response, Option<PostProcessing>> subscribe(
         ContentType outputContentType,
         const hashmap<std::string, std::string>& queryParameters,
         const process::Owned<ObjectApprovers>& approvers) const;
 
-    // TODO(bmahler): These could just live in the .cpp file,
-    // however they are shared with SUBSCRIBE which currently
-    // is not implemented as a read only handler here. Make these
-    // private or only in the .cpp file once SUBSCRIBE is moved
-    // into readonly_handler.cpp.
+  private:
     std::string serializeGetState(
         const process::Owned<ObjectApprovers>& approvers) const;
     std::string serializeGetAgents(
@@ -1404,6 +1429,8 @@ public:
         const process::Owned<ObjectApprovers>& approvers) const;
     std::string serializeGetRoles(
         const process::Owned<ObjectApprovers>& approvers) const;
+    std::string serializeSubscribe(
+        const process::Owned<ObjectApprovers>& approvers) const;
 
     std::function<void(JSON::ObjectWriter*)> jsonifyGetState(
         const process::Owned<ObjectApprovers>& approvers) const;
@@ -1419,8 +1446,9 @@ public:
         const process::Owned<ObjectApprovers>& approvers) const;
     std::function<void(JSON::ObjectWriter*)> jsonifyGetRoles(
         const process::Owned<ObjectApprovers>& approvers) const;
+    std::function<void(JSON::ObjectWriter*)> jsonifySubscribe(
+        const process::Owned<ObjectApprovers>& approvers) const;
 
-  private:
     const Master* master;
   };
 
@@ -1914,8 +1942,13 @@ private:
     // installation, we take some extra care to keep the backlog small.
     // In particular, all read-only requests are batched and executed in
     // parallel, instead of going through the master queue separately.
+    // The post-processing step, that depends on the handler, will be
+    // executed synchronously and serially after the parallel executions
+    // complete.
 
-    typedef process::http::Response
+    typedef std::pair<
+        process::http::Response,
+        Option<ReadOnlyHandler::PostProcessing>>
       (Master::ReadOnlyHandler::*ReadOnlyRequestHandler)(
           ContentType,
           const hashmap<std::string, std::string>&,
@@ -1937,10 +1970,6 @@ private:
       hashmap<std::string, std::string> queryParameters;
       Option<process::http::authentication::Principal> principal;
       process::Owned<ObjectApprovers> approvers;
-
-      // NOTE: The returned response should be either of type
-      // `BODY` or `PATH`, since `PIPE`-type responses would
-      // break the deduplication mechanism.
       process::Promise<process::http::Response> promise;
     };
 
diff --git a/src/master/readonly_handler.cpp b/src/master/readonly_handler.cpp
index fbe748d..40005a2 100644
--- a/src/master/readonly_handler.cpp
+++ b/src/master/readonly_handler.cpp
@@ -48,6 +48,7 @@ using process::Owned;
 
 using process::http::NotAcceptable;
 using process::http::OK;
+using process::http::Response;
 
 using mesos::authorization::VIEW_EXECUTOR;
 using mesos::authorization::VIEW_FLAGS;
@@ -58,8 +59,9 @@ using mesos::authorization::VIEW_TASK;
 using mesos::internal::protobuf::WireFormatLite2;
 
 using std::function;
-using std::vector;
+using std::pair;
 using std::string;
+using std::vector;
 
 namespace mesos {
 namespace internal {
@@ -672,10 +674,11 @@ private:
 };
 
 
-process::http::Response Master::ReadOnlyHandler::frameworks(
-    ContentType outputContentType,
-    const hashmap<std::string, std::string>& query,
-    const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+  Master::ReadOnlyHandler::frameworks(
+      ContentType outputContentType,
+      const hashmap<std::string, std::string>& query,
+      const process::Owned<ObjectApprovers>& approvers) const
 {
   CHECK_EQ(outputContentType, ContentType::JSON);
 
@@ -729,14 +732,17 @@ process::http::Response 
Master::ReadOnlyHandler::frameworks(
     writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
   };
 
-  return OK(jsonify(frameworks), query.get("jsonp"));
+  return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+      OK(jsonify(frameworks), query.get("jsonp")),
+      None());
 }
 
 
-process::http::Response Master::ReadOnlyHandler::roles(
-    ContentType outputContentType,
-    const hashmap<std::string, std::string>& query,
-    const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+  Master::ReadOnlyHandler::roles(
+      ContentType outputContentType,
+      const hashmap<std::string, std::string>& query,
+      const process::Owned<ObjectApprovers>& approvers) const
 {
   CHECK_EQ(outputContentType, ContentType::JSON);
 
@@ -809,29 +815,34 @@ process::http::Response Master::ReadOnlyHandler::roles(
         });
   };
 
-  return OK(jsonify(roles), query.get("jsonp"));
+  return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+      OK(jsonify(roles), query.get("jsonp")),
+      None());
 }
 
 
-process::http::Response Master::ReadOnlyHandler::slaves(
-    ContentType outputContentType,
-    const hashmap<std::string, std::string>& query,
-    const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+  Master::ReadOnlyHandler::slaves(
+      ContentType outputContentType,
+      const hashmap<std::string, std::string>& query,
+      const process::Owned<ObjectApprovers>& approvers) const
 {
   CHECK_EQ(outputContentType, ContentType::JSON);
 
   IDAcceptor<SlaveID> selectSlaveId(query.get("slave_id"));
 
-  return process::http::OK(
-      jsonify(SlavesWriter(master->slaves, approvers, selectSlaveId)),
-      query.get("jsonp"));
+  return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+      OK(jsonify(SlavesWriter(master->slaves, approvers, selectSlaveId)),
+         query.get("jsonp")),
+      None());
 }
 
 
-process::http::Response Master::ReadOnlyHandler::state(
-    ContentType outputContentType,
-    const hashmap<std::string, std::string>& query,
-    const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+  Master::ReadOnlyHandler::state(
+      ContentType outputContentType,
+      const hashmap<std::string, std::string>& query,
+      const process::Owned<ObjectApprovers>& approvers) const
 {
   CHECK_EQ(outputContentType, ContentType::JSON);
 
@@ -973,14 +984,17 @@ process::http::Response Master::ReadOnlyHandler::state(
     writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
   };
 
-  return OK(jsonify(calculateState), query.get("jsonp"));
+  return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+      OK(jsonify(calculateState), query.get("jsonp")),
+      None());
 }
 
 
-process::http::Response Master::ReadOnlyHandler::stateSummary(
-    ContentType outputContentType,
-    const hashmap<std::string, std::string>& query,
-    const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+  Master::ReadOnlyHandler::stateSummary(
+      ContentType outputContentType,
+      const hashmap<std::string, std::string>& query,
+      const process::Owned<ObjectApprovers>& approvers) const
 {
   CHECK_EQ(outputContentType, ContentType::JSON);
 
@@ -1128,7 +1142,9 @@ process::http::Response 
Master::ReadOnlyHandler::stateSummary(
         });
     };
 
-  return OK(jsonify(stateSummary), query.get("jsonp"));
+  return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+      OK(jsonify(stateSummary), query.get("jsonp")),
+      None());
 }
 
 
@@ -1176,10 +1192,11 @@ struct TaskComparator
 };
 
 
-process::http::Response Master::ReadOnlyHandler::tasks(
-    ContentType outputContentType,
-    const hashmap<std::string, std::string>& query,
-    const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+  Master::ReadOnlyHandler::tasks(
+      ContentType outputContentType,
+      const hashmap<std::string, std::string>& query,
+      const process::Owned<ObjectApprovers>& approvers) const
 {
   CHECK_EQ(outputContentType, ContentType::JSON);
 
@@ -1284,7 +1301,9 @@ process::http::Response Master::ReadOnlyHandler::tasks(
           });
   };
 
-  return OK(jsonify(tasksWriter), query.get("jsonp"));
+  return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+      OK(jsonify(tasksWriter), query.get("jsonp")),
+      None());
 }
 
 
@@ -1421,10 +1440,11 @@ string Master::ReadOnlyHandler::serializeGetAgents(
 
 
 
-process::http::Response Master::ReadOnlyHandler::getAgents(
-    ContentType outputContentType,
-    const hashmap<std::string, std::string>& query,
-    const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+  Master::ReadOnlyHandler::getAgents(
+      ContentType outputContentType,
+      const hashmap<std::string, std::string>& query,
+      const process::Owned<ObjectApprovers>& approvers) const
 {
   // Serialize the following message:
   //
@@ -1453,7 +1473,9 @@ process::http::Response 
Master::ReadOnlyHandler::getAgents(
       // destructed.
       writer.Trim();
 
-      return OK(std::move(output), stringify(outputContentType));
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          OK(std::move(output), stringify(outputContentType)),
+          None());
     }
 
     case ContentType::JSON: {
@@ -1476,11 +1498,14 @@ process::http::Response 
Master::ReadOnlyHandler::getAgents(
       });
 
       // TODO(bmahler): Pass jsonp query parameter through here.
-      return OK(std::move(body), stringify(outputContentType));
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          OK(std::move(body), stringify(outputContentType)),
+          None());
     }
 
     default:
-      return NotAcceptable("Request must accept json or protobuf");
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          NotAcceptable("Request must accept json or protobuf"), None());
   }
 }
 
@@ -1602,10 +1627,11 @@ string Master::ReadOnlyHandler::serializeGetFrameworks(
 }
 
 
-process::http::Response Master::ReadOnlyHandler::getFrameworks(
-    ContentType outputContentType,
-    const hashmap<std::string, std::string>& query,
-    const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+  Master::ReadOnlyHandler::getFrameworks(
+      ContentType outputContentType,
+      const hashmap<std::string, std::string>& query,
+      const process::Owned<ObjectApprovers>& approvers) const
 {
   // Serialize the following message:
   //
@@ -1634,7 +1660,9 @@ process::http::Response 
Master::ReadOnlyHandler::getFrameworks(
       // destructed.
       writer.Trim();
 
-      return OK(std::move(output), stringify(outputContentType));
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          OK(std::move(output), stringify(outputContentType)),
+          None());
     }
 
     case ContentType::JSON: {
@@ -1657,11 +1685,14 @@ process::http::Response 
Master::ReadOnlyHandler::getFrameworks(
       });
 
       // TODO(bmahler): Pass jsonp query parameter through here.
-      return OK(std::move(body), stringify(outputContentType));
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          OK(std::move(body), stringify(outputContentType)),
+          None());
     }
 
     default:
-      return NotAcceptable("Request must accept json or protobuf");
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          NotAcceptable("Request must accept json or protobuf"), None());
   }
 }
 
@@ -1844,10 +1875,11 @@ string Master::ReadOnlyHandler::serializeGetExecutors(
 }
 
 
-process::http::Response Master::ReadOnlyHandler::getExecutors(
-    ContentType outputContentType,
-    const hashmap<std::string, std::string>& query,
-    const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+  Master::ReadOnlyHandler::getExecutors(
+      ContentType outputContentType,
+      const hashmap<std::string, std::string>& query,
+      const process::Owned<ObjectApprovers>& approvers) const
 {
   // Serialize the following message:
   //
@@ -1876,7 +1908,9 @@ process::http::Response 
Master::ReadOnlyHandler::getExecutors(
       // destructed.
       writer.Trim();
 
-      return OK(std::move(output), stringify(outputContentType));
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          OK(std::move(output), stringify(outputContentType)),
+          None());
     }
 
     case ContentType::JSON: {
@@ -1899,11 +1933,14 @@ process::http::Response 
Master::ReadOnlyHandler::getExecutors(
       });
 
       // TODO(bmahler): Pass jsonp query parameter through here.
-      return OK(std::move(body), stringify(outputContentType));
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          OK(std::move(body), stringify(outputContentType)),
+          None());
     }
 
     default:
-      return NotAcceptable("Request must accept json or protobuf");
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          NotAcceptable("Request must accept json or protobuf"), None());
   }
 }
 
@@ -2132,10 +2169,11 @@ string Master::ReadOnlyHandler::serializeGetTasks(
 }
 
 
-process::http::Response Master::ReadOnlyHandler::getTasks(
-    ContentType outputContentType,
-    const hashmap<std::string, std::string>& query,
-    const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+  Master::ReadOnlyHandler::getTasks(
+      ContentType outputContentType,
+      const hashmap<std::string, std::string>& query,
+      const process::Owned<ObjectApprovers>& approvers) const
 {
   // Serialize the following message:
   //
@@ -2164,7 +2202,9 @@ process::http::Response Master::ReadOnlyHandler::getTasks(
       // destructed.
       writer.Trim();
 
-      return OK(std::move(output), stringify(outputContentType));
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          OK(std::move(output), stringify(outputContentType)),
+          None());
     }
 
     case ContentType::JSON: {
@@ -2187,19 +2227,23 @@ process::http::Response 
Master::ReadOnlyHandler::getTasks(
       });
 
       // TODO(bmahler): Pass jsonp query parameter through here.
-      return OK(std::move(body), stringify(outputContentType));
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          OK(std::move(body), stringify(outputContentType)),
+          None());
     }
 
     default:
-      return NotAcceptable("Request must accept json or protobuf");
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          NotAcceptable("Request must accept json or protobuf"), None());
   }
 }
 
 
-process::http::Response Master::ReadOnlyHandler::getOperations(
-    ContentType outputContentType,
-    const hashmap<std::string, std::string>& query,
-    const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+  Master::ReadOnlyHandler::getOperations(
+      ContentType outputContentType,
+      const hashmap<std::string, std::string>& query,
+      const process::Owned<ObjectApprovers>& approvers) const
 {
   // We consider a principal to be authorized to view an operation if it
   // is authorized to view the resources the operation is performed on.
@@ -2249,16 +2293,18 @@ process::http::Response 
Master::ReadOnlyHandler::getOperations(
     }
   }
 
-  return OK(
-      serialize(outputContentType, evolve(response)),
-      stringify(outputContentType));
+  return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+      OK(serialize(outputContentType, evolve(response)),
+         stringify(outputContentType)),
+      None());
 }
 
 
-process::http::Response Master::ReadOnlyHandler::getRoles(
-    ContentType outputContentType,
-    const hashmap<std::string, std::string>& query,
-    const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+  Master::ReadOnlyHandler::getRoles(
+      ContentType outputContentType,
+      const hashmap<std::string, std::string>& query,
+      const process::Owned<ObjectApprovers>& approvers) const
 {
   const vector<string> knownRoles = master->knownRoles();
 
@@ -2308,8 +2354,10 @@ process::http::Response 
Master::ReadOnlyHandler::getRoles(
     }
   }
 
-  return OK(serialize(outputContentType, evolve(response)),
-            stringify(outputContentType));
+  return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+      OK(serialize(outputContentType, evolve(response)),
+         stringify(outputContentType)),
+      None());
 }
 
 
@@ -2400,10 +2448,11 @@ string Master::ReadOnlyHandler::serializeGetState(
 }
 
 
-process::http::Response Master::ReadOnlyHandler::getState(
-    ContentType outputContentType,
-    const hashmap<std::string, std::string>& query,
-    const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+  Master::ReadOnlyHandler::getState(
+      ContentType outputContentType,
+      const hashmap<std::string, std::string>& query,
+      const process::Owned<ObjectApprovers>& approvers) const
 {
   // Serialize the following message:
   //
@@ -2432,7 +2481,9 @@ process::http::Response Master::ReadOnlyHandler::getState(
       // destructed.
       writer.Trim();
 
-      return OK(std::move(output), stringify(outputContentType));
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          OK(std::move(output), stringify(outputContentType)),
+          None());
     }
 
     case ContentType::JSON: {
@@ -2455,12 +2506,179 @@ process::http::Response 
Master::ReadOnlyHandler::getState(
       });
 
       // TODO(bmahler): Pass jsonp query parameter through here.
-      return OK(std::move(body), stringify(outputContentType));
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          OK(std::move(body), stringify(outputContentType)),
+          None());
+    }
+
+    default:
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          NotAcceptable("Request must accept json or protobuf"), None());
+  }
+}
+
+
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+  Master::ReadOnlyHandler::subscribe(
+      ContentType outputContentType,
+      const hashmap<std::string, std::string>& query,
+      const process::Owned<ObjectApprovers>& approvers) const
+{
+  process::http::Pipe pipe;
+  OK ok;
+
+  ok.headers["Content-Type"] = stringify(outputContentType);
+  ok.type = process::http::Response::PIPE;
+  ok.reader = pipe.reader();
+
+  StreamingHttpConnection<v1::master::Event> http(
+      pipe.writer(), outputContentType);
+
+  // Serialize the following event:
+  //
+  //   mesos::master::Event event;
+  //   event.set_type(mesos::master::Event::SUBSCRIBED);
+  //   *event.mutable_subscribed()->mutable_get_state() =
+  //     _getState(approvers);
+  //   event.mutable_subscribed()->set_heartbeat_interval_seconds(
+  //       DEFAULT_HEARTBEAT_INTERVAL.secs());
+  //
+  //   http.send(event);
+
+  switch (outputContentType) {
+    case ContentType::PROTOBUF: {
+      string serialized;
+      google::protobuf::io::StringOutputStream stream(&serialized);
+      google::protobuf::io::CodedOutputStream writer(&stream);
+
+      WireFormatLite::WriteEnum(
+          mesos::v1::master::Event::kTypeFieldNumber,
+          mesos::v1::master::Event::SUBSCRIBED,
+          &writer);
+
+      WireFormatLite::WriteBytes(
+          mesos::v1::master::Event::kSubscribedFieldNumber,
+          serializeSubscribe(approvers),
+          &writer);
+
+      // We must manually trim the unused buffer space since
+      // we use the string before the coded output stream is
+      // destructed.
+      writer.Trim();
+
+      http.send(serialized);
+
+      break;
+    }
+
+    case ContentType::JSON: {
+      string serialized = jsonify([&](JSON::ObjectWriter* writer) {
+        const google::protobuf::Descriptor* descriptor =
+          v1::master::Event::descriptor();
+
+        int field;
+
+        field = v1::master::Event::kTypeFieldNumber;
+        writer->field(
+            descriptor->FindFieldByNumber(field)->name(),
+            v1::master::Event::Type_Name(
+                v1::master::Event::SUBSCRIBED));
+
+        field = v1::master::Event::kSubscribedFieldNumber;
+        writer->field(
+            descriptor->FindFieldByNumber(field)->name(),
+            jsonifySubscribe(approvers));
+      });
+
+      http.send(serialized);
+
+      break;
     }
 
     default:
-      return NotAcceptable("Request must accept json or protobuf");
+      return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+          NotAcceptable("Request must accept json or protobuf"), None());
   }
+
+  mesos::master::Event heartbeatEvent;
+  heartbeatEvent.set_type(mesos::master::Event::HEARTBEAT);
+  http.send(heartbeatEvent);
+
+  // This new subscriber needs to be added in the post-processing step.
+  Master::ReadOnlyHandler::PostProcessing::Subscribe s =
+    { approvers->principal, http };
+
+  Master::ReadOnlyHandler::PostProcessing postProcessing = { std::move(s) };
+
+  return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+      ok,
+      std::move(postProcessing));
+}
+
+
+function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifySubscribe(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Jsonify the following message:
+  //
+  //   mesos::master::Event::Subscribed subscribed;
+  //   *subscribed.mutable_get_state() = _getState(approvers);
+  //   subscribed.set_heartbeat_interval_seconds(
+  //       DEFAULT_HEARTBEAT_INTERVAL.secs());
+
+  // TODO(bmahler): This copies the Owned object approvers.
+  return [=](JSON::ObjectWriter* writer) {
+    const google::protobuf::Descriptor* descriptor =
+      v1::master::Event::Subscribed::descriptor();
+
+    int field;
+
+    field = v1::master::Event::Subscribed::kGetStateFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        jsonifyGetState(approvers));
+
+    field = 
v1::master::Event::Subscribed::kHeartbeatIntervalSecondsFieldNumber;
+    writer->field(
+        descriptor->FindFieldByNumber(field)->name(),
+        DEFAULT_HEARTBEAT_INTERVAL.secs());
+  };
+}
+
+
+string Master::ReadOnlyHandler::serializeSubscribe(
+    const Owned<ObjectApprovers>& approvers) const
+{
+  // Serialize the following message:
+  //
+  //   mesos::master::Event::Subscribed subscribed;
+  //   *subscribed.mutable_get_state() = _getState(approvers);
+  //   subscribed.set_heartbeat_interval_seconds(
+  //       DEFAULT_HEARTBEAT_INTERVAL.secs());
+
+  string output;
+  google::protobuf::io::StringOutputStream stream(&output);
+  google::protobuf::io::CodedOutputStream writer(&stream);
+
+  WireFormatLite::WriteBytes(
+      mesos::v1::master::Event::Subscribed::kGetStateFieldNumber,
+      serializeGetState(approvers),
+      &writer);
+
+  WireFormatLite::WriteDouble(
+      mesos::v1::master::Event::Subscribed
+        ::kHeartbeatIntervalSecondsFieldNumber,
+      DEFAULT_HEARTBEAT_INTERVAL.secs(),
+      &writer);
+
+  // While an explicit Trim() isn't necessary (since the coded
+  // output stream is destructed before the string is returned),
+  // it's a quite tricky bug to diagnose if Trim() is missed, so
+  // we always do it explicitly to signal the reader about this
+  // subtlety.
+  writer.Trim();
+
+  return output;
 }
 
 } // namespace master {
diff --git a/src/tests/master_load_tests.cpp b/src/tests/master_load_tests.cpp
index 6cee248..6bbc1c0 100644
--- a/src/tests/master_load_tests.cpp
+++ b/src/tests/master_load_tests.cpp
@@ -391,19 +391,19 @@ TEST_F(MasterLoadTest, SimultaneousBatchedRequests)
     Response reference;
     if (request.endpoint == "/state") {
       reference = readOnlyHandler.state(
-          ContentType::JSON, queryParameters, approvers);
+          ContentType::JSON, queryParameters, approvers).first;
     } else if (request.endpoint == "/state-summary") {
       reference = readOnlyHandler.stateSummary(
-          ContentType::JSON, queryParameters, approvers);
+          ContentType::JSON, queryParameters, approvers).first;
     } else if (request.endpoint == "/roles") {
       reference = readOnlyHandler.roles(
-          ContentType::JSON, queryParameters, approvers);
+          ContentType::JSON, queryParameters, approvers).first;
     } else if (request.endpoint == "/frameworks") {
       reference = readOnlyHandler.frameworks(
-          ContentType::JSON, queryParameters, approvers);
+          ContentType::JSON, queryParameters, approvers).first;
     } else if (request.endpoint == "/slaves") {
       reference = readOnlyHandler.slaves(
-          ContentType::JSON, queryParameters, approvers);
+          ContentType::JSON, queryParameters, approvers).first;
     } else {
       UNREACHABLE();
     }

Reply via email to