This is an automated email from the ASF dual-hosted git repository. mzhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit ec6b7b34215e821a63cb79e7d52d94ff08c1e110 Author: Meng Zhu <[email protected]> AuthorDate: Thu Aug 22 17:54:25 2019 -0700 Optimized the allocation loop. Master: HierarchicalAllocator_WithQuotaParam.LargeAndSmallQuota/2 Made 3500 allocations in 23.37 secs Made 0 allocation in 19.72 secs Master + this patch: HierarchicalAllocator_WithQuotaParam.LargeAndSmallQuota/2 Made 3500 allocations in 16.831380548secs Made 0 allocation in 15.102885644secs Review: https://reviews.apache.org/r/71359 --- src/master/allocator/mesos/hierarchical.cpp | 86 ++++++++++++++++++----------- 1 file changed, 53 insertions(+), 33 deletions(-) diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp index 649de3b..dd73d5b 100644 --- a/src/master/allocator/mesos/hierarchical.cpp +++ b/src/master/allocator/mesos/hierarchical.cpp @@ -1965,16 +1965,30 @@ void HierarchicalAllocatorProcess::__allocate() break; // Nothing left on this agent. } + ResourceQuantities unsatisfiedQuotaGuarantees = + quotaGuarantees - + rolesConsumedQuota.get(role).getOrElse(ResourceQuantities()); + + // We only allocate to roles with unsatisfied guarantees + // in the first stage. + if (unsatisfiedQuotaGuarantees.empty()) { + continue; + } + // Fetch frameworks in the order provided by the sorter. // NOTE: Suppressed frameworks are not included in the sort. Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); foreach (const string& frameworkId_, frameworkSorter->sort()) { - Resources available = slave.getAvailable().allocatableTo(role); + if (unsatisfiedQuotaGuarantees.empty()) { + break; + } // Offer a shared resource only if it has not been offered in this // offer cycle to a framework. - available -= offeredSharedResources.get(slaveId).getOrElse(Resources()); + Resources available = + slave.getAvailable().allocatableTo(role) - + offeredSharedResources.get(slaveId).getOrElse(Resources()); if (available.empty()) { break; // Nothing left for the role. @@ -2048,13 +2062,16 @@ void HierarchicalAllocatorProcess::__allocate() // which only enforces quantity. Nevertheless, We choose to allow // such bursting for less resource fragmentation. - ResourceQuantities unsatisfiedQuotaGuarantees = - quotaGuarantees - - rolesConsumedQuota.get(role).getOrElse(ResourceQuantities()); - // Resources that can be used to to increase a role's quota consumption. + // + // This is hot path, we use explicit filter calls to avoid + // multiple traversal. Resources quotaResources = - available.scalars().unreserved().nonRevocable(); + available.filter([&](const Resource& resource) { + return resource.type() == Value::SCALAR && + Resources::isUnreserved(resource) && + !Resources::isRevocable(resource); + }); Resources guaranteesAllocation = shrinkResources(quotaResources, unsatisfiedQuotaGuarantees); @@ -2076,17 +2093,23 @@ void HierarchicalAllocatorProcess::__allocate() continue; } - // First, reservations and guarantees are always allocated. + // This role's reservations, non-scalar resources and revocable + // resources, as well as guarantees are always allocated. // // We need to allocate guarantees unconditionally here so that // even the cluster is overcommitted by guarantees (thus deficit in // headroom), this role's guarantees can still be allocated. - Resources toAllocate = available.reserved(role) + guaranteesAllocation; + Resources toAllocate = guaranteesAllocation + + available.filter([&](const Resource& resource) { + return Resources::isReserved(resource, role) || + resource.type() != Value::SCALAR || + Resources::isRevocable(resource); + }); Resources additionalScalarAllocation = quotaResources - guaranteesAllocation; - // Second, non-guaranteed quota resources are subject to quota limits + // Then, non-guaranteed quota resources are subject to quota limits // and global headroom enforcements. // Limits enforcement. @@ -2112,13 +2135,6 @@ void HierarchicalAllocatorProcess::__allocate() toAllocate += additionalScalarAllocation; - // Lastly, non-scalar resources and revocable resources - // are all allocated. - toAllocate += available.filter([&](const Resource& resource) { - return resource.type() != Value::SCALAR || - Resources::isRevocable(resource); - }); - // If the framework filters these resources, ignore. if (!allocatable(toAllocate, role, framework) || isFiltered(framework, role, slave, toAllocate)) { @@ -2140,6 +2156,7 @@ void HierarchicalAllocatorProcess::__allocate() ResourceQuantities::fromScalarResources( guaranteesAllocation + additionalScalarAllocation); + unsatisfiedQuotaGuarantees -= increasedQuotaConsumption; rolesConsumedQuota[role] += increasedQuotaConsumption; for (const string& ancestor : roles::ancestors(role)) { rolesConsumedQuota[ancestor] += increasedQuotaConsumption; @@ -2196,11 +2213,11 @@ void HierarchicalAllocatorProcess::__allocate() Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); foreach (const string& frameworkId_, frameworkSorter->sort()) { - Resources available = slave.getAvailable().allocatableTo(role); - // Offer a shared resource only if it has not been offered in this // offer cycle to a framework. - available -= offeredSharedResources.get(slaveId).getOrElse(Resources()); + Resources available = + slave.getAvailable().allocatableTo(role) - + offeredSharedResources.get(slaveId).getOrElse(Resources()); if (available.empty()) { break; // Nothing left for the role. @@ -2222,15 +2239,25 @@ void HierarchicalAllocatorProcess::__allocate() available = stripIncapableResources(available, framework.capabilities); - // First, reservations are always allocated. This also includes the - // roles ancestors' reservations. - Resources toAllocate = available.reserved(); + // Reservations (including the roles ancestors' reservations), + // non-scalar resources and revocable resources are always allocated. + Resources toAllocate = available.filter([&](const Resource& resource) { + return Resources::isReserved(resource) || + resource.type() != Value::SCALAR || + Resources::isRevocable(resource); + }); - // Second, unreserved scalar resources are subject to quota limits + // Then, unreserved scalar resources are subject to quota limits // and global headroom enforcement. - + // + // This is hot path, we use explicit filter calls to avoid + // multiple traversal. Resources additionalScalarAllocation = - available.scalars().unreserved().nonRevocable(); + available.filter([&](const Resource& resource) { + return resource.type() == Value::SCALAR && + Resources::isUnreserved(resource) && + !Resources::isRevocable(resource); + }); // Limits enforcement. if (!quotaLimits.empty()) { @@ -2258,13 +2285,6 @@ void HierarchicalAllocatorProcess::__allocate() toAllocate += additionalScalarAllocation; - // Lastly, non-scalar resources and revocable resources - // are all allocated. - toAllocate += available.filter([&](const Resource& resource) { - return resource.type() != Value::SCALAR || - Resources::isRevocable(resource); - }); - // If the framework filters these resources, ignore. if (!allocatable(toAllocate, role, framework) || isFiltered(framework, role, slave, toAllocate)) {
