Added HttpConnection to support http frameworks in the master. Review: https://reviews.apache.org/r/36720
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7f352ef8 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7f352ef8 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7f352ef8 Branch: refs/heads/master Commit: 7f352ef886f3116e4bef23b235d87b3182354908 Parents: d44419a Author: Anand Mazumdar <[email protected]> Authored: Thu Aug 6 10:37:21 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Thu Aug 6 11:25:17 2015 -0700 ---------------------------------------------------------------------- src/common/http.cpp | 19 ++++++++++ src/common/http.hpp | 8 +++++ src/master/master.cpp | 51 ++++++++++++++------------ src/master/master.hpp | 89 +++++++++++++++++++++++++--------------------- 4 files changed, 105 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/7f352ef8/src/common/http.cpp ---------------------------------------------------------------------- diff --git a/src/common/http.cpp b/src/common/http.cpp index a74c51d..e2ff48c 100644 --- a/src/common/http.cpp +++ b/src/common/http.cpp @@ -27,6 +27,7 @@ #include <stout/foreach.hpp> #include <stout/protobuf.hpp> #include <stout/stringify.hpp> +#include <stout/unreachable.hpp> #include "common/attributes.hpp" #include "common/http.hpp" @@ -45,6 +46,24 @@ const char APPLICATION_JSON[] = "application/json"; const char APPLICATION_PROTOBUF[] = "application/x-protobuf"; +string serialize( + ContentType contentType, + const google::protobuf::Message& message) +{ + switch (contentType) { + case ContentType::PROTOBUF: { + return message.SerializeAsString(); + } + case ContentType::JSON: { + JSON::Object object = JSON::Protobuf(message); + return stringify(object); + } + } + + UNREACHABLE(); +} + + // TODO(bmahler): Kill these in favor of automatic Proto->JSON // Conversion (when it becomes available). http://git-wip-us.apache.org/repos/asf/mesos/blob/7f352ef8/src/common/http.hpp ---------------------------------------------------------------------- diff --git a/src/common/http.hpp b/src/common/http.hpp index 9e4290f..98a1270 100644 --- a/src/common/http.hpp +++ b/src/common/http.hpp @@ -46,6 +46,14 @@ enum class ContentType JSON }; + +// Serializes a protobuf message for transmission +// based on the HTTP content type. +std::string serialize( + ContentType contentType, + const google::protobuf::Message& message); + + JSON::Object model(const Resources& resources); JSON::Object model(const hashmap<std::string, Resources>& roleResources); JSON::Object model(const Attributes& attributes); http://git-wip-us.apache.org/repos/asf/mesos/blob/7f352ef8/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 50b9824..d699e4b 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -960,10 +960,11 @@ void Master::finalize() } -void Master::exited(const FrameworkID& frameworkId, Pipe::Writer writer) +void Master::exited(const FrameworkID& frameworkId, const HttpConnection& http) { foreachvalue (Framework* framework, frameworks.registered) { - if (framework->http.isSome() && framework->http.get().writer == writer) { + if (framework->http.isSome() && + framework->http.get().writer == http.writer) { CHECK_EQ(frameworkId, framework->id()); _exited(framework); return; @@ -4665,15 +4666,18 @@ void Master::addFramework(Framework* framework) CHECK(!frameworks.registered.contains(framework->id())) << "Framework " << *framework << " already exists!"; - CHECK_SOME(framework->pid) << "adding http framework not implemented"; - frameworks.registered[framework->id()] = framework; - link(framework->pid.get()); + if (framework->pid.isSome()) { + link(framework->pid.get()); + } else { + CHECK_SOME(framework->http); + + HttpConnection http = framework->http.get(); - // TODO(anand): For http frameworks, add a readerClosed() - // callback to invoke Master::exited() when the connection - // closes. + http.closed() + .onAny(defer(self(), &Self::exited, framework->id(), http)); + } // Enforced by Master::registerFramework. CHECK(roles.contains(framework->info.role())) @@ -4695,22 +4699,25 @@ void Master::addFramework(Framework* framework) // If the framework is authenticated, its principal should be in // 'authenticated'. Otherwise look if it's supplied in // FrameworkInfo. - Option<string> principal = authenticated.get(framework->pid.get()); - if (principal.isNone() && framework->info.has_principal()) { - principal = framework->info.principal(); - } + if (framework->pid.isSome()) { + Option<string> principal = authenticated.get(framework->pid.get()); + if (principal.isNone() && framework->info.has_principal()) { + principal = framework->info.principal(); + } - CHECK(!frameworks.principals.contains(framework->pid.get())); - frameworks.principals.put(framework->pid.get(), principal); + CHECK(!frameworks.principals.contains(framework->pid.get())); + frameworks.principals.put(framework->pid.get(), principal); - // Export framework metrics if a principal is specified. - if (principal.isSome()) { - // Create new framework metrics if this framework is the first - // one of this principal. Otherwise existing metrics are reused. - if (!metrics->frameworks.contains(principal.get())) { - metrics->frameworks.put( - principal.get(), - Owned<Metrics::Frameworks>(new Metrics::Frameworks(principal.get()))); + // Export framework metrics if a principal is specified. + if (principal.isSome()) { + // Create new framework metrics if this framework is the first + // one of this principal. Otherwise existing metrics are reused. + if (!metrics->frameworks.contains(principal.get())) { + metrics->frameworks.put( + principal.get(), + Owned<Metrics::Frameworks>( + new Metrics::Frameworks(principal.get()))); + } } } } http://git-wip-us.apache.org/repos/asf/mesos/blob/7f352ef8/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 30a2550..53420ca 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -92,6 +92,7 @@ class SlaveObserver; struct BoundedRateLimiter; struct Framework; +struct HttpConnection; struct Role; @@ -512,9 +513,7 @@ protected: virtual void visit(const process::ExitedEvent& event); virtual void exited(const process::UPID& pid); - void exited( - const FrameworkID& frameworkId, - process::http::Pipe::Writer writer); + void exited(const FrameworkID& frameworkId, const HttpConnection& http); void _exited(Framework* framework); // Invoked when the message is ready to be executed after @@ -1225,6 +1224,37 @@ inline std::ostream& operator << ( const Framework& framework); +// Represents the streaming HTTP connection to a framework. +struct HttpConnection +{ + HttpConnection(const process::http::Pipe::Writer& _writer, + ContentType _contentType) + : writer(_writer), + contentType(_contentType), + encoder(lambda::bind(serialize, contentType, lambda::_1)) {} + + // Converts the message to an Event before sending. + template <typename Message> + bool send(const Message& message) { + return writer.write(encoder.encode(protobuf::scheduler::event(message))); + } + + bool close() + { + return writer.close(); + } + + process::Future<Nothing> closed() const + { + return writer.readerClosed(); + } + + process::http::Pipe::Writer writer; + ContentType contentType; + recordio::Encoder<scheduler::Event> encoder; +}; + + // Information about a connected or completed framework. // TODO(bmahler): Keeping the task and executor information in sync // across the Slave and Framework structs is error prone! @@ -1245,46 +1275,21 @@ struct Framework Framework(Master* const _master, const FrameworkInfo& _info, - const process::http::Pipe::Writer& writer, - ContentType contentType, + const HttpConnection& _http, const process::Time& time = process::Clock::now()) : master(_master), info(_info), + http(_http), connected(true), active(true), registeredTime(time), reregisteredTime(time), - completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) - { - // TODO(anand): This logic needs to be invoked each - // time the framework connects via http. Move it to - // a method instead, that gets invoked from - // addFramework and failoverFrameowrk. - - auto serialize = [contentType](const scheduler::Event& event) { - switch (contentType) { - case ContentType::PROTOBUF: { - return event.SerializeAsString(); - } - case ContentType::JSON: { - JSON::Object object = JSON::Protobuf(event); - return stringify(object); - } - } - }; - - auto encoder = recordio::Encoder<scheduler::Event>(serialize); - - http = Http {writer, encoder}; - - http.get().writer.readerClosed(). - onAny(defer(master->self(), &Master::exited, id(), writer)); - } + completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {} ~Framework() { if (http.isSome() && connected) { - if (!http.get().writer.close()) { + if (!http.get().close()) { LOG(WARNING) << "Failed to close HTTP pipe for " << *this; } } @@ -1343,7 +1348,7 @@ struct Framework if (http.isSome()) { const scheduler::Event event = protobuf::scheduler::event(message); - if (!http.get().writer.write(http.get().encoder.encode(event))) { + if (!http.get().send(message)) { LOG(WARNING) << "Unable to send event to framework " << *this << ":" << " connection closed"; } @@ -1503,21 +1508,25 @@ struct Framework } } + void updateConnection(const HttpConnection& other) + { + // Close the existing connection if it has changed. + if (http.isSome() && http.get().writer != other.writer) { + http.get().close(); + } + + http = other; + } + Master* const master; FrameworkInfo info; - struct Http - { - process::http::Pipe::Writer writer; - recordio::Encoder<scheduler::Event> encoder; - }; - // Frameworks can either be connected via HTTP or by message // passing (scheduler driver). Exactly one of 'http' and 'pid' // will be set according to the last connection made by the // framework. - Option<Http> http; + Option<HttpConnection> http; Option<process::UPID> pid; // Framework becomes disconnected when the socket closes.
