This is an automated email from the ASF dual-hosted git repository. mzhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit a5af0162dbe77ba3d76d94b2a2b4beb92d63f83a Author: Meng Zhu <[email protected]> AuthorDate: Fri Mar 29 15:39:17 2019 -0700 Used `ResourceQuantities` in `__allocate()` when appropriate. Replaced `Resources` with `ResourceQuantities` when appropriate in `__allocate()`. This simplifies the code and improves performance. Review: https://reviews.apache.org/r/70320 --- src/master/allocator/mesos/hierarchical.cpp | 164 ++++++++++++++-------------- src/master/allocator/mesos/hierarchical.hpp | 12 +- 2 files changed, 89 insertions(+), 87 deletions(-) diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp index f2393ed..047e799 100644 --- a/src/master/allocator/mesos/hierarchical.cpp +++ b/src/master/allocator/mesos/hierarchical.cpp @@ -925,7 +925,7 @@ void HierarchicalAllocatorProcess::updateAllocation( // Update the allocated resources in the quota sorter. We only update // the allocated resources if this role has quota set. - if (quotas.contains(role)) { + if (quotaGuarantees.contains(role)) { // See comment at `quotaRoleSorter` declaration regarding non-revocable. quotaRoleSorter->update( role, @@ -1428,11 +1428,12 @@ void HierarchicalAllocatorProcess::setQuota( // the role is not set. Setting quota differs from updating it because // the former moves the role to a different allocation group with a // dedicated sorter, while the later just updates the actual quota. - CHECK(!quotas.contains(role)); + CHECK(!quotaGuarantees.contains(role)); // Persist quota in memory and add the role into the corresponding // allocation group. - quotas[role] = quota; + quotaGuarantees[role] = + ResourceQuantities::fromScalarResources(quota.info.guarantee()); quotaRoleSorter->add(role); quotaRoleSorter->activate(role); @@ -1468,15 +1469,15 @@ void HierarchicalAllocatorProcess::removeQuota( CHECK(initialized); // Do not allow removing quota if it is not set. - CHECK(quotas.contains(role)); + CHECK(quotaGuarantees.contains(role)); CHECK(quotaRoleSorter->contains(role)); // TODO(alexr): Print all quota info for the role. - LOG(INFO) << "Removed quota " << quotas[role].info.guarantee() - << " for role '" << role << "'"; + LOG(INFO) << "Removed quota " << quotaGuarantees[role] << " for role '" + << role << "'"; // Remove the role from the quota'ed allocation group. - quotas.erase(role); + quotaGuarantees.erase(role); quotaRoleSorter->remove(role); metrics.removeQuota(role); @@ -1690,9 +1691,7 @@ void HierarchicalAllocatorProcess::__allocate() // // (2) Simplify the quota enforcement logic -- the allocator // would no longer need to track reservations separately. - // - // Note these are __quantities__ with no meta-data. - hashmap<string, Resources> rolesConsumedQuotaScalarQuantites; + hashmap<string, ResourceQuantities> rolesConsumedQuota; // We charge a role against its quota by considering its allocation as well // as any unallocated reservations since reservations are bound to the role. @@ -1702,22 +1701,24 @@ void HierarchicalAllocatorProcess::__allocate() // // Consumed Quota = reservations + unreserved allocation // = reservations + allocation - allocated reservations - foreachkey (const string& role, quotas) { + foreachkey (const string& role, quotaGuarantees) { // First add reservations. - rolesConsumedQuotaScalarQuantites[role] += - reservationScalarQuantities.get(role).getOrElse(Resources()); + rolesConsumedQuota[role] += + reservationScalarQuantities.get(role).getOrElse(ResourceQuantities()); // Then add allocated resource _quantities_ . // NOTE: Revocable resources are excluded in `quotaRoleSorter`. - CHECK(quotas.contains(role)); - rolesConsumedQuotaScalarQuantites[role] += - quotaRoleSorter->allocationScalarQuantities(role); + // + // TODO(mzhu): Update sorter to return `ResourceQuantities` directly. + CHECK(quotaGuarantees.contains(role)); + rolesConsumedQuota[role] += ResourceQuantities::fromScalarResources( + quotaRoleSorter->allocationScalarQuantities(role)); // Lastly subtract allocated reservations on each agent. foreachvalue ( const Resources& resources, quotaRoleSorter->allocation(role)) { - rolesConsumedQuotaScalarQuantites[role] -= - resources.reserved().createStrippedScalarQuantity(); + rolesConsumedQuota[role] -= + ResourceQuantities::fromScalarResources(resources.reserved().scalars()); } } @@ -1731,13 +1732,13 @@ void HierarchicalAllocatorProcess::__allocate() // Given the above, if a role has more reservations (which count towards // consumed quota) than quota guarantee, we don't need to hold back any // unreserved headroom for it. - Resources requiredHeadroom; - foreachpair (const string& role, const Quota& quota, quotas) { - // We can safely subtract resources without checking inclusion. If the - // minuend resource is less than the subtrahend resource, the result is an - // empty resource. - requiredHeadroom += Resources(quota.info.guarantee()) - - rolesConsumedQuotaScalarQuantites.get(role).getOrElse(Resources()); + ResourceQuantities requiredHeadroom; + foreachpair ( + const string& role, + const ResourceQuantities& guarantee, + quotaGuarantees) { + requiredHeadroom += + guarantee - rolesConsumedQuota.get(role).getOrElse(ResourceQuantities()); } // We will allocate resources while ensuring that the required @@ -1752,17 +1753,24 @@ void HierarchicalAllocatorProcess::__allocate() // allocated resources - // unallocated reservations - // unallocated revocable resources - Resources availableHeadroom = roleSorter->totalScalarQuantities(); + // + // TODO(mzhu): Update sorter to return `ResourceQuantities` directly. + ResourceQuantities availableHeadroom = + ResourceQuantities::fromScalarResources( + roleSorter->totalScalarQuantities()); // Subtract allocated resources from the total. + // + // TODO(mzhu): Update sorter to return `ResourceQuantities` directly. foreachkey (const string& role, roles) { - availableHeadroom -= roleSorter->allocationScalarQuantities(role); + availableHeadroom -= ResourceQuantities::fromScalarResources( + roleSorter->allocationScalarQuantities(role)); } // Calculate total allocated reservations. Note that we need to ensure // we count a reservation for "a" being allocated to "a/b", therefore // we cannot simply loop over the reservations' roles. - Resources totalAllocatedReservationScalarQuantities; + ResourceQuantities totalAllocatedReservation; foreachkey (const string& role, roles) { const hashmap<SlaveID, Resources>* allocations; if (quotaRoleSorter->contains(role)) { @@ -1774,20 +1782,19 @@ void HierarchicalAllocatorProcess::__allocate() } foreachvalue (const Resources& resources, *CHECK_NOTNULL(allocations)) { - totalAllocatedReservationScalarQuantities += - resources.reserved().createStrippedScalarQuantity(); + totalAllocatedReservation += + ResourceQuantities::fromScalarResources(resources.reserved().scalars()); } } // Subtract total unallocated reservations. - availableHeadroom -= - Resources::sum(reservationScalarQuantities) - - totalAllocatedReservationScalarQuantities; + availableHeadroom -= ResourceQuantities::sum(reservationScalarQuantities) - + totalAllocatedReservation; // Subtract revocable resources. foreachvalue (const Slave& slave, slaves) { - availableHeadroom -= - slave.getAvailable().revocable().createStrippedScalarQuantity(); + availableHeadroom -= ResourceQuantities::fromScalarResources( + slave.getAvailable().revocable().scalars()); } // Due to the two stages in the allocation algorithm and the nature of @@ -1819,9 +1826,9 @@ void HierarchicalAllocatorProcess::__allocate() Slave& slave = slaves.at(slaveId); foreach (const string& role, quotaRoleSorter->sort()) { - CHECK(quotas.contains(role)); + CHECK(quotaGuarantees.contains(role)); - const Quota& quota = quotas.at(role); + const ResourceQuantities& quotaGuarantee = quotaGuarantees.at(role); // If there are no active frameworks in this role, we do not // need to do any allocations for this role. @@ -1901,17 +1908,15 @@ void HierarchicalAllocatorProcess::__allocate() // have quota, there are no ancestor reservations involved here. Resources toAllocate = available.reserved(role).nonRevocable(); - // This is a scalar quantity with no meta-data. - Resources unsatisfiedQuotaGuarantee = - Resources(quota.info.guarantee()) - - rolesConsumedQuotaScalarQuantites.get(role).getOrElse(Resources()); + ResourceQuantities unsatisfiedQuotaGuarantee = + quotaGuarantee - + rolesConsumedQuota.get(role).getOrElse(ResourceQuantities()); Resources unreserved = available.nonRevocable().unreserved(); // First, allocate resources up to a role's quota guarantee. - Resources newQuotaAllocation = shrinkResources( - unreserved, - ResourceQuantities::fromScalarResources(unsatisfiedQuotaGuarantee)); + Resources newQuotaAllocation = + shrinkResources(unreserved, unsatisfiedQuotaGuarantee); toAllocate += newQuotaAllocation; @@ -1927,21 +1932,16 @@ void HierarchicalAllocatorProcess::__allocate() // Second, allocate scalar resources with unset quota while maintaining // the quota headroom. - set<string> quotaGuaranteeResourceNames = - Resources(quota.info.guarantee()).names(); - Resources nonQuotaGuaranteeResources = - unreserved.filter( - ["aGuaranteeResourceNames] (const Resource& resource) { - return quotaGuaranteeResourceNames.count(resource.name()) == 0; - } - ); + unreserved.filter([&](const Resource& resource) { + return quotaGuarantee.get(resource.name()) == Value::Scalar(); + }); - Resources surplusHeadroom = availableHeadroom - requiredHeadroom; + ResourceQuantities surplusHeadroom = + availableHeadroom - requiredHeadroom; - nonQuotaGuaranteeResources = shrinkResources( - nonQuotaGuaranteeResources, - ResourceQuantities::fromScalarResources(surplusHeadroom.scalars())); + nonQuotaGuaranteeResources = + shrinkResources(nonQuotaGuaranteeResources, surplusHeadroom); toAllocate += nonQuotaGuaranteeResources; @@ -1968,11 +1968,12 @@ void HierarchicalAllocatorProcess::__allocate() offerable[frameworkId][role][slaveId] += toAllocate; offeredSharedResources[slaveId] += toAllocate.shared(); - Resources allocatedUnreserved = - toAllocate.unreserved().createStrippedScalarQuantity(); + ResourceQuantities allocatedUnreserved = + ResourceQuantities::fromScalarResources( + toAllocate.unreserved().scalars()); // Update role consumed quota. - rolesConsumedQuotaScalarQuantites[role] += allocatedUnreserved; + rolesConsumedQuota[role] += allocatedUnreserved; // Track quota guarantee headroom change. @@ -1981,7 +1982,8 @@ void HierarchicalAllocatorProcess::__allocate() // role's guarantee should be subtracted. Allocation of reserved // resources or resources that this role has unset guarantee do not // affect `requiredHeadroom`. - requiredHeadroom -= newQuotaAllocation.createStrippedScalarQuantity(); + requiredHeadroom -= + ResourceQuantities::fromScalarResources(newQuotaAllocation.scalars()); // `availableHeadroom` counts total unreserved non-revocable resources // in the cluster. @@ -2009,7 +2011,7 @@ void HierarchicalAllocatorProcess::__allocate() foreach (const string& role, roleSorter->sort()) { // In the second allocation stage, we only allocate // for non-quota roles. - if (quotas.contains(role)) { + if (quotaGuarantees.contains(role)) { continue; } @@ -2059,16 +2061,17 @@ void HierarchicalAllocatorProcess::__allocate() // If allocating these resources would reduce the headroom // below what is required, we will hold them back. - const Resources headroomToAllocate = toAllocate - .scalars().unreserved().nonRevocable(); + const Resources headroomResources = + toAllocate.scalars().unreserved().nonRevocable(); + const ResourceQuantities headroomToAllocate = + ResourceQuantities::fromScalarResources(headroomResources); bool sufficientHeadroom = - (availableHeadroom - - headroomToAllocate.createStrippedScalarQuantity()) + (availableHeadroom - headroomToAllocate) .contains(requiredHeadroom); if (!sufficientHeadroom) { - toAllocate -= headroomToAllocate; + toAllocate -= headroomResources; } // If the framework filters these resources, ignore. @@ -2088,8 +2091,7 @@ void HierarchicalAllocatorProcess::__allocate() offeredSharedResources[slaveId] += toAllocate.shared(); if (sufficientHeadroom) { - availableHeadroom -= - headroomToAllocate.createStrippedScalarQuantity(); + availableHeadroom -= headroomToAllocate; } slave.allocate(toAllocate); @@ -2583,14 +2585,14 @@ void HierarchicalAllocatorProcess::trackReservations( { foreachpair (const string& role, const Resources& resources, reservations) { - const Resources scalarQuantitesToTrack = - resources.createStrippedScalarQuantity(); + const ResourceQuantities quantities = + ResourceQuantities::fromScalarResources(resources.scalars()); - if (scalarQuantitesToTrack.empty()) { + if (quantities.empty()) { continue; // Do not insert an empty entry. } - reservationScalarQuantities[role] += scalarQuantitesToTrack; + reservationScalarQuantities[role] += quantities; } } @@ -2600,21 +2602,21 @@ void HierarchicalAllocatorProcess::untrackReservations( { foreachpair (const string& role, const Resources& resources, reservations) { - const Resources scalarQuantitesToUntrack = - resources.createStrippedScalarQuantity(); + const ResourceQuantities quantities = + ResourceQuantities::fromScalarResources(resources.scalars()); - if (scalarQuantitesToUntrack.empty()) { + if (quantities.empty()) { continue; // Do not CHECK for the role if there's nothing to untrack. } CHECK(reservationScalarQuantities.contains(role)); - Resources& currentReservationQuantity = + ResourceQuantities& currentReservationQuantities = reservationScalarQuantities.at(role); - CHECK(currentReservationQuantity.contains(scalarQuantitesToUntrack)); - currentReservationQuantity -= scalarQuantitesToUntrack; + CHECK(currentReservationQuantities.contains(quantities)); + currentReservationQuantities -= quantities; - if (currentReservationQuantity.empty()) { + if (currentReservationQuantities.empty()) { reservationScalarQuantities.erase(role); } } @@ -2780,7 +2782,7 @@ void HierarchicalAllocatorProcess::trackAllocatedResources( frameworkSorters.at(role)->allocated( frameworkId.value(), slaveId, allocation); - if (quotas.contains(role)) { + if (quotaGuarantees.contains(role)) { // See comment at `quotaRoleSorter` declaration regarding non-revocable. quotaRoleSorter->allocated(role, slaveId, allocation.nonRevocable()); } @@ -2815,7 +2817,7 @@ void HierarchicalAllocatorProcess::untrackAllocatedResources( roleSorter->unallocated(role, slaveId, allocation); - if (quotas.contains(role)) { + if (quotaGuarantees.contains(role)) { // See comment at `quotaRoleSorter` declaration regarding non-revocable. quotaRoleSorter->unallocated(role, slaveId, allocation.nonRevocable()); } diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index 36bf90b..9d51aeb 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -541,17 +541,17 @@ protected: // (e.g. some tasks and/or executors are consuming resources under the role). hashmap<std::string, hashset<FrameworkID>> roles; - // Configured quota for each role, if any. If a role does not have - // an entry here it has the default quota of (no guarantee, no limit). - hashmap<std::string, Quota> quotas; + // Configured guaranteed resource quantities for each role, if any. + // If a role does not have an entry here it has (the default) + // no guarantee. + hashmap<std::string, ResourceQuantities> quotaGuarantees; // Aggregated resource reservations on all agents tied to a - // particular role, if any. These are stripped scalar quantities - // that contain no meta-data. + // particular role, if any. // // Only roles with non-empty scalar reservation quantities will // be stored in the map. - hashmap<std::string, Resources> reservationScalarQuantities; + hashmap<std::string, ResourceQuantities> reservationScalarQuantities; // Slaves to send offers for. Option<hashset<std::string>> whitelist;
