Updated the master to handle frameworks that changes its roles. Review: https://reviews.apache.org/r/57110
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/68bb8d1f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/68bb8d1f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/68bb8d1f Branch: refs/heads/master Commit: 68bb8d1f0eb13e910906fde2879900ac433b3500 Parents: 98c7722 Author: Michael Park <[email protected]> Authored: Sun Mar 5 19:01:29 2017 -0800 Committer: Michael Park <[email protected]> Committed: Mon Mar 6 16:06:15 2017 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 168 +++++++++++++++--------------- src/master/master.hpp | 252 ++++++++++++++++++++++++++++++--------------- 2 files changed, 256 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/68bb8d1f/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index d6d954e..dd1e4cd 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -382,6 +382,56 @@ struct BoundedRateLimiter }; +bool Framework::isTrackedUnderRole(const std::string& role) const +{ + CHECK(master->isWhitelistedRole(role)) + << "Unknown role '" << role << "'" << " of framework " << *this; + + return master->roles.contains(role) && + master->roles.at(role)->frameworks.contains(id()); +} + +void Framework::trackUnderRole(const std::string& role) +{ + CHECK(master->isWhitelistedRole(role)) + << "Unknown role '" << role << "'" << " of framework " << *this; + + CHECK(!isTrackedUnderRole(role)); + + CHECK(roles.count(role) > 0); + + if (!master->roles.contains(role)) { + master->roles[role] = new Role(role); + } + master->roles.at(role)->addFramework(this); +} + +void Framework::untrackUnderRole(const std::string& role) +{ + CHECK(master->isWhitelistedRole(role)) + << "Unknown role '" << role << "'" << " of framework " << *this; + + CHECK(isTrackedUnderRole(role)); + + // NOTE: Ideally we would also `CHECK` that we're not currently subscribed + // to the role. We don't do this currently because this function is used in + // `Master::removeFramework` where we're still subscribed to `roles`. + + auto allocatedToRole = [&role](const Resource& resource) { + return resource.allocation_info().role() == role; + }; + + CHECK(totalUsedResources.filter(allocatedToRole).empty()); + CHECK(totalOfferedResources.filter(allocatedToRole).empty()); + + master->roles.at(role)->removeFramework(this); + if (master->roles.at(role)->frameworks.empty()) { + delete master->roles.at(role); + master->roles.erase(role); + } +} + + void Master::initialize() { LOG(INFO) << "Master " << info_.id() << " (" << info_.hostname() << ")" @@ -2641,24 +2691,8 @@ void Master::_subscribe( if (!framework->recovered()) { // The framework has previously been registered with this master; // it may or may not currently be connected. - LOG(INFO) << "Updating info for framework " << framework->id(); - - Try<Nothing> updateFrameworkInfo = - framework->updateFrameworkInfo(frameworkInfo); - - if (updateFrameworkInfo.isError()) { - LOG(INFO) << "Could not update FrameworkInfo of framework '" - << frameworkInfo.name() << "': " << updateFrameworkInfo.error(); - - FrameworkErrorMessage message; - message.set_message(updateFrameworkInfo.error()); - http.send(message); - http.close(); - return; - } - - allocator->updateFramework(framework->id(), framework->info); + updateFramework(framework, frameworkInfo); framework->reregisteredTime = Clock::now(); // Always failover the old framework connection. See MESOS-4712 for details. @@ -2951,22 +2985,7 @@ void Master::_subscribe( // It is now safe to update the framework fields since the request is now // guaranteed to be successful. We use the fields passed in during // re-registration. - LOG(INFO) << "Updating info for framework " << framework->id(); - - Try<Nothing> updateFrameworkInfo = - framework->updateFrameworkInfo(frameworkInfo); - - if (updateFrameworkInfo.isError()) { - LOG(INFO) << "Could not update frameworkInfo of framework '" << *framework - << "': " << updateFrameworkInfo.error(); - - FrameworkErrorMessage message; - message.set_message(updateFrameworkInfo.error()); - send(from, message); - return; - } - - allocator->updateFramework(framework->id(), framework->info); + updateFramework(framework, frameworkInfo); framework->reregisteredTime = Clock::now(); @@ -6036,6 +6055,36 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId) } +void Master::updateFramework( + Framework* framework, + const FrameworkInfo& frameworkInfo) +{ + LOG(INFO) << "Updating info for framework " << framework->id(); + + // NOTE: The allocator takes care of activating/deactivating + // the frameworks from the added/removed roles, respectively. + allocator->updateFramework(framework->id(), frameworkInfo); + + // First, remove the offers allocated to roles being removed. + foreach (Offer* offer, utils::copy(framework->offers)) { + set<string> newRoles = protobuf::framework::getRoles(frameworkInfo); + if (newRoles.count(offer->allocation_info().role()) > 0) { + continue; + } + + allocator->recoverResources( + offer->framework_id(), + offer->slave_id(), + offer->resources(), + None()); + + removeOffer(offer, true); // Rescind! + } + + framework->update(frameworkInfo); +} + + void Master::updateSlave( const SlaveID& slaveId, const Resources& oversubscribedResources) @@ -7458,25 +7507,6 @@ void Master::addFramework(Framework* framework) } } - auto addFrameworkRole = [this](Framework* framework, const string& role) { - CHECK(isWhitelistedRole(role)) - << "Unknown role '" << role << "'" - << " of framework " << *framework; - - if (!roles.contains(role)) { - roles[role] = new Role(role); - } - roles.at(role)->addFramework(framework); - }; - - if (framework->capabilities.multiRole) { - foreach (const string& role, framework->info.roles()) { - addFrameworkRole(framework, role); - } - } else { - addFrameworkRole(framework, framework->info.role()); - } - // There should be no offered resources yet! CHECK_EQ(Resources(), framework->totalOfferedResources); @@ -7552,17 +7582,7 @@ Try<Nothing> Master::activateRecoveredFramework( CHECK(framework->pid.isNone()); CHECK(framework->http.isNone()); - // The `FrameworkInfo` might have changed. - LOG(INFO) << "Updating info for framework " << framework->id(); - - Try<Nothing> updateFrameworkInfo = - framework->updateFrameworkInfo(frameworkInfo); - - if (updateFrameworkInfo.isError()) { - return updateFrameworkInfo; - } - - allocator->updateFramework(framework->id(), framework->info); + updateFramework(framework, frameworkInfo); // Updating `registeredTime` here is debatable: ideally, // `registeredTime` would be the time at which the framework first @@ -7897,26 +7917,8 @@ void Master::removeFramework(Framework* framework) framework->unregisteredTime = Clock::now(); - auto removeFrameworkRole = [this](Framework* framework, const string& role) { - CHECK(isWhitelistedRole(role)) - << "Unknown role '" << role << "'" - << " of framework " << *framework; - - CHECK(roles.contains(role)); - - roles[role]->removeFramework(framework); - if (roles[role]->frameworks.empty()) { - delete roles[role]; - roles.erase(role); - } - }; - - if (framework->capabilities.multiRole) { - foreach (const string& role, framework->info.roles()) { - removeFrameworkRole(framework, role); - } - } else { - removeFrameworkRole(framework, framework->info.role()); + foreach (const string& role, framework->roles) { + framework->untrackUnderRole(role); } // TODO(anand): This only works for pid based frameworks. We would http://git-wip-us.apache.org/repos/asf/mesos/blob/68bb8d1f/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 1738eeb..d92c8ad 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -602,6 +602,10 @@ protected: // executors and recover the resources. void removeFramework(Slave* slave, Framework* framework); + void updateFramework( + Framework* framework, + const FrameworkInfo& frameworkInfo); + void disconnect(Framework* framework); void deactivate(Framework* framework, bool rescind); @@ -1820,7 +1824,10 @@ private: hashmap<OfferID, InverseOffer*> inverseOffers; hashmap<OfferID, process::Timer> inverseOfferTimers; - // Roles with > 0 frameworks currently registered. + // We track information about roles that we're aware of in the system. + // Specifically, we keep track of the roles when a framework subscribes to + // the role, and/or when there are resources allocated to the role + // (e.g. some tasks and/or executors are consuming resources under the role). hashmap<std::string, Role*> roles; // Configured role whitelist if using the (deprecated) "explicit @@ -2210,48 +2217,30 @@ struct Framework ACTIVE }; - Framework(Master* const _master, + Framework(Master* const master, const Flags& masterFlags, - const FrameworkInfo& _info, + const FrameworkInfo& info, const process::UPID& _pid, const process::Time& time = process::Clock::now()) - : master(_master), - info(_info), - roles(protobuf::framework::getRoles(_info)), - capabilities(_info.capabilities()), - pid(_pid), - state(ACTIVE), - registeredTime(time), - reregisteredTime(time), - completedTasks(masterFlags.max_completed_tasks_per_framework), - unreachableTasks(masterFlags.max_unreachable_tasks_per_framework) {} + : Framework(master, masterFlags, info, ACTIVE, time) + { + pid = _pid; + } - Framework(Master* const _master, + Framework(Master* const master, const Flags& masterFlags, - const FrameworkInfo& _info, + const FrameworkInfo& info, const HttpConnection& _http, const process::Time& time = process::Clock::now()) - : master(_master), - info(_info), - roles(protobuf::framework::getRoles(_info)), - capabilities(_info.capabilities()), - http(_http), - state(ACTIVE), - registeredTime(time), - reregisteredTime(time), - completedTasks(masterFlags.max_completed_tasks_per_framework), - unreachableTasks(masterFlags.max_unreachable_tasks_per_framework) {} + : Framework(master, masterFlags, info, ACTIVE, time) + { + http = _http; + } - Framework(Master* const _master, + Framework(Master* const master, const Flags& masterFlags, - const FrameworkInfo& _info) - : master(_master), - info(_info), - roles(protobuf::framework::getRoles(_info)), - capabilities(_info.capabilities()), - state(RECOVERED), - completedTasks(masterFlags.max_completed_tasks_per_framework), - unreachableTasks(masterFlags.max_unreachable_tasks_per_framework) {} + const FrameworkInfo& info) + : Framework(master, masterFlags, info, RECOVERED, process::Time()) {} ~Framework() { @@ -2286,6 +2275,18 @@ struct Framework if (!Master::isRemovable(task->state())) { totalUsedResources += task->resources(); usedResources[task->slave_id()] += task->resources(); + + // It's possible that we're not tracking the task's role for + // this framework if the role is absent from the framework's + // set of roles. In this case, we track the role's allocation + // for this framework. + CHECK(!task->resources().empty()); + const std::string& role = + task->resources().begin()->allocation_info().role(); + + if (!isTrackedUnderRole(role)) { + trackUnderRole(role); + } } } @@ -2297,7 +2298,6 @@ struct Framework // functionally for all tasks is expensive, for now. void recoverResources(Task* task) { - CHECK(Master::isRemovable(task->state())); CHECK(tasks.contains(task->task_id())) << "Unknown task " << task->task_id() << " of framework " << task->framework_id(); @@ -2307,6 +2307,23 @@ struct Framework if (usedResources[task->slave_id()].empty()) { usedResources.erase(task->slave_id()); } + + // If we are no longer subscribed to the role to which these resources are + // being returned to, and we have no more resources allocated to us for that + // role, stop tracking the framework under the role. + CHECK(!task->resources().empty()); + const std::string& role = + task->resources().begin()->allocation_info().role(); + + auto allocatedToRole = [&role](const Resource& resource) { + return resource.allocation_info().role() == role; + }; + + if (roles.count(role) == 0 && + totalUsedResources.filter(allocatedToRole).empty()) { + CHECK(totalOfferedResources.filter(allocatedToRole).empty()); + untrackUnderRole(role); + } } // Sends a message to the connected framework. @@ -2355,11 +2372,7 @@ struct Framework << " of framework " << task->framework_id(); if (!Master::isRemovable(task->state())) { - totalUsedResources -= task->resources(); - usedResources[task->slave_id()] -= task->resources(); - if (usedResources[task->slave_id()].empty()) { - usedResources.erase(task->slave_id()); - } + recoverResources(task); } if (task->state() == TASK_UNREACHABLE) { @@ -2431,6 +2444,19 @@ struct Framework executors[slaveId][executorInfo.executor_id()] = executorInfo; totalUsedResources += executorInfo.resources(); usedResources[slaveId] += executorInfo.resources(); + + // It's possible that we're not tracking the task's role for + // this framework if the role is absent from the framework's + // set of roles. In this case, we track the role's allocation + // for this framework. + if (!executorInfo.resources().empty()) { + const std::string& role = + executorInfo.resources().begin()->allocation_info().role(); + + if (!isTrackedUnderRole(role)) { + trackUnderRole(role); + } + } } void removeExecutor(const SlaveID& slaveId, @@ -2441,12 +2467,32 @@ struct Framework << "' of framework " << id() << " of agent " << slaveId; - totalUsedResources -= executors[slaveId][executorId].resources(); - usedResources[slaveId] -= executors[slaveId][executorId].resources(); + const ExecutorInfo& executorInfo = executors[slaveId][executorId]; + + totalUsedResources -= executorInfo.resources(); + usedResources[slaveId] -= executorInfo.resources(); if (usedResources[slaveId].empty()) { usedResources.erase(slaveId); } + // If we are no longer subscribed to the role to which these resources are + // being returned to, and we have no more resources allocated to us for that + // role, stop tracking the framework under the role. + if (!executorInfo.resources().empty()) { + const std::string& role = + executorInfo.resources().begin()->allocation_info().role(); + + auto allocatedToRole = [&role](const Resource& resource) { + return resource.allocation_info().role() == role; + }; + + if (roles.count(role) == 0 && + totalUsedResources.filter(allocatedToRole).empty()) { + CHECK(totalOfferedResources.filter(allocatedToRole).empty()); + untrackUnderRole(role); + } + } + executors[slaveId].erase(executorId); if (executors[slaveId].empty()) { executors.erase(slaveId); @@ -2456,55 +2502,32 @@ struct Framework const FrameworkID id() const { return info.id(); } // Update fields in 'info' using those in 'newInfo'. Currently this - // only updates 'name', 'failover_timeout', 'hostname', 'webui_url', - // 'capabilities', and 'labels'. - Try<Nothing> updateFrameworkInfo(const FrameworkInfo& newInfo) + // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname', + // 'webui_url', 'capabilities', and 'labels'. + void update(const FrameworkInfo& newInfo) { // We only merge 'info' from the same framework 'id'. CHECK_EQ(info.id(), newInfo.id()); + // Save the old list of roles for later. + std::set<std::string> oldRoles = roles; + // TODO(jmlvanre): Merge other fields as per design doc in // MESOS-703. - // We currently do not allow frameworks to add or remove roles. We - // do however allow frameworks to opt in and out of `MULTI_ROLE` - // capability, given that the `role` and `roles` field contain the - // same number of roles. - if (capabilities.multiRole || protobuf::frameworkHasCapability( - newInfo, FrameworkInfo::Capability::MULTI_ROLE)) { - // Two `roles` sets are equivalent if they contain the same - // elements. A `role` `*` is not equivalent to an empty `roles` - // set, but to the set `{*}`. Since we might be dealing with a - // framework upgrading to `MULTI_ROLE` capability or dropping - // it, we need to examine either `role` or `roles` in order to - // determine the roles a framework is subscribed to. - const std::set<std::string> newRoles = - protobuf::framework::getRoles(newInfo); - - if (roles != newRoles) { - return Error( - "Frameworks cannot change their roles: expected '" + - stringify(roles) + "', but got '" + stringify(newRoles) + "'"); - } - - info.clear_role(); - info.clear_roles(); + info.clear_role(); + info.clear_roles(); - if (newInfo.has_role()) { - info.set_role(newInfo.role()); - } + if (newInfo.has_role()) { + info.set_role(newInfo.role()); + } - if (newInfo.roles_size() > 0) { - info.mutable_roles()->CopyFrom(newInfo.roles()); - } - } else { - if (newInfo.role() != info.role()) { - LOG(WARNING) << "Cannot update FrameworkInfo.role to '" - << newInfo.role() << "' for framework " << id() - << ". Check MESOS-703"; - } + if (newInfo.roles_size() > 0) { + info.mutable_roles()->CopyFrom(newInfo.roles()); } + roles = protobuf::framework::getRoles(newInfo); + if (newInfo.user() != info.user()) { LOG(WARNING) << "Cannot update FrameworkInfo.user to '" << newInfo.user() << "' for framework " << id() << ". Check MESOS-703"; @@ -2555,7 +2578,45 @@ struct Framework info.clear_labels(); } - return Nothing(); + const std::set<std::string>& newRoles = roles; + + const std::set<std::string> removedRoles = [&]() { + std::set<std::string> result = oldRoles; + foreach (const std::string& role, newRoles) { + result.erase(role); + } + return result; + }(); + + foreach (const std::string& role, removedRoles) { + auto allocatedToRole = [&role](const Resource& resource) { + return resource.allocation_info().role() == role; + }; + + // Stop tracking the framework under this role if there are + // no longer any resources allocated to it. + if (totalUsedResources.filter(allocatedToRole).empty()) { + CHECK(totalOfferedResources.filter(allocatedToRole).empty()); + untrackUnderRole(role); + } + } + + const std::set<std::string> addedRoles = [&]() { + std::set<std::string> result = newRoles; + foreach (const std::string& role, oldRoles) { + result.erase(role); + } + return result; + }(); + + foreach (const std::string& role, addedRoles) { + // NOTE: It's possible that we're already tracking this framework + // under the role because a framework can unsubscribe from a role + // while it still has resources allocated to the role. + if (!isTrackedUnderRole(role)) { + trackUnderRole(role); + } + } } void updateConnection(const process::UPID& newPid) @@ -2628,6 +2689,10 @@ struct Framework bool connected() const { return state == ACTIVE || state == INACTIVE; } bool recovered() const { return state == RECOVERED; } + bool isTrackedUnderRole(const std::string& role) const; + void trackUnderRole(const std::string& role); + void untrackUnderRole(const std::string& role); + Master* const master; FrameworkInfo info; @@ -2719,6 +2784,31 @@ struct Framework Option<process::Owned<Heartbeater>> heartbeater; private: + Framework(Master* const _master, + const Flags& masterFlags, + const FrameworkInfo& _info, + State state, + const process::Time& time) + : master(_master), + info(_info), + roles(protobuf::framework::getRoles(_info)), + capabilities(_info.capabilities()), + state(state), + registeredTime(time), + reregisteredTime(time), + completedTasks(masterFlags.max_completed_tasks_per_framework), + unreachableTasks(masterFlags.max_unreachable_tasks_per_framework) + { + foreach (const std::string& role, roles) { + // NOTE: It's possible that we're already being tracked under the role + // because a framework can unsubscribe from a role while it still has + // resources allocated to the role. + if (!isTrackedUnderRole(role)) { + trackUnderRole(role); + } + } + } + Framework(const Framework&); // No copying. Framework& operator=(const Framework&); // No assigning. };
