Repository: mesos Updated Branches: refs/heads/1.5.x f541ec0ff -> 5602cd2d0
Refactored `struct Slave` in the allocator for better performance. This patch refactors the `struct Slave` in the allocator. In particular, it addresses the slowness of computing agents' available resources. Instead of calculating them every time on the fly, this patch "denormalizes" the agent available resources by updating and persisting the field each time an agent's allocated or total resources change. Review: https://reviews.apache.org/r/67561/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/762c78d5 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/762c78d5 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/762c78d5 Branch: refs/heads/1.5.x Commit: 762c78d5351e2cbef4dc5deb58389ee48e56ef4f Parents: f541ec0 Author: Meng Zhu <[email protected]> Authored: Thu Jun 21 09:09:36 2018 -0700 Committer: Greg Mann <[email protected]> Committed: Tue Jul 3 07:27:26 2018 -0700 ---------------------------------------------------------------------- src/master/allocator/mesos/hierarchical.cpp | 75 +++++++++-------- src/master/allocator/mesos/hierarchical.hpp | 103 +++++++++++++++++------ 2 files changed, 114 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/762c78d5/src/master/allocator/mesos/hierarchical.cpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp index a483dfb..381e359 100644 --- a/src/master/allocator/mesos/hierarchical.cpp +++ b/src/master/allocator/mesos/hierarchical.cpp @@ -508,16 +508,16 @@ void HierarchicalAllocatorProcess::addSlave( CHECK_EQ(slaveId, slaveInfo.id()); CHECK(!paused || expectedAgentCount.isSome()); - slaves[slaveId] = Slave(); + slaves.insert({slaveId, + Slave( + slaveInfo, + protobuf::slave::Capabilities(capabilities), + true, + total, + Resources::sum(used))}); Slave& slave = slaves.at(slaveId); - slave.total = total; - slave.allocated = Resources::sum(used); - slave.activated = true; - slave.info = slaveInfo; - slave.capabilities = protobuf::slave::Capabilities(capabilities); - // NOTE: We currently implement maintenance in the allocator to be able to // leverage state and features such as the FrameworkSorter and OfferFilter. if (unavailability.isSome()) { @@ -574,8 +574,8 @@ void HierarchicalAllocatorProcess::addSlave( } LOG(INFO) << "Added agent " << slaveId << " (" << slave.info.hostname() << ")" - << " with " << slave.total - << " (allocated: " << slave.allocated << ")"; + << " with " << slave.getTotal() + << " (allocated: " << slave.getAllocated() << ")"; allocate(slaveId); } @@ -593,12 +593,13 @@ void HierarchicalAllocatorProcess::removeSlave( // all the resources. Fixing this would require more information // than what we currently track in the allocator. - roleSorter->remove(slaveId, slaves.at(slaveId).total); + roleSorter->remove(slaveId, slaves.at(slaveId).getTotal()); // See comment at `quotaRoleSorter` declaration regarding non-revocable. - quotaRoleSorter->remove(slaveId, slaves.at(slaveId).total.nonRevocable()); + quotaRoleSorter->remove( + slaveId, slaves.at(slaveId).getTotal().nonRevocable()); - untrackReservations(slaves.at(slaveId).total.reservations()); + untrackReservations(slaves.at(slaveId).getTotal().reservations()); slaves.erase(slaveId); allocationCandidates.erase(slaveId); @@ -708,8 +709,8 @@ void HierarchicalAllocatorProcess::addResourceProvider( } Slave& slave = slaves.at(slaveId); - updateSlaveTotal(slaveId, slave.total + total); - slave.allocated += Resources::sum(used); + updateSlaveTotal(slaveId, slave.getTotal() + total); + slave.allocate(Resources::sum(used)); VLOG(1) << "Grew agent " << slaveId << " by " @@ -846,8 +847,8 @@ void HierarchicalAllocatorProcess::updateAllocation( const Resources& updatedOfferedResources = _updatedOfferedResources.get(); // Update the per-slave allocation. - slave.allocated -= offeredResources; - slave.allocated += updatedOfferedResources; + slave.unallocate(offeredResources); + slave.allocate(updatedOfferedResources); // Update the allocation in the framework sorter. frameworkSorter->update( @@ -906,7 +907,7 @@ void HierarchicalAllocatorProcess::updateAllocation( strippedConversions.emplace_back(consumed, converted); } - Try<Resources> updatedTotal = slave.total.apply(strippedConversions); + Try<Resources> updatedTotal = slave.getTotal().apply(strippedConversions); CHECK_SOME(updatedTotal); updateSlaveTotal(slaveId, updatedTotal.get()); @@ -957,7 +958,7 @@ Future<Nothing> HierarchicalAllocatorProcess::updateAvailable( // \___/ \___/ // // where A = allocate, R = reserve, U = updateAvailable - Try<Resources> updatedAvailable = slave.available().apply(operations); + Try<Resources> updatedAvailable = slave.getAvailable().apply(operations); if (updatedAvailable.isError()) { VLOG(1) << "Failed to update available resources on agent " << slaveId << ": " << updatedAvailable.error(); @@ -965,7 +966,7 @@ Future<Nothing> HierarchicalAllocatorProcess::updateAvailable( } // Update the total resources. - Try<Resources> updatedTotal = slave.total.apply(operations); + Try<Resources> updatedTotal = slave.getTotal().apply(operations); CHECK_SOME(updatedTotal); // Update the total resources in the allocator and role and quota sorters. @@ -1186,14 +1187,14 @@ void HierarchicalAllocatorProcess::recoverResources( if (slaves.contains(slaveId)) { Slave& slave = slaves.at(slaveId); - CHECK(slave.allocated.contains(resources)) - << slave.allocated << " does not contain " << resources; + CHECK(slave.getAllocated().contains(resources)) + << slave.getAllocated() << " does not contain " << resources; - slave.allocated -= resources; + slave.unallocate(resources); VLOG(1) << "Recovered " << resources - << " (total: " << slave.total - << ", allocated: " << slave.allocated << ")" + << " (total: " << slave.getTotal() + << ", allocated: " << slave.getAllocated() << ")" << " on agent " << slaveId << " from framework " << frameworkId; } @@ -1708,7 +1709,7 @@ void HierarchicalAllocatorProcess::__allocate() // NOTE: `totalScalarQuantities` omits dynamic reservation, // persistent volume info, and allocation info. We additionally // remove the static reservations here via `toUnreserved()`. - availableHeadroom -= slave.available().revocable() + availableHeadroom -= slave.getAvailable().revocable() .createStrippedScalarQuantity().toUnreserved(); } @@ -1791,7 +1792,7 @@ void HierarchicalAllocatorProcess::__allocate() // See MESOS-5634. if (filterGpuResources && !framework.capabilities.gpuResources && - slave.total.gpus().getOrElse(0) > 0) { + slave.getTotal().gpus().getOrElse(0) > 0) { continue; } @@ -1804,14 +1805,14 @@ void HierarchicalAllocatorProcess::__allocate() // Calculate the currently available resources on the slave, which // is the difference in non-shared resources between total and // allocated, plus all shared resources on the agent (if applicable). - Resources available = slave.available().nonShared(); + Resources available = slave.getAvailable().nonShared(); // Since shared resources are offerable even when they are in use, we // make one copy of the shared resources available regardless of the // past allocations. Offer a shared resource only if it has not been // offered in this offer cycle to a framework. if (framework.capabilities.sharedResources) { - available += slave.total.shared(); + available += slave.getTotal().shared(); if (offeredSharedResources.contains(slaveId)) { available -= offeredSharedResources[slaveId]; } @@ -2002,7 +2003,7 @@ void HierarchicalAllocatorProcess::__allocate() // multiple copies of the same shared resources. const Resources newShared = resources.shared() .filter([this, &slaveId](const Resources& resource) { - return !slaves.at(slaveId).allocated.contains(resource); + return !slaves.at(slaveId).getAllocated().contains(resource); }); // We remove the static reservation metadata here via `toUnreserved()`. @@ -2010,7 +2011,7 @@ void HierarchicalAllocatorProcess::__allocate() (resources.reserved(role).nonShared() + newShared) .createStrippedScalarQuantity().toUnreserved(); - slave.allocated += resources; + slave.allocate(resources); trackAllocatedResources(slaveId, frameworkId, resources); } @@ -2051,7 +2052,7 @@ void HierarchicalAllocatorProcess::__allocate() // See MESOS-5634. if (filterGpuResources && !framework.capabilities.gpuResources && - slave.total.gpus().getOrElse(0) > 0) { + slave.getTotal().gpus().getOrElse(0) > 0) { continue; } @@ -2064,14 +2065,14 @@ void HierarchicalAllocatorProcess::__allocate() // Calculate the currently available resources on the slave, which // is the difference in non-shared resources between total and // allocated, plus all shared resources on the agent (if applicable). - Resources available = slave.available().nonShared(); + Resources available = slave.getAvailable().nonShared(); // Since shared resources are offerable even when they are in use, we // make one copy of the shared resources available regardless of the // past allocations. Offer a shared resource only if it has not been // offered in this offer cycle to a framework. if (framework.capabilities.sharedResources) { - available += slave.total.shared(); + available += slave.getTotal().shared(); if (offeredSharedResources.contains(slaveId)) { available -= offeredSharedResources[slaveId]; } @@ -2165,7 +2166,7 @@ void HierarchicalAllocatorProcess::__allocate() headroomToAllocate.createStrippedScalarQuantity(); } - slave.allocated += resources; + slave.allocate(resources); trackAllocatedResources(slaveId, frameworkId, resources); } @@ -2497,7 +2498,7 @@ double HierarchicalAllocatorProcess::_resources_offered_or_allocated( foreachvalue (const Slave& slave, slaves) { Option<Value::Scalar> value = - slave.allocated.get<Value::Scalar>(resource); + slave.getAllocated().get<Value::Scalar>(resource); if (value.isSome()) { offered_or_allocated += value->value(); @@ -2673,13 +2674,13 @@ bool HierarchicalAllocatorProcess::updateSlaveTotal( Slave& slave = slaves.at(slaveId); - const Resources oldTotal = slave.total; + const Resources oldTotal = slave.getTotal(); if (oldTotal == total) { return false; } - slave.total = total; + slave.updateTotal(total); hashmap<std::string, Resources> oldReservations = oldTotal.reservations(); hashmap<std::string, Resources> newReservations = total.reservations(); http://git-wip-us.apache.org/repos/asf/mesos/blob/762c78d5/src/master/allocator/mesos/hierarchical.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index 37d7b75..ec45e16 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -354,42 +354,54 @@ protected: hashmap<FrameworkID, Framework> frameworks; - struct Slave + class Slave { - // Total amount of regular *and* oversubscribed resources. - Resources total; - - // Regular *and* oversubscribed resources that are allocated. - // - // NOTE: We maintain multiple copies of each shared resource allocated - // to a slave, where the number of copies represents the number of times - // this shared resource has been allocated to (and has not been recovered - // from) a specific framework. - // - // NOTE: We keep track of slave's allocated resources despite - // having that information in sorters. This is because the - // information in sorters is not accurate if some framework - // hasn't reregistered. See MESOS-2919 for details. - Resources allocated; - - // We track the total and allocated resources on the slave, the - // available resources are computed as follows: - // - // available = total - allocated - // - // Note that it's possible for the slave to be over-allocated! - // In this case, allocated > total. - Resources available() const + public: + Slave( + const SlaveInfo& _info, + const protobuf::slave::Capabilities& _capabilities, + bool _activated, + const Resources& _total, + const Resources& _allocated) + : info(_info), + capabilities(_capabilities), + activated(_activated), + total(_total), + allocated(_allocated) { // In order to subtract from the total, // we strip the allocation information. Resources allocated_ = allocated; allocated_.unallocate(); - return total - allocated_; + available = total - allocated_; + } + + Resources getTotal() const { return total; } + + Resources getAllocated() const { return allocated; } + + Resources getAvailable() const { return available; } + + void updateTotal(const Resources& newTotal) { + total = newTotal; + + updateAvailable(); + } + + void allocate(const Resources& toAllocate) + { + allocated += toAllocate; + + updateAvailable(); } - bool activated; // Whether to offer resources. + void unallocate(const Resources& toUnallocate) + { + allocated -= toUnallocate; + + updateAvailable(); + } // The `SlaveInfo` that was passed to the allocator when the slave was added // or updated. Currently only two fields are used: `hostname` for host @@ -399,6 +411,8 @@ protected: protobuf::slave::Capabilities capabilities; + bool activated; // Whether to offer resources. + // Represents a scheduled unavailability due to maintenance for a specific // slave, and the responses from frameworks as to whether they will be able // to gracefully handle this unavailability. @@ -434,6 +448,41 @@ protected: // a given point in time, for an optional duration. This information is used // to send out `InverseOffers`. Option<Maintenance> maintenance; + + private: + void updateAvailable() { + // In order to subtract from the total, + // we strip the allocation information. + Resources allocated_ = allocated; + allocated_.unallocate(); + + available = total - allocated_; + } + + // Total amount of regular *and* oversubscribed resources. + Resources total; + + // Regular *and* oversubscribed resources that are allocated. + // + // NOTE: We maintain multiple copies of each shared resource allocated + // to a slave, where the number of copies represents the number of times + // this shared resource has been allocated to (and has not been recovered + // from) a specific framework. + // + // NOTE: We keep track of the slave's allocated resources despite + // having that information in sorters. This is because the + // information in sorters is not accurate if some framework + // hasn't reregistered. See MESOS-2919 for details. + Resources allocated; + + // We track the total and allocated resources on the slave, the + // available resources are computed as follows: + // + // available = total - allocated + // + // Note that it's possible for the slave to be over-allocated! + // In this case, allocated > total. + Resources available; }; hashmap<SlaveID, Slave> slaves;
