This is an automated email from the ASF dual-hosted git repository. mzhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 76868f09bc8fbb5e3a23bd3328b8ac39de5261ae Author: Meng Zhu <[email protected]> AuthorDate: Sat Aug 10 15:07:27 2019 -0700 Added a role tree class in the allocator. The role concept in Mesos fits into a tree structure naturally. However, the role state in the allocator are currenstored in a hashmap. This is less efficient and harder to use and reason. This patch introduced a `RoleTree` structure in the allocator and organizes all the roles in to a tree. This should simplify the code logic and opens further refactor and optimization opportunities. In addition, the master code also lacks a proper tree structure for tracking roles. We should leverage the same role tree code here to simplify that as well. Review: https://reviews.apache.org/r/71269 --- src/master/allocator/mesos/hierarchical.cpp | 363 ++++++++++++++++++---------- src/master/allocator/mesos/hierarchical.hpp | 164 +++++++++---- 2 files changed, 351 insertions(+), 176 deletions(-) diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp index 580d35a..5127dfb 100644 --- a/src/master/allocator/mesos/hierarchical.cpp +++ b/src/master/allocator/mesos/hierarchical.cpp @@ -201,6 +201,195 @@ static hashmap<string, vector<ResourceQuantities>> unpackFrameworkOfferFilters( } +Role::Role(const string& _role, Role* _parent) + : role(_role), + basename(strings::split(role, "/").back()), + parent(_parent), + quota_(DEFAULT_QUOTA), + weight_(DEFAULT_WEIGHT) {} + + +void Role::addChild(Role* child) +{ + CHECK_NOT_CONTAINS(children_, child->basename); + children_.put(child->basename, child); +} + + +void Role::removeChild(Role* child) +{ + CHECK_CONTAINS(children_, child->basename); + children_.erase(child->basename); +} + + +RoleTree::RoleTree(Metrics* metrics_) + : root_(new Role("", nullptr)), metrics(metrics_) {} + + +RoleTree::~RoleTree() +{ + delete root_; +} + + +Option<const Role*> RoleTree::get(const std::string& role) const +{ + auto found = roles_.find(role); + + if (found == roles_.end()) { + return None(); + } else { + return &(found->second); + } +} + + +Role& RoleTree::operator[](const std::string& rolePath) +{ + if (roles_.contains(rolePath)) { + return roles_.at(rolePath); + } + + // We go through the path from top to bottom and create any missing + // node along the way. + Role* current = root_; + foreach (const string& token, strings::split(rolePath, "/")) { + Option<Role*> child = current->children_.get(token); + + if (child.isSome()) { + current = *child; + continue; + } + + // Create a new role. + string newRolePath = + current == root_ ? token : strings::join("/", current->role, token); + CHECK_NOT_CONTAINS(roles_, newRolePath); + roles_.put(newRolePath, Role(newRolePath, current)); + metrics->addRole(newRolePath); + + Role& role = roles_.at(newRolePath); + current->addChild(&role); + current = &role; + } + + return roles_.at(rolePath); +} + + +bool RoleTree::tryRemove(const std::string& role) +{ + CHECK_CONTAINS(roles_, role); + Role* current = &(roles_.at(role)); + + if (!current->isEmpty()) { + return false; + } + + // We go through the path from bottom to top and remove empty nodes + // along the way. + vector<string> tokens = strings::split(role, "/"); + for (auto token = tokens.crbegin(); token != tokens.crend(); ++token) { + CHECK_EQ(current->basename, *token); + if (!current->isEmpty()) { + break; + } + + Role* parent = CHECK_NOTNULL(current->parent); + + parent->removeChild(current); + metrics->removeRole(current->role); + roles_.erase(current->role); + + current = parent; + } + + return true; +} + + +void RoleTree::trackReservations(const Resources& resources) +{ + foreach (const Resource& r, resources.scalars()) { + CHECK(Resources::isReserved(r)); + + const string& reservationRole = Resources::reservationRole(r); + + Role* current = &(*this)[reservationRole]; + ResourceQuantities quantities = ResourceQuantities::fromScalarResources(r); + + // Track it hierarchically up to the root. + // Create new role tree node if necessary. + for (; current != nullptr; current = current->parent) { + current->reservationScalarQuantities_ += quantities; + } + } +} + + +void RoleTree::untrackReservations(const Resources& resources) +{ + foreach (const Resource& r, resources.scalars()) { + CHECK(Resources::isReserved(r)); + + const string& reservationRole = Resources::reservationRole(r); + CHECK_CONTAINS(roles_, reservationRole); + + ResourceQuantities quantities = ResourceQuantities::fromScalarResources(r); + + // Track it hierarchically up to the root. + for (Role* current = &(roles_.at(reservationRole)); current != nullptr; + current = current->parent) { + CHECK_CONTAINS(current->reservationScalarQuantities_, quantities); + current->reservationScalarQuantities_ -= quantities; + } + + tryRemove(reservationRole); + } +} + + +void RoleTree::trackFramework( + const FrameworkID& frameworkId, const string& rolePath) +{ + Role* role = &(*this)[rolePath]; + + CHECK_NOT_CONTAINS(role->frameworks_, frameworkId) + << " for role " << rolePath; + role->frameworks_.insert(frameworkId); +} + + +void RoleTree::untrackFramework( + const FrameworkID& frameworkId, const string& rolePath) +{ + CHECK_CONTAINS(roles_, rolePath); + Role& role = roles_.at(rolePath); + + CHECK_CONTAINS(role.frameworks_, frameworkId) << " for role " << rolePath; + role.frameworks_.erase(frameworkId); + + tryRemove(rolePath); +} + + +void RoleTree::updateQuota(const string& role, const Quota& quota) +{ + (*this)[role].quota_ = quota; + + tryRemove(role); +} + + +void RoleTree::updateWeight(const string& role, double weight) +{ + (*this)[role].weight_ = weight; + + tryRemove(role); +} + + Framework::Framework( const FrameworkInfo& frameworkInfo, const set<string>& _suppressedRoles, @@ -568,7 +757,7 @@ void HierarchicalAllocatorProcess::addSlave( slave.maintenance = Slave::Maintenance(unavailability.get()); } - trackReservations(total.reservations()); + roleTree.trackReservations(total.reserved()); roleSorter->add(slaveId, total); @@ -644,7 +833,7 @@ void HierarchicalAllocatorProcess::removeSlave( sorter->remove(slaveId, slaves.at(slaveId).getTotal()); } - untrackReservations(slaves.at(slaveId).getTotal().reservations()); + roleTree.untrackReservations(slaves.at(slaveId).getTotal().reserved()); slaves.erase(slaveId); allocationCandidates.erase(slaveId); @@ -1393,13 +1582,11 @@ void HierarchicalAllocatorProcess::reviveOffers( void HierarchicalAllocatorProcess::updateQuota( - const string& role, - const Quota& quota) + const string& role, const Quota& quota) { CHECK(initialized); - roles[role].quota = quota; - + roleTree.updateQuota(role, quota); metrics.updateQuota(role, quota); LOG(INFO) << "Updated quota for role '" << role << "', " @@ -1415,7 +1602,7 @@ void HierarchicalAllocatorProcess::updateWeights( foreach (const WeightInfo& weightInfo, weightInfos) { CHECK(weightInfo.has_role()); - roles[weightInfo.role()].weight = weightInfo.weight(); + roleTree.updateWeight(weightInfo.role(), weightInfo.weight()); roleSorter->updateWeight(weightInfo.role(), weightInfo.weight()); } @@ -1591,25 +1778,28 @@ void HierarchicalAllocatorProcess::__allocate() // // Currently, only top level roles can have quota set and thus // we only track consumed quota for top level roles. - foreachpair (const string& role, const Role& r, roles) { + foreach (const Role* r, roleTree.root()->children()) { // TODO(mzhu): Track all role consumed quota. We may want to expose // these as metrics. - if (r.quota != DEFAULT_QUOTA) { + if (r->quota() != DEFAULT_QUOTA) { logHeadroomInfo = true; // Note, `reservationScalarQuantities` in `struct role` // is hierarchical aware, thus it also includes subrole reservations. - rolesConsumedQuota[role] += r.reservationScalarQuantities; + rolesConsumedQuota[r->role] += r->reservationScalarQuantities(); } } // Then add the unreserved allocation. - foreachpair (const string& role, const Role& r, roles) { - if (r.frameworks.empty()) { + // + // TODO(mzhu): make allocation tracking hierarchical, so that we only + // need to look at the top-level node. + foreachpair (const string& role, const Role& r, roleTree.roles()) { + if (r.frameworks().empty()) { continue; } - const string& topLevelRole = strings::contains(role, "/") ? - role.substr(0, role.find('/')) : role; + const string& topLevelRole = + strings::contains(role, "/") ? role.substr(0, role.find('/')) : role; if (getQuota(topLevelRole) == DEFAULT_QUOTA) { continue; @@ -1635,10 +1825,10 @@ void HierarchicalAllocatorProcess::__allocate() // consumed quota) than quota guarantee, we don't need to hold back any // unreserved headroom for it. ResourceQuantities requiredHeadroom; - foreachpair (const string& role, const Role& r, roles) { + foreach (const Role* r, roleTree.root()->children()) { requiredHeadroom += - r.quota.guarantees - - rolesConsumedQuota.get(role).getOrElse(ResourceQuantities()); + r->quota().guarantees - + rolesConsumedQuota.get(r->role).getOrElse(ResourceQuantities()); } // We will allocate resources while ensuring that the required @@ -1662,8 +1852,10 @@ void HierarchicalAllocatorProcess::__allocate() // Subtract allocated resources from the total. availableHeadroom -= roleSorter->allocationScalarQuantities(); + // TODO(mzhu): make allocation tracking hierarchical, so that we only + // need to look at the top-level node. ResourceQuantities totalAllocatedReservation; - foreachkey (const string& role, roles) { + foreachkey (const string& role, roleTree.roles()) { if (!roleSorter->contains(role)) { continue; // This role has no allocation. } @@ -1674,15 +1866,10 @@ void HierarchicalAllocatorProcess::__allocate() } } - ResourceQuantities totalReservation; - foreachpair (const string& role, const Role& r, roles) { - if (!strings::contains(role, "/")) { - totalReservation += r.reservationScalarQuantities; - } - } - // Subtract total unallocated reservations. - availableHeadroom -= totalReservation - totalAllocatedReservation; + // unallocated reservations = total reservations - allocated reservations + availableHeadroom -= roleTree.root()->reservationScalarQuantities() - + totalAllocatedReservation; // Subtract revocable resources. foreachvalue (const Slave& slave, slaves) { @@ -1735,7 +1922,8 @@ void HierarchicalAllocatorProcess::__allocate() // If there are no active frameworks in this role, we do not // need to do any allocations for this role. - if (!roles.contains(role) || roles.at(role).frameworks.empty()) { + if (roleTree.get(role).isNone() || + (*roleTree.get(role))->frameworks().empty()) { continue; } @@ -2491,150 +2679,68 @@ double HierarchicalAllocatorProcess::_offer_filters_active( bool HierarchicalAllocatorProcess::isFrameworkTrackedUnderRole( - const FrameworkID& frameworkId, - const string& role) const + const FrameworkID& frameworkId, const string& role) const { - return roles.contains(role) && - roles.at(role).frameworks.contains(frameworkId); + Option<const Role*> r = roleTree.get(role); + return r.isSome() && (*r)->frameworks().contains(frameworkId); } const Quota& HierarchicalAllocatorProcess::getQuota(const string& role) const { - auto it = roles.find(role); + Option<const Role*> r = roleTree.get(role); - return it == roles.end() ? DEFAULT_QUOTA : it->second.quota; + return r.isSome() ? (*r)->quota() : DEFAULT_QUOTA; } void HierarchicalAllocatorProcess::trackFrameworkUnderRole( - const FrameworkID& frameworkId, - const string& role) + const FrameworkID& frameworkId, const string& role) { CHECK(initialized); - // If this is the first framework to subscribe to this role, or have - // resources allocated to this role, initialize state as necessary. - if (roles[role].frameworks.empty()) { + // If this is the first framework to subscribe to this role, + // initialize state as necessary. + if (roleTree.get(role).isNone() || + (*roleTree.get(role))->frameworks().empty()) { CHECK_NOT_CONTAINS(*roleSorter, role); - roleSorter->add(role); roleSorter->activate(role); CHECK_NOT_CONTAINS(frameworkSorters, role); - frameworkSorters.insert({role, Owned<Sorter>(frameworkSorterFactory())}); frameworkSorters.at(role)->initialize(options.fairnessExcludeResourceNames); foreachvalue (const Slave& slave, slaves) { frameworkSorters.at(role)->add(slave.info.id(), slave.getTotal()); } - - metrics.addRole(role); } - CHECK_NOT_CONTAINS(roles.at(role).frameworks, frameworkId) - << " for role " << role; - roles.at(role).frameworks.insert(frameworkId); + roleTree.trackFramework(frameworkId, role); CHECK_NOT_CONTAINS(*frameworkSorters.at(role), frameworkId.value()) << " for role " << role; - frameworkSorters.at(role)->add(frameworkId.value()); } void HierarchicalAllocatorProcess::untrackFrameworkUnderRole( - const FrameworkID& frameworkId, - const string& role) + const FrameworkID& frameworkId, const string& role) { CHECK(initialized); - CHECK_CONTAINS(roles, role); - CHECK_CONTAINS(roles.at(role).frameworks, frameworkId) - << " for role " << role; + roleTree.untrackFramework(frameworkId, role); CHECK_CONTAINS(frameworkSorters, role); CHECK_CONTAINS(*frameworkSorters.at(role), frameworkId.value()) << " for role " << role; - - roles.at(role).frameworks.erase(frameworkId); frameworkSorters.at(role)->remove(frameworkId.value()); - // If no more frameworks are subscribed to this role or have resources - // allocated to this role, cleanup associated state. This is not necessary - // for correctness (roles with no registered frameworks will not be offered - // any resources), but since many different role names might be used over - // time, we want to avoid leaking resources for no-longer-used role names. - - if (roles.at(role).frameworks.empty()) { + if (roleTree.get(role).isNone() || + (*roleTree.get(role))->frameworks().empty()) { CHECK_EQ(frameworkSorters.at(role)->count(), 0u); - roleSorter->remove(role); - frameworkSorters.erase(role); - - metrics.removeRole(role); - } - - if (roles.at(role).isEmpty()) { - roles.erase(role); - } -} - - -void HierarchicalAllocatorProcess::trackReservations( - const hashmap<std::string, Resources>& reservations) -{ - foreachpair (const string& role, - const Resources& resources, reservations) { - const ResourceQuantities quantities = - ResourceQuantities::fromScalarResources(resources.scalars()); - - if (quantities.empty()) { - continue; // Do not insert an empty entry. - } - - // Track it hierarchically up to the top level role. - roles[role].reservationScalarQuantities += quantities; - for (const string& ancestor : roles::ancestors(role)) { - roles[ancestor].reservationScalarQuantities += quantities; - } - } -} - - -void HierarchicalAllocatorProcess::untrackReservations( - const hashmap<std::string, Resources>& reservations) -{ - foreachpair (const string& role, - const Resources& resources, reservations) { - const ResourceQuantities quantities = - ResourceQuantities::fromScalarResources(resources.scalars()); - - if (quantities.empty()) { - continue; // Do not CHECK for the role if there's nothing to untrack. - } - - auto untrack = [&](const string& r) { - CHECK_CONTAINS(roles, r); - - CHECK_CONTAINS(roles.at(r).reservationScalarQuantities, quantities) - << "current reservation " << roles.at(r).reservationScalarQuantities - << " does not contain " << quantities; - - roles.at(r).reservationScalarQuantities -= quantities; - - if (roles.at(r).isEmpty()) { - roles.erase(r); - } - }; - - // Untrack it hierarchically up to the top level role. - untrack(role); - for (const string& ancestor : roles::ancestors(role)) { - untrack(ancestor); - } } } @@ -2655,13 +2761,8 @@ bool HierarchicalAllocatorProcess::updateSlaveTotal( slave.updateTotal(total); - hashmap<std::string, Resources> oldReservations = oldTotal.reservations(); - hashmap<std::string, Resources> newReservations = total.reservations(); - - if (oldReservations != newReservations) { - untrackReservations(oldReservations); - trackReservations(newReservations); - } + roleTree.untrackReservations(oldTotal.reserved()); + roleTree.trackReservations(total.reserved()); // Update the totals in the sorters. roleSorter->remove(slaveId, oldTotal); diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index 8be8dce..46ce5fc 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -74,6 +74,7 @@ namespace internal { // Forward declarations. class OfferFilter; class InverseOfferFilter; +class RoleTree; struct Framework @@ -109,34 +110,131 @@ struct Framework }; -struct Role +class Role { - Role() : weight(DEFAULT_WEIGHT) {} +public: + Role(const std::string& name, Role* parent); - // IDs of the frameworks susbscibed to the role, if any. - hashset<FrameworkID> frameworks; + const ResourceQuantities& reservationScalarQuantities() const + { + return reservationScalarQuantities_; + } - // Aggregated reserved scalar resource quantities on all agents tied to this - // role, if any. This includes both its own reservations as well as - // reservations of any of its subroles (i.e. it is hierarchical aware). - // Note that non-scalar resources, such as ports, are excluded. - ResourceQuantities reservationScalarQuantities; + const hashset<FrameworkID>& frameworks() const { return frameworks_; } + + const Quota& quota() const { return quota_; } + + double weight() const { return weight_; } + + bool isEmpty() const + { + return children_.empty() && + frameworks_.empty() && + reservationScalarQuantities_.empty() && + quota_ == DEFAULT_QUOTA && + weight_ == DEFAULT_WEIGHT; + } + + std::vector<Role*> children() const { return children_.values(); } + + const std::string role; // E.g. "a/b/c" + const std::string basename; // E.g. "c" + +private: + // We keep fields that are related to the tree structure as private + // and only allow mutations through the RoleTree structure. + friend class RoleTree; + + // Add a child to the role, the child must not already exist. + void addChild(Role* child); + + // Remove a child from the role, the child must be present. + void removeChild(Role* child); + + Role* parent; // Configured guaranteed resource quantities and resource limits for // this role. By default, a role has no guarantee and no limit. - Quota quota; + Quota quota_; // Configured weight for the role. This affects sorting precedence. // By default, weights == DEFAULT_WEIGHT == 1.0. - double weight; + double weight_; - bool isEmpty() const - { - return frameworks.empty() && - reservationScalarQuantities.empty() && - quota == DEFAULT_QUOTA && - weight == DEFAULT_WEIGHT; - } + // IDs of the frameworks subscribed to the role, if any. + hashset<FrameworkID> frameworks_; + + // Aggregated reserved scalar resource quantities on all agents tied to this + // role, if any. This includes both its own reservations as well as + // reservations of any of its subroles (i.e. it is hierarchical aware). + // Note that non-scalar resources, such as ports, are excluded. + ResourceQuantities reservationScalarQuantities_; + + hashmap<std::string, Role*> children_; +}; + + +// A tree abstraction for organizing `class Role` hierarchically. +// +// We track a role when it has: +// +// * a non-default weight, or +// * a non-default quota, or +// * frameworks subscribed to it, or +// * reservations, or +// * descendent roles meeting any of the above conditions. +// +// Any roles that do not meet these conditions are not tracked in the role tree. +class RoleTree +{ +public: + RoleTree(Metrics* metrics); + + ~RoleTree(); + + Option<const Role*> get(const std::string& role) const; + + // Return a hashmap of all known roles. Root is not included. + const hashmap<std::string, Role>& roles() const { return roles_; } + + const Role* root() const { return root_; } + + // We keep track of reservations to enforce role quota limit + // in the presence of unallocated reservations. See MESOS-4527. + void trackReservations(const Resources& resources); + void untrackReservations(const Resources& resources); + + void trackFramework( + const FrameworkID& frameworkId, const std::string& role); + void untrackFramework( + const FrameworkID& frameworkId, const std::string& role); + + void updateQuota(const std::string& role, const Quota& quota); + + void updateWeight(const std::string& role, double weight); + +private: + // Lookup or add the role struct associated with the role. Ancestor roles + // along the tree path will be created if necessary. + Role& operator[](const std::string& role); + + // Try to remove the role associated with the given role. + // The role must exist. The role and its ancestors will be removed + // if they become "empty". See "Role:isEmpty()". + // Return true if the role instance associated with the role is removed. + // This should be called whenever a role's state (that defines its emptiness) + // gets updated, such as quota, weight, reservation and tracked frameworks. + // Otherwise the "tracking only non-empty" tree invariant may break. + bool tryRemove(const std::string& role); + + // Root node of the tree, its `basename` == `role` == "". + Role* root_; + + // Allocator's metrics handle for publishing role related metrics. + Metrics* metrics; + + // A map of role and `Role` pairs for quick lookup. + hashmap<std::string, Role> roles_; }; @@ -310,6 +408,7 @@ public: paused(true), metrics(*this), completedFrameworkMetrics(0), + roleTree(&metrics), roleSorter(roleSorterFactory()), frameworkSorterFactory(_frameworkSorterFactory) {} @@ -554,6 +653,8 @@ protected: hashmap<SlaveID, Slave> slaves; + RoleTree roleTree; + // A set of agents that are kept as allocation candidates. Events // may add or remove candidates to the set. When an allocation is // processed, the set of candidates is cleared. @@ -563,13 +664,6 @@ protected: // ready after the allocation run is complete. Option<process::Future<Nothing>> allocation; - // We track information about roles that we're aware of in the system. - // Specifically, we keep track of the roles when a framework subscribes to - // the role, and/or when there are resources allocated to the role - // (e.g. some tasks and/or executors are consuming resources under the role), - // and/or when there are reservations tied to this role. - hashmap<std::string, Role> roles; - // Slaves to send offers for. Option<hashset<std::string>> whitelist; @@ -649,26 +743,6 @@ private: const FrameworkID& frameworkId, const std::set<std::string>& roles); - // `trackReservations` and `untrackReservations` are helpers - // to track role resource reservations. We need to keep - // track of reservations to enforce role quota limit - // in the presence of unallocated reservations. See MESOS-4527. - // - // TODO(mzhu): Ideally, we want these helpers to instead track the - // reservations as *allocated* in the sorters even when the - // reservations have not been allocated yet. This will help to: - // - // (1) Solve the fairness issue when roles with unallocated - // reservations may game the allocator (See MESOS-8299). - // - // (2) Simplify the quota enforcement logic -- the allocator - // would no longer need to track reservations separately. - void trackReservations( - const hashmap<std::string, Resources>& reservations); - - void untrackReservations( - const hashmap<std::string, Resources>& reservations); - // Helper to update the agent's total resources maintained in the allocator // and the role and quota sorters (whose total resources match the agent's // total resources). Returns true iff the stored agent total was changed.
