Repository: mesos Updated Branches: refs/heads/master 0d08b2224 -> b8bec2027
Merged registerFramework() and reregisterFramework() into subscribe() in Master. Review: https://reviews.apache.org/r/37046 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b8bec202 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b8bec202 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b8bec202 Branch: refs/heads/master Commit: b8bec2027e96d08f3b8c1d3f2313b39bd20aa132 Parents: 0d08b22 Author: Vinod Kone <[email protected]> Authored: Fri Jul 31 15:03:10 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Tue Aug 4 10:53:53 2015 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 293 +++++++++++++++------------------------------ src/master/master.hpp | 18 +-- 2 files changed, 103 insertions(+), 208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b8bec202/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 351a3c2..87e11d5 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1694,183 +1694,77 @@ void Master::registerFramework( { ++metrics->messages_register_framework; - if (authenticating.contains(from)) { - // TODO(vinod): Consider dropping this request and fix the tests - // to deal with the drop. Currently there is a race between master - // realizing the framework is authenticated and framework sending - // a registration request. Dropping this message will cause the - // framework to retry slowing down the tests. - LOG(INFO) << "Queuing up registration request for" - << " framework '" << frameworkInfo.name() << "' at " << from - << " because authentication is still in progress"; - - authenticating[from] - .onReady(defer(self(), &Self::registerFramework, from, frameworkInfo)); - return; - } - - Option<Error> validationError = None(); - - // TODO(vinod): Add "!=" operator for FrameworkID. - if (frameworkInfo.has_id() && !(frameworkInfo.id() == "")) { - validationError = Error("Registering with 'id' already set"); - } - - // TODO(vinod): Deprecate this in favor of ACLs. - if (validationError.isNone() && !roles.contains(frameworkInfo.role())) { - validationError = Error("Role '" + frameworkInfo.role() + "' is not" + - " present in the master's --roles"); - } - - // TODO(vinod): Deprecate this in favor of authorization. - if (validationError.isNone()) { - if (frameworkInfo.user() == "root" && !flags.root_submissions) { - validationError = Error("User 'root' is not allowed to run frameworks" - " without --root_submissions set"); - } - } + if (frameworkInfo.has_id() && !frameworkInfo.id().value().empty()) { + const string error = "Registering with 'id' already set"; - // Note that re-authentication errors are already handled above. - if (validationError.isNone()) { - validationError = validateFrameworkAuthentication(frameworkInfo, from); - } - - if (validationError.isSome()) { - LOG(INFO) << "Refusing registration of framework" - << " '" << frameworkInfo.name() << "' at " << from << ": " - << validationError.get().message; + LOG(INFO) << "Refusing registration request of framework" + << " '" << frameworkInfo.name() << "' at " << from + << ": " << error; FrameworkErrorMessage message; - message.set_message(validationError.get().message); + message.set_message(error); send(from, message); return; } - LOG(INFO) << "Received registration request for" - << " framework '" << frameworkInfo.name() << "' at " << from; + scheduler::Call::Subscribe call; + call.mutable_framework_info()->CopyFrom(frameworkInfo); - // We allow an authenticated framework to not specify a principal - // in FrameworkInfo but we'd prefer if it did so we log a WARNING - // here when it happens. - if (!frameworkInfo.has_principal() && authenticated.contains(from)) { - LOG(WARNING) << "Framework at " << from - << " (authenticated as '" << authenticated[from] << "')" - << " does not set 'principal' in FrameworkInfo"; - } - - authorizeFramework(frameworkInfo) - .onAny(defer(self(), - &Master::_registerFramework, - from, - frameworkInfo, - lambda::_1)); + subscribe(from, call); } -void Master::_registerFramework( +void Master::reregisterFramework( const UPID& from, const FrameworkInfo& frameworkInfo, - const Future<bool>& authorized) + bool failover) { - CHECK(!authorized.isDiscarded()); + ++metrics->messages_reregister_framework; - Option<Error> authorizationError = None(); + if (!frameworkInfo.has_id() || frameworkInfo.id().value().empty()) { + const string error = "Re-registering without an 'id'"; - if (authorized.isFailed()) { - authorizationError = - Error("Authorization failure: " + authorized.failure()); - } else if (!authorized.get()) { - authorizationError = - Error("Not authorized to use role '" + frameworkInfo.role() + "'"); - } - - if (authorizationError.isSome()) { - LOG(INFO) << "Refusing registration of framework" + LOG(INFO) << "Refusing re-registration request of framework" << " '" << frameworkInfo.name() << "' at " << from - << ": " << authorizationError.get().message; + << ": " << error; FrameworkErrorMessage message; - message.set_message(authorizationError.get().message); + message.set_message(error); send(from, message); return; } - // At this point, authentications errors will be due to - // re-authentication during the authorization process, - // so we drop the registration. - Option<Error> authenticationError = - validateFrameworkAuthentication(frameworkInfo, from); - - if (authenticationError.isSome()) { - LOG(INFO) << "Dropping registration request for framework" - << " '" << frameworkInfo.name() << "' at " << from - << ": " << authenticationError.get().message; - return; - } - - // Check if this framework is already registered (because it retries). - foreachvalue (Framework* framework, frameworks.registered) { - if (framework->pid == from) { - LOG(INFO) << "Framework " << *framework - << " already registered, resending acknowledgement"; - FrameworkRegisteredMessage message; - message.mutable_framework_id()->MergeFrom(framework->id()); - message.mutable_master_info()->MergeFrom(info_); - framework->send(message); - return; - } - } - - // Assign a new FrameworkID. - FrameworkInfo frameworkInfo_ = frameworkInfo; - frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId()); - - Framework* framework = new Framework(this, frameworkInfo_, from); - - LOG(INFO) << "Registering framework " << *framework - << " with checkpointing " - << (framework->info.checkpoint() ? "enabled" : "disabled") - << " and capabilities " << framework->info.capabilities(); + scheduler::Call::Subscribe call; + call.mutable_framework_info()->CopyFrom(frameworkInfo); + call.set_force(failover); - addFramework(framework); - - FrameworkRegisteredMessage message; - message.mutable_framework_id()->MergeFrom(framework->id()); - message.mutable_master_info()->MergeFrom(info_); - framework->send(message); + subscribe(from, call); } -void Master::reregisterFramework( +void Master::subscribe( const UPID& from, - const FrameworkInfo& frameworkInfo, - bool failover) + const scheduler::Call::Subscribe& subscribe) { - ++metrics->messages_reregister_framework; + const FrameworkInfo& frameworkInfo = subscribe.framework_info(); if (authenticating.contains(from)) { - LOG(INFO) << "Queuing up re-registration request for framework " - << frameworkInfo.id() << " (" << frameworkInfo.name() << ") at " - << from << " because authentication is still in progress"; - // TODO(vinod): Consider dropping this request and fix the tests - // to deal with the drop. See 'Master::registerFramework()' for - // more details. + // to deal with the drop. Currently there is a race between master + // realizing the framework is authenticated and framework sending + // a subscribe call. Dropping this message will cause the + // framework to retry slowing down the tests. + LOG(INFO) << "Queuing up SUBSCRIBE call for" + << " framework '" << frameworkInfo.name() << "' at " << from + << " because authentication is still in progress"; + authenticating[from] - .onReady(defer(self(), - &Self::reregisterFramework, - from, - frameworkInfo, - failover)); + .onReady(defer(self(), &Self::subscribe, from, subscribe)); return; } Option<Error> validationError = None(); - if (!frameworkInfo.has_id() || frameworkInfo.id() == "") { - validationError = Error("Re-registering without 'id' set"); - } - // TODO(vinod): Deprecate this in favor of ACLs. if (validationError.isNone() && !roles.contains(frameworkInfo.role())) { validationError = Error("Role '" + frameworkInfo.role() + "' is not" + @@ -1878,19 +1772,19 @@ void Master::reregisterFramework( } // TODO(vinod): Deprecate this in favor of authorization. - if (validationError.isNone()) { - if (frameworkInfo.user() == "root" && !flags.root_submissions) { - validationError = Error("User 'root' is not allowed to run frameworks" - " without --root_submissions set"); - } + if (validationError.isNone() && + frameworkInfo.user() == "root" && !flags.root_submissions) { + validationError = Error("User 'root' is not allowed to run frameworks" + " without --root_submissions set"); } - if (validationError.isNone()) { + if (validationError.isNone() && frameworkInfo.has_id()) { foreach (const shared_ptr<Framework>& framework, frameworks.completed) { if (framework->id() == frameworkInfo.id()) { - // This could happen if a framework tries to re-register after + // This could happen if a framework tries to subscribe after // its failover timeout has elapsed or it unregistered itself // by calling 'stop()' on the scheduler driver. + // // TODO(vinod): Master should persist admitted frameworks to the // registry and remove them from it after failover timeout. validationError = Error("Framework has been removed"); @@ -1905,7 +1799,7 @@ void Master::reregisterFramework( } if (validationError.isSome()) { - LOG(INFO) << "Refusing re-registration of framework" + LOG(INFO) << "Refusing subscription of framework" << " '" << frameworkInfo.name() << "' at " << from << ": " << validationError.get().message; @@ -1915,9 +1809,8 @@ void Master::reregisterFramework( return; } - LOG(INFO) << "Received re-registration request from framework " - << frameworkInfo.id() << " (" << frameworkInfo.name() - << ") at " << from; + LOG(INFO) << "Received SUBSCRIBE call for" + << " framework '" << frameworkInfo.name() << "' at " << from; // We allow an authenticated framework to not specify a principal // in FrameworkInfo but we'd prefer if it did so we log a WARNING @@ -1930,20 +1823,20 @@ void Master::reregisterFramework( authorizeFramework(frameworkInfo) .onAny(defer(self(), - &Master::_reregisterFramework, + &Master::_subscribe, from, - frameworkInfo, - failover, + subscribe, lambda::_1)); } -void Master::_reregisterFramework( +void Master::_subscribe( const UPID& from, - const FrameworkInfo& frameworkInfo, - bool failover, + const scheduler::Call::Subscribe& subscribe, const Future<bool>& authorized) { + const FrameworkInfo& frameworkInfo = subscribe.framework_info(); + CHECK(!authorized.isDiscarded()); Option<Error> authorizationError = None(); @@ -1957,7 +1850,7 @@ void Master::_reregisterFramework( } if (authorizationError.isSome()) { - LOG(INFO) << "Refusing re-registration of framework" + LOG(INFO) << "Refusing subscription of framework" << " '" << frameworkInfo.name() << "' at " << from << ": " << authorizationError.get().message; @@ -1969,28 +1862,59 @@ void Master::_reregisterFramework( // At this point, authentications errors will be due to // re-authentication during the authorization process, - // so we drop the re-registration. It is important to - // drop this because if this request is from a failing - // over framework (pid = from) we don't want to failover - // the already registered framework (pid = framework->pid). + // so we drop the subscription. Option<Error> authenticationError = validateFrameworkAuthentication(frameworkInfo, from); if (authenticationError.isSome()) { - LOG(INFO) << "Dropping re-registration request for framework" + LOG(INFO) << "Dropping SUBSCRIBE call for framework" << " '" << frameworkInfo.name() << "' at " << from << ": " << authenticationError.get().message; return; } - LOG(INFO) << "Re-registering framework " << frameworkInfo.id() - << " (" << frameworkInfo.name() << ") " << " at " << from + LOG(INFO) << "Subscribing framework " << frameworkInfo.name() << " with checkpointing " << (frameworkInfo.checkpoint() ? "enabled" : "disabled") << " and capabilities " << frameworkInfo.capabilities(); + if (!frameworkInfo.has_id() || frameworkInfo.id().value().empty()) { + // If we are here the framework is subscribing for the first time. + + // Check if this framework is already subscribed (because it retries). + foreachvalue (Framework* framework, frameworks.registered) { + if (framework->pid == from) { + LOG(INFO) << "Framework " << *framework + << " already subscribed, resending acknowledgement"; + FrameworkRegisteredMessage message; + message.mutable_framework_id()->MergeFrom(framework->id()); + message.mutable_master_info()->MergeFrom(info_); + framework->send(message); + return; + } + } + + // Assign a new FrameworkID. + FrameworkInfo frameworkInfo_ = frameworkInfo; + frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId()); + + Framework* framework = new Framework(this, frameworkInfo_, from); + + addFramework(framework); + + FrameworkRegisteredMessage message; + message.mutable_framework_id()->MergeFrom(framework->id()); + message.mutable_master_info()->MergeFrom(info_); + framework->send(message); + + return; + } + + // If we are here framework has already been assigned an id. + CHECK(!frameworkInfo.id().value().empty()); + if (frameworks.registered.count(frameworkInfo.id()) > 0) { - // Using the "failover" of the scheduler allows us to keep a + // Using the "force" field of the scheduler allows us to keep a // scheduler that got partitioned but didn't die (in ZooKeeper // speak this means didn't lose their session) and then // eventually tried to connect to this master even though @@ -2004,7 +1928,7 @@ void Master::_reregisterFramework( CHECK_NOTNULL(frameworks.registered[frameworkInfo.id()]); // Update the framework's info fields based on those passed during - // re-registration. + // subscription. LOG(INFO) << "Updating info for framework " << framework->id(); framework->updateFrameworkInfo(frameworkInfo); @@ -2012,21 +1936,20 @@ void Master::_reregisterFramework( framework->reregisteredTime = Clock::now(); - if (failover) { - // We do not attempt to detect a duplicate re-registration - // message here because it is impossible to distinguish between - // a duplicate message, and a scheduler failover to the same - // pid, given the existing libprocess primitives (PID does not - // identify the libprocess Process instance). - + if (subscribe.force()) { // TODO(benh): Should we check whether the new scheduler has // given us a different framework name, user name or executor // info? + // TODO(vinod): Now that the scheduler pid is unique we don't + // need to call 'failoverFramework()' if the pid hasn't changed + // (i.e., duplicate message). Instead we can just send the + // FrameworkReregisteredMessage back and activate the framework + // if necesssary. LOG(INFO) << "Framework " << *framework << " failed over"; failoverFramework(framework, from); } else if (framework->pid != from) { LOG(ERROR) - << "Disallowing re-registration attempt of framework " << *framework + << "Disallowing subscription attempt of framework " << *framework << " because it is not expected from " << from; FrameworkErrorMessage message; message.set_message("Framework failed over"); @@ -2034,7 +1957,7 @@ void Master::_reregisterFramework( return; } else { LOG(INFO) << "Allowing framework " << *framework - << " to re-register with an already used id"; + << " to subscribe with an already used id"; // Remove any offers sent to this framework. // NOTE: We need to do this because the scheduler might have @@ -2072,8 +1995,6 @@ void Master::_reregisterFramework( // any tasks it has that have been reported by reconnecting slaves. Framework* framework = new Framework(this, frameworkInfo, from); - // TODO(benh): Check for root submissions like above! - // Add active tasks and executors to the framework. foreachvalue (Slave* slave, slaves.registered) { foreachvalue (Task* task, slave->tasks[framework->id()]) { @@ -2114,24 +2035,6 @@ void Master::_reregisterFramework( message.set_pid(from); send(slave->pid, message); } - - return; -} - - -void Master::subscribe( - const UPID& from, - const scheduler::Call::Subscribe& subscribe) -{ - const FrameworkInfo& frameworkInfo = subscribe.framework_info(); - - // TODO(vinod): Instead of calling '(re-)registerFramework()' from - // here refactor those methods to call 'subscribe()'. - if (!frameworkInfo.has_id() || frameworkInfo.id() == "") { - registerFramework(from, frameworkInfo); - } else { - reregisterFramework(from, frameworkInfo, subscribe.force()); - } } http://git-wip-us.apache.org/repos/asf/mesos/blob/b8bec202/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index ea18c4e..cd0a5c8 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -571,19 +571,6 @@ protected: const std::vector<ExecutorInfo>& executors, const std::vector<Task>& tasks); - // 'registerFramework()' continuation. - void _registerFramework( - const process::UPID& from, - const FrameworkInfo& frameworkInfo, - const process::Future<bool>& authorized); - - // 'reregisterFramework()' continuation. - void _reregisterFramework( - const process::UPID& from, - const FrameworkInfo& frameworkInfo, - bool failover, - const process::Future<bool>& authorized); - // Add a framework. void addFramework(Framework* framework); @@ -714,6 +701,11 @@ private: const process::UPID& from, const scheduler::Call::Subscribe& subscribe); + void _subscribe( + const process::UPID& from, + const scheduler::Call::Subscribe& subscribe, + const process::Future<bool>& authorized); + void accept( Framework* framework, const scheduler::Call::Accept& accept);
