Updated scheduler library to HTTP. Review: https://reviews.apache.org/r/37303
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/69704a28 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/69704a28 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/69704a28 Branch: refs/heads/master Commit: 69704a28f115a23c4a8e1119bdda3d452674e679 Parents: 138ca69 Author: Anand Mazumdar <[email protected]> Authored: Wed Aug 12 23:06:07 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Wed Aug 12 23:30:41 2015 -0700 ---------------------------------------------------------------------- src/common/http.hpp | 45 +++++++++ src/scheduler/scheduler.cpp | 203 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 243 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/69704a28/src/common/http.hpp ---------------------------------------------------------------------- diff --git a/src/common/http.hpp b/src/common/http.hpp index 98a1270..0eab8ba 100644 --- a/src/common/http.hpp +++ b/src/common/http.hpp @@ -25,6 +25,7 @@ #include <stout/hashmap.hpp> #include <stout/json.hpp> +#include <stout/protobuf.hpp> namespace mesos { @@ -47,6 +48,21 @@ enum class ContentType }; +inline std::ostream& operator<<(std::ostream& stream, ContentType contentType) +{ + switch (contentType) { + case ContentType::PROTOBUF: { + return stream << APPLICATION_PROTOBUF; + } + case ContentType::JSON: { + return stream << APPLICATION_JSON; + } + } + + UNREACHABLE(); +} + + // Serializes a protobuf message for transmission // based on the HTTP content type. std::string serialize( @@ -54,6 +70,35 @@ std::string serialize( const google::protobuf::Message& message); +// Deserializes a string message into a protobuf message based on the +// HTTP content type. +template <typename Message> +Try<Message> deserialize( + ContentType contentType, + const std::string& body) +{ + switch (contentType) { + case ContentType::PROTOBUF: { + Message message; + if (!message.ParseFromString(body)) { + return Error("Failed to parse body into a protobuf object"); + } + return message; + } + case ContentType::JSON: { + Try<JSON::Value> value = JSON::parse(body); + if (value.isError()) { + return Error("Failed to parse body into JSON: " + value.error()); + } + + return ::protobuf::parse<Message>(value.get()); + } + } + + UNREACHABLE(); +} + + JSON::Object model(const Resources& resources); JSON::Object model(const hashmap<std::string, Resources>& roleResources); JSON::Object model(const Attributes& attributes); http://git-wip-us.apache.org/repos/asf/mesos/blob/69704a28/src/scheduler/scheduler.cpp ---------------------------------------------------------------------- diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp index 36d7052..37b5457 100644 --- a/src/scheduler/scheduler.cpp +++ b/src/scheduler/scheduler.cpp @@ -26,6 +26,7 @@ #include <arpa/inet.h> #include <iostream> +#include <queue> #include <string> #include <sstream> @@ -47,13 +48,18 @@ #include <stout/duration.hpp> #include <stout/error.hpp> #include <stout/flags.hpp> +#include <stout/hashmap.hpp> #include <stout/ip.hpp> #include <stout/lambda.hpp> #include <stout/nothing.hpp> #include <stout/option.hpp> #include <stout/os.hpp> +#include <stout/recordio.hpp> #include <stout/uuid.hpp> +#include "common/http.hpp" +#include "common/recordio.hpp" + #include "internal/devolve.hpp" #include "internal/evolve.hpp" @@ -77,8 +83,17 @@ using std::queue; using std::string; using std::vector; +using mesos::internal::recordio::Reader; + +using process::Owned; using process::wait; // Necessary on some OS's to disambiguate. +using process::http::Pipe; +using process::http::post; +using process::http::Response; + +using ::recordio::Decoder; + namespace mesos { namespace v1 { namespace scheduler { @@ -91,10 +106,12 @@ class MesosProcess : public ProtobufProcess<MesosProcess> public: MesosProcess( const string& master, + ContentType _contentType, const lambda::function<void(void)>& _connected, const lambda::function<void(void)>& _disconnected, lambda::function<void(const queue<Event>&)> _received) : ProcessBase(ID::generate("scheduler")), + contentType(_contentType), connected(_connected), disconnected(_disconnected), received(_received), @@ -160,6 +177,8 @@ public: virtual ~MesosProcess() { + disconnect(); + // Check and see if we need to shutdown a local cluster. if (local) { local::shutdown(); @@ -172,7 +191,7 @@ public: // TODO(benh): Move this to 'protected'. using ProtobufProcess<MesosProcess>::send; - void send(Call call) + void send(const Call& call) { if (master.isNone()) { drop(call, "Disconnected"); @@ -189,7 +208,34 @@ public: // TODO(vinod): Add support for sending MESSAGE calls directly // to the slave, instead of relaying it through the master, as // the scheduler driver does. - send(master.get(), devolve(call)); + + const string body = serialize(contentType, call); + const hashmap<string, string> headers{{"Accept", stringify(contentType)}}; + + Future<Response> response; + + if (call.type() == Call::SUBSCRIBE) { + // Each subscription requires a new connection. + disconnect(); + + // Send a streaming request for Subscribe call. + response = process::http::streaming::post( + master.get(), + "api/v1/scheduler", + headers, + body, + stringify(contentType)); + } else { + response = post( + master.get(), + "api/v1/scheduler", + headers, + body, + stringify(contentType)); + } + + response + .onAny(defer(self(), &Self::_send, call, lambda::_1)); } protected: @@ -209,6 +255,9 @@ protected: return; } + // Disconnect the reader upon a master detection callback. + disconnect(); + if (future.get().isNone()) { master = None(); @@ -260,7 +309,7 @@ protected: error->set_message(message); - receive(None(), event); + receive(event, true); } void drop(const Call& call, const string& message) @@ -268,7 +317,145 @@ protected: LOG(WARNING) << "Dropping " << call.type() << ": " << message; } + void _send(const Call& call, const Future<Response>& response) + { + CHECK(!response.isDiscarded()); + + // This can happen during a master failover or a network blip + // causing the socket to timeout. Eventually, the scheduler would + // detect the disconnection via ZK(disconnect()) or lack of heartbeats. + if (response.isFailed()) { + LOG(ERROR) << "Request for call type " << call.type() << " failed: " + << response.failure(); + return; + } + + if (call.type() == Call::SUBSCRIBE && + response.get().status == process::http::statuses[200] ) { + CHECK_EQ(response.get().type, http::Response::PIPE); + CHECK_SOME(response.get().reader); + + Pipe::Reader reader = response.get().reader.get(); + + auto deserializer = + lambda::bind(deserialize<Event>, contentType, lambda::_1); + + Owned<Reader<Event>> decoder( + new Reader<Event>(Decoder<Event>(deserializer), reader)); + + connection = Connection {reader, decoder}; + + read(); + return; + } + + if (call.type() != Call::SUBSCRIBE && + response.get().status == process::http::statuses[202] ) { + return; + } + + // We should be able to get here only for AuthN errors which is not + // yet supported for HTTP frameworks. The other possible scenario + // can be that the master was not able to de-serialize the Call + // message. Since we validate the Call messages before sending, this + // can only happen when a packet corruption happens. + error("Received unexpected '" + response.get().status + "' for " + + stringify(call.type()) + " call: " + response.get().body); + } + + void read() + { + connection.get().decoder->read() + .onAny(defer(self(), + &Self::_read, + connection.get().reader, + lambda::_1)); + } + + void _read(const Pipe::Reader& reader, const Future<Result<Event>>& event) + { + CHECK(!event.isDiscarded()); + + // Ignore enqueued events from the previous Subscribe call reader. + if (!connection.isSome() || connection.get().reader != reader) { + VLOG(1) << "Ignoring event from old stale connection"; + return; + } + + // This could happen if the master failed over while sending a response. + // It's fine to drop this as the scheduler would detect the + // disconnection via ZK(disconnect) or lack of heartbeats. + if (event.isFailed()) { + LOG(ERROR) << "Failed to decode the stream of events: " + << event.failure(); + return; + } + + if (!event.get().isSome()) { + // It's fine to drop this as the scheduler would detect the + // disconnection via ZK(disconnect) or lack of heartbeats. + LOG(ERROR) << "End-Of-File received from master." + << " The master closed the event stream"; + return; + } + + if (event.get().isError()) { + error("Failed to de-serialize event: " + event.get().error()); + } else { + receive(event.get().get(), false); + } + + read(); + } + + void receive(const Event& event, bool isLocallyInjected) + { + // Check if we're disconnected but received an event. + if (!isLocallyInjected && master.isNone()) { + LOG(WARNING) << "Ignoring " << stringify(event.type()) + << " event because we're disconnected"; + return; + } + + if (isLocallyInjected) { + VLOG(1) << "Enqueuing locally injected event " + << stringify(event.type()); + } + + // Queue up the event and invoke the 'received' callback if this + // is the first event (between now and when the 'received' + // callback actually gets invoked more events might get queued). + events.push(event); + + if (events.size() == 1) { + mutex.lock() + .then(defer(self(), &Self::_receive)) + .onAny(lambda::bind(&Mutex::unlock, mutex)); + } + } + + void disconnect() + { + if (connection.isSome()) { + if (!connection.get().reader.close()) { + LOG(WARNING) << "HTTP connection was already closed"; + } + } + + connection = None(); + } + private: + struct Connection + { + Pipe::Reader reader; + process::Owned<Reader<Event>> decoder; + }; + + Option<Connection> connection; + + ContentType contentType; + Mutex mutex; // Used to serialize the callback invocations. lambda::function<void(void)> connected; @@ -291,8 +478,14 @@ Mesos::Mesos( const lambda::function<void(void)>& disconnected, const lambda::function<void(const queue<Event>&)>& received) { - process = - new MesosProcess(master, connected, disconnected, received); + // TODO(anand): Make ContentType as a constructor argument. + process = new MesosProcess( + master, + ContentType::PROTOBUF, + connected, + disconnected, + received); + spawn(process); }
