Repository: mesos Updated Branches: refs/heads/master 4b4cba24d -> 90b107a24
Updated Framework struct in master for the http api. This change refactors the Framework struct in master to introduce support for http frameworks: * 'pid' becomes a optional field. * Added optional 'http' field. Review: https://reviews.apache.org/r/36318 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/90b107a2 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/90b107a2 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/90b107a2 Branch: refs/heads/master Commit: 90b107a249169c6fc8b8d398b675ab9bd2df633b Parents: 4b4cba2 Author: Anand Mazumdar <[email protected]> Authored: Tue Jul 28 11:53:45 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Tue Jul 28 12:19:31 2015 -0700 ---------------------------------------------------------------------- src/common/http.hpp | 8 ++ src/master/http.cpp | 6 +- src/master/master.cpp | 183 +++++++++++++++++++++++++++------------------ src/master/master.hpp | 105 ++++++++++++++++++++++++-- 4 files changed, 221 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/90b107a2/src/common/http.hpp ---------------------------------------------------------------------- diff --git a/src/common/http.hpp b/src/common/http.hpp index 765860f..9e4290f 100644 --- a/src/common/http.hpp +++ b/src/common/http.hpp @@ -38,6 +38,14 @@ class Task; extern const char APPLICATION_JSON[]; extern const char APPLICATION_PROTOBUF[]; +// Possible content-types that can be used as responses for +// the mesos Http API. +enum class ContentType +{ + PROTOBUF, + JSON +}; + JSON::Object model(const Resources& resources); JSON::Object model(const hashmap<std::string, Resources>& roleResources); JSON::Object model(const Attributes& attributes); http://git-wip-us.apache.org/repos/asf/mesos/blob/90b107a2/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index 3a1598f..3772e39 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -113,7 +113,11 @@ JSON::Object summarize(const Framework& framework) JSON::Object object; object.values["id"] = framework.id().value(); object.values["name"] = framework.info.name(); - object.values["pid"] = string(framework.pid); + + // Omit pid for http frameworks. + if (framework.pid.isSome()) { + object.values["pid"] = string(framework.pid.get()); + } // TODO(bmahler): Use these in the webui. object.values["used_resources"] = model(framework.totalUsedResources); http://git-wip-us.apache.org/repos/asf/mesos/blob/90b107a2/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index a8a195d..3e63184 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1659,7 +1659,7 @@ void Master::receive( return; } - if (from != framework->pid) { + if (framework->pid != from) { drop(from, call, "Call is not from registered framework"); return; } @@ -1844,41 +1844,41 @@ void Master::_registerFramework( FrameworkRegisteredMessage message; message.mutable_framework_id()->MergeFrom(framework->id()); message.mutable_master_info()->MergeFrom(info_); - send(from, message); + framework->send(message); return; } } + // TODO(vinod): Deprecate this in favor of authorization. + bool rootSubmissions = flags.root_submissions; + + if (frameworkInfo.user() == "root" && rootSubmissions == false) { + LOG(INFO) << "Framework " << frameworkInfo.name() << " at " << from + << " registering as root, but root submissions are disabled" + << " on this cluster"; + FrameworkErrorMessage message; + message.set_message("User 'root' is not allowed to run frameworks"); + send(from, message); + return; + } + // Assign a new FrameworkID. FrameworkInfo frameworkInfo_ = frameworkInfo; frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId()); - Framework* framework = new Framework(frameworkInfo_, from); + Framework* framework = new Framework(this, frameworkInfo_, from); LOG(INFO) << "Registering framework " << *framework << " with checkpointing " << (framework->info.checkpoint() ? "enabled" : "disabled") << " and capabilities " << framework->info.capabilities(); - // TODO(vinod): Deprecate this in favor of authorization. - bool rootSubmissions = flags.root_submissions; - - if (framework->info.user() == "root" && rootSubmissions == false) { - LOG(INFO) << "Framework " << *framework << " registering as root, but " - << "root submissions are disabled on this cluster"; - FrameworkErrorMessage message; - message.set_message("User 'root' is not allowed to run frameworks"); - send(from, message); - delete framework; - return; - } - addFramework(framework); FrameworkRegisteredMessage message; message.mutable_framework_id()->MergeFrom(framework->id()); message.mutable_master_info()->MergeFrom(info_); - send(framework->pid, message); + framework->send(message); } @@ -1952,6 +1952,7 @@ void Master::_reregisterFramework( const Future<Option<Error>>& validationError) { CHECK_READY(validationError); + if (validationError.get().isSome()) { LOG(INFO) << "Refusing re-registration of framework " << frameworkInfo.id() << " (" << frameworkInfo.name() << ") " << " at " << from @@ -2025,7 +2026,7 @@ void Master::_reregisterFramework( // info? LOG(INFO) << "Framework " << *framework << " failed over"; failoverFramework(framework, from); - } else if (from != framework->pid) { + } else if (framework->pid != from) { LOG(ERROR) << "Disallowing re-registration attempt of framework " << *framework << " because it is not expected from " << from; @@ -2063,7 +2064,7 @@ void Master::_reregisterFramework( FrameworkReregisteredMessage message; message.mutable_framework_id()->MergeFrom(frameworkInfo.id()); message.mutable_master_info()->MergeFrom(info_); - send(from, message); + framework->send(message); return; } } else { @@ -2071,7 +2072,7 @@ void Master::_reregisterFramework( // elected Mesos master to which either an existing scheduler or a // failed-over one is connecting. Create a Framework object and add // any tasks it has that have been reported by reconnecting slaves. - Framework* framework = new Framework(frameworkInfo, from); + Framework* framework = new Framework(this, frameworkInfo, from); // TODO(benh): Check for root submissions like above! @@ -2098,7 +2099,7 @@ void Master::_reregisterFramework( FrameworkRegisteredMessage message; message.mutable_framework_id()->MergeFrom(framework->id()); message.mutable_master_info()->MergeFrom(info_); - send(framework->pid, message); + framework->send(message); } CHECK(frameworks.registered.contains(frameworkInfo.id())) @@ -2172,7 +2173,7 @@ void Master::deactivateFramework( return; } - if (from != framework->pid) { + if (framework->pid != from) { LOG(WARNING) << "Ignoring deactivate framework message for framework " << *framework << " because it is not expected from " << from; @@ -2191,9 +2192,17 @@ void Master::disconnect(Framework* framework) framework->connected = false; - // Remove the framework from authenticated. This is safe because - // a framework will always reauthenticate before (re-)registering. - authenticated.erase(framework->pid); + if (framework->pid.isSome()) { + // Remove the framework from authenticated. This is safe because + // a framework will always reauthenticate before (re-)registering. + authenticated.erase(framework->pid.get()); + } else { + CHECK_SOME(framework->http); + + // Close the HTTP connection, which may already have + // been closed due to scheduler disconnection. + framework->http.get().writer.close(); + } deactivate(framework); } @@ -2275,7 +2284,7 @@ void Master::resourceRequest( return; } - if (from != framework->pid) { + if (framework->pid != from) { LOG(WARNING) << "Ignoring resource request message from framework " << *framework << " because it is not expected from " << from; @@ -2329,12 +2338,11 @@ void Master::launchTasks( return; } - if (from != framework->pid) { + if (framework->pid != from) { LOG(WARNING) << "Ignoring launch tasks message for offers " << stringify(offerIds) - << " of framework " << frameworkId << " from '" << from - << "' because it is not from the registered framework '" - << framework->pid << "'"; + << " from '" << from << "' because it is not from the" + << " registered framework " << *framework; return; } @@ -2882,7 +2890,11 @@ void Master::_accept( RunTaskMessage message; message.mutable_framework()->MergeFrom(framework->info); message.mutable_framework_id()->MergeFrom(framework->id()); - message.set_pid(framework->pid); + + // TODO(anand): We set 'pid' to UPID() for http frameworks + // as 'pid' was made optional in 0.24.0. In 0.25.0, we + // no longer have to set pid here for http frameowrks. + message.set_pid(framework->pid.getOrElse(UPID())); message.mutable_task()->MergeFrom(task_); if (HookManager::hooksAvailable()) { @@ -2958,7 +2970,7 @@ void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId) return; } - if (from != framework->pid) { + if (framework->pid != from) { LOG(WARNING) << "Ignoring revive offers message for framework " << *framework << " because it is not expected from " << from; @@ -2997,7 +3009,7 @@ void Master::killTask( return; } - if (from != framework->pid) { + if (framework->pid != from) { LOG(WARNING) << "Ignoring kill task message for task " << taskId << " of framework " << *framework << " because it is not expected from " << from; @@ -3117,7 +3129,7 @@ void Master::statusUpdateAcknowledgement( return; } - if (from != framework->pid) { + if (framework->pid != from) { LOG(WARNING) << "Ignoring status update acknowledgement " << UUID::fromBytes(uuid) << " for task " << taskId << " of framework " << *framework @@ -3232,11 +3244,11 @@ void Master::schedulerMessage( return; } - if (from != framework->pid) { - LOG(WARNING) << "Ignoring framework message" - << " for executor '" << executorId << "'" - << " of framework " << *framework - << " because it is not expected from " << from; + if (framework->pid != from) { + LOG(WARNING) + << "Ignoring framework message for executor " << executorId + << " of framework " << *framework + << " because it is not expected from " << from; metrics->invalid_framework_to_executor_messages++; return; } @@ -3307,7 +3319,8 @@ void Master::executorMessage( message.mutable_framework_id()->MergeFrom(frameworkId); message.mutable_executor_id()->MergeFrom(executorId); message.set_data(data); - send(framework->pid, message); + + framework->send(message); metrics->valid_executor_to_framework_messages++; } @@ -3712,16 +3725,23 @@ void Master::__reregisterSlave(Slave* slave, const vector<Task>& tasks) CHECK_NOTNULL(slave); // Send the latest framework pids to the slave. - hashset<UPID> pids; + hashset<FrameworkID> ids; + foreach (const Task& task, tasks) { Framework* framework = getFramework(task.framework_id()); - if (framework != NULL && !pids.contains(framework->pid)) { + + if (framework != NULL && !ids.contains(framework->id())) { UpdateFrameworkMessage message; message.mutable_framework_id()->MergeFrom(framework->id()); - message.set_pid(framework->pid); + + // TODO(anand): We set 'pid' to UPID() for http frameworks + // as 'pid' was made optional in 0.24.0. In 0.25.0, we + // no longer have to set pid here for http frameowrks. + message.set_pid(framework->pid.getOrElse(UPID())); + send(slave->pid, message); - pids.insert(framework->pid); + ids.insert(framework->id()); } } @@ -3905,7 +3925,7 @@ void Master::forward( StatusUpdateMessage message; message.mutable_update()->MergeFrom(update); message.set_pid(acknowledgee); - send(framework->pid, message); + framework->send(message); } @@ -3977,7 +3997,7 @@ void Master::exitedExecutor( message.mutable_slave_id()->CopyFrom(slaveId); message.set_status(status); - send(framework->pid, message); + framework->send(message); } @@ -4064,7 +4084,7 @@ void Master::reconcileTasks( return; } - if (from != framework->pid) { + if (framework->pid != from) { LOG(WARNING) << "Ignoring reconcile tasks message for framework " << *framework << " because it is not expected from " << from; @@ -4106,7 +4126,7 @@ void Master::_reconcileTasks( // much logging. StatusUpdateMessage message; message.mutable_update()->CopyFrom(update); - send(framework->pid, message); + framework->send(message); } foreachvalue (Task* task, framework->tasks) { @@ -4139,7 +4159,7 @@ void Master::_reconcileTasks( // much logging. StatusUpdateMessage message; message.mutable_update()->CopyFrom(update); - send(framework->pid, message); + framework->send(message); } return; @@ -4243,7 +4263,7 @@ void Master::_reconcileTasks( // much logging. StatusUpdateMessage message; message.mutable_update()->CopyFrom(update.get()); - send(framework->pid, message); + framework->send(message); } } } @@ -4405,7 +4425,7 @@ void Master::offer(const FrameworkID& frameworkId, LOG(INFO) << "Sending " << message.offers().size() << " offers to framework " << *framework; - send(framework->pid, message); + framework->send(message); } @@ -4718,9 +4738,15 @@ void Master::addFramework(Framework* framework) CHECK(!frameworks.registered.contains(framework->id())) << "Framework " << *framework << " already exists!"; + CHECK_SOME(framework->pid) << "adding http framework not implemented"; + frameworks.registered[framework->id()] = framework; - link(framework->pid); + link(framework->pid.get()); + + // TODO(anand): For http frameworks, add a readerClosed() + // callback to invoke Master::exited() when the connection + // closes. // Enforced by Master::registerFramework. CHECK(roles.contains(framework->info.role())) @@ -4742,13 +4768,13 @@ void Master::addFramework(Framework* framework) // If the framework is authenticated, its principal should be in // 'authenticated'. Otherwise look if it's supplied in // FrameworkInfo. - Option<string> principal = authenticated.get(framework->pid); + Option<string> principal = authenticated.get(framework->pid.get()); if (principal.isNone() && framework->info.has_principal()) { principal = framework->info.principal(); } - CHECK(!frameworks.principals.contains(framework->pid)); - frameworks.principals.put(framework->pid, principal); + CHECK(!frameworks.principals.contains(framework->pid.get())); + frameworks.principals.put(framework->pid.get(), principal); // Export framework metrics if a principal is specified. if (principal.isSome()) { @@ -4767,7 +4793,9 @@ void Master::addFramework(Framework* framework) // event of a scheduler failover. void Master::failoverFramework(Framework* framework, const UPID& newPid) { - const UPID oldPid = framework->pid; + CHECK_SOME(framework->pid) << "http framework failover not implemented"; + + const UPID oldPid = framework->pid.get(); // There are a few failover cases to consider: // 1. The pid has changed. In this case we definitely want to @@ -4795,7 +4823,7 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) FrameworkRegisteredMessage message; message.mutable_framework_id()->MergeFrom(framework->id()); message.mutable_master_info()->MergeFrom(info_); - send(newPid, message); + framework->send(message); // Remove the framework's offers (if they weren't removed before). // We do this after we have updated the pid and sent the framework @@ -4912,6 +4940,8 @@ void Master::removeFramework(Framework* framework) // TODO(benh): unlink(framework->pid); + // TODO(anand): For http frameworks, close the connection. + framework->unregisteredTime = Clock::now(); // The completedFramework buffer now owns the framework pointer. @@ -4923,21 +4953,26 @@ void Master::removeFramework(Framework* framework) roles[framework->info.role()]->removeFramework(framework); - // Remove the framework from authenticated. - authenticated.erase(framework->pid); - - CHECK(frameworks.principals.contains(framework->pid)); - const Option<string> principal = frameworks.principals[framework->pid]; - - frameworks.principals.erase(framework->pid); - - // Remove the framework's message counters. - if (principal.isSome()) { - // Remove the metrics for the principal if this framework is the - // last one with this principal. - if (!frameworks.principals.containsValue(principal.get())) { - CHECK(metrics->frameworks.contains(principal.get())); - metrics->frameworks.erase(principal.get()); + // TODO(anand): This only works for pid based frameworks. We would + // need similar authentication logic for http frameworks. + if (framework->pid.isSome()) { + // Remove the framework from authenticated. + authenticated.erase(framework->pid.get()); + + CHECK(frameworks.principals.contains(framework->pid.get())); + const Option<string> principal = + frameworks.principals[framework->pid.get()]; + + frameworks.principals.erase(framework->pid.get()); + + // Remove the framework's message counters. + if (principal.isSome()) { + // Remove the metrics for the principal if this framework is the + // last one with this principal. + if (!frameworks.principals.containsValue(principal.get())) { + CHECK(metrics->frameworks.contains(principal.get())); + metrics->frameworks.erase(principal.get()); + } } } @@ -5220,7 +5255,7 @@ void Master::_removeSlave( << "after recovering"; LostSlaveMessage message; message.mutable_slave_id()->MergeFrom(slaveInfo.id()); - send(framework->pid, message); + framework->send(message); } } @@ -5465,7 +5500,7 @@ void Master::removeOffer(Offer* offer, bool rescind) if (rescind) { RescindResourceOfferMessage message; message.mutable_offer_id()->MergeFrom(offer->id()); - send(framework->pid, message); + framework->send(message); } // Remove and cancel offer removal timers. Canceling the Timers is http://git-wip-us.apache.org/repos/asf/mesos/blob/90b107a2/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 2c924ad..879e3d8 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -52,7 +52,9 @@ #include <stout/hashset.hpp> #include <stout/multihashmap.hpp> #include <stout/option.hpp> +#include <stout/recordio.hpp> +#include "common/http.hpp" #include "common/protobuf_utils.hpp" #include "common/resources_utils.hpp" @@ -830,6 +832,7 @@ private: Master(const Master&); // No copying. Master& operator = (const Master&); // No assigning. + friend struct Framework; friend struct Metrics; // NOTE: Since 'getOffer' and 'slaves' are protected, @@ -1211,15 +1214,22 @@ private: }; +inline std::ostream& operator << ( + std::ostream& stream, + const Framework& framework); + + // Information about a connected or completed framework. // TODO(bmahler): Keeping the task and executor information in sync // across the Slave and Framework structs is error prone! struct Framework { - Framework(const FrameworkInfo& _info, + Framework(Master* const _master, + const FrameworkInfo& _info, const process::UPID& _pid, const process::Time& time = process::Clock::now()) - : info(_info), + : master(_master), + info(_info), pid(_pid), connected(true), active(true), @@ -1227,7 +1237,49 @@ struct Framework reregisteredTime(time), completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {} - ~Framework() {} + Framework(Master* const _master, + const FrameworkInfo& _info, + const process::http::Pipe::Writer& writer, + ContentType contentType, + const process::Time& time = process::Clock::now()) + : master(_master), + info(_info), + connected(true), + active(true), + registeredTime(time), + reregisteredTime(time), + completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) + { + // TODO(anand): This logic needs to be invoked each + // time the framework connects via http. Move it to + // a method instead, that gets invoked from + // addFramework and failoverFrameowrk. + + auto serialize = [contentType](const scheduler::Event& event) { + switch (contentType) { + case ContentType::PROTOBUF: { + return event.SerializeAsString(); + } + case ContentType::JSON: { + JSON::Object object = JSON::Protobuf(event); + return stringify(object); + } + } + }; + + auto encoder = recordio::Encoder<scheduler::Event>(serialize); + + http = Http {writer, encoder}; + } + + ~Framework() + { + if (http.isSome() && connected) { + if (!http.get().writer.close()) { + LOG(WARNING) << "Failed to close HTTP pipe for " << *this; + } + } + } Task* getTask(const TaskID& taskId) { @@ -1270,6 +1322,29 @@ struct Framework } } + // Sends a message to the connected framework. + template <typename Message> + void send(const Message& message) + { + if (!connected) { + LOG(WARNING) << "Master attempted to send message to disconnected" + << " framework " << *this; + return; + } + + if (http.isSome()) { + const scheduler::Event event = protobuf::scheduler::event(message); + + if (!http.get().writer.write(http.get().encoder.encode(event))) { + LOG(WARNING) << "Unable to send event to framework " << *this << ":" + << " connection closed"; + } + } else { + CHECK_SOME(pid); + master->send(pid.get(), message); + } + } + void addCompletedTask(const Task& task) { // TODO(adam-mesos): Check if completed task already exists. @@ -1420,9 +1495,22 @@ struct Framework } } + Master* const master; + FrameworkInfo info; - process::UPID pid; + struct Http + { + process::http::Pipe::Writer writer; + recordio::Encoder<scheduler::Event> encoder; + }; + + // Frameworks can either be connected via HTTP or by message + // passing (scheduler driver). Exactly one of 'http' and 'pid' + // will be set according to the last connection made by the + // framework. + Option<Http> http; + Option<process::UPID> pid; // Framework becomes disconnected when the socket closes. bool connected; @@ -1493,8 +1581,13 @@ inline std::ostream& operator << ( { // TODO(vinod): Also log the hostname once FrameworkInfo is properly // updated on framework failover (MESOS-1784). - return stream << framework.id() << " (" << framework.info.name() - << ") at " << framework.pid; + stream << framework.id() << " (" << framework.info.name() << ")"; + + if (framework.pid.isSome()) { + stream << " at " << framework.pid.get(); + } + + return stream; }
