This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit b6c87d7c44346b2497ace65b1d2060ee423aa772 Author: Benjamin Mahler <[email protected]> AuthorDate: Wed Aug 21 20:10:42 2019 -0400 Eliminated double lookups in the allocator. Review: https://reviews.apache.org/r/71345 --- src/master/allocator/mesos/hierarchical.cpp | 307 +++++++++++++++------------- src/master/allocator/mesos/hierarchical.hpp | 11 +- 2 files changed, 170 insertions(+), 148 deletions(-) diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp index d09f10f..af9efd9 100644 --- a/src/master/allocator/mesos/hierarchical.cpp +++ b/src/master/allocator/mesos/hierarchical.cpp @@ -572,18 +572,18 @@ void HierarchicalAllocatorProcess::addFramework( active, options.publishPerFrameworkMetrics)}); - const Framework& framework = frameworks.at(frameworkId); + const Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); foreach (const string& role, framework.roles) { trackFrameworkUnderRole(frameworkId, role); - CHECK_CONTAINS(frameworkSorters, role); + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); if (suppressedRoles.count(role)) { - frameworkSorters.at(role)->deactivate(frameworkId.value()); + frameworkSorter->deactivate(frameworkId.value()); framework.metrics->suppressRole(role); } else { - frameworkSorters.at(role)->activate(frameworkId.value()); + frameworkSorter->activate(frameworkId.value()); framework.metrics->reviveRole(role); } } @@ -616,22 +616,23 @@ void HierarchicalAllocatorProcess::removeFramework( const FrameworkID& frameworkId) { CHECK(initialized); - CHECK_CONTAINS(frameworks, frameworkId); - Framework& framework = frameworks.at(frameworkId); + Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); foreach (const string& role, framework.roles) { // Might not be in 'frameworkSorters[role]' because it // was previously deactivated and never re-added. // // TODO(mzhu): This check may no longer be necessary. - if (!frameworkSorters.contains(role) || - !frameworkSorters.at(role)->contains(frameworkId.value())) { + Option<Sorter*> frameworkSorter = getFrameworkSorter(role); + + if (frameworkSorter.isNone() || + !(*frameworkSorter)->contains(frameworkId.value())) { continue; } hashmap<SlaveID, Resources> allocation = - frameworkSorters.at(role)->allocation(frameworkId.value()); + (*frameworkSorter)->allocation(frameworkId.value()); // Update the allocation for this framework. foreachpair (const SlaveID& slaveId, @@ -659,9 +660,8 @@ void HierarchicalAllocatorProcess::activateFramework( const FrameworkID& frameworkId) { CHECK(initialized); - CHECK_CONTAINS(frameworks, frameworkId); - Framework& framework = frameworks.at(frameworkId); + Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); framework.active = true; @@ -671,10 +671,10 @@ void HierarchicalAllocatorProcess::activateFramework( // role is specified in `suppressed_roles` during framework // (re)registration, or via a subsequent `SUPPRESS` call. foreach (const string& role, framework.roles) { - CHECK_CONTAINS(frameworkSorters, role); + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); if (!framework.suppressedRoles.count(role)) { - frameworkSorters.at(role)->activate(frameworkId.value()); + frameworkSorter->activate(frameworkId.value()); } } @@ -688,14 +688,13 @@ void HierarchicalAllocatorProcess::deactivateFramework( const FrameworkID& frameworkId) { CHECK(initialized); - CHECK_CONTAINS(frameworks, frameworkId); - Framework& framework = frameworks.at(frameworkId); + Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); foreach (const string& role, framework.roles) { - CHECK_CONTAINS(frameworkSorters, role); + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); - frameworkSorters.at(role)->deactivate(frameworkId.value()); + frameworkSorter->deactivate(frameworkId.value()); // Note that the Sorter *does not* remove the resources allocated // to this framework. For now, this is important because if the @@ -719,9 +718,8 @@ void HierarchicalAllocatorProcess::updateFramework( const set<string>& suppressedRoles) { CHECK(initialized); - CHECK_CONTAINS(frameworks, frameworkId); - Framework& framework = frameworks.at(frameworkId); + Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); const set<string> oldRoles = framework.roles; const set<string> newRoles = protobuf::framework::getRoles(frameworkInfo); @@ -741,13 +739,13 @@ void HierarchicalAllocatorProcess::updateFramework( } foreach (const string& role, oldRoles - newRoles) { - CHECK_CONTAINS(frameworkSorters, role); + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); - frameworkSorters.at(role)->deactivate(frameworkId.value()); + frameworkSorter->deactivate(frameworkId.value()); // Stop tracking the framework under this role if there are // no longer any resources allocated to it. - if (frameworkSorters.at(role)->allocation(frameworkId.value()).empty()) { + if (frameworkSorter->allocation(frameworkId.value()).empty()) { untrackFrameworkUnderRole(frameworkId, role); } @@ -795,7 +793,7 @@ void HierarchicalAllocatorProcess::addSlave( total, Resources::sum(used))}); - Slave& slave = slaves.at(slaveId); + Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); // NOTE: We currently implement maintenance in the allocator to be able to // leverage state and features such as the FrameworkSorter and OfferFilter. @@ -865,21 +863,24 @@ void HierarchicalAllocatorProcess::removeSlave( const SlaveID& slaveId) { CHECK(initialized); - CHECK_CONTAINS(slaves, slaveId); - // TODO(bmahler): Per MESOS-621, this should remove the allocations - // that any frameworks have on this slave. Otherwise the caller may - // "leak" allocated resources accidentally if they forget to recover - // all the resources. Fixing this would require more information - // than what we currently track in the allocator. + { + const Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); - roleSorter->remove(slaveId, slaves.at(slaveId).getTotal()); + // TODO(bmahler): Per MESOS-621, this should remove the allocations + // that any frameworks have on this slave. Otherwise the caller may + // "leak" allocated resources accidentally if they forget to recover + // all the resources. Fixing this would require more information + // than what we currently track in the allocator. - foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { - sorter->remove(slaveId, slaves.at(slaveId).getTotal()); - } + roleSorter->remove(slaveId, slave.getTotal()); - roleTree.untrackReservations(slaves.at(slaveId).getTotal().reserved()); + foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { + sorter->remove(slaveId, slave.getTotal()); + } + + roleTree.untrackReservations(slave.getTotal().reserved()); + } slaves.erase(slaveId); allocationCandidates.erase(slaveId); @@ -897,10 +898,9 @@ void HierarchicalAllocatorProcess::updateSlave( const Option<vector<SlaveInfo::Capability>>& capabilities) { CHECK(initialized); - CHECK_CONTAINS(slaves, slaveId); CHECK_EQ(slaveId, info.id()); - Slave& slave = slaves.at(slaveId); + Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); bool updated = false; @@ -962,7 +962,6 @@ void HierarchicalAllocatorProcess::addResourceProvider( const hashmap<FrameworkID, Resources>& used) { CHECK(initialized); - CHECK_CONTAINS(slaves, slaveId); foreachpair (const FrameworkID& frameworkId, const Resources& allocation, @@ -985,7 +984,7 @@ void HierarchicalAllocatorProcess::addResourceProvider( trackAllocatedResources(slaveId, frameworkId, allocation); } - Slave& slave = slaves.at(slaveId); + Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); updateSlaveTotal(slaveId, slave.getTotal() + total); slave.allocate(Resources::sum(used)); @@ -1019,9 +1018,9 @@ void HierarchicalAllocatorProcess::activateSlave( const SlaveID& slaveId) { CHECK(initialized); - CHECK_CONTAINS(slaves, slaveId); - slaves.at(slaveId).activated = true; + Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); + slave.activated = true; LOG(INFO) << "Agent " << slaveId << " reactivated"; } @@ -1031,9 +1030,9 @@ void HierarchicalAllocatorProcess::deactivateSlave( const SlaveID& slaveId) { CHECK(initialized); - CHECK_CONTAINS(slaves, slaveId); - slaves.at(slaveId).activated = false; + Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); + slave.activated = false; LOG(INFO) << "Agent " << slaveId << " deactivated"; } @@ -1075,10 +1074,9 @@ void HierarchicalAllocatorProcess::updateAllocation( const vector<ResourceConversion>& conversions) { CHECK(initialized); - CHECK_CONTAINS(slaves, slaveId); CHECK_CONTAINS(frameworks, frameworkId); - Slave& slave = slaves.at(slaveId); + Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); // We require that an allocation is tied to a single role. // @@ -1091,9 +1089,8 @@ void HierarchicalAllocatorProcess::updateAllocation( string role = allocations.begin()->first; - CHECK_CONTAINS(frameworkSorters, role); + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); - const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role); const Resources frameworkAllocation = frameworkSorter->allocation(frameworkId.value(), slaveId); @@ -1209,9 +1206,8 @@ Future<Nothing> HierarchicalAllocatorProcess::updateAvailable( // for the operations to contain only unallocated resources. CHECK(initialized); - CHECK_CONTAINS(slaves, slaveId); - Slave& slave = slaves.at(slaveId); + Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); // It's possible for this 'apply' to fail here because a call to // 'allocate' could have been enqueued by the allocator itself @@ -1248,9 +1244,8 @@ void HierarchicalAllocatorProcess::updateUnavailability( const Option<Unavailability>& unavailability) { CHECK(initialized); - CHECK_CONTAINS(slaves, slaveId); - Slave& slave = slaves.at(slaveId); + Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); // NOTE: We currently implement maintenance in the allocator to be able to // leverage state and features such as the FrameworkSorter and OfferFilter. @@ -1284,11 +1279,9 @@ void HierarchicalAllocatorProcess::updateInverseOffer( const Option<Filters>& filters) { CHECK(initialized); - CHECK_CONTAINS(frameworks, frameworkId); - CHECK_CONTAINS(slaves, slaveId); - Framework& framework = frameworks.at(frameworkId); - Slave& slave = slaves.at(slaveId); + Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); + Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); CHECK(slave.maintenance.isSome()) << "Agent " << slaveId @@ -1425,17 +1418,17 @@ void HierarchicalAllocatorProcess::recoverResources( // MesosAllocatorProcess::removeFramework or // MesosAllocatorProcess::deactivateFramework, in which case we will // have already recovered all of its resources). - if (frameworks.contains(frameworkId)) { - CHECK_CONTAINS(frameworkSorters, role); + Option<Framework*> framework = getFramework(frameworkId); - const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role); + if (framework.isSome()) { + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); if (frameworkSorter->contains(frameworkId.value())) { untrackAllocatedResources(slaveId, frameworkId, resources); // Stop tracking the framework under this role if it's no longer // subscribed and no longer has resources allocated to the role. - if (frameworks.at(frameworkId).roles.count(role) == 0 && + if ((*framework)->roles.count(role) == 0 && frameworkSorter->allocation(frameworkId.value()).empty()) { untrackFrameworkUnderRole(frameworkId, role); } @@ -1445,18 +1438,18 @@ void HierarchicalAllocatorProcess::recoverResources( // Update resources allocated on slave (if slave still exists, // which it might not in the event that we dispatched Master::offer // before we received Allocator::removeSlave). - if (slaves.contains(slaveId)) { - Slave& slave = slaves.at(slaveId); + Option<Slave*> slave = getSlave(slaveId); - CHECK(slave.getAllocated().contains(resources)) + if (slave.isSome()) { + CHECK((*slave)->getAllocated().contains(resources)) << "agent " << slaveId << " resources " - << slave.getAllocated() << " do not contain " << resources; + << (*slave)->getAllocated() << " do not contain " << resources; - slave.unallocate(resources); + (*slave)->unallocate(resources); VLOG(1) << "Recovered " << resources - << " (total: " << slave.getTotal() - << ", allocated: " << slave.getAllocated() << ")" + << " (total: " << (*slave)->getTotal() + << ", allocated: " << (*slave)->getAllocated() << ")" << " on agent " << slaveId << " from framework " << frameworkId; } @@ -1467,7 +1460,7 @@ void HierarchicalAllocatorProcess::recoverResources( } // No need to install the filter if slave/framework does not exist. - if (!frameworks.contains(frameworkId) || !slaves.contains(slaveId)) { + if (framework.isNone() || slave.isNone()) { return; } @@ -1525,8 +1518,7 @@ void HierarchicalAllocatorProcess::recoverResources( shared_ptr<RefusedOfferFilter> offerFilter = make_shared<RefusedOfferFilter>(unallocated, *timeout); - frameworks.at(frameworkId) - .offerFilters[role][slaveId].insert(offerFilter); + (*framework)->offerFilters[role][slaveId].insert(offerFilter); weak_ptr<OfferFilter> weakPtr = offerFilter; @@ -1548,9 +1540,9 @@ void HierarchicalAllocatorProcess::suppressRoles( // we have to differentiate between the cases here. foreach (const string& role, roles) { - CHECK_CONTAINS(frameworkSorters, role); + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); - frameworkSorters.at(role)->deactivate(framework.frameworkId.value()); + frameworkSorter->deactivate(framework.frameworkId.value()); framework.suppressedRoles.insert(role); framework.metrics->suppressRole(role); } @@ -1566,9 +1558,7 @@ void HierarchicalAllocatorProcess::suppressOffers( const FrameworkID& frameworkId, const set<string>& roles_) { - CHECK_CONTAINS(frameworks, frameworkId); - - Framework& framework = frameworks.at(frameworkId); + Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); const set<string>& roles = roles_.empty() ? framework.roles : roles_; suppressRoles(framework, roles); @@ -1590,9 +1580,9 @@ void HierarchicalAllocatorProcess::reviveRoles( // SUPPRESS is not parameterized. When parameterization is added, // we may need to differentiate between the cases here. foreach (const string& role, roles) { - CHECK_CONTAINS(frameworkSorters, role); + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); - frameworkSorters.at(role)->activate(framework.frameworkId.value()); + frameworkSorter->activate(framework.frameworkId.value()); framework.suppressedRoles.erase(role); framework.metrics->reviveRole(role); } @@ -1609,9 +1599,8 @@ void HierarchicalAllocatorProcess::reviveOffers( const set<string>& roles) { CHECK(initialized); - CHECK_CONTAINS(frameworks, frameworkId); - Framework& framework = frameworks.at(frameworkId); + Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); reviveRoles(framework, roles.empty() ? framework.roles : roles); @@ -1762,9 +1751,9 @@ void HierarchicalAllocatorProcess::__allocate() // Filter out non-whitelisted, removed, and deactivated slaves // in order not to send offers for them. foreach (const SlaveID& slaveId, allocationCandidates) { - if (isWhitelisted(slaveId) && - slaves.contains(slaveId) && - slaves.at(slaveId).activated) { + Option<Slave*> slave = getSlave(slaveId); + + if (isWhitelisted(slaveId) && slave.isSome() && (*slave)->activated) { slaveIds.push_back(slaveId); } } @@ -1944,13 +1933,13 @@ void HierarchicalAllocatorProcess::__allocate() // those roles with unsatisfied guarantees can have more choices and higher // probability in getting their guarantees satisfied. foreach (const SlaveID& slaveId, slaveIds) { - CHECK_CONTAINS(slaves, slaveId); - - Slave& slave = slaves.at(slaveId); + Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); foreach (const string& role, roleSorter->sort()) { - const ResourceQuantities& quotaGuarantees = getQuota(role).guarantees; - const ResourceLimits& quotaLimits = getQuota(role).limits; + const Quota& quota = getQuota(role); + + const ResourceQuantities& quotaGuarantees = quota.guarantees; + const ResourceLimits& quotaLimits = quota.limits; // We only allocate to roles with non-default guarantees // in the first stage. @@ -1960,8 +1949,13 @@ void HierarchicalAllocatorProcess::__allocate() // If there are no active frameworks in this role, we do not // need to do any allocations for this role. - if (roleTree.get(role).isNone() || - (*roleTree.get(role))->frameworks().empty()) { + bool noFrameworks = [&]() { + Option<const Role*> r = roleTree.get(role); + + return r.isNone() || (*r)->frameworks().empty(); + }(); + + if (noFrameworks) { continue; } @@ -1973,8 +1967,7 @@ void HierarchicalAllocatorProcess::__allocate() // Fetch frameworks in the order provided by the sorter. // NOTE: Suppressed frameworks are not included in the sort. - CHECK_CONTAINS(frameworkSorters, role); - const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role); + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); foreach (const string& frameworkId_, frameworkSorter->sort()) { Resources available = slave.getAvailable(); @@ -1990,9 +1983,7 @@ void HierarchicalAllocatorProcess::__allocate() FrameworkID frameworkId; frameworkId.set_value(frameworkId_); - CHECK_CONTAINS(frameworks, frameworkId); - - const Framework& framework = frameworks.at(frameworkId); + const Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); CHECK(framework.active) << frameworkId; // An early `continue` optimization. @@ -2130,7 +2121,7 @@ void HierarchicalAllocatorProcess::__allocate() // If the framework filters these resources, ignore. if (!allocatable(toAllocate, role, framework) || - isFiltered(framework, role, slaveId, toAllocate)) { + isFiltered(framework, role, slave, toAllocate)) { continue; } @@ -2190,9 +2181,7 @@ void HierarchicalAllocatorProcess::__allocate() std::random_shuffle(slaveIds.begin(), slaveIds.end()); foreach (const SlaveID& slaveId, slaveIds) { - CHECK_CONTAINS(slaves, slaveId); - - Slave& slave = slaves.at(slaveId); + Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); foreach (const string& role, roleSorter->sort()) { // TODO(bmahler): Handle shared volumes, which are always available but @@ -2204,9 +2193,7 @@ void HierarchicalAllocatorProcess::__allocate() const ResourceLimits& quotaLimits = getQuota(role).limits; // NOTE: Suppressed frameworks are not included in the sort. - CHECK_CONTAINS(frameworkSorters, role); - - const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role); + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); foreach (const string& frameworkId_, frameworkSorter->sort()) { Resources available = slave.getAvailable(); @@ -2222,9 +2209,7 @@ void HierarchicalAllocatorProcess::__allocate() FrameworkID frameworkId; frameworkId.set_value(frameworkId_); - CHECK_CONTAINS(frameworks, frameworkId); - - const Framework& framework = frameworks.at(frameworkId); + const Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); // An early `continue` optimization. if (!allocatable(available, role, framework)) { @@ -2282,7 +2267,7 @@ void HierarchicalAllocatorProcess::__allocate() // If the framework filters these resources, ignore. if (!allocatable(toAllocate, role, framework) || - isFiltered(framework, role, slaveId, toAllocate)) { + isFiltered(framework, role, slave, toAllocate)) { continue; } @@ -2353,9 +2338,7 @@ void HierarchicalAllocatorProcess::deallocate() foreachvalue (const Owned<Sorter>& frameworkSorter, frameworkSorters) { foreach (const SlaveID& slaveId, allocationCandidates) { - CHECK_CONTAINS(slaves, slaveId); - - Slave& slave = slaves.at(slaveId); + Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); if (slave.maintenance.isSome()) { // We use a reference by alias because we intend to modify the @@ -2369,9 +2352,8 @@ void HierarchicalAllocatorProcess::deallocate() FrameworkID frameworkId; frameworkId.set_value(frameworkId_); - CHECK_CONTAINS(frameworks, frameworkId); - - const Framework& framework = frameworks.at(frameworkId); + const Framework& framework = + *CHECK_NOTNONE(getFramework(frameworkId)); // No need to deallocate for an inactive framework as the master // will not send it inverse offers. @@ -2392,7 +2374,7 @@ void HierarchicalAllocatorProcess::deallocate() // inverse offers for maintenance primitives, and those are at the // whole slave level, we only need to filter based on the // time-out. - if (isFiltered(framework, slaveId)) { + if (isFiltered(framework, slave)) { continue; } @@ -2522,23 +2504,17 @@ void HierarchicalAllocatorProcess::expire( bool HierarchicalAllocatorProcess::isWhitelisted( const SlaveID& slaveId) const { - CHECK_CONTAINS(slaves, slaveId); - - const Slave& slave = slaves.at(slaveId); - - return whitelist.isNone() || whitelist->contains(slave.info.hostname()); + return whitelist.isNone() || + whitelist->contains(CHECK_NOTNONE(getSlave(slaveId))->info.hostname()); } bool HierarchicalAllocatorProcess::isFiltered( const Framework& framework, const string& role, - const SlaveID& slaveId, + const Slave& slave, const Resources& resources) const { - CHECK_CONTAINS(slaves, slaveId); - const Slave& slave = slaves.at(slaveId); - // TODO(mpark): Consider moving these filter logic out and into the master, // since they are not specific to the hierarchical allocator but rather are // global allocation constraints. @@ -2547,7 +2523,7 @@ bool HierarchicalAllocatorProcess::isFiltered( // to MULTI_ROLE frameworks. if (framework.capabilities.multiRole && !slave.capabilities.multiRole) { - LOG(WARNING) << "Implicitly filtering agent " << slaveId + LOG(WARNING) << "Implicitly filtering agent " << slave.info.id() << " from framework " << framework.frameworkId << " because the framework is MULTI_ROLE capable" << " but the agent is not"; @@ -2558,7 +2534,7 @@ bool HierarchicalAllocatorProcess::isFiltered( // Prevent offers from non-HIERARCHICAL_ROLE agents to be allocated // to hierarchical roles. if (!slave.capabilities.hierarchicalRole && strings::contains(role, "/")) { - LOG(WARNING) << "Implicitly filtering agent " << slaveId + LOG(WARNING) << "Implicitly filtering agent " << slave.info.id() << " from role " << role << " because the role is hierarchical but the agent is not" << " HIERARCHICAL_ROLE capable"; @@ -2573,7 +2549,7 @@ bool HierarchicalAllocatorProcess::isFiltered( return false; } - auto agentFilters = roleFilters->second.find(slaveId); + auto agentFilters = roleFilters->second.find(slave.info.id()); if (agentFilters == roleFilters->second.end()) { return false; } @@ -2581,7 +2557,7 @@ bool HierarchicalAllocatorProcess::isFiltered( foreach (const shared_ptr<OfferFilter>& offerFilter, agentFilters->second) { if (offerFilter->filter(resources)) { VLOG(1) << "Filtered offer with " << resources - << " on agent " << slaveId + << " on agent " << slave.info.id() << " for role " << role << " of framework " << framework.frameworkId; @@ -2594,15 +2570,13 @@ bool HierarchicalAllocatorProcess::isFiltered( bool HierarchicalAllocatorProcess::isFiltered( - const Framework& framework, const SlaveID& slaveId) const + const Framework& framework, const Slave& slave) const { - CHECK_CONTAINS(slaves, slaveId); - - if (framework.inverseOfferFilters.contains(slaveId)) { + if (framework.inverseOfferFilters.contains(slave.info.id())) { foreach (const shared_ptr<InverseOfferFilter>& inverseOfferFilter, - framework.inverseOfferFilters.at(slaveId)) { + framework.inverseOfferFilters.at(slave.info.id())) { if (inverseOfferFilter->filter()) { - VLOG(1) << "Filtered unavailability on agent " << slaveId + VLOG(1) << "Filtered unavailability on agent " << slave.info.id() << " for framework " << framework.frameworkId; return true; @@ -2717,6 +2691,39 @@ bool HierarchicalAllocatorProcess::isFrameworkTrackedUnderRole( } +Option<Slave*> HierarchicalAllocatorProcess::getSlave( + const SlaveID& slaveId) const +{ + auto it = slaves.find(slaveId); + + if (it == slaves.end()) return None(); + + return const_cast<Slave*>(&it->second); +} + + +Option<Framework*> HierarchicalAllocatorProcess::getFramework( + const FrameworkID& frameworkId) const +{ + auto it = frameworks.find(frameworkId); + + if (it == frameworks.end()) return None(); + + return const_cast<Framework*>(&it->second); +} + + +Option<Sorter*> HierarchicalAllocatorProcess::getFrameworkSorter( + const string& role) const +{ + auto it = frameworkSorters.find(role); + + if (it == frameworkSorters.end()) return None(); + + return const_cast<Sorter*>(it->second.get()); +} + + const Quota& HierarchicalAllocatorProcess::getQuota(const string& role) const { Option<const Role*> r = roleTree.get(role); @@ -2740,18 +2747,23 @@ void HierarchicalAllocatorProcess::trackFrameworkUnderRole( CHECK_NOT_CONTAINS(frameworkSorters, role); frameworkSorters.insert({role, Owned<Sorter>(frameworkSorterFactory())}); - frameworkSorters.at(role)->initialize(options.fairnessExcludeResourceNames); + + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); + + frameworkSorter->initialize(options.fairnessExcludeResourceNames); foreachvalue (const Slave& slave, slaves) { - frameworkSorters.at(role)->add(slave.info.id(), slave.getTotal()); + frameworkSorter->add(slave.info.id(), slave.getTotal()); } } roleTree.trackFramework(frameworkId, role); - CHECK_NOT_CONTAINS(*frameworkSorters.at(role), frameworkId.value()) + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); + + CHECK_NOT_CONTAINS(*frameworkSorter, frameworkId.value()) << " for role " << role; - frameworkSorters.at(role)->add(frameworkId.value()); + frameworkSorter->add(frameworkId.value()); } @@ -2762,14 +2774,15 @@ void HierarchicalAllocatorProcess::untrackFrameworkUnderRole( roleTree.untrackFramework(frameworkId, role); - CHECK_CONTAINS(frameworkSorters, role); - CHECK_CONTAINS(*frameworkSorters.at(role), frameworkId.value()) + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); + + CHECK_CONTAINS(*frameworkSorter, frameworkId.value()) << " for role " << role; - frameworkSorters.at(role)->remove(frameworkId.value()); + frameworkSorter->remove(frameworkId.value()); if (roleTree.get(role).isNone() || (*roleTree.get(role))->frameworks().empty()) { - CHECK_EQ(frameworkSorters.at(role)->count(), 0u); + CHECK_EQ(frameworkSorter->count(), 0u); roleSorter->remove(role); frameworkSorters.erase(role); } @@ -2780,9 +2793,7 @@ bool HierarchicalAllocatorProcess::updateSlaveTotal( const SlaveID& slaveId, const Resources& total) { - CHECK_CONTAINS(slaves, slaveId); - - Slave& slave = slaves.at(slaveId); + Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); const Resources oldTotal = slave.getTotal(); @@ -2919,12 +2930,14 @@ void HierarchicalAllocatorProcess::trackAllocatedResources( } CHECK_CONTAINS(*roleSorter, role); - CHECK_CONTAINS(frameworkSorters, role); - CHECK_CONTAINS(*frameworkSorters.at(role), frameworkId.value()) + + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); + + CHECK_CONTAINS(*frameworkSorter, frameworkId.value()) << " for role " << role; roleSorter->allocated(role, slaveId, allocation); - frameworkSorters.at(role)->allocated( + frameworkSorter->allocated( frameworkId.value(), slaveId, allocation); } } @@ -2949,11 +2962,13 @@ void HierarchicalAllocatorProcess::untrackAllocatedResources( const Resources& allocation, allocated.allocations()) { CHECK_CONTAINS(*roleSorter, role); - CHECK_CONTAINS(frameworkSorters, role); - CHECK_CONTAINS(*frameworkSorters.at(role), frameworkId.value()) + + Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); + + CHECK_CONTAINS(*frameworkSorter, frameworkId.value()) << "for role " << role; - frameworkSorters.at(role)->unallocated( + frameworkSorter->unallocated( frameworkId.value(), slaveId, allocation); roleSorter->unallocated(role, slaveId, allocation); diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index 22dd881..65d103e 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -600,12 +600,14 @@ protected: bool isFiltered( const Framework& framework, const std::string& role, - const SlaveID& slaveId, + const Slave& slave, const Resources& resources) const; // Returns true if there is an inverse offer filter for this framework // on this slave. - bool isFiltered(const Framework& framework, const SlaveID& slaveID) const; + bool isFiltered( + const Framework& framework, + const Slave& slave) const; bool allocatable( const Resources& resources, @@ -728,6 +730,11 @@ private: const FrameworkID& frameworkId, const std::string& role) const; + Option<Slave*> getSlave(const SlaveID& slaveId) const; + Option<Framework*> getFramework(const FrameworkID& frameworkId) const; + + Option<Sorter*> getFrameworkSorter(const std::string& role) const; + const Quota& getQuota(const std::string& role) const; void trackFrameworkUnderRole(
