Repository: mesos Updated Branches: refs/heads/master bc26a44a5 -> dbf35da46
Add subscribe-> subscribed workflow for http frameworks Split review out of r36318. This change adds the functionality of making a http call for subscribe and the master responding with a subscribed event on the persistent stream. Also added functionality for framework failover equivalent of re-register. It should now be possible to merge the subscribed(...) introduced in this review and the re-factor that happened in MESOS-3182. - Made a new function for exited()/failoverFramework for http frameworks that invoke into the common continuation function for pid/http frameworks thereafter. - The re-register functionality equivalent goes in _subscribe(...) Review: https://reviews.apache.org/r/36720 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1709e8a8 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1709e8a8 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1709e8a8 Branch: refs/heads/master Commit: 1709e8a82fdae6a10893f0cfb9d2fae144d6a7a8 Parents: bc26a44 Author: Anand Mazumdar <[email protected]> Authored: Wed Aug 5 14:01:13 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Wed Aug 5 14:01:15 2015 -0700 ---------------------------------------------------------------------- src/master/http.cpp | 21 ++- src/master/master.cpp | 343 +++++++++++++++++++++++++++++++++++++++------ src/master/master.hpp | 36 +++++ 3 files changed, 358 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/1709e8a8/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index 76e7080..181db46 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -70,6 +70,7 @@ using process::http::InternalServerError; using process::http::NotFound; using process::http::NotImplemented; using process::http::OK; +using process::http::Pipe; using process::http::TemporaryRedirect; using process::http::Unauthorized; using process::http::UnsupportedMediaType; @@ -375,10 +376,24 @@ Future<Response> Master::Http::call(const Request& request) const responseContentType = ContentType::PROTOBUF; } - // Silence unused warning for now. - (void)responseContentType; + switch (call.type()) { + case scheduler::Call::SUBSCRIBE: { + Pipe pipe; + OK ok; - // TODO(anand): Handle the call. + ok.type = Response::PIPE; + ok.reader = pipe.reader(); + + master->subscribe( + call.subscribe(), + responseContentType, + pipe.writer()); + + return ok; + } + default: + break; + } return NotImplemented(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/1709e8a8/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 50b9824..e738607 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1771,6 +1771,227 @@ void Master::reregisterFramework( void Master::subscribe( + const scheduler::Call::Subscribe& subscribe, + ContentType contentType, + Pipe::Writer writer) +{ + // TODO(anand): We need to ensure that framework is authenticated + // before calling into this function. If not, we need to build the + // authentication logic here before invoking subscribe(...) + + const FrameworkInfo& frameworkInfo = subscribe.framework_info(); + + LOG(INFO) << "Received registration request for" + << " http framework '" << frameworkInfo.name() << "'"; + + // Assign a new FrameworkID. + FrameworkInfo frameworkInfo_ = frameworkInfo; + frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId()); + + Framework* framework = new Framework( + this, + frameworkInfo_, + writer, + contentType); + + // TODO(anand): Add validation/authorization logic before we invoke + // the continuation function. + this->_subscribe(framework, subscribe); +} + + +void Master::_subscribe( + Framework* framework, + const scheduler::Call::Subscribe& subscribe) +{ + const FrameworkInfo& frameworkInfo = subscribe.framework_info(); + + if (!frameworkInfo.has_id() || frameworkInfo.id() == "") { + // TODO(anand): Make '(re-)registerFramework()' also call into + // this method (MESOS-3182). + LOG(INFO) << "Registering framework " << *framework + << " with checkpointing " + << (framework->info.checkpoint() ? "enabled" : "disabled") + << " and capabilities " << framework->info.capabilities(); + + // TODO(vinod): Deprecate this in favor of authorization. + bool rootSubmissions = flags.root_submissions; + + if (framework->info.user() == "root" && rootSubmissions == false) { + LOG(INFO) << "Framework " << *framework << " registering as root, but " + << "root submissions are disabled on this cluster"; + FrameworkErrorMessage message; + message.set_message("User 'root' is not allowed to run frameworks"); + framework->send(message); + delete framework; + return; + } + + addFramework(framework); + + FrameworkRegisteredMessage message; + message.mutable_framework_id()->MergeFrom(framework->id()); + message.mutable_master_info()->MergeFrom(info_); + framework->send(message); + return; + } + + const bool force = subscribe.has_force() ? subscribe.force() : false; + const bool isRegistered = frameworks.registered.count(frameworkInfo.id()); + + // TODO(anand): Completed frameworks might try to subscribe + // again later. This is part of validation checks in the + // reregister(...) function. We need to refactor those checks into a + // separate function and then use them here. + + LOG(INFO) << "Re-registering framework " << frameworkInfo.id() + << " (" << frameworkInfo.name() << ") with checkpointing " + << (frameworkInfo.checkpoint() ? "enabled" : "disabled") + << " and capabilities " << frameworkInfo.capabilities(); + + if (isRegistered) { + // Using the "failover" of the scheduler allows us to keep a + // scheduler that got partitioned but didn't die (in ZooKeeper + // speak this means didn't lose their session) and then + // eventually tried to connect to this master even though + // another instance of their scheduler has reconnected. This + // might not be an issue in the future when the + // master/allocator launches the scheduler can get restarted + // (if necessary) by the master and the master will always + // know which scheduler is the correct one. + + Framework* registeredFramework = + CHECK_NOTNULL(frameworks.registered[frameworkInfo.id()]); + + // Update the framework's info fields based on those passed during + // re-registration. + LOG(INFO) << "Updating info for framework " << framework->id(); + + registeredFramework->updateFrameworkInfo(framework->info); + allocator->updateFramework( + registeredFramework->id(), + registeredFramework->info); + + registeredFramework->reregisteredTime = Clock::now(); + + if (force) { + // TODO(benh): Should we check whether the new scheduler has + // given us a different framework name, user name or executor + // info? + LOG(INFO) << "Framework " << *registeredFramework << " failed over"; + failoverFramework(registeredFramework, framework); + } else { + if (registeredFramework->connected) { + // It's very hard to diffrentiate between retries from the + // same scheduler or a old scheduler that got failed over to a + // new one and is now retrying. An example scenario can be: + // A scheduler sends us a SUBCRIBE request, we process it and + // the stream breaks just before we could send a SUBSCRIBED + // event back. If the scheduler retries now, we would see it + // as connected. This scenario is no different from a old + // scheduler retrying after it has already failed over. + // Eventually the master would detect the disconnection, start + // the failover clock and then the framework can connect. This + // was slightly easier for pid frameworks as we used to check + // if the pid values matched. + FrameworkErrorMessage message; + message.set_message("Framework is already connected"); + framework->send(message); + delete framework; + return; + } + + LOG(INFO) << "Allowing framework " << *registeredFramework + << " to re-register with an already used id"; + + // Convert the framework to a http framework if it was pid based + // in the past. Also, we don't need to set the readerClosed(...) + // callbacks again as it was done already when the framework + // object was created. + if (registeredFramework->pid.isSome()) { + registeredFramework->pid = None(); + registeredFramework->http = framework->http; + } else { + registeredFramework->updateWriter(framework->http.get().writer); + } + + // Remove any offers sent to this framework and reactivate it. + // NOTE: We need to do this because the scheduler might have + // replied to the offers but the driver might have dropped + // those messages since it wasn't connected to the master. + rescindOffersAndReactivate(registeredFramework); + + FrameworkReregisteredMessage message; + message.mutable_framework_id()->MergeFrom(registeredFramework->id()); + message.mutable_master_info()->MergeFrom(info_); + registeredFramework->send(message); + } + } else { + // We don't have a framework with this ID, so we must be a newly + // elected Mesos master to which either an existing scheduler or a + // failed-over one is connecting. Add any tasks it has that have + // been reported by reconnecting slaves. + + // TODO(benh): Check for root submissions like above! + + // Add active tasks and executors to the framework. + foreachvalue (Slave* slave, slaves.registered) { + foreachvalue (Task* task, slave->tasks[framework->id()]) { + framework->addTask(task); + } + foreachvalue (const ExecutorInfo& executor, + slave->executors[framework->id()]) { + framework->addExecutor(slave->id, executor); + } + } + + // N.B. Need to add the framework _after_ we add its tasks + // (above) so that we can properly determine the resources it's + // currently using! + addFramework(framework); + + // TODO(bmahler): We have to send a registered message here for + // the re-registering framework, per the API contract. Send + // re-register here per MESOS-786; requires deprecation or it + // will break frameworks. + FrameworkRegisteredMessage message; + message.mutable_framework_id()->MergeFrom(framework->id()); + message.mutable_master_info()->MergeFrom(info_); + framework->send(message); + } + + CHECK(frameworks.registered.contains(frameworkInfo.id())) + << "Unknown framework " << frameworkInfo.id() + << " (" << frameworkInfo.name() << ")"; + + // Broadcast the new framework pid to all the slaves. We have to + // broadcast because an executor might be running on a slave but + // it currently isn't running any tasks. This could be a + // potential scalability issue ... + foreachvalue (Slave* slave, slaves.registered) { + UpdateFrameworkMessage message; + message.mutable_framework_id()->MergeFrom(frameworkInfo.id()); + + // TODO(anand): We set 'pid' to UPID() for http frameworks + // as 'pid' was made optional in 0.24.0. In 0.25.0, we + // no longer have to set pid here for http frameowrks. + message.set_pid(framework->pid.getOrElse(UPID())); + send(slave->pid, message); + } + + if (isRegistered) { + // Mark the framework as disconnected before deleting so as not + // to trigger closing the writer. + framework->connected = false; + + // We only updated attributes in the existing stored framework. + // Delete the optimistically created framework object. + delete framework; + } +} + + +void Master::subscribe( const UPID& from, const scheduler::Call::Subscribe& subscribe) { @@ -1981,7 +2202,7 @@ void Master::_subscribe( << " because it is not expected from " << from; FrameworkErrorMessage message; message.set_message("Framework failed over"); - send(from, message); + framework->send(message); return; } else { LOG(INFO) << "Allowing framework " << *framework @@ -4665,11 +4886,11 @@ void Master::addFramework(Framework* framework) CHECK(!frameworks.registered.contains(framework->id())) << "Framework " << *framework << " already exists!"; - CHECK_SOME(framework->pid) << "adding http framework not implemented"; - frameworks.registered[framework->id()] = framework; - link(framework->pid.get()); + if (framework->pid.isSome()) { + link(framework->pid.get()); + } // TODO(anand): For http frameworks, add a readerClosed() // callback to invoke Master::exited() when the connection @@ -4695,33 +4916,94 @@ void Master::addFramework(Framework* framework) // If the framework is authenticated, its principal should be in // 'authenticated'. Otherwise look if it's supplied in // FrameworkInfo. - Option<string> principal = authenticated.get(framework->pid.get()); - if (principal.isNone() && framework->info.has_principal()) { - principal = framework->info.principal(); - } + if (framework->pid.isSome()) { + Option<string> principal = authenticated.get(framework->pid.get()); + if (principal.isNone() && framework->info.has_principal()) { + principal = framework->info.principal(); + } - CHECK(!frameworks.principals.contains(framework->pid.get())); - frameworks.principals.put(framework->pid.get(), principal); + CHECK(!frameworks.principals.contains(framework->pid.get())); + frameworks.principals.put(framework->pid.get(), principal); - // Export framework metrics if a principal is specified. - if (principal.isSome()) { - // Create new framework metrics if this framework is the first - // one of this principal. Otherwise existing metrics are reused. - if (!metrics->frameworks.contains(principal.get())) { - metrics->frameworks.put( - principal.get(), - Owned<Metrics::Frameworks>(new Metrics::Frameworks(principal.get()))); + // Export framework metrics if a principal is specified. + if (principal.isSome()) { + // Create new framework metrics if this framework is the first + // one of this principal. Otherwise existing metrics are reused. + if (!metrics->frameworks.contains(principal.get())) { + metrics->frameworks.put( + principal.get(), + Owned<Metrics::Frameworks>( + new Metrics::Frameworks(principal.get()))); + } } } } +void Master::rescindOffersAndReactivate(Framework* framework) +{ + // Remove the framework's offers (if they weren't removed before). + // We do this after we have updated the pid and sent the framework + // registered message so that the allocator can immediately re-offer + // these resources to this framework if it wants. + foreach (Offer* offer, utils::copy(framework->offers)) { + allocator->recoverResources( + offer->framework_id(), offer->slave_id(), offer->resources(), None()); + removeOffer(offer); + } + + // Reactivate the framework. + // NOTE: We do this after recovering resources (above) so that + // the allocator has the correct view of the framework's share. + if (!framework->active) { + framework->active = true; + allocator->activateFramework(framework->id()); + } + + // Mark the framework as connected now after reactivating it. + framework->connected = true; +} + + +// TODO(anand): Currently this function is only used to failover http +// frameworks. Extend this function to handle both pid/http. +// We can then get rid of the failoverFramework(...) overload for pid +// based frameworks. +void Master::failoverFramework(Framework* framework, Framework* newFramework) +{ + // Notify the old connected framework that it has failed over. + // It might be a duplicated message too, but it is virtually + // impossible for us to distinguish between them. We try to + // do a best effort logic here of letting the old scheduler + // know. + if (framework->connected) { + FrameworkErrorMessage message; + message.set_message("Framework failed over"); + framework->send(message); + } + + // The framework was previously registered as pid based and is + // now trying to failover as a http framework. Set it's pid + // to None and convert it to a http framework. + if (framework->pid.isSome()) { + framework->pid = None(); + framework->http = newFramework->http; + } + + framework->updateWriter(newFramework->http.get().writer); + rescindOffersAndReactivate(framework); + + FrameworkRegisteredMessage message; + message.mutable_framework_id()->MergeFrom(framework->id()); + message.mutable_master_info()->MergeFrom(info_); + framework->send(message); +} + + // Replace the scheduler for a framework with a new process ID, in the // event of a scheduler failover. void Master::failoverFramework(Framework* framework, const UPID& newPid) { - CHECK_SOME(framework->pid) << "http framework failover not implemented"; - const UPID oldPid = framework->pid.get(); // There are a few failover cases to consider: @@ -4744,7 +5026,8 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) framework->pid = newPid; link(newPid); - framework->connected = true; + + rescindOffersAndReactivate(framework); // The scheduler driver safely ignores any duplicate registration // messages, so we don't need to compare the old and new pids here. @@ -4753,24 +5036,6 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) message.mutable_master_info()->MergeFrom(info_); framework->send(message); - // Remove the framework's offers (if they weren't removed before). - // We do this after we have updated the pid and sent the framework - // registered message so that the allocator can immediately re-offer - // these resources to this framework if it wants. - foreach (Offer* offer, utils::copy(framework->offers)) { - allocator->recoverResources( - offer->framework_id(), offer->slave_id(), offer->resources(), None()); - removeOffer(offer); - } - - // Reactivate the framework. - // NOTE: We do this after recovering resources (above) so that - // the allocator has the correct view of the framework's share. - if (!framework->active) { - framework->active = true; - allocator->activateFramework(framework->id()); - } - // 'Failover' the framework's metrics. i.e., change the lookup key // for its metrics to 'newPid'. if (oldPid != newPid && frameworks.principals.contains(oldPid)) { http://git-wip-us.apache.org/repos/asf/mesos/blob/1709e8a8/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index e441749..c71e343 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -584,6 +584,14 @@ protected: // the event of a scheduler failover. void failoverFramework(Framework* framework, const process::UPID& newPid); + // Failover the old framework. Copies various attributes from the + // new framework passed as argument. + void failoverFramework(Framework* framework, Framework* newFramework); + + // Rescinds the framework's offers and reactivates the framework + // marking it as connected. + void rescindOffersAndReactivate(Framework* framework); + // Kill all of a framework's tasks, delete the framework object, and // reschedule offers that were assigned to this framework. void removeFramework(Framework* framework); @@ -698,11 +706,23 @@ private: const Offer::Operation& operation, const std::string& message); + // Subscribes a http framework. + void subscribe( + const scheduler::Call::Subscribe& subscribe, + ContentType contentType, + process::http::Pipe::Writer writer); + + // Continuation of subscribe(). + void _subscribe( + Framework* framework, + const scheduler::Call::Subscribe& subscribe); + // Call handlers. void receive( const process::UPID& from, const scheduler::Call& call); + // Subscribes a pid framework. void subscribe( const process::UPID& from, const scheduler::Call::Subscribe& subscribe); @@ -1503,6 +1523,22 @@ struct Framework } } + void updateWriter(process::http::Pipe::Writer writer) + { + // Don't update with the same writer. + if (http.get().writer == writer) { + return; + } + + // Close the existing connection and update the writer. + http.get().writer.close(); + http.get().writer = writer; + + // Set up the reader closed callback. + http.get().writer.readerClosed() + .onAny(defer(master->self(), &Master::exited, id(), writer)); + } + Master* const master; FrameworkInfo info;
