My bad, I accidentally pushed out a patch that I had applied earlier. Build should be back to normal now!
On Wed, Aug 5, 2015 at 4:47 PM, <bmah...@apache.org> wrote: > Repository: mesos > Updated Branches: > refs/heads/master dbf35da46 -> 586307e54 > > > Revert "Add subscribe-> subscribed workflow for http frameworks" > > This reverts commit 1709e8a82fdae6a10893f0cfb9d2fae144d6a7a8. > > > Project: http://git-wip-us.apache.org/repos/asf/mesos/repo > Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/586307e5 > Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/586307e5 > Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/586307e5 > > Branch: refs/heads/master > Commit: 586307e546955c23aadf0ed5bfae9e00c0228f86 > Parents: dbf35da > Author: Benjamin Mahler <benjamin.mah...@gmail.com> > Authored: Wed Aug 5 16:47:36 2015 -0700 > Committer: Benjamin Mahler <benjamin.mah...@gmail.com> > Committed: Wed Aug 5 16:47:36 2015 -0700 > > ---------------------------------------------------------------------- > src/master/http.cpp | 21 +-- > src/master/master.cpp | 343 ++++++--------------------------------------- > src/master/master.hpp | 36 ----- > 3 files changed, 42 insertions(+), 358 deletions(-) > ---------------------------------------------------------------------- > > > > http://git-wip-us.apache.org/repos/asf/mesos/blob/586307e5/src/master/http.cpp > ---------------------------------------------------------------------- > diff --git a/src/master/http.cpp b/src/master/http.cpp > index 181db46..76e7080 100644 > --- a/src/master/http.cpp > +++ b/src/master/http.cpp > @@ -70,7 +70,6 @@ using process::http::InternalServerError; > using process::http::NotFound; > using process::http::NotImplemented; > using process::http::OK; > -using process::http::Pipe; > using process::http::TemporaryRedirect; > using process::http::Unauthorized; > using process::http::UnsupportedMediaType; > @@ -376,24 +375,10 @@ Future<Response> Master::Http::call(const Request& > request) const > responseContentType = ContentType::PROTOBUF; > } > > - switch (call.type()) { > - case scheduler::Call::SUBSCRIBE: { > - Pipe pipe; > - OK ok; > + // Silence unused warning for now. > + (void)responseContentType; > > - ok.type = Response::PIPE; > - ok.reader = pipe.reader(); > - > - master->subscribe( > - call.subscribe(), > - responseContentType, > - pipe.writer()); > - > - return ok; > - } > - default: > - break; > - } > + // TODO(anand): Handle the call. > > return NotImplemented(); > } > > > http://git-wip-us.apache.org/repos/asf/mesos/blob/586307e5/src/master/master.cpp > ---------------------------------------------------------------------- > diff --git a/src/master/master.cpp b/src/master/master.cpp > index e738607..50b9824 100644 > --- a/src/master/master.cpp > +++ b/src/master/master.cpp > @@ -1771,227 +1771,6 @@ void Master::reregisterFramework( > > > void Master::subscribe( > - const scheduler::Call::Subscribe& subscribe, > - ContentType contentType, > - Pipe::Writer writer) > -{ > - // TODO(anand): We need to ensure that framework is authenticated > - // before calling into this function. If not, we need to build the > - // authentication logic here before invoking subscribe(...) > - > - const FrameworkInfo& frameworkInfo = subscribe.framework_info(); > - > - LOG(INFO) << "Received registration request for" > - << " http framework '" << frameworkInfo.name() << "'"; > - > - // Assign a new FrameworkID. > - FrameworkInfo frameworkInfo_ = frameworkInfo; > - frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId()); > - > - Framework* framework = new Framework( > - this, > - frameworkInfo_, > - writer, > - contentType); > - > - // TODO(anand): Add validation/authorization logic before we invoke > - // the continuation function. > - this->_subscribe(framework, subscribe); > -} > - > - > -void Master::_subscribe( > - Framework* framework, > - const scheduler::Call::Subscribe& subscribe) > -{ > - const FrameworkInfo& frameworkInfo = subscribe.framework_info(); > - > - if (!frameworkInfo.has_id() || frameworkInfo.id() == "") { > - // TODO(anand): Make '(re-)registerFramework()' also call into > - // this method (MESOS-3182). > - LOG(INFO) << "Registering framework " << *framework > - << " with checkpointing " > - << (framework->info.checkpoint() ? "enabled" : "disabled") > - << " and capabilities " << framework->info.capabilities(); > - > - // TODO(vinod): Deprecate this in favor of authorization. > - bool rootSubmissions = flags.root_submissions; > - > - if (framework->info.user() == "root" && rootSubmissions == false) { > - LOG(INFO) << "Framework " << *framework << " registering as root, > but " > - << "root submissions are disabled on this cluster"; > - FrameworkErrorMessage message; > - message.set_message("User 'root' is not allowed to run frameworks"); > - framework->send(message); > - delete framework; > - return; > - } > - > - addFramework(framework); > - > - FrameworkRegisteredMessage message; > - message.mutable_framework_id()->MergeFrom(framework->id()); > - message.mutable_master_info()->MergeFrom(info_); > - framework->send(message); > - return; > - } > - > - const bool force = subscribe.has_force() ? subscribe.force() : false; > - const bool isRegistered = > frameworks.registered.count(frameworkInfo.id()); > - > - // TODO(anand): Completed frameworks might try to subscribe > - // again later. This is part of validation checks in the > - // reregister(...) function. We need to refactor those checks into a > - // separate function and then use them here. > - > - LOG(INFO) << "Re-registering framework " << frameworkInfo.id() > - << " (" << frameworkInfo.name() << ") with checkpointing " > - << (frameworkInfo.checkpoint() ? "enabled" : "disabled") > - << " and capabilities " << frameworkInfo.capabilities(); > - > - if (isRegistered) { > - // Using the "failover" of the scheduler allows us to keep a > - // scheduler that got partitioned but didn't die (in ZooKeeper > - // speak this means didn't lose their session) and then > - // eventually tried to connect to this master even though > - // another instance of their scheduler has reconnected. This > - // might not be an issue in the future when the > - // master/allocator launches the scheduler can get restarted > - // (if necessary) by the master and the master will always > - // know which scheduler is the correct one. > - > - Framework* registeredFramework = > - CHECK_NOTNULL(frameworks.registered[frameworkInfo.id()]); > - > - // Update the framework's info fields based on those passed during > - // re-registration. > - LOG(INFO) << "Updating info for framework " << framework->id(); > - > - registeredFramework->updateFrameworkInfo(framework->info); > - allocator->updateFramework( > - registeredFramework->id(), > - registeredFramework->info); > - > - registeredFramework->reregisteredTime = Clock::now(); > - > - if (force) { > - // TODO(benh): Should we check whether the new scheduler has > - // given us a different framework name, user name or executor > - // info? > - LOG(INFO) << "Framework " << *registeredFramework << " failed over"; > - failoverFramework(registeredFramework, framework); > - } else { > - if (registeredFramework->connected) { > - // It's very hard to diffrentiate between retries from the > - // same scheduler or a old scheduler that got failed over to a > - // new one and is now retrying. An example scenario can be: > - // A scheduler sends us a SUBCRIBE request, we process it and > - // the stream breaks just before we could send a SUBSCRIBED > - // event back. If the scheduler retries now, we would see it > - // as connected. This scenario is no different from a old > - // scheduler retrying after it has already failed over. > - // Eventually the master would detect the disconnection, start > - // the failover clock and then the framework can connect. This > - // was slightly easier for pid frameworks as we used to check > - // if the pid values matched. > - FrameworkErrorMessage message; > - message.set_message("Framework is already connected"); > - framework->send(message); > - delete framework; > - return; > - } > - > - LOG(INFO) << "Allowing framework " << *registeredFramework > - << " to re-register with an already used id"; > - > - // Convert the framework to a http framework if it was pid based > - // in the past. Also, we don't need to set the readerClosed(...) > - // callbacks again as it was done already when the framework > - // object was created. > - if (registeredFramework->pid.isSome()) { > - registeredFramework->pid = None(); > - registeredFramework->http = framework->http; > - } else { > - registeredFramework->updateWriter(framework->http.get().writer); > - } > - > - // Remove any offers sent to this framework and reactivate it. > - // NOTE: We need to do this because the scheduler might have > - // replied to the offers but the driver might have dropped > - // those messages since it wasn't connected to the master. > - rescindOffersAndReactivate(registeredFramework); > - > - FrameworkReregisteredMessage message; > - > message.mutable_framework_id()->MergeFrom(registeredFramework->id()); > - message.mutable_master_info()->MergeFrom(info_); > - registeredFramework->send(message); > - } > - } else { > - // We don't have a framework with this ID, so we must be a newly > - // elected Mesos master to which either an existing scheduler or a > - // failed-over one is connecting. Add any tasks it has that have > - // been reported by reconnecting slaves. > - > - // TODO(benh): Check for root submissions like above! > - > - // Add active tasks and executors to the framework. > - foreachvalue (Slave* slave, slaves.registered) { > - foreachvalue (Task* task, slave->tasks[framework->id()]) { > - framework->addTask(task); > - } > - foreachvalue (const ExecutorInfo& executor, > - slave->executors[framework->id()]) { > - framework->addExecutor(slave->id, executor); > - } > - } > - > - // N.B. Need to add the framework _after_ we add its tasks > - // (above) so that we can properly determine the resources it's > - // currently using! > - addFramework(framework); > - > - // TODO(bmahler): We have to send a registered message here for > - // the re-registering framework, per the API contract. Send > - // re-register here per MESOS-786; requires deprecation or it > - // will break frameworks. > - FrameworkRegisteredMessage message; > - message.mutable_framework_id()->MergeFrom(framework->id()); > - message.mutable_master_info()->MergeFrom(info_); > - framework->send(message); > - } > - > - CHECK(frameworks.registered.contains(frameworkInfo.id())) > - << "Unknown framework " << frameworkInfo.id() > - << " (" << frameworkInfo.name() << ")"; > - > - // Broadcast the new framework pid to all the slaves. We have to > - // broadcast because an executor might be running on a slave but > - // it currently isn't running any tasks. This could be a > - // potential scalability issue ... > - foreachvalue (Slave* slave, slaves.registered) { > - UpdateFrameworkMessage message; > - message.mutable_framework_id()->MergeFrom(frameworkInfo.id()); > - > - // TODO(anand): We set 'pid' to UPID() for http frameworks > - // as 'pid' was made optional in 0.24.0. In 0.25.0, we > - // no longer have to set pid here for http frameowrks. > - message.set_pid(framework->pid.getOrElse(UPID())); > - send(slave->pid, message); > - } > - > - if (isRegistered) { > - // Mark the framework as disconnected before deleting so as not > - // to trigger closing the writer. > - framework->connected = false; > - > - // We only updated attributes in the existing stored framework. > - // Delete the optimistically created framework object. > - delete framework; > - } > -} > - > - > -void Master::subscribe( > const UPID& from, > const scheduler::Call::Subscribe& subscribe) > { > @@ -2202,7 +1981,7 @@ void Master::_subscribe( > << " because it is not expected from " << from; > FrameworkErrorMessage message; > message.set_message("Framework failed over"); > - framework->send(message); > + send(from, message); > return; > } else { > LOG(INFO) << "Allowing framework " << *framework > @@ -4886,11 +4665,11 @@ void Master::addFramework(Framework* framework) > CHECK(!frameworks.registered.contains(framework->id())) > << "Framework " << *framework << " already exists!"; > > + CHECK_SOME(framework->pid) << "adding http framework not implemented"; > + > frameworks.registered[framework->id()] = framework; > > - if (framework->pid.isSome()) { > - link(framework->pid.get()); > - } > + link(framework->pid.get()); > > // TODO(anand): For http frameworks, add a readerClosed() > // callback to invoke Master::exited() when the connection > @@ -4916,87 +4695,24 @@ void Master::addFramework(Framework* framework) > // If the framework is authenticated, its principal should be in > // 'authenticated'. Otherwise look if it's supplied in > // FrameworkInfo. > - if (framework->pid.isSome()) { > - Option<string> principal = authenticated.get(framework->pid.get()); > - if (principal.isNone() && framework->info.has_principal()) { > - principal = framework->info.principal(); > - } > - > - CHECK(!frameworks.principals.contains(framework->pid.get())); > - frameworks.principals.put(framework->pid.get(), principal); > - > - // Export framework metrics if a principal is specified. > - if (principal.isSome()) { > - // Create new framework metrics if this framework is the first > - // one of this principal. Otherwise existing metrics are reused. > - if (!metrics->frameworks.contains(principal.get())) { > - metrics->frameworks.put( > - principal.get(), > - Owned<Metrics::Frameworks>( > - new Metrics::Frameworks(principal.get()))); > - } > - } > + Option<string> principal = authenticated.get(framework->pid.get()); > + if (principal.isNone() && framework->info.has_principal()) { > + principal = framework->info.principal(); > } > -} > > + CHECK(!frameworks.principals.contains(framework->pid.get())); > + frameworks.principals.put(framework->pid.get(), principal); > > -void Master::rescindOffersAndReactivate(Framework* framework) > -{ > - // Remove the framework's offers (if they weren't removed before). > - // We do this after we have updated the pid and sent the framework > - // registered message so that the allocator can immediately re-offer > - // these resources to this framework if it wants. > - foreach (Offer* offer, utils::copy(framework->offers)) { > - allocator->recoverResources( > - offer->framework_id(), offer->slave_id(), offer->resources(), > None()); > - removeOffer(offer); > - } > - > - // Reactivate the framework. > - // NOTE: We do this after recovering resources (above) so that > - // the allocator has the correct view of the framework's share. > - if (!framework->active) { > - framework->active = true; > - allocator->activateFramework(framework->id()); > + // Export framework metrics if a principal is specified. > + if (principal.isSome()) { > + // Create new framework metrics if this framework is the first > + // one of this principal. Otherwise existing metrics are reused. > + if (!metrics->frameworks.contains(principal.get())) { > + metrics->frameworks.put( > + principal.get(), > + Owned<Metrics::Frameworks>(new > Metrics::Frameworks(principal.get()))); > + } > } > - > - // Mark the framework as connected now after reactivating it. > - framework->connected = true; > -} > - > - > -// TODO(anand): Currently this function is only used to failover http > -// frameworks. Extend this function to handle both pid/http. > -// We can then get rid of the failoverFramework(...) overload for pid > -// based frameworks. > -void Master::failoverFramework(Framework* framework, Framework* > newFramework) > -{ > - // Notify the old connected framework that it has failed over. > - // It might be a duplicated message too, but it is virtually > - // impossible for us to distinguish between them. We try to > - // do a best effort logic here of letting the old scheduler > - // know. > - if (framework->connected) { > - FrameworkErrorMessage message; > - message.set_message("Framework failed over"); > - framework->send(message); > - } > - > - // The framework was previously registered as pid based and is > - // now trying to failover as a http framework. Set it's pid > - // to None and convert it to a http framework. > - if (framework->pid.isSome()) { > - framework->pid = None(); > - framework->http = newFramework->http; > - } > - > - framework->updateWriter(newFramework->http.get().writer); > - rescindOffersAndReactivate(framework); > - > - FrameworkRegisteredMessage message; > - message.mutable_framework_id()->MergeFrom(framework->id()); > - message.mutable_master_info()->MergeFrom(info_); > - framework->send(message); > } > > > @@ -5004,6 +4720,8 @@ void Master::failoverFramework(Framework* framework, > Framework* newFramework) > // event of a scheduler failover. > void Master::failoverFramework(Framework* framework, const UPID& newPid) > { > + CHECK_SOME(framework->pid) << "http framework failover not implemented"; > + > const UPID oldPid = framework->pid.get(); > > // There are a few failover cases to consider: > @@ -5026,8 +4744,7 @@ void Master::failoverFramework(Framework* framework, > const UPID& newPid) > > framework->pid = newPid; > link(newPid); > - > - rescindOffersAndReactivate(framework); > + framework->connected = true; > > // The scheduler driver safely ignores any duplicate registration > // messages, so we don't need to compare the old and new pids here. > @@ -5036,6 +4753,24 @@ void Master::failoverFramework(Framework* > framework, const UPID& newPid) > message.mutable_master_info()->MergeFrom(info_); > framework->send(message); > > + // Remove the framework's offers (if they weren't removed before). > + // We do this after we have updated the pid and sent the framework > + // registered message so that the allocator can immediately re-offer > + // these resources to this framework if it wants. > + foreach (Offer* offer, utils::copy(framework->offers)) { > + allocator->recoverResources( > + offer->framework_id(), offer->slave_id(), offer->resources(), > None()); > + removeOffer(offer); > + } > + > + // Reactivate the framework. > + // NOTE: We do this after recovering resources (above) so that > + // the allocator has the correct view of the framework's share. > + if (!framework->active) { > + framework->active = true; > + allocator->activateFramework(framework->id()); > + } > + > // 'Failover' the framework's metrics. i.e., change the lookup key > // for its metrics to 'newPid'. > if (oldPid != newPid && frameworks.principals.contains(oldPid)) { > > > http://git-wip-us.apache.org/repos/asf/mesos/blob/586307e5/src/master/master.hpp > ---------------------------------------------------------------------- > diff --git a/src/master/master.hpp b/src/master/master.hpp > index c71e343..e441749 100644 > --- a/src/master/master.hpp > +++ b/src/master/master.hpp > @@ -584,14 +584,6 @@ protected: > // the event of a scheduler failover. > void failoverFramework(Framework* framework, const process::UPID& > newPid); > > - // Failover the old framework. Copies various attributes from the > - // new framework passed as argument. > - void failoverFramework(Framework* framework, Framework* newFramework); > - > - // Rescinds the framework's offers and reactivates the framework > - // marking it as connected. > - void rescindOffersAndReactivate(Framework* framework); > - > // Kill all of a framework's tasks, delete the framework object, and > // reschedule offers that were assigned to this framework. > void removeFramework(Framework* framework); > @@ -706,23 +698,11 @@ private: > const Offer::Operation& operation, > const std::string& message); > > - // Subscribes a http framework. > - void subscribe( > - const scheduler::Call::Subscribe& subscribe, > - ContentType contentType, > - process::http::Pipe::Writer writer); > - > - // Continuation of subscribe(). > - void _subscribe( > - Framework* framework, > - const scheduler::Call::Subscribe& subscribe); > - > // Call handlers. > void receive( > const process::UPID& from, > const scheduler::Call& call); > > - // Subscribes a pid framework. > void subscribe( > const process::UPID& from, > const scheduler::Call::Subscribe& subscribe); > @@ -1523,22 +1503,6 @@ struct Framework > } > } > > - void updateWriter(process::http::Pipe::Writer writer) > - { > - // Don't update with the same writer. > - if (http.get().writer == writer) { > - return; > - } > - > - // Close the existing connection and update the writer. > - http.get().writer.close(); > - http.get().writer = writer; > - > - // Set up the reader closed callback. > - http.get().writer.readerClosed() > - .onAny(defer(master->self(), &Master::exited, id(), writer)); > - } > - > Master* const master; > > FrameworkInfo info; > >