Repository: mesos Updated Branches: refs/heads/master ad1d6fca2 -> d24b3ad64
Implemented the SUBSCRIBE call for http schedulers in the master. Review: https://reviews.apache.org/r/36720 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d24b3ad6 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d24b3ad6 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d24b3ad6 Branch: refs/heads/master Commit: d24b3ad6496cafec8f8ea4b02aa106a30e5c1d75 Parents: ad1d6fc Author: Anand Mazumdar <[email protected]> Authored: Thu Aug 6 22:04:01 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Fri Aug 7 00:13:57 2015 -0700 ---------------------------------------------------------------------- src/master/http.cpp | 21 ++- src/master/master.cpp | 336 ++++++++++++++++++++++++++++++++++++++------- src/master/master.hpp | 39 +++++- 3 files changed, 342 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/d24b3ad6/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index 76e7080..7d7e562 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -70,6 +70,7 @@ using process::http::InternalServerError; using process::http::NotFound; using process::http::NotImplemented; using process::http::OK; +using process::http::Pipe; using process::http::TemporaryRedirect; using process::http::Unauthorized; using process::http::UnsupportedMediaType; @@ -375,10 +376,24 @@ Future<Response> Master::Http::call(const Request& request) const responseContentType = ContentType::PROTOBUF; } - // Silence unused warning for now. - (void)responseContentType; + switch (call.type()) { + case scheduler::Call::SUBSCRIBE: { + Pipe pipe; + OK ok; - // TODO(anand): Handle the call. + ok.type = Response::PIPE; + ok.reader = pipe.reader(); + + HttpConnection http {pipe.writer(), responseContentType}; + master->subscribe(http, call.subscribe()); + + return ok; + } + default: + // TODO(bmahler): Log fatally here once all calls are + // implemented, since validation should catch this. + break; + } return NotImplemented(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/d24b3ad6/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index a7b8527..0330f94 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1772,6 +1772,208 @@ void Master::reregisterFramework( void Master::subscribe( + HttpConnection http, + const scheduler::Call::Subscribe& subscribe) +{ + // TODO(anand): Authenticate the framework. + + const FrameworkInfo& frameworkInfo = subscribe.framework_info(); + Option<Error> validationError = None(); + + // TODO(vinod): Deprecate this in favor of ACLs. + if (validationError.isNone() && !roles.contains(frameworkInfo.role())) { + validationError = Error("Role '" + frameworkInfo.role() + "' is not" + + " present in the master's --roles"); + } + + // TODO(vinod): Deprecate this in favor of authorization. + if (validationError.isNone() && + frameworkInfo.user() == "root" && !flags.root_submissions) { + validationError = Error("User 'root' is not allowed to run frameworks" + " without --root_submissions set"); + } + + if (validationError.isNone() && frameworkInfo.has_id()) { + foreach (const shared_ptr<Framework>& framework, frameworks.completed) { + if (framework->id() == frameworkInfo.id()) { + // This could happen if a framework tries to subscribe after + // its failover timeout has elapsed or it unregistered itself + // by calling 'stop()' on the scheduler driver. + // + // TODO(vinod): Master should persist admitted frameworks to the + // registry and remove them from it after failover timeout. + validationError = Error("Framework has been removed"); + break; + } + } + } + + LOG(INFO) << "Received registration request for" + << " http framework '" << frameworkInfo.name() << "'"; + + if (validationError.isSome()) { + LOG(INFO) << "Refusing subscription of framework" + << " '" << frameworkInfo.name() << "': " + << validationError.get().message; + + FrameworkErrorMessage message; + message.set_message(validationError.get().message); + + http.send(message); + http.close(); + return; + } + + // TODO(anand): Authorize the framework. + this->_subscribe(http, subscribe); +} + + +void Master::_subscribe( + HttpConnection http, + const scheduler::Call::Subscribe& subscribe) +{ + const FrameworkInfo& frameworkInfo = subscribe.framework_info(); + + LOG(INFO) << "Subscribing framework " << frameworkInfo.name() + << " with checkpointing " + << (frameworkInfo.checkpoint() ? "enabled" : "disabled") + << " and capabilities " << frameworkInfo.capabilities(); + + if (!frameworkInfo.has_id() || frameworkInfo.id() == "") { + // If we are here the framework is subscribing for the first time. + // Assign a new FrameworkID. + FrameworkInfo frameworkInfo_ = frameworkInfo; + frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId()); + + Framework* framework = new Framework(this, frameworkInfo_, http); + + addFramework(framework); + + FrameworkRegisteredMessage message; + message.mutable_framework_id()->MergeFrom(framework->id()); + message.mutable_master_info()->MergeFrom(info_); + framework->send(message); + return; + } + + // If we are here framework has already been assigned an id. + CHECK(!frameworkInfo.id().value().empty()); + + if (frameworks.registered.contains(frameworkInfo.id())) { + // Using the "force" field of the scheduler allows us to keep a + // scheduler that got partitioned but didn't die (in ZooKeeper + // speak this means didn't lose their session) and then + // eventually tried to connect to this master even though + // another instance of their scheduler has reconnected. + + Framework* framework = + CHECK_NOTNULL(frameworks.registered[frameworkInfo.id()]); + + // Update the framework's info fields based on those passed during + // re-registration. + LOG(INFO) << "Updating info for framework " << framework->id(); + + framework->updateFrameworkInfo(frameworkInfo); + allocator->updateFramework(framework->id(), framework->info); + + framework->reregisteredTime = Clock::now(); + + if (subscribe.force()) { + LOG(INFO) << "Framework " << *framework << " failed over"; + failoverFramework(framework, http); + } else if (framework->connected) { + // Note that if the scheduler is retrying we expect it + // to close its old connection. But, the master may not + // realize that the connection is closed before the retry + // occurs so we may kick off a scheduler unnecessarily. + LOG(ERROR) << "Disallowing subscription attempt" + << " of framework " << *framework + << " because it is already connected"; + + FrameworkErrorMessage message; + message.set_message("Framework is already connected"); + + http.send(message); + http.close(); + return; + } else { + LOG(INFO) << "Allowing framework " << *framework + << " to subcribe with an already used id"; + + // Convert the framework to an http framework if it was + // pid based in the past. + if (framework->pid.isSome()) { + framework->pid = None(); + } + + framework->connected = true; + framework->updateConnection(http); + + http.closed() + .onAny(defer(self(), &Self::exited, framework->id(), http)); + + // Reactivate the framework. + if (!framework->active) { + framework->active = true; + allocator->activateFramework(framework->id()); + } + + FrameworkReregisteredMessage message; + message.mutable_framework_id()->MergeFrom(framework->id()); + message.mutable_master_info()->MergeFrom(info_); + framework->send(message); + } + } else { + // We don't have a framework with this ID, so we must be a newly + // elected Mesos master to which either an existing scheduler or a + // failed-over one is connecting. Create a Framework object and add + // any tasks it has that have been reported by reconnecting slaves. + Framework* framework = new Framework(this, frameworkInfo, http); + + // Add active tasks and executors to the framework. + foreachvalue (Slave* slave, slaves.registered) { + foreachvalue (Task* task, slave->tasks[framework->id()]) { + framework->addTask(task); + } + foreachvalue (const ExecutorInfo& executor, + slave->executors[framework->id()]) { + framework->addExecutor(slave->id, executor); + } + } + + // N.B. Need to add the framework _after_ we add its tasks + // (above) so that we can properly determine the resources it's + // currently using! + addFramework(framework); + + FrameworkReregisteredMessage message; + message.mutable_framework_id()->MergeFrom(framework->id()); + message.mutable_master_info()->MergeFrom(info_); + framework->send(message); + } + + CHECK(frameworks.registered.contains(frameworkInfo.id())) + << "Unknown framework " << frameworkInfo.id() + << " (" << frameworkInfo.name() << ")"; + + // Broadcast the new framework pid to all the slaves. We have to + // broadcast because an executor might be running on a slave but + // it currently isn't running any tasks. + foreachvalue (Slave* slave, slaves.registered) { + UpdateFrameworkMessage message; + message.mutable_framework_id()->MergeFrom(frameworkInfo.id()); + + // TODO(anand): We set 'pid' to UPID() for http frameworks + // as 'pid' was made optional in 0.24.0. In 0.25.0, we + // no longer have to set pid here for http frameowrks. + message.set_pid(UPID()); + send(slave->pid, message); + } +} + + +void Master::subscribe( const UPID& from, const scheduler::Call::Subscribe& subscribe) { @@ -1787,8 +1989,12 @@ void Master::subscribe( << " framework '" << frameworkInfo.name() << "' at " << from << " because authentication is still in progress"; + // Need to disambiguate for the compiler. + void (Master::*f)(const UPID&, const scheduler::Call::Subscribe&) + = &Self::subscribe; + authenticating[from] - .onReady(defer(self(), &Self::subscribe, from, subscribe)); + .onReady(defer(self(), f, from, subscribe)); return; } @@ -1909,7 +2115,6 @@ void Master::_subscribe( if (!frameworkInfo.has_id() || frameworkInfo.id().value().empty()) { // If we are here the framework is subscribing for the first time. - // Check if this framework is already subscribed (because it retries). foreachvalue (Framework* framework, frameworks.registered) { if (framework->pid == from) { @@ -1942,16 +2147,12 @@ void Master::_subscribe( // If we are here framework has already been assigned an id. CHECK(!frameworkInfo.id().value().empty()); - if (frameworks.registered.count(frameworkInfo.id()) > 0) { + if (frameworks.registered.contains(frameworkInfo.id())) { // Using the "force" field of the scheduler allows us to keep a // scheduler that got partitioned but didn't die (in ZooKeeper // speak this means didn't lose their session) and then // eventually tried to connect to this master even though - // another instance of their scheduler has reconnected. This - // might not be an issue in the future when the - // master/allocator launches the scheduler can get restarted - // (if necessary) by the master and the master will always - // know which scheduler is the correct one. + // another instance of their scheduler has reconnected. Framework* framework = CHECK_NOTNULL(frameworks.registered[frameworkInfo.id()]); @@ -1959,8 +2160,8 @@ void Master::_subscribe( // Update the framework's info fields based on those passed during // subscription. LOG(INFO) << "Updating info for framework " << framework->id(); - framework->updateFrameworkInfo(frameworkInfo); + framework->updateFrameworkInfo(frameworkInfo); allocator->updateFramework(framework->id(), framework->info); framework->reregisteredTime = Clock::now(); @@ -1974,9 +2175,9 @@ void Master::_subscribe( LOG(INFO) << "Framework " << *framework << " failed over"; failoverFramework(framework, from); } else if (framework->pid != from) { - LOG(ERROR) - << "Disallowing subscription attempt of framework " << *framework - << " because it is not expected from " << from; + LOG(ERROR) << "Disallowing subscription attempt of" + << " framework " << *framework + << " because it is not expected from " << from; FrameworkErrorMessage message; message.set_message("Framework failed over"); send(from, message); @@ -1998,6 +2199,7 @@ void Master::_subscribe( removeOffer(offer, true); // Rescind. } + // TODO(bmahler): Shouldn't this re-link with the scheduler? framework->connected = true; // Reactivate the framework. @@ -2053,8 +2255,7 @@ void Master::_subscribe( // Broadcast the new framework pid to all the slaves. We have to // broadcast because an executor might be running on a slave but - // it currently isn't running any tasks. This could be a - // potential scalability issue ... + // it currently isn't running any tasks. foreachvalue (Slave* slave, slaves.registered) { UpdateFrameworkMessage message; message.mutable_framework_id()->MergeFrom(frameworkInfo.id()); @@ -2128,7 +2329,7 @@ void Master::disconnect(Framework* framework) // Close the HTTP connection, which may already have // been closed due to scheduler disconnection. - framework->http.get().writer.close(); + framework->http.get().close(); } deactivate(framework); @@ -4720,13 +4921,48 @@ void Master::addFramework(Framework* framework) } +void Master::failoverFramework(Framework* framework, const HttpConnection& http) +{ + // Notify the old connected framework that it has failed over. + // Note that this may be a retry in which case we'll shut down + // the scheduler unnecessarily. + if (framework->connected) { + FrameworkErrorMessage message; + message.set_message("Framework failed over"); + framework->send(message); + } + + // If this is an upgrade, clear the authentication related data. + if (framework->pid.isSome()) { + authenticated.erase(framework->pid.get()); + + CHECK(frameworks.principals.contains(framework->pid.get())); + Option<string> principal = frameworks.principals[framework->pid.get()]; + + frameworks.principals.erase(framework->pid.get()); + + // Remove the metrics for the principal if this framework is the + // last one with this principal. + if (principal.isSome() && + !frameworks.principals.containsValue(principal.get())) { + CHECK(metrics->frameworks.contains(principal.get())); + metrics->frameworks.erase(principal.get()); + } + } + + framework->updateConnection(http); + http.closed() + .onAny(defer(self(), &Self::exited, framework->id(), http)); + + _failoverFramework(framework); +} + + // Replace the scheduler for a framework with a new process ID, in the // event of a scheduler failover. void Master::failoverFramework(Framework* framework, const UPID& newPid) { - CHECK_SOME(framework->pid) << "http framework failover not implemented"; - - const UPID oldPid = framework->pid.get(); + const Option<UPID> oldPid = framework->pid; // There are a few failover cases to consider: // 1. The pid has changed. In this case we definitely want to @@ -4738,25 +4974,31 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) // scheduler as it is necessarily dead. // 2.2 This is a duplicate message. In this case, the scheduler // has not failed over, so we do not want to shut it down. - if (oldPid != newPid) { + if (oldPid.isSome() && oldPid != newPid) { FrameworkErrorMessage message; message.set_message("Framework failed over"); - send(oldPid, message); + framework->send(message); } - // TODO(benh): unlink(oldPid); - - framework->pid = newPid; + framework->updateConnection(newPid); link(newPid); - framework->connected = true; - // The scheduler driver safely ignores any duplicate registration - // messages, so we don't need to compare the old and new pids here. - FrameworkRegisteredMessage message; - message.mutable_framework_id()->MergeFrom(framework->id()); - message.mutable_master_info()->MergeFrom(info_); - framework->send(message); + _failoverFramework(framework); + + CHECK_SOME(framework->pid); + + // Update the principal mapping for this framework, which is + // needed to keep the per-principal framework metrics accurate. + if (oldPid.isSome() && frameworks.principals.contains(oldPid.get())) { + frameworks.principals.erase(oldPid.get()); + } + + frameworks.principals[newPid] = authenticated.get(newPid); +} + +void Master::_failoverFramework(Framework* framework) +{ // Remove the framework's offers (if they weren't removed before). // We do this after we have updated the pid and sent the framework // registered message so that the allocator can immediately re-offer @@ -4767,7 +5009,9 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) removeOffer(offer); } - // Reactivate the framework. + // Reconnect and reactivate the framework. + framework->connected = true; + // NOTE: We do this after recovering resources (above) so that // the allocator has the correct view of the framework's share. if (!framework->active) { @@ -4775,12 +5019,12 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) allocator->activateFramework(framework->id()); } - // 'Failover' the framework's metrics. i.e., change the lookup key - // for its metrics to 'newPid'. - if (oldPid != newPid && frameworks.principals.contains(oldPid)) { - frameworks.principals[newPid] = frameworks.principals[oldPid]; - frameworks.principals.erase(oldPid); - } + // The scheduler driver safely ignores any duplicate registration + // messages, so we don't need to compare the old and new pids here. + FrameworkRegisteredMessage message; + message.mutable_framework_id()->MergeFrom(framework->id()); + message.mutable_master_info()->MergeFrom(info_); + framework->send(message); } @@ -4886,23 +5130,19 @@ void Master::removeFramework(Framework* framework) // TODO(anand): This only works for pid based frameworks. We would // need similar authentication logic for http frameworks. if (framework->pid.isSome()) { - // Remove the framework from authenticated. authenticated.erase(framework->pid.get()); CHECK(frameworks.principals.contains(framework->pid.get())); - const Option<string> principal = - frameworks.principals[framework->pid.get()]; + Option<string> principal = frameworks.principals[framework->pid.get()]; frameworks.principals.erase(framework->pid.get()); - // Remove the framework's message counters. - if (principal.isSome()) { - // Remove the metrics for the principal if this framework is the - // last one with this principal. - if (!frameworks.principals.containsValue(principal.get())) { - CHECK(metrics->frameworks.contains(principal.get())); - metrics->frameworks.erase(principal.get()); - } + // Remove the metrics for the principal if this framework is the + // last one with this principal. + if (principal.isSome() && + !frameworks.principals.containsValue(principal.get())) { + CHECK(metrics->frameworks.contains(principal.get())); + metrics->frameworks.erase(principal.get()); } } http://git-wip-us.apache.org/repos/asf/mesos/blob/d24b3ad6/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 53420ca..b288b8a 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -583,6 +583,12 @@ protected: // the event of a scheduler failover. void failoverFramework(Framework* framework, const process::UPID& newPid); + // Replace the scheduler for a framework with a new http connection, + // in the event of a scheduler failover. + void failoverFramework(Framework* framework, const HttpConnection& http); + + void _failoverFramework(Framework* framework); + // Kill all of a framework's tasks, delete the framework object, and // reschedule offers that were assigned to this framework. void removeFramework(Framework* framework); @@ -703,6 +709,14 @@ private: const scheduler::Call& call); void subscribe( + HttpConnection http, + const scheduler::Call::Subscribe& subscribe); + + void _subscribe( + HttpConnection http, + const scheduler::Call::Subscribe& subscribe); + + void subscribe( const process::UPID& from, const scheduler::Call::Subscribe& subscribe); @@ -1508,14 +1522,33 @@ struct Framework } } - void updateConnection(const HttpConnection& other) + void updateConnection(const process::UPID& newPid) { + // Remove the http connnection if this is a downgrade from + // http to pid, note the connection may already be closed. + if (http.isSome()) { + http.get().close(); + http = None(); + } + + // TODO(benh): unlink(oldPid); + pid = newPid; + } + + void updateConnection(const HttpConnection& newHttp) + { + // Wipe the pid if this is an upgrade from pid to http. + if (pid.isSome()) { + // TODO(benh): unlink(oldPid); + pid = None(); + } + // Close the existing connection if it has changed. - if (http.isSome() && http.get().writer != other.writer) { + if (http.isSome() && http.get().writer != newHttp.writer) { http.get().close(); } - http = other; + http = newHttp; } Master* const master;
