Repository: mesos Updated Branches: refs/heads/master dbf35da46 -> 586307e54
Revert "Add subscribe-> subscribed workflow for http frameworks" This reverts commit 1709e8a82fdae6a10893f0cfb9d2fae144d6a7a8. Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/586307e5 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/586307e5 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/586307e5 Branch: refs/heads/master Commit: 586307e546955c23aadf0ed5bfae9e00c0228f86 Parents: dbf35da Author: Benjamin Mahler <[email protected]> Authored: Wed Aug 5 16:47:36 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Wed Aug 5 16:47:36 2015 -0700 ---------------------------------------------------------------------- src/master/http.cpp | 21 +-- src/master/master.cpp | 343 ++++++--------------------------------------- src/master/master.hpp | 36 ----- 3 files changed, 42 insertions(+), 358 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/586307e5/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index 181db46..76e7080 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -70,7 +70,6 @@ using process::http::InternalServerError; using process::http::NotFound; using process::http::NotImplemented; using process::http::OK; -using process::http::Pipe; using process::http::TemporaryRedirect; using process::http::Unauthorized; using process::http::UnsupportedMediaType; @@ -376,24 +375,10 @@ Future<Response> Master::Http::call(const Request& request) const responseContentType = ContentType::PROTOBUF; } - switch (call.type()) { - case scheduler::Call::SUBSCRIBE: { - Pipe pipe; - OK ok; + // Silence unused warning for now. + (void)responseContentType; - ok.type = Response::PIPE; - ok.reader = pipe.reader(); - - master->subscribe( - call.subscribe(), - responseContentType, - pipe.writer()); - - return ok; - } - default: - break; - } + // TODO(anand): Handle the call. return NotImplemented(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/586307e5/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index e738607..50b9824 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1771,227 +1771,6 @@ void Master::reregisterFramework( void Master::subscribe( - const scheduler::Call::Subscribe& subscribe, - ContentType contentType, - Pipe::Writer writer) -{ - // TODO(anand): We need to ensure that framework is authenticated - // before calling into this function. If not, we need to build the - // authentication logic here before invoking subscribe(...) - - const FrameworkInfo& frameworkInfo = subscribe.framework_info(); - - LOG(INFO) << "Received registration request for" - << " http framework '" << frameworkInfo.name() << "'"; - - // Assign a new FrameworkID. - FrameworkInfo frameworkInfo_ = frameworkInfo; - frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId()); - - Framework* framework = new Framework( - this, - frameworkInfo_, - writer, - contentType); - - // TODO(anand): Add validation/authorization logic before we invoke - // the continuation function. - this->_subscribe(framework, subscribe); -} - - -void Master::_subscribe( - Framework* framework, - const scheduler::Call::Subscribe& subscribe) -{ - const FrameworkInfo& frameworkInfo = subscribe.framework_info(); - - if (!frameworkInfo.has_id() || frameworkInfo.id() == "") { - // TODO(anand): Make '(re-)registerFramework()' also call into - // this method (MESOS-3182). - LOG(INFO) << "Registering framework " << *framework - << " with checkpointing " - << (framework->info.checkpoint() ? "enabled" : "disabled") - << " and capabilities " << framework->info.capabilities(); - - // TODO(vinod): Deprecate this in favor of authorization. - bool rootSubmissions = flags.root_submissions; - - if (framework->info.user() == "root" && rootSubmissions == false) { - LOG(INFO) << "Framework " << *framework << " registering as root, but " - << "root submissions are disabled on this cluster"; - FrameworkErrorMessage message; - message.set_message("User 'root' is not allowed to run frameworks"); - framework->send(message); - delete framework; - return; - } - - addFramework(framework); - - FrameworkRegisteredMessage message; - message.mutable_framework_id()->MergeFrom(framework->id()); - message.mutable_master_info()->MergeFrom(info_); - framework->send(message); - return; - } - - const bool force = subscribe.has_force() ? subscribe.force() : false; - const bool isRegistered = frameworks.registered.count(frameworkInfo.id()); - - // TODO(anand): Completed frameworks might try to subscribe - // again later. This is part of validation checks in the - // reregister(...) function. We need to refactor those checks into a - // separate function and then use them here. - - LOG(INFO) << "Re-registering framework " << frameworkInfo.id() - << " (" << frameworkInfo.name() << ") with checkpointing " - << (frameworkInfo.checkpoint() ? "enabled" : "disabled") - << " and capabilities " << frameworkInfo.capabilities(); - - if (isRegistered) { - // Using the "failover" of the scheduler allows us to keep a - // scheduler that got partitioned but didn't die (in ZooKeeper - // speak this means didn't lose their session) and then - // eventually tried to connect to this master even though - // another instance of their scheduler has reconnected. This - // might not be an issue in the future when the - // master/allocator launches the scheduler can get restarted - // (if necessary) by the master and the master will always - // know which scheduler is the correct one. - - Framework* registeredFramework = - CHECK_NOTNULL(frameworks.registered[frameworkInfo.id()]); - - // Update the framework's info fields based on those passed during - // re-registration. - LOG(INFO) << "Updating info for framework " << framework->id(); - - registeredFramework->updateFrameworkInfo(framework->info); - allocator->updateFramework( - registeredFramework->id(), - registeredFramework->info); - - registeredFramework->reregisteredTime = Clock::now(); - - if (force) { - // TODO(benh): Should we check whether the new scheduler has - // given us a different framework name, user name or executor - // info? - LOG(INFO) << "Framework " << *registeredFramework << " failed over"; - failoverFramework(registeredFramework, framework); - } else { - if (registeredFramework->connected) { - // It's very hard to diffrentiate between retries from the - // same scheduler or a old scheduler that got failed over to a - // new one and is now retrying. An example scenario can be: - // A scheduler sends us a SUBCRIBE request, we process it and - // the stream breaks just before we could send a SUBSCRIBED - // event back. If the scheduler retries now, we would see it - // as connected. This scenario is no different from a old - // scheduler retrying after it has already failed over. - // Eventually the master would detect the disconnection, start - // the failover clock and then the framework can connect. This - // was slightly easier for pid frameworks as we used to check - // if the pid values matched. - FrameworkErrorMessage message; - message.set_message("Framework is already connected"); - framework->send(message); - delete framework; - return; - } - - LOG(INFO) << "Allowing framework " << *registeredFramework - << " to re-register with an already used id"; - - // Convert the framework to a http framework if it was pid based - // in the past. Also, we don't need to set the readerClosed(...) - // callbacks again as it was done already when the framework - // object was created. - if (registeredFramework->pid.isSome()) { - registeredFramework->pid = None(); - registeredFramework->http = framework->http; - } else { - registeredFramework->updateWriter(framework->http.get().writer); - } - - // Remove any offers sent to this framework and reactivate it. - // NOTE: We need to do this because the scheduler might have - // replied to the offers but the driver might have dropped - // those messages since it wasn't connected to the master. - rescindOffersAndReactivate(registeredFramework); - - FrameworkReregisteredMessage message; - message.mutable_framework_id()->MergeFrom(registeredFramework->id()); - message.mutable_master_info()->MergeFrom(info_); - registeredFramework->send(message); - } - } else { - // We don't have a framework with this ID, so we must be a newly - // elected Mesos master to which either an existing scheduler or a - // failed-over one is connecting. Add any tasks it has that have - // been reported by reconnecting slaves. - - // TODO(benh): Check for root submissions like above! - - // Add active tasks and executors to the framework. - foreachvalue (Slave* slave, slaves.registered) { - foreachvalue (Task* task, slave->tasks[framework->id()]) { - framework->addTask(task); - } - foreachvalue (const ExecutorInfo& executor, - slave->executors[framework->id()]) { - framework->addExecutor(slave->id, executor); - } - } - - // N.B. Need to add the framework _after_ we add its tasks - // (above) so that we can properly determine the resources it's - // currently using! - addFramework(framework); - - // TODO(bmahler): We have to send a registered message here for - // the re-registering framework, per the API contract. Send - // re-register here per MESOS-786; requires deprecation or it - // will break frameworks. - FrameworkRegisteredMessage message; - message.mutable_framework_id()->MergeFrom(framework->id()); - message.mutable_master_info()->MergeFrom(info_); - framework->send(message); - } - - CHECK(frameworks.registered.contains(frameworkInfo.id())) - << "Unknown framework " << frameworkInfo.id() - << " (" << frameworkInfo.name() << ")"; - - // Broadcast the new framework pid to all the slaves. We have to - // broadcast because an executor might be running on a slave but - // it currently isn't running any tasks. This could be a - // potential scalability issue ... - foreachvalue (Slave* slave, slaves.registered) { - UpdateFrameworkMessage message; - message.mutable_framework_id()->MergeFrom(frameworkInfo.id()); - - // TODO(anand): We set 'pid' to UPID() for http frameworks - // as 'pid' was made optional in 0.24.0. In 0.25.0, we - // no longer have to set pid here for http frameowrks. - message.set_pid(framework->pid.getOrElse(UPID())); - send(slave->pid, message); - } - - if (isRegistered) { - // Mark the framework as disconnected before deleting so as not - // to trigger closing the writer. - framework->connected = false; - - // We only updated attributes in the existing stored framework. - // Delete the optimistically created framework object. - delete framework; - } -} - - -void Master::subscribe( const UPID& from, const scheduler::Call::Subscribe& subscribe) { @@ -2202,7 +1981,7 @@ void Master::_subscribe( << " because it is not expected from " << from; FrameworkErrorMessage message; message.set_message("Framework failed over"); - framework->send(message); + send(from, message); return; } else { LOG(INFO) << "Allowing framework " << *framework @@ -4886,11 +4665,11 @@ void Master::addFramework(Framework* framework) CHECK(!frameworks.registered.contains(framework->id())) << "Framework " << *framework << " already exists!"; + CHECK_SOME(framework->pid) << "adding http framework not implemented"; + frameworks.registered[framework->id()] = framework; - if (framework->pid.isSome()) { - link(framework->pid.get()); - } + link(framework->pid.get()); // TODO(anand): For http frameworks, add a readerClosed() // callback to invoke Master::exited() when the connection @@ -4916,87 +4695,24 @@ void Master::addFramework(Framework* framework) // If the framework is authenticated, its principal should be in // 'authenticated'. Otherwise look if it's supplied in // FrameworkInfo. - if (framework->pid.isSome()) { - Option<string> principal = authenticated.get(framework->pid.get()); - if (principal.isNone() && framework->info.has_principal()) { - principal = framework->info.principal(); - } - - CHECK(!frameworks.principals.contains(framework->pid.get())); - frameworks.principals.put(framework->pid.get(), principal); - - // Export framework metrics if a principal is specified. - if (principal.isSome()) { - // Create new framework metrics if this framework is the first - // one of this principal. Otherwise existing metrics are reused. - if (!metrics->frameworks.contains(principal.get())) { - metrics->frameworks.put( - principal.get(), - Owned<Metrics::Frameworks>( - new Metrics::Frameworks(principal.get()))); - } - } + Option<string> principal = authenticated.get(framework->pid.get()); + if (principal.isNone() && framework->info.has_principal()) { + principal = framework->info.principal(); } -} + CHECK(!frameworks.principals.contains(framework->pid.get())); + frameworks.principals.put(framework->pid.get(), principal); -void Master::rescindOffersAndReactivate(Framework* framework) -{ - // Remove the framework's offers (if they weren't removed before). - // We do this after we have updated the pid and sent the framework - // registered message so that the allocator can immediately re-offer - // these resources to this framework if it wants. - foreach (Offer* offer, utils::copy(framework->offers)) { - allocator->recoverResources( - offer->framework_id(), offer->slave_id(), offer->resources(), None()); - removeOffer(offer); - } - - // Reactivate the framework. - // NOTE: We do this after recovering resources (above) so that - // the allocator has the correct view of the framework's share. - if (!framework->active) { - framework->active = true; - allocator->activateFramework(framework->id()); + // Export framework metrics if a principal is specified. + if (principal.isSome()) { + // Create new framework metrics if this framework is the first + // one of this principal. Otherwise existing metrics are reused. + if (!metrics->frameworks.contains(principal.get())) { + metrics->frameworks.put( + principal.get(), + Owned<Metrics::Frameworks>(new Metrics::Frameworks(principal.get()))); + } } - - // Mark the framework as connected now after reactivating it. - framework->connected = true; -} - - -// TODO(anand): Currently this function is only used to failover http -// frameworks. Extend this function to handle both pid/http. -// We can then get rid of the failoverFramework(...) overload for pid -// based frameworks. -void Master::failoverFramework(Framework* framework, Framework* newFramework) -{ - // Notify the old connected framework that it has failed over. - // It might be a duplicated message too, but it is virtually - // impossible for us to distinguish between them. We try to - // do a best effort logic here of letting the old scheduler - // know. - if (framework->connected) { - FrameworkErrorMessage message; - message.set_message("Framework failed over"); - framework->send(message); - } - - // The framework was previously registered as pid based and is - // now trying to failover as a http framework. Set it's pid - // to None and convert it to a http framework. - if (framework->pid.isSome()) { - framework->pid = None(); - framework->http = newFramework->http; - } - - framework->updateWriter(newFramework->http.get().writer); - rescindOffersAndReactivate(framework); - - FrameworkRegisteredMessage message; - message.mutable_framework_id()->MergeFrom(framework->id()); - message.mutable_master_info()->MergeFrom(info_); - framework->send(message); } @@ -5004,6 +4720,8 @@ void Master::failoverFramework(Framework* framework, Framework* newFramework) // event of a scheduler failover. void Master::failoverFramework(Framework* framework, const UPID& newPid) { + CHECK_SOME(framework->pid) << "http framework failover not implemented"; + const UPID oldPid = framework->pid.get(); // There are a few failover cases to consider: @@ -5026,8 +4744,7 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) framework->pid = newPid; link(newPid); - - rescindOffersAndReactivate(framework); + framework->connected = true; // The scheduler driver safely ignores any duplicate registration // messages, so we don't need to compare the old and new pids here. @@ -5036,6 +4753,24 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) message.mutable_master_info()->MergeFrom(info_); framework->send(message); + // Remove the framework's offers (if they weren't removed before). + // We do this after we have updated the pid and sent the framework + // registered message so that the allocator can immediately re-offer + // these resources to this framework if it wants. + foreach (Offer* offer, utils::copy(framework->offers)) { + allocator->recoverResources( + offer->framework_id(), offer->slave_id(), offer->resources(), None()); + removeOffer(offer); + } + + // Reactivate the framework. + // NOTE: We do this after recovering resources (above) so that + // the allocator has the correct view of the framework's share. + if (!framework->active) { + framework->active = true; + allocator->activateFramework(framework->id()); + } + // 'Failover' the framework's metrics. i.e., change the lookup key // for its metrics to 'newPid'. if (oldPid != newPid && frameworks.principals.contains(oldPid)) { http://git-wip-us.apache.org/repos/asf/mesos/blob/586307e5/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index c71e343..e441749 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -584,14 +584,6 @@ protected: // the event of a scheduler failover. void failoverFramework(Framework* framework, const process::UPID& newPid); - // Failover the old framework. Copies various attributes from the - // new framework passed as argument. - void failoverFramework(Framework* framework, Framework* newFramework); - - // Rescinds the framework's offers and reactivates the framework - // marking it as connected. - void rescindOffersAndReactivate(Framework* framework); - // Kill all of a framework's tasks, delete the framework object, and // reschedule offers that were assigned to this framework. void removeFramework(Framework* framework); @@ -706,23 +698,11 @@ private: const Offer::Operation& operation, const std::string& message); - // Subscribes a http framework. - void subscribe( - const scheduler::Call::Subscribe& subscribe, - ContentType contentType, - process::http::Pipe::Writer writer); - - // Continuation of subscribe(). - void _subscribe( - Framework* framework, - const scheduler::Call::Subscribe& subscribe); - // Call handlers. void receive( const process::UPID& from, const scheduler::Call& call); - // Subscribes a pid framework. void subscribe( const process::UPID& from, const scheduler::Call::Subscribe& subscribe); @@ -1523,22 +1503,6 @@ struct Framework } } - void updateWriter(process::http::Pipe::Writer writer) - { - // Don't update with the same writer. - if (http.get().writer == writer) { - return; - } - - // Close the existing connection and update the writer. - http.get().writer.close(); - http.get().writer = writer; - - // Set up the reader closed callback. - http.get().writer.readerClosed() - .onAny(defer(master->self(), &Master::exited, id(), writer)); - } - Master* const master; FrameworkInfo info;
