This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new c6caf50 Updated master::Call::SUBSCRIBE to be served in parallel.
c6caf50 is described below
commit c6caf5017cc03a150293a2cc7fa897029e1de833
Author: Benjamin Mahler <[email protected]>
AuthorDate: Thu Jan 16 17:14:50 2020 -0500
Updated master::Call::SUBSCRIBE to be served in parallel.
This call is not entirely read-only, unlike the other GET_* v1 master
calls, and therefore it warranted its own patch.
The approach used is to add a post-processing "write" step to the
handler return type. The post-processing step gets executed
synchronously. In order to deal with different potential post-
processing steps, we use a Variant.
Note that SUBSCRIBE cannot asynchronously register the subscriber
after the read-only state is served, because it will miss events
in the interim!
Review: https://reviews.apache.org/r/72019
---
src/common/http.hpp | 23 +--
src/master/http.cpp | 265 +++++++++-------------------
src/master/master.hpp | 87 ++++++---
src/master/readonly_handler.cpp | 380 +++++++++++++++++++++++++++++++---------
src/tests/master_load_tests.cpp | 10 +-
5 files changed, 455 insertions(+), 310 deletions(-)
diff --git a/src/common/http.hpp b/src/common/http.hpp
index 47a4d6a..5fc19fd 100644
--- a/src/common/http.hpp
+++ b/src/common/http.hpp
@@ -264,8 +264,10 @@ public:
bool approved(const Args&... args)
{
if (!approvers.contains(action)) {
- LOG(WARNING) << "Attempted to authorize " << principal
- << " for unexpected action " << stringify(action);
+ LOG(WARNING)
+ << "Attempted to authorize principal "
+ << " '" << (principal.isSome() ? stringify(*principal) : "") << "'"
+ << " for unexpected action " << stringify(action);
return false;
}
@@ -274,29 +276,28 @@ public:
if (approved.isError()) {
// TODO(joerg84): Expose these errors back to the caller.
- LOG(WARNING) << "Failed to authorize principal " << principal
- << "for action " << stringify(action) << ": "
- << approved.error();
+ LOG(WARNING)
+ << "Failed to authorize principal "
+ << " '" << (principal.isSome() ? stringify(*principal) : "") << "'"
+ << " for action " << stringify(action) << ": " << approved.error();
return false;
}
return approved.get();
}
+ const Option<process::http::authentication::Principal> principal;
+
private:
ObjectApprovers(
hashmap<
authorization::Action,
process::Owned<ObjectApprover>>&& _approvers,
const Option<process::http::authentication::Principal>& _principal)
- : approvers(std::move(_approvers)),
- principal(_principal.isSome()
- ? "'" + stringify(_principal.get()) + "'"
- : "")
- {}
+ : principal(_principal),
+ approvers(std::move(_approvers)) {}
hashmap<authorization::Action, process::Owned<ObjectApprover>> approvers;
- const std::string principal; // Only used for logging.
};
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 8a58863..eeaac88 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -132,6 +132,7 @@ using std::copy_if;
using std::function;
using std::list;
using std::map;
+using std::pair;
using std::set;
using std::string;
using std::tie;
@@ -419,7 +420,7 @@ Future<Response> Master::Http::api(
Future<Response> Master::Http::subscribe(
const mesos::master::Call& call,
const Option<Principal>& principal,
- ContentType contentType) const
+ ContentType outputContentType) const
{
CHECK_EQ(mesos::master::Call::SUBSCRIBE, call.type());
@@ -428,160 +429,16 @@ Future<Response> Master::Http::subscribe(
principal,
{VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR, VIEW_ROLE})
.then(defer(
- master->self(),
- [=](const Owned<ObjectApprovers>& approvers) -> Future<Response> {
- Pipe pipe;
- OK ok;
-
- ok.headers["Content-Type"] = stringify(contentType);
- ok.type = Response::PIPE;
- ok.reader = pipe.reader();
-
- StreamingHttpConnection<v1::master::Event> http(
- pipe.writer(), contentType);
-
- // Serialize the following event:
- //
- // mesos::master::Event event;
- // event.set_type(mesos::master::Event::SUBSCRIBED);
- // *event.mutable_subscribed()->mutable_get_state() =
- // _getState(approvers);
- // event.mutable_subscribed()->set_heartbeat_interval_seconds(
- // DEFAULT_HEARTBEAT_INTERVAL.secs());
- //
- // http.send(event);
-
- switch (contentType) {
- case ContentType::PROTOBUF: {
- string serialized;
- google::protobuf::io::StringOutputStream stream(&serialized);
- google::protobuf::io::CodedOutputStream writer(&stream);
-
- WireFormatLite::WriteEnum(
- mesos::v1::master::Event::kTypeFieldNumber,
- mesos::v1::master::Event::SUBSCRIBED,
- &writer);
-
- WireFormatLite::WriteBytes(
- mesos::v1::master::Event::kSubscribedFieldNumber,
- serializeSubscribe(approvers),
- &writer);
-
- // We must manually trim the unused buffer space since
- // we use the string before the coded output stream is
- // destructed.
- writer.Trim();
-
- http.send(serialized);
-
- break;
- }
-
- case ContentType::JSON: {
- string serialized = jsonify([&](JSON::ObjectWriter* writer) {
- const google::protobuf::Descriptor* descriptor =
- v1::master::Event::descriptor();
-
- int field;
-
- field = v1::master::Event::kTypeFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- v1::master::Event::Type_Name(
- v1::master::Event::SUBSCRIBED));
-
- field = v1::master::Event::kSubscribedFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- jsonifySubscribe(master, approvers));
- });
-
- http.send(serialized);
-
- break;
- }
-
- default:
- return NotAcceptable("Request must accept json or protobuf");
- }
-
- mesos::master::Event heartbeatEvent;
- heartbeatEvent.set_type(mesos::master::Event::HEARTBEAT);
- http.send(heartbeatEvent);
-
- // Master::subscribe will start the heartbeater process, which should
- // only happen after `SUBSCRIBED` event is sent.
- master->subscribe(http, principal);
-
- return ok;
- }));
-}
-
-
-function<void(JSON::ObjectWriter*)> Master::Http::jsonifySubscribe(
- const Master* master,
- const Owned<ObjectApprovers>& approvers)
-{
- // Jsonify the following message:
- //
- // mesos::master::Event::Subscribed subscribed;
- // *subscribed.mutable_get_state() = _getState(approvers);
- // subscribed.set_heartbeat_interval_seconds(
- // DEFAULT_HEARTBEAT_INTERVAL.secs());
-
- // TODO(bmahler): This copies the Owned object approvers.
- return [=](JSON::ObjectWriter* writer) {
- const google::protobuf::Descriptor* descriptor =
- v1::master::Event::Subscribed::descriptor();
-
- int field;
-
- field = v1::master::Event::Subscribed::kGetStateFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- Master::ReadOnlyHandler(master).jsonifyGetState(approvers));
-
- field =
v1::master::Event::Subscribed::kHeartbeatIntervalSecondsFieldNumber;
- writer->field(
- descriptor->FindFieldByNumber(field)->name(),
- DEFAULT_HEARTBEAT_INTERVAL.secs());
- };
-}
-
-
-string Master::Http::serializeSubscribe(
- const Owned<ObjectApprovers>& approvers) const
-{
- // Serialize the following message:
- //
- // mesos::master::Event::Subscribed subscribed;
- // *subscribed.mutable_get_state() = _getState(approvers);
- // subscribed.set_heartbeat_interval_seconds(
- // DEFAULT_HEARTBEAT_INTERVAL.secs());
-
- string output;
- google::protobuf::io::StringOutputStream stream(&output);
- google::protobuf::io::CodedOutputStream writer(&stream);
-
- WireFormatLite::WriteBytes(
- mesos::v1::master::Event::Subscribed::kGetStateFieldNumber,
- Master::ReadOnlyHandler(master).serializeGetState(approvers),
- &writer);
-
- WireFormatLite::WriteDouble(
- mesos::v1::master::Event::Subscribed
- ::kHeartbeatIntervalSecondsFieldNumber,
- DEFAULT_HEARTBEAT_INTERVAL.secs(),
- &writer);
-
- // While an explicit Trim() isn't necessary (since the coded
- // output stream is destructed before the string is returned),
- // it's a quite tricky bug to diagnose if Trim() is missed, so
- // we always do it explicitly to signal the reader about this
- // subtlety.
- writer.Trim();
-
- return output;
+ master->self(),
+ [this, principal, outputContentType](
+ const Owned<ObjectApprovers>& approvers) {
+ return deferBatchedRequest(
+ &Master::ReadOnlyHandler::subscribe,
+ principal,
+ outputContentType,
+ {},
+ approvers);
+ }));
}
@@ -2712,16 +2569,12 @@ Future<Response> Master::Http::deferBatchedRequest(
});
Future<Response> future;
- if (it != batchedRequests.end()) {
- // Return the existing future if we have a matching request.
- // NOTE: This is effectively adding a layer of authorization permissions
- // caching since we only checked the equality of principals, not the
- // equality of the approvers themselves.
- // On heavily-loaded masters, this could lead to a delay of several seconds
- // before permission changes for a principal take effect.
- future = it->promise.future();
- ++master->metrics->http_cache_hits;
- } else {
+
+ // Note that we do not de-duplicate the SUBSCRIBE responses,
+ // since the http server in libprocess assumes there's only
+ // 1 reader of the pipe.
+ if (handler == &Master::ReadOnlyHandler::subscribe ||
+ it == batchedRequests.end()) {
// Add an element to the batched state requests.
Promise<Response> promise;
future = promise.future();
@@ -2732,6 +2585,23 @@ Future<Response> Master::Http::deferBatchedRequest(
principal,
approvers,
std::move(promise)});
+ } else {
+ // Return the existing future if we have a matching request.
+ // NOTE: This is effectively adding a layer of authorization permissions
+ // caching since we only checked the equality of principals, not the
+ // equality of the approvers themselves.
+ // On heavily-loaded masters, this could lead to a delay of several seconds
+ // before permission changes for a principal take effect.
+ future = it->promise.future();
+ ++master->metrics->http_cache_hits;
+
+ // NOTE: The returned response should be either of type
+ // `BODY` or `PATH`, since `PIPE`-type responses cannot
+ // be de-duplicated currently.
+ it->promise.future()
+ .onReady([](const Response& r) {
+ CHECK_NE(r.type, Response::PIPE);
+ });
}
// Schedule processing of batched requests if not yet scheduled.
@@ -2750,6 +2620,9 @@ void Master::Http::processRequestsBatch() const
CHECK(!batchedRequests.empty())
<< "Bug in state batching logic: No requests to process";
+ vector<Future<pair<Response, Option<ReadOnlyHandler::PostProcessing>>>>
+ results;
+
// Produce the responses in parallel.
//
// TODO(alexr): Consider abstracting this into `parallel_async` or
@@ -2758,20 +2631,30 @@ void Master::Http::processRequestsBatch() const
// TODO(alexr): Consider moving `BatchedStateRequest`'s fields into
// `process::async` once it supports moving.
foreach (BatchedRequest& request, batchedRequests) {
- request.promise.associate(process::async(
- [this](ReadOnlyRequestHandler handler,
- ContentType outputContentType,
- const hashmap<std::string, std::string>& queryParameters,
- const process::Owned<ObjectApprovers>& approvers) {
- return (readonlyHandler.*handler)(
- outputContentType,
- queryParameters,
- approvers);
- },
- request.handler,
- request.outputContentType,
- request.queryParameters,
- request.approvers));
+ Future<pair<Response, Option<ReadOnlyHandler::PostProcessing>>>
+ f = process::async(
+ [this](ReadOnlyRequestHandler handler,
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& queryParameters,
+ const process::Owned<ObjectApprovers>& approvers) {
+ return (readonlyHandler.*handler)(
+ outputContentType,
+ queryParameters,
+ approvers);
+ },
+ request.handler,
+ request.outputContentType,
+ request.queryParameters,
+ request.approvers);
+
+ request.promise.associate(
+ f.then([](const pair<
+ Response,
+ Option<ReadOnlyHandler::PostProcessing>>& result) {
+ return result.first;
+ }));
+
+ results.push_back(f);
}
// Block the master actor until all workers have generated state responses.
@@ -2780,13 +2663,27 @@ void Master::Http::processRequestsBatch() const
//
// NOTE: There is the potential for deadlock since we are blocking 1 working
// thread here, see MESOS-8256.
- vector<Future<Response>> responses;
- foreach (const BatchedRequest& request, batchedRequests) {
- responses.push_back(request.promise.future());
- }
- process::await(responses).await();
+ process::await(results).await();
batchedRequests.clear();
+
+ // Now perform the post-processing "writes" synchronously.
+ for (const auto& result : results) {
+ CHECK(!result.isPending()) << result;
+
+ // Response failed or was discarded.
+ if (!result.isReady()) continue;
+
+ // No post-processing needed.
+ if (result->second.isNone()) continue;
+
+ const ReadOnlyHandler::PostProcessing& postProcessing = *result->second;
+
+ postProcessing.state.visit(
+ [&](const ReadOnlyHandler::PostProcessing::Subscribe& s) {
+ master->subscribe(s.connection, s.principal);
+ });
+ }
}
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 3074918..c813e9f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -24,6 +24,7 @@
#include <memory>
#include <set>
#include <string>
+#include <utility>
#include <vector>
#include <mesos/mesos.hpp>
@@ -1290,12 +1291,21 @@ private:
};
public:
- // Inner class used to namespace HTTP handlers that do not change the
- // underlying master object.
+ // Inner class used to namespace read-only HTTP handlers; these handlers
+ // may be executed in parallel.
//
- // Endpoints served by this handler are only permitted to depend on
- // the request query parameters and the authorization filters to
- // make caching of responses possible.
+ // A synchronously executed post-processing step is provided for any
+ // cases where the handler is not purely read-only and requires a
+ // synchronous write (i.e. it's not feasible to perform the write
+ // asynchronously (e.g. SUBSCRIBE cannot have a gap between serving
+ // the initial state and registering the subscriber, or else events
+ // will be missed in the interim)).
+ //
+ // The handlers are only permitted to depend on the output content
+ // type (derived from the request headers), the request query
+ // parameters and the authorization filters to de-duplicate identical
+ // responses (this does not de-duplicate all identical responses, e.g.
+ // different authz principal but same permissions).
//
// NOTE: Most member functions of this class are not routed directly but
// dispatched from their corresponding handlers in the outer `Http` class.
@@ -1305,91 +1315,106 @@ public:
class ReadOnlyHandler
{
public:
+ struct PostProcessing
+ {
+ struct Subscribe
+ {
+ Option<process::http::authentication::Principal> principal;
+ StreamingHttpConnection<v1::master::Event> connection;
+ };
+
+ // Any additional post-processing cases will add additional
+ // cases into this variant.
+ Variant<Subscribe> state;
+ };
+
explicit ReadOnlyHandler(const Master* _master) : master(_master) {}
// /frameworks
- process::http::Response frameworks(
+ std::pair<process::http::Response, Option<PostProcessing>> frameworks(
ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// /roles
- process::http::Response roles(
+ std::pair<process::http::Response, Option<PostProcessing>> roles(
ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// /slaves
- process::http::Response slaves(
+ std::pair<process::http::Response, Option<PostProcessing>> slaves(
ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// /state
- process::http::Response state(
+ std::pair<process::http::Response, Option<PostProcessing>> state(
ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// /state-summary
- process::http::Response stateSummary(
+ std::pair<process::http::Response, Option<PostProcessing>> stateSummary(
ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// /tasks
- process::http::Response tasks(
+ std::pair<process::http::Response, Option<PostProcessing>> tasks(
ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// master::Call::GET_STATE
- process::http::Response getState(
+ std::pair<process::http::Response, Option<PostProcessing>> getState(
ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// master::Call::GET_AGENTS
- process::http::Response getAgents(
+ std::pair<process::http::Response, Option<PostProcessing>> getAgents(
ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// master::Call::GET_FRAMEWORKS
- process::http::Response getFrameworks(
+ std::pair<process::http::Response, Option<PostProcessing>> getFrameworks(
ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// master::Call::GET_EXECUTORS
- process::http::Response getExecutors(
+ std::pair<process::http::Response, Option<PostProcessing>> getExecutors(
ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// master::Call::GET_OPERATIONS
- process::http::Response getOperations(
+ std::pair<process::http::Response, Option<PostProcessing>> getOperations(
ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// master::Call::GET_TASKS
- process::http::Response getTasks(
+ std::pair<process::http::Response, Option<PostProcessing>> getTasks(
ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
// master::Call::GET_ROLES
- process::http::Response getRoles(
+ std::pair<process::http::Response, Option<PostProcessing>> getRoles(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& queryParameters,
+ const process::Owned<ObjectApprovers>& approvers) const;
+
+ // master::Call::SUBSCRIBE
+ std::pair<process::http::Response, Option<PostProcessing>> subscribe(
ContentType outputContentType,
const hashmap<std::string, std::string>& queryParameters,
const process::Owned<ObjectApprovers>& approvers) const;
- // TODO(bmahler): These could just live in the .cpp file,
- // however they are shared with SUBSCRIBE which currently
- // is not implemented as a read only handler here. Make these
- // private or only in the .cpp file once SUBSCRIBE is moved
- // into readonly_handler.cpp.
+ private:
std::string serializeGetState(
const process::Owned<ObjectApprovers>& approvers) const;
std::string serializeGetAgents(
@@ -1404,6 +1429,8 @@ public:
const process::Owned<ObjectApprovers>& approvers) const;
std::string serializeGetRoles(
const process::Owned<ObjectApprovers>& approvers) const;
+ std::string serializeSubscribe(
+ const process::Owned<ObjectApprovers>& approvers) const;
std::function<void(JSON::ObjectWriter*)> jsonifyGetState(
const process::Owned<ObjectApprovers>& approvers) const;
@@ -1419,8 +1446,9 @@ public:
const process::Owned<ObjectApprovers>& approvers) const;
std::function<void(JSON::ObjectWriter*)> jsonifyGetRoles(
const process::Owned<ObjectApprovers>& approvers) const;
+ std::function<void(JSON::ObjectWriter*)> jsonifySubscribe(
+ const process::Owned<ObjectApprovers>& approvers) const;
- private:
const Master* master;
};
@@ -1914,8 +1942,13 @@ private:
// installation, we take some extra care to keep the backlog small.
// In particular, all read-only requests are batched and executed in
// parallel, instead of going through the master queue separately.
+ // The post-processing step, that depends on the handler, will be
+ // executed synchronously and serially after the parallel executions
+ // complete.
- typedef process::http::Response
+ typedef std::pair<
+ process::http::Response,
+ Option<ReadOnlyHandler::PostProcessing>>
(Master::ReadOnlyHandler::*ReadOnlyRequestHandler)(
ContentType,
const hashmap<std::string, std::string>&,
@@ -1937,10 +1970,6 @@ private:
hashmap<std::string, std::string> queryParameters;
Option<process::http::authentication::Principal> principal;
process::Owned<ObjectApprovers> approvers;
-
- // NOTE: The returned response should be either of type
- // `BODY` or `PATH`, since `PIPE`-type responses would
- // break the deduplication mechanism.
process::Promise<process::http::Response> promise;
};
diff --git a/src/master/readonly_handler.cpp b/src/master/readonly_handler.cpp
index fbe748d..40005a2 100644
--- a/src/master/readonly_handler.cpp
+++ b/src/master/readonly_handler.cpp
@@ -48,6 +48,7 @@ using process::Owned;
using process::http::NotAcceptable;
using process::http::OK;
+using process::http::Response;
using mesos::authorization::VIEW_EXECUTOR;
using mesos::authorization::VIEW_FLAGS;
@@ -58,8 +59,9 @@ using mesos::authorization::VIEW_TASK;
using mesos::internal::protobuf::WireFormatLite2;
using std::function;
-using std::vector;
+using std::pair;
using std::string;
+using std::vector;
namespace mesos {
namespace internal {
@@ -672,10 +674,11 @@ private:
};
-process::http::Response Master::ReadOnlyHandler::frameworks(
- ContentType outputContentType,
- const hashmap<std::string, std::string>& query,
- const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+ Master::ReadOnlyHandler::frameworks(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
{
CHECK_EQ(outputContentType, ContentType::JSON);
@@ -729,14 +732,17 @@ process::http::Response
Master::ReadOnlyHandler::frameworks(
writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
};
- return OK(jsonify(frameworks), query.get("jsonp"));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(jsonify(frameworks), query.get("jsonp")),
+ None());
}
-process::http::Response Master::ReadOnlyHandler::roles(
- ContentType outputContentType,
- const hashmap<std::string, std::string>& query,
- const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+ Master::ReadOnlyHandler::roles(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
{
CHECK_EQ(outputContentType, ContentType::JSON);
@@ -809,29 +815,34 @@ process::http::Response Master::ReadOnlyHandler::roles(
});
};
- return OK(jsonify(roles), query.get("jsonp"));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(jsonify(roles), query.get("jsonp")),
+ None());
}
-process::http::Response Master::ReadOnlyHandler::slaves(
- ContentType outputContentType,
- const hashmap<std::string, std::string>& query,
- const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+ Master::ReadOnlyHandler::slaves(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
{
CHECK_EQ(outputContentType, ContentType::JSON);
IDAcceptor<SlaveID> selectSlaveId(query.get("slave_id"));
- return process::http::OK(
- jsonify(SlavesWriter(master->slaves, approvers, selectSlaveId)),
- query.get("jsonp"));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(jsonify(SlavesWriter(master->slaves, approvers, selectSlaveId)),
+ query.get("jsonp")),
+ None());
}
-process::http::Response Master::ReadOnlyHandler::state(
- ContentType outputContentType,
- const hashmap<std::string, std::string>& query,
- const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+ Master::ReadOnlyHandler::state(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
{
CHECK_EQ(outputContentType, ContentType::JSON);
@@ -973,14 +984,17 @@ process::http::Response Master::ReadOnlyHandler::state(
writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
};
- return OK(jsonify(calculateState), query.get("jsonp"));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(jsonify(calculateState), query.get("jsonp")),
+ None());
}
-process::http::Response Master::ReadOnlyHandler::stateSummary(
- ContentType outputContentType,
- const hashmap<std::string, std::string>& query,
- const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+ Master::ReadOnlyHandler::stateSummary(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
{
CHECK_EQ(outputContentType, ContentType::JSON);
@@ -1128,7 +1142,9 @@ process::http::Response
Master::ReadOnlyHandler::stateSummary(
});
};
- return OK(jsonify(stateSummary), query.get("jsonp"));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(jsonify(stateSummary), query.get("jsonp")),
+ None());
}
@@ -1176,10 +1192,11 @@ struct TaskComparator
};
-process::http::Response Master::ReadOnlyHandler::tasks(
- ContentType outputContentType,
- const hashmap<std::string, std::string>& query,
- const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+ Master::ReadOnlyHandler::tasks(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
{
CHECK_EQ(outputContentType, ContentType::JSON);
@@ -1284,7 +1301,9 @@ process::http::Response Master::ReadOnlyHandler::tasks(
});
};
- return OK(jsonify(tasksWriter), query.get("jsonp"));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(jsonify(tasksWriter), query.get("jsonp")),
+ None());
}
@@ -1421,10 +1440,11 @@ string Master::ReadOnlyHandler::serializeGetAgents(
-process::http::Response Master::ReadOnlyHandler::getAgents(
- ContentType outputContentType,
- const hashmap<std::string, std::string>& query,
- const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+ Master::ReadOnlyHandler::getAgents(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
@@ -1453,7 +1473,9 @@ process::http::Response
Master::ReadOnlyHandler::getAgents(
// destructed.
writer.Trim();
- return OK(std::move(output), stringify(outputContentType));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(std::move(output), stringify(outputContentType)),
+ None());
}
case ContentType::JSON: {
@@ -1476,11 +1498,14 @@ process::http::Response
Master::ReadOnlyHandler::getAgents(
});
// TODO(bmahler): Pass jsonp query parameter through here.
- return OK(std::move(body), stringify(outputContentType));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(std::move(body), stringify(outputContentType)),
+ None());
}
default:
- return NotAcceptable("Request must accept json or protobuf");
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ NotAcceptable("Request must accept json or protobuf"), None());
}
}
@@ -1602,10 +1627,11 @@ string Master::ReadOnlyHandler::serializeGetFrameworks(
}
-process::http::Response Master::ReadOnlyHandler::getFrameworks(
- ContentType outputContentType,
- const hashmap<std::string, std::string>& query,
- const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+ Master::ReadOnlyHandler::getFrameworks(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
@@ -1634,7 +1660,9 @@ process::http::Response
Master::ReadOnlyHandler::getFrameworks(
// destructed.
writer.Trim();
- return OK(std::move(output), stringify(outputContentType));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(std::move(output), stringify(outputContentType)),
+ None());
}
case ContentType::JSON: {
@@ -1657,11 +1685,14 @@ process::http::Response
Master::ReadOnlyHandler::getFrameworks(
});
// TODO(bmahler): Pass jsonp query parameter through here.
- return OK(std::move(body), stringify(outputContentType));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(std::move(body), stringify(outputContentType)),
+ None());
}
default:
- return NotAcceptable("Request must accept json or protobuf");
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ NotAcceptable("Request must accept json or protobuf"), None());
}
}
@@ -1844,10 +1875,11 @@ string Master::ReadOnlyHandler::serializeGetExecutors(
}
-process::http::Response Master::ReadOnlyHandler::getExecutors(
- ContentType outputContentType,
- const hashmap<std::string, std::string>& query,
- const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+ Master::ReadOnlyHandler::getExecutors(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
@@ -1876,7 +1908,9 @@ process::http::Response
Master::ReadOnlyHandler::getExecutors(
// destructed.
writer.Trim();
- return OK(std::move(output), stringify(outputContentType));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(std::move(output), stringify(outputContentType)),
+ None());
}
case ContentType::JSON: {
@@ -1899,11 +1933,14 @@ process::http::Response
Master::ReadOnlyHandler::getExecutors(
});
// TODO(bmahler): Pass jsonp query parameter through here.
- return OK(std::move(body), stringify(outputContentType));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(std::move(body), stringify(outputContentType)),
+ None());
}
default:
- return NotAcceptable("Request must accept json or protobuf");
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ NotAcceptable("Request must accept json or protobuf"), None());
}
}
@@ -2132,10 +2169,11 @@ string Master::ReadOnlyHandler::serializeGetTasks(
}
-process::http::Response Master::ReadOnlyHandler::getTasks(
- ContentType outputContentType,
- const hashmap<std::string, std::string>& query,
- const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+ Master::ReadOnlyHandler::getTasks(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
@@ -2164,7 +2202,9 @@ process::http::Response Master::ReadOnlyHandler::getTasks(
// destructed.
writer.Trim();
- return OK(std::move(output), stringify(outputContentType));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(std::move(output), stringify(outputContentType)),
+ None());
}
case ContentType::JSON: {
@@ -2187,19 +2227,23 @@ process::http::Response
Master::ReadOnlyHandler::getTasks(
});
// TODO(bmahler): Pass jsonp query parameter through here.
- return OK(std::move(body), stringify(outputContentType));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(std::move(body), stringify(outputContentType)),
+ None());
}
default:
- return NotAcceptable("Request must accept json or protobuf");
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ NotAcceptable("Request must accept json or protobuf"), None());
}
}
-process::http::Response Master::ReadOnlyHandler::getOperations(
- ContentType outputContentType,
- const hashmap<std::string, std::string>& query,
- const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+ Master::ReadOnlyHandler::getOperations(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
{
// We consider a principal to be authorized to view an operation if it
// is authorized to view the resources the operation is performed on.
@@ -2249,16 +2293,18 @@ process::http::Response
Master::ReadOnlyHandler::getOperations(
}
}
- return OK(
- serialize(outputContentType, evolve(response)),
- stringify(outputContentType));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(serialize(outputContentType, evolve(response)),
+ stringify(outputContentType)),
+ None());
}
-process::http::Response Master::ReadOnlyHandler::getRoles(
- ContentType outputContentType,
- const hashmap<std::string, std::string>& query,
- const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+ Master::ReadOnlyHandler::getRoles(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
{
const vector<string> knownRoles = master->knownRoles();
@@ -2308,8 +2354,10 @@ process::http::Response
Master::ReadOnlyHandler::getRoles(
}
}
- return OK(serialize(outputContentType, evolve(response)),
- stringify(outputContentType));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(serialize(outputContentType, evolve(response)),
+ stringify(outputContentType)),
+ None());
}
@@ -2400,10 +2448,11 @@ string Master::ReadOnlyHandler::serializeGetState(
}
-process::http::Response Master::ReadOnlyHandler::getState(
- ContentType outputContentType,
- const hashmap<std::string, std::string>& query,
- const process::Owned<ObjectApprovers>& approvers) const
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+ Master::ReadOnlyHandler::getState(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
@@ -2432,7 +2481,9 @@ process::http::Response Master::ReadOnlyHandler::getState(
// destructed.
writer.Trim();
- return OK(std::move(output), stringify(outputContentType));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(std::move(output), stringify(outputContentType)),
+ None());
}
case ContentType::JSON: {
@@ -2455,12 +2506,179 @@ process::http::Response
Master::ReadOnlyHandler::getState(
});
// TODO(bmahler): Pass jsonp query parameter through here.
- return OK(std::move(body), stringify(outputContentType));
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ OK(std::move(body), stringify(outputContentType)),
+ None());
+ }
+
+ default:
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ NotAcceptable("Request must accept json or protobuf"), None());
+ }
+}
+
+
+pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
+ Master::ReadOnlyHandler::subscribe(
+ ContentType outputContentType,
+ const hashmap<std::string, std::string>& query,
+ const process::Owned<ObjectApprovers>& approvers) const
+{
+ process::http::Pipe pipe;
+ OK ok;
+
+ ok.headers["Content-Type"] = stringify(outputContentType);
+ ok.type = process::http::Response::PIPE;
+ ok.reader = pipe.reader();
+
+ StreamingHttpConnection<v1::master::Event> http(
+ pipe.writer(), outputContentType);
+
+ // Serialize the following event:
+ //
+ // mesos::master::Event event;
+ // event.set_type(mesos::master::Event::SUBSCRIBED);
+ // *event.mutable_subscribed()->mutable_get_state() =
+ // _getState(approvers);
+ // event.mutable_subscribed()->set_heartbeat_interval_seconds(
+ // DEFAULT_HEARTBEAT_INTERVAL.secs());
+ //
+ // http.send(event);
+
+ switch (outputContentType) {
+ case ContentType::PROTOBUF: {
+ string serialized;
+ google::protobuf::io::StringOutputStream stream(&serialized);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ WireFormatLite::WriteEnum(
+ mesos::v1::master::Event::kTypeFieldNumber,
+ mesos::v1::master::Event::SUBSCRIBED,
+ &writer);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Event::kSubscribedFieldNumber,
+ serializeSubscribe(approvers),
+ &writer);
+
+ // We must manually trim the unused buffer space since
+ // we use the string before the coded output stream is
+ // destructed.
+ writer.Trim();
+
+ http.send(serialized);
+
+ break;
+ }
+
+ case ContentType::JSON: {
+ string serialized = jsonify([&](JSON::ObjectWriter* writer) {
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Event::descriptor();
+
+ int field;
+
+ field = v1::master::Event::kTypeFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ v1::master::Event::Type_Name(
+ v1::master::Event::SUBSCRIBED));
+
+ field = v1::master::Event::kSubscribedFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ jsonifySubscribe(approvers));
+ });
+
+ http.send(serialized);
+
+ break;
}
default:
- return NotAcceptable("Request must accept json or protobuf");
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ NotAcceptable("Request must accept json or protobuf"), None());
}
+
+ mesos::master::Event heartbeatEvent;
+ heartbeatEvent.set_type(mesos::master::Event::HEARTBEAT);
+ http.send(heartbeatEvent);
+
+ // This new subscriber needs to be added in the post-processing step.
+ Master::ReadOnlyHandler::PostProcessing::Subscribe s =
+ { approvers->principal, http };
+
+ Master::ReadOnlyHandler::PostProcessing postProcessing = { std::move(s) };
+
+ return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
+ ok,
+ std::move(postProcessing));
+}
+
+
+function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifySubscribe(
+ const Owned<ObjectApprovers>& approvers) const
+{
+ // Jsonify the following message:
+ //
+ // mesos::master::Event::Subscribed subscribed;
+ // *subscribed.mutable_get_state() = _getState(approvers);
+ // subscribed.set_heartbeat_interval_seconds(
+ // DEFAULT_HEARTBEAT_INTERVAL.secs());
+
+ // TODO(bmahler): This copies the Owned object approvers.
+ return [=](JSON::ObjectWriter* writer) {
+ const google::protobuf::Descriptor* descriptor =
+ v1::master::Event::Subscribed::descriptor();
+
+ int field;
+
+ field = v1::master::Event::Subscribed::kGetStateFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ jsonifyGetState(approvers));
+
+ field =
v1::master::Event::Subscribed::kHeartbeatIntervalSecondsFieldNumber;
+ writer->field(
+ descriptor->FindFieldByNumber(field)->name(),
+ DEFAULT_HEARTBEAT_INTERVAL.secs());
+ };
+}
+
+
+string Master::ReadOnlyHandler::serializeSubscribe(
+ const Owned<ObjectApprovers>& approvers) const
+{
+ // Serialize the following message:
+ //
+ // mesos::master::Event::Subscribed subscribed;
+ // *subscribed.mutable_get_state() = _getState(approvers);
+ // subscribed.set_heartbeat_interval_seconds(
+ // DEFAULT_HEARTBEAT_INTERVAL.secs());
+
+ string output;
+ google::protobuf::io::StringOutputStream stream(&output);
+ google::protobuf::io::CodedOutputStream writer(&stream);
+
+ WireFormatLite::WriteBytes(
+ mesos::v1::master::Event::Subscribed::kGetStateFieldNumber,
+ serializeGetState(approvers),
+ &writer);
+
+ WireFormatLite::WriteDouble(
+ mesos::v1::master::Event::Subscribed
+ ::kHeartbeatIntervalSecondsFieldNumber,
+ DEFAULT_HEARTBEAT_INTERVAL.secs(),
+ &writer);
+
+ // While an explicit Trim() isn't necessary (since the coded
+ // output stream is destructed before the string is returned),
+ // it's a quite tricky bug to diagnose if Trim() is missed, so
+ // we always do it explicitly to signal the reader about this
+ // subtlety.
+ writer.Trim();
+
+ return output;
}
} // namespace master {
diff --git a/src/tests/master_load_tests.cpp b/src/tests/master_load_tests.cpp
index 6cee248..6bbc1c0 100644
--- a/src/tests/master_load_tests.cpp
+++ b/src/tests/master_load_tests.cpp
@@ -391,19 +391,19 @@ TEST_F(MasterLoadTest, SimultaneousBatchedRequests)
Response reference;
if (request.endpoint == "/state") {
reference = readOnlyHandler.state(
- ContentType::JSON, queryParameters, approvers);
+ ContentType::JSON, queryParameters, approvers).first;
} else if (request.endpoint == "/state-summary") {
reference = readOnlyHandler.stateSummary(
- ContentType::JSON, queryParameters, approvers);
+ ContentType::JSON, queryParameters, approvers).first;
} else if (request.endpoint == "/roles") {
reference = readOnlyHandler.roles(
- ContentType::JSON, queryParameters, approvers);
+ ContentType::JSON, queryParameters, approvers).first;
} else if (request.endpoint == "/frameworks") {
reference = readOnlyHandler.frameworks(
- ContentType::JSON, queryParameters, approvers);
+ ContentType::JSON, queryParameters, approvers).first;
} else if (request.endpoint == "/slaves") {
reference = readOnlyHandler.slaves(
- ContentType::JSON, queryParameters, approvers);
+ ContentType::JSON, queryParameters, approvers).first;
} else {
UNREACHABLE();
}