Updated the allocator to handle frameworks that change its roles. Review: https://reviews.apache.org/r/57111
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2a7b912e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2a7b912e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2a7b912e Branch: refs/heads/master Commit: 2a7b912e0b1c5b52862660209c34fec65c536c46 Parents: 68bb8d1 Author: Michael Park <[email protected]> Authored: Sat Mar 4 10:53:28 2017 -0800 Committer: Michael Park <[email protected]> Committed: Mon Mar 6 16:06:20 2017 -0800 ---------------------------------------------------------------------- src/master/allocator/mesos/hierarchical.cpp | 184 +++++++++++++++++------ src/master/allocator/mesos/hierarchical.hpp | 21 ++- 2 files changed, 154 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/2a7b912e/src/master/allocator/mesos/hierarchical.cpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp index dcafc79..0059cce 100644 --- a/src/master/allocator/mesos/hierarchical.cpp +++ b/src/master/allocator/mesos/hierarchical.cpp @@ -242,20 +242,7 @@ void HierarchicalAllocatorProcess::addFramework( const Framework& framework = frameworks.at(frameworkId); foreach (const string& role, framework.roles) { - // If this is the first framework to register as this role, - // initialize state as necessary. - if (!activeRoles.contains(role)) { - activeRoles[role] = 1; - roleSorter->add(role, roleWeight(role)); - frameworkSorters.insert({role, Owned<Sorter>(frameworkSorterFactory())}); - frameworkSorters.at(role)->initialize(fairnessExcludeResourceNames); - metrics.addRole(role); - } else { - activeRoles[role]++; - } - - CHECK(!frameworkSorters.at(role)->contains(frameworkId.value())); - frameworkSorters.at(role)->add(frameworkId.value()); + trackFrameworkUnderRole(frameworkId, role); } // TODO(bmahler): Validate that the reserved resources have the @@ -326,31 +313,7 @@ void HierarchicalAllocatorProcess::removeFramework( } } - frameworkSorters.at(role)->remove(frameworkId.value()); - } - - foreach (const string& role, framework.roles) { - CHECK(activeRoles.contains(role)); - - // If this is the last framework that was registered for this role, - // cleanup associated state. This is not necessary for correctness - // (roles with no registered frameworks will not be offered any - // resources), but since many different role names might be used - // over time, we want to avoid leaking resources for no-longer-used - // role names. Note that we don't remove the role from - // `quotaRoleSorter` if it exists there, since roles with a quota - // set still influence allocation even if they don't have any - // registered frameworks. - activeRoles[role]--; - if (activeRoles[role] == 0) { - activeRoles.erase(role); - roleSorter->remove(role); - - CHECK(frameworkSorters.contains(role)); - frameworkSorters.erase(role); - - metrics.removeRole(role); - } + untrackFrameworkUnderRole(frameworkId, role); } // Do not delete the filters contained in this @@ -424,14 +387,55 @@ void HierarchicalAllocatorProcess::updateFramework( CHECK(frameworks.contains(frameworkId)); Framework& framework = frameworks.at(frameworkId); + + set<string> oldRoles = framework.roles; set<string> newRoles = protobuf::framework::getRoles(frameworkInfo); - // TODO(bmahler): Allow frameworks to update their roles, see MESOS-6627. - CHECK(framework.roles == newRoles) - << "Expected: " << stringify(framework.roles) - << " vs Actual: " << stringify(newRoles); + const set<string> removedRoles = [&]() { + set<string> result = oldRoles; + foreach (const string& role, newRoles) { + result.erase(role); + } + return result; + }(); + + foreach (const string& role, removedRoles) { + CHECK(frameworkSorters.contains(role)); + frameworkSorters.at(role)->deactivate(frameworkId.value()); + + // Stop tracking the framework under this role if there are + // no longer any resources allocated to it. + if (frameworkSorters.at(role)->allocation(frameworkId.value()).empty()) { + untrackFrameworkUnderRole(frameworkId, role); + } + + if (framework.offerFilters.contains(role)) { + framework.offerFilters.erase(role); + } + } + + const set<string> addedRoles = [&]() { + set<string> result = newRoles; + foreach (const string& role, oldRoles) { + result.erase(role); + } + return result; + }(); + + foreach (const string& role, addedRoles) { + // NOTE: It's possible that we're already tracking this framework + // under the role because a framework can unsubscribe from a role + // while it still has resources allocated to the role. + if (!isFrameworkTrackedUnderRole(frameworkId, role)) { + trackFrameworkUnderRole(frameworkId, role); + } + + CHECK(frameworkSorters.contains(role)); + frameworkSorters.at(role)->activate(frameworkId.value()); + } - framework.capabilities = Capabilities(frameworkInfo.capabilities()); + framework.roles = newRoles; + framework.capabilities = frameworkInfo.capabilities(); } @@ -463,6 +467,13 @@ void HierarchicalAllocatorProcess::addSlave( foreachpair (const string& role, const Resources& allocated, used_.allocations()) { + // The framework has resources allocated to this role but it may + // or may not be subscribed to the role. Either way, we need to + // track the framework under the role. + if (!isFrameworkTrackedUnderRole(frameworkId, role)) { + trackFrameworkUnderRole(frameworkId, role); + } + // TODO(bmahler): Validate that the reserved resources have the // framework's role. CHECK(roleSorter->contains(role)); @@ -1076,6 +1087,13 @@ void HierarchicalAllocatorProcess::recoverResources( quotaRoleSorter->unallocated( role, slaveId, resources.nonRevocable()); } + + // Stop tracking the framework under this role if it's no longer + // subscribed and no longer has resources allocated to the role. + if (frameworks.at(frameworkId).roles.count(role) == 0 && + frameworkSorter->allocation(frameworkId.value()).empty()) { + untrackFrameworkUnderRole(frameworkId, role); + } } } @@ -1500,7 +1518,7 @@ void HierarchicalAllocatorProcess::__allocate() // If there are no active frameworks in this role, we do not // need to do any allocations for this role. - if (!activeRoles.contains(role)) { + if (!roles.contains(role)) { continue; } @@ -1649,7 +1667,7 @@ void HierarchicalAllocatorProcess::__allocate() // argument to the `allocate()` call) so that frameworks in roles without // quota are not unnecessarily deprived of resources. Resources remainingClusterResources = roleSorter->totalScalarQuantities(); - foreachkey (const string& role, activeRoles) { + foreachkey (const string& role, roles) { remainingClusterResources -= roleSorter->allocationScalarQuantities(role); } @@ -1848,7 +1866,7 @@ void HierarchicalAllocatorProcess::__allocate() void HierarchicalAllocatorProcess::deallocate() { // If no frameworks are currently registered, no work to do. - if (activeRoles.empty()) { + if (roles.empty()) { return; } CHECK(!frameworkSorters.empty()); @@ -2188,6 +2206,78 @@ double HierarchicalAllocatorProcess::_offer_filters_active( } +bool HierarchicalAllocatorProcess::isFrameworkTrackedUnderRole( + const FrameworkID& frameworkId, + const std::string& role) const +{ + return roles.contains(role) && + roles.at(role).contains(frameworkId); +} + + +void HierarchicalAllocatorProcess::trackFrameworkUnderRole( + const FrameworkID& frameworkId, + const string& role) +{ + CHECK(initialized); + + // If this is the first framework to subscribe to this role, or have + // resources allocated to this role, initialize state as necessary. + if (!roles.contains(role)) { + roles[role] = {}; + CHECK(!roleSorter->contains(role)); + roleSorter->add(role, roleWeight(role)); + + CHECK(!frameworkSorters.contains(role)); + frameworkSorters.insert({role, Owned<Sorter>(frameworkSorterFactory())}); + frameworkSorters.at(role)->initialize(fairnessExcludeResourceNames); + metrics.addRole(role); + } + + CHECK(!roles.at(role).contains(frameworkId)); + roles.at(role).insert(frameworkId); + + CHECK(!frameworkSorters.at(role)->contains(frameworkId.value())); + frameworkSorters.at(role)->add(frameworkId.value()); +} + + +void HierarchicalAllocatorProcess::untrackFrameworkUnderRole( + const FrameworkID& frameworkId, + const string& role) +{ + CHECK(initialized); + + CHECK(roles.contains(role)); + CHECK(roles.at(role).contains(frameworkId)); + CHECK(frameworkSorters.contains(role)); + CHECK(frameworkSorters.at(role)->contains(frameworkId.value())); + + roles.at(role).erase(frameworkId); + frameworkSorters.at(role)->remove(frameworkId.value()); + + // If no more frameworks are subscribed to this role or have resources + // allocated to this role, cleanup associated state. This is not necessary + // for correctness (roles with no registered frameworks will not be offered + // any resources), but since many different role names might be used over + // time, we want to avoid leaking resources for no-longer-used role names. + // Note that we don't remove the role from `quotaRoleSorter` if it exists + // there, since roles with a quota set still influence allocation even if + // they don't have any registered frameworks. + + if (roles.at(role).empty()) { + CHECK_EQ(frameworkSorters.at(role)->count(), 0); + + roles.erase(role); + roleSorter->remove(role); + + frameworkSorters.erase(role); + + metrics.removeRole(role); + } +} + + void HierarchicalAllocatorProcess::updateSlaveTotal( const SlaveID& slaveId, const Resources& total) http://git-wip-us.apache.org/repos/asf/mesos/blob/2a7b912e/src/master/allocator/mesos/hierarchical.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index 0bb24be..646f66e 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -427,10 +427,11 @@ protected: // ready after the allocation run is complete. Option<process::Future<Nothing>> allocation; - // Number of registered frameworks for each role. When a role's active - // count drops to zero, it is removed from this map; the role is also - // removed from `roleSorter` and its `frameworkSorter` is deleted. - hashmap<std::string, size_t> activeRoles; + // We track information about roles that we're aware of in the system. + // Specifically, we keep track of the roles when a framework subscribes to + // the role, and/or when there are resources allocated to the role + // (e.g. some tasks and/or executors are consuming resources under the role). + hashmap<std::string, hashset<FrameworkID>> roles; // Configured weight for each role, if any; if a role does not // appear here, it has the default weight of 1. @@ -514,6 +515,18 @@ protected: const std::function<Sorter*()> frameworkSorterFactory; private: + bool isFrameworkTrackedUnderRole( + const FrameworkID& frameworkId, + const std::string& role) const; + + void trackFrameworkUnderRole( + const FrameworkID& frameworkId, + const std::string& role); + + void untrackFrameworkUnderRole( + const FrameworkID& frameworkId, + const std::string& role); + // Helper to update the agent's total resources maintained in the allocator // and the role and quota sorters (whose total resources match the agent's // total resources).
