This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch 1.7.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 1d850597a3ecd671ab93ee8dba1be7f428247d0b Author: Andrei Sekretenko <[email protected]> AuthorDate: Wed Oct 30 19:53:36 2019 -0400 Fixed allocator performance issue in updateAllocation(). This patch addresses poor performance of `HierarchicalAllocatorProcess::updateAllocation()` for agents with a huge number of non-addable resources in a many-framework case (see MESOS-10015). Sorter methods for totals tracking that modify `Resources` of an agent in the Sorter are replaced with methods that add/remove resource quantities of an agent as a whole (which was actually the only use case of the old methods). Thus, subtracting/adding `Resources` of a whole agent no longer occurs when updating resources of an agent in a Sorter. Further, this patch completely removes agent resource tracking logic from the random sorter (which by itself makes no use of them) by implementing cluster totals tracking in the allocator. Results of `*BENCHMARK_WithReservationParam.UpdateAllocation*` (for the DRF sorter): 1.7.x branch: Agent resources size: 200 (50 frameworks) Made 20 reserve and unreserve operations in 2.014081646secs Agent resources size: 400 (100 frameworks) Made 20 reserve and unreserve operations in 13.623513239secs Agent resources size: 800 (200 frameworks) Made 20 reserve and unreserve operations in 2.14100063438333mins Agent resources size: 1600 (400 frameworks) (killed after several minutes) 1.7.x branch + this patch: Agent resources size: 200 (50 frameworks) Made 20 reserve and unreserve operations in 236.706615ms Agent resources size: 400 (100 frameworks) Made 20 reserve and unreserve operations in 483.544585ms Agent resources size: 800 (200 frameworks) Made 20 reserve and unreserve operations in 1.095224322secs ... Agent resources size: 6400 (1600 frameworks) Made 20 reserve and unreserve operations in 50.369691741secs This is a backport of https://reviews.apache.org/r/71646 Review: https://reviews.apache.org/r/71698/ --- src/master/allocator/mesos/hierarchical.cpp | 69 +++++-- src/master/allocator/mesos/hierarchical.hpp | 3 + src/master/allocator/sorter/drf/sorter.cpp | 75 ++----- src/master/allocator/sorter/drf/sorter.hpp | 41 +--- src/master/allocator/sorter/random/sorter.cpp | 58 ------ src/master/allocator/sorter/random/sorter.hpp | 42 +--- src/master/allocator/sorter/sorter.hpp | 22 +- src/tests/sorter_tests.cpp | 278 +++++++++++--------------- 8 files changed, 220 insertions(+), 368 deletions(-) diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp index 3e8a8ce..116d3c4 100644 --- a/src/master/allocator/mesos/hierarchical.cpp +++ b/src/master/allocator/mesos/hierarchical.cpp @@ -537,14 +537,21 @@ void HierarchicalAllocatorProcess::addSlave( trackReservations(total.reservations()); - roleSorter->add(slaveId, total); + const Resources strippedScalars = total.createStrippedScalarQuantity(); + const ResourceQuantities agentScalarQuantities = + ResourceQuantities::fromScalarResources(strippedScalars); + + totalStrippedScalars += strippedScalars; + roleSorter->addSlave(slaveId, agentScalarQuantities); foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { - sorter->add(slaveId, total); + sorter->addSlave(slaveId, agentScalarQuantities); } // See comment at `quotaRoleSorter` declaration regarding non-revocable. - quotaRoleSorter->add(slaveId, total.nonRevocable()); + quotaRoleSorter->addSlave( + slaveId, + ResourceQuantities::fromScalarResources(total.nonRevocable().scalars())); foreachpair (const FrameworkID& frameworkId, const Resources& allocation, @@ -608,18 +615,22 @@ void HierarchicalAllocatorProcess::removeSlave( // all the resources. Fixing this would require more information // than what we currently track in the allocator. - roleSorter->remove(slaveId, slaves.at(slaveId).getTotal()); + roleSorter->removeSlave(slaveId); foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { - sorter->remove(slaveId, slaves.at(slaveId).getTotal()); + sorter->removeSlave(slaveId); } - // See comment at `quotaRoleSorter` declaration regarding non-revocable. - quotaRoleSorter->remove( - slaveId, slaves.at(slaveId).getTotal().nonRevocable()); + quotaRoleSorter->removeSlave(slaveId); untrackReservations(slaves.at(slaveId).getTotal().reservations()); + const Resources strippedScalars = + slaves.at(slaveId).getTotal().createStrippedScalarQuantity(); + + CHECK(totalStrippedScalars.contains(strippedScalars)); + totalStrippedScalars -= strippedScalars; + slaves.erase(slaveId); allocationCandidates.erase(slaveId); @@ -1727,7 +1738,11 @@ void HierarchicalAllocatorProcess::__allocate() // allocated resources - // unallocated reservations - // unallocated revocable resources - Resources availableHeadroom = roleSorter->totalScalarQuantities(); + Resources availableHeadroom = totalStrippedScalars; + + // NOTE: The role sorter does not return aggregated allocation + // information whereas `reservationScalarQuantities` does, so + // we need to loop over only top level roles for the latter. // Subtract allocated resources from the total. foreachkey (const string& role, roles) { @@ -2473,8 +2488,7 @@ double HierarchicalAllocatorProcess::_resources_total( const string& resource) { Option<Value::Scalar> total = - roleSorter->totalScalarQuantities() - .get<Value::Scalar>(resource); + totalStrippedScalars.get<Value::Scalar>(resource); return total.isSome() ? total->value() : 0; } @@ -2545,7 +2559,9 @@ void HierarchicalAllocatorProcess::trackFrameworkUnderRole( frameworkSorters.at(role)->initialize(fairnessExcludeResourceNames); foreachvalue (const Slave& slave, slaves) { - frameworkSorters.at(role)->add(slave.info.id(), slave.getTotal()); + frameworkSorters.at(role)->addSlave( + slave.info.id(), + ResourceQuantities::fromScalarResources(slave.getTotal().scalars())); } metrics.addRole(role); @@ -2662,18 +2678,33 @@ bool HierarchicalAllocatorProcess::updateSlaveTotal( trackReservations(newReservations); } - // Update the totals in the sorters. - roleSorter->remove(slaveId, oldTotal); - roleSorter->add(slaveId, total); + // Update the total in the allocator and totals in the sorters. + const Resources oldStrippedScalars = oldTotal.createStrippedScalarQuantity(); + const Resources strippedScalars = total.createStrippedScalarQuantity(); + + const ResourceQuantities oldAgentScalarQuantities = + ResourceQuantities::fromScalarResources(oldStrippedScalars); + + const ResourceQuantities agentScalarQuantities = + ResourceQuantities::fromScalarResources(strippedScalars); + + CHECK(totalStrippedScalars.contains(oldStrippedScalars)); + totalStrippedScalars -= oldStrippedScalars; + totalStrippedScalars += strippedScalars; + + roleSorter->removeSlave(slaveId); + roleSorter->addSlave(slaveId, agentScalarQuantities); foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { - sorter->remove(slaveId, oldTotal); - sorter->add(slaveId, total); + sorter->removeSlave(slaveId); + sorter->addSlave(slaveId, agentScalarQuantities); } // See comment at `quotaRoleSorter` declaration regarding non-revocable. - quotaRoleSorter->remove(slaveId, oldTotal.nonRevocable()); - quotaRoleSorter->add(slaveId, total.nonRevocable()); + quotaRoleSorter->removeSlave(slaveId); + quotaRoleSorter->addSlave( + slaveId, + ResourceQuantities::fromScalarResources(total.nonRevocable().scalars())); return true; } diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index 1fce68f..97d4a71 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -523,6 +523,9 @@ protected: hashmap<SlaveID, Slave> slaves; + // Total stripped scalar resources on all agents. + Resources totalStrippedScalars; + // A set of agents that are kept as allocation candidates. Events // may add or remove candidates to the set. When an allocation is // processed, the set of candidates is cleared. diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp index 43c9767..fb1baec 100644 --- a/src/master/allocator/sorter/drf/sorter.cpp +++ b/src/master/allocator/sorter/drf/sorter.cpp @@ -493,69 +493,36 @@ Resources DRFSorter::allocation( } -const Resources& DRFSorter::totalScalarQuantities() const +void DRFSorter::addSlave( + const SlaveID& slaveId, + const ResourceQuantities& scalarQuantities) { - return total_.scalarQuantities; -} + bool inserted = total_.agentResourceQuantities.emplace( + slaveId, scalarQuantities).second; + CHECK(inserted) << "Attempted to add already added agent " << slaveId; -void DRFSorter::add(const SlaveID& slaveId, const Resources& resources) -{ - if (!resources.empty()) { - // Add shared resources to the total quantities when the same - // resources don't already exist in the total. - const Resources newShared = resources.shared() - .filter([this, slaveId](const Resource& resource) { - return !total_.resources[slaveId].contains(resource); - }); - - total_.resources[slaveId] += resources; - - const Resources scalarQuantities = - (resources.nonShared() + newShared).createStrippedScalarQuantity(); - - total_.scalarQuantities += scalarQuantities; - total_.totals += ResourceQuantities::fromScalarResources(scalarQuantities); - - // We have to recalculate all shares when the total resources - // change, but we put it off until `sort` is called so that if - // something else changes before the next allocation we don't - // recalculate everything twice. - dirty = true; - } + total_.totals += scalarQuantities; + + // We have to recalculate all shares when the total resources + // change, but we put it off until `sort` is called so that if + // something else changes before the next allocation we don't + // recalculate everything twice. + dirty = true; } -void DRFSorter::remove(const SlaveID& slaveId, const Resources& resources) +void DRFSorter::removeSlave(const SlaveID& slaveId) { - if (!resources.empty()) { - CHECK(total_.resources.contains(slaveId)); - CHECK(total_.resources[slaveId].contains(resources)) - << total_.resources[slaveId] << " does not contain " << resources; - - total_.resources[slaveId] -= resources; + const auto agent = total_.agentResourceQuantities.find(slaveId); + CHECK(agent != total_.agentResourceQuantities.end()) + << "Attempted to remove unknown agent " << slaveId; - // Remove shared resources from the total quantities when there - // are no instances of same resources left in the total. - const Resources absentShared = resources.shared() - .filter([this, slaveId](const Resource& resource) { - return !total_.resources[slaveId].contains(resource); - }); + // CHECK(total_.totals.contains(agent->second)); + total_.totals -= agent->second; - const Resources scalarQuantities = - (resources.nonShared() + absentShared).createStrippedScalarQuantity(); - - CHECK(total_.scalarQuantities.contains(scalarQuantities)); - total_.scalarQuantities -= scalarQuantities; - - total_.totals -= ResourceQuantities::fromScalarResources(scalarQuantities); - - if (total_.resources[slaveId].empty()) { - total_.resources.erase(slaveId); - } - - dirty = true; - } + total_.agentResourceQuantities.erase(agent); + dirty = true; } diff --git a/src/master/allocator/sorter/drf/sorter.hpp b/src/master/allocator/sorter/drf/sorter.hpp index 75f90f3..bfbb9a3 100644 --- a/src/master/allocator/sorter/drf/sorter.hpp +++ b/src/master/allocator/sorter/drf/sorter.hpp @@ -96,11 +96,11 @@ public: const std::string& clientPath, const SlaveID& slaveId) const override; - const Resources& totalScalarQuantities() const override; - - void add(const SlaveID& slaveId, const Resources& resources) override; + void addSlave( + const SlaveID& slaveId, + const ResourceQuantities& scalarQuantities) override; - void remove(const SlaveID& slaveId, const Resources& resources) override; + void removeSlave(const SlaveID& slaveId) override; std::vector<std::string> sort() override; @@ -150,34 +150,15 @@ private: // Total resources. struct Total { - // We need to keep track of the resources (and not just scalar - // quantities) to account for multiple copies of the same shared - // resources. We need to ensure that we do not update the scalar - // quantities for shared resources when the change is only in the - // number of copies in the sorter. - hashmap<SlaveID, Resources> resources; - - // NOTE: Scalars can be safely aggregated across slaves. We keep - // that to speed up the calculation of shares. See MESOS-2891 for - // the reasons why we want to do that. - // - // NOTE: We omit information about dynamic reservations and - // persistent volumes here to enable resources to be aggregated - // across slaves more effectively. See MESOS-4833 for more - // information. - // - // Sharedness info is also stripped out when resource identities - // are omitted because sharedness inherently refers to the - // identities of resources and not quantities. - Resources scalarQuantities; + ResourceQuantities totals; - // To improve the performance of calculating shares, we store - // a redundant but more efficient version of `scalarQuantities`. - // See MESOS-4694. + // We keep track of per-agent resource quantities to handle agent removal. // - // TODO(bmahler): Can we remove `scalarQuantities` in favor of - // using this type whenever scalar quantities are needed? - ResourceQuantities totals; + // Note that the only way to change the stored resource quantities + // is to remove the agent from the sorter and add it with new resources. + // Thus, when a resource shared count on an agent changes, multiple copies + // of the same shared resource are still accounted for exactly once. + hashmap<SlaveID, const ResourceQuantities> agentResourceQuantities; } total_; // Metrics are optionally exposed by the sorter. diff --git a/src/master/allocator/sorter/random/sorter.cpp b/src/master/allocator/sorter/random/sorter.cpp index 6fcfc41..95070cc 100644 --- a/src/master/allocator/sorter/random/sorter.cpp +++ b/src/master/allocator/sorter/random/sorter.cpp @@ -418,64 +418,6 @@ Resources RandomSorter::allocation( } -const Resources& RandomSorter::totalScalarQuantities() const -{ - return total_.scalarQuantities; -} - - -void RandomSorter::add(const SlaveID& slaveId, const Resources& resources) -{ - if (!resources.empty()) { - // Add shared resources to the total quantities when the same - // resources don't already exist in the total. - const Resources newShared = resources.shared() - .filter([this, slaveId](const Resource& resource) { - return !total_.resources[slaveId].contains(resource); - }); - - total_.resources[slaveId] += resources; - - const Resources scalarQuantities = - (resources.nonShared() + newShared).createStrippedScalarQuantity(); - - total_.scalarQuantities += scalarQuantities; - total_.totals += ResourceQuantities::fromScalarResources(scalarQuantities); - } -} - - -void RandomSorter::remove(const SlaveID& slaveId, const Resources& resources) -{ - if (!resources.empty()) { - CHECK(total_.resources.contains(slaveId)); - CHECK(total_.resources[slaveId].contains(resources)) - << total_.resources[slaveId] << " does not contain " << resources; - - total_.resources[slaveId] -= resources; - - // Remove shared resources from the total quantities when there - // are no instances of same resources left in the total. - const Resources absentShared = resources.shared() - .filter([this, slaveId](const Resource& resource) { - return !total_.resources[slaveId].contains(resource); - }); - - const Resources scalarQuantities = - (resources.nonShared() + absentShared).createStrippedScalarQuantity(); - - CHECK(total_.scalarQuantities.contains(scalarQuantities)); - total_.scalarQuantities -= scalarQuantities; - - total_.totals -= ResourceQuantities::fromScalarResources(scalarQuantities); - - if (total_.resources[slaveId].empty()) { - total_.resources.erase(slaveId); - } - } -} - - vector<string> RandomSorter::sort() { std::function<void (Node*)> shuffleTree = [this, &shuffleTree](Node* node) { diff --git a/src/master/allocator/sorter/random/sorter.hpp b/src/master/allocator/sorter/random/sorter.hpp index 2031cb2..2aa4ee8 100644 --- a/src/master/allocator/sorter/random/sorter.hpp +++ b/src/master/allocator/sorter/random/sorter.hpp @@ -95,11 +95,12 @@ public: const std::string& clientPath, const SlaveID& slaveId) const override; - const Resources& totalScalarQuantities() const override; - - void add(const SlaveID& slaveId, const Resources& resources) override; + // NOTE: addSlave/removeSlave is a no-op for this sorter. + void addSlave( + const SlaveID& slaveId, + const ResourceQuantities& scalarQuantities) override {}; - void remove(const SlaveID& slaveId, const Resources& resources) override; + void removeSlave(const SlaveID& slaveId) override {}; // This will perform a weighted random shuffle on each call. // @@ -144,39 +145,6 @@ private: // rooted at that path. This hashmap might include weights for paths // that are not currently in the sorter tree. hashmap<std::string, double> weights; - - // Total resources. - struct Total - { - // We need to keep track of the resources (and not just scalar - // quantities) to account for multiple copies of the same shared - // resources. We need to ensure that we do not update the scalar - // quantities for shared resources when the change is only in the - // number of copies in the sorter. - hashmap<SlaveID, Resources> resources; - - // NOTE: Scalars can be safely aggregated across slaves. We keep - // that to speed up the calculation of shares. See MESOS-2891 for - // the reasons why we want to do that. - // - // NOTE: We omit information about dynamic reservations and - // persistent volumes here to enable resources to be aggregated - // across slaves more effectively. See MESOS-4833 for more - // information. - // - // Sharedness info is also stripped out when resource identities - // are omitted because sharedness inherently refers to the - // identities of resources and not quantities. - Resources scalarQuantities; - - // To improve the performance of calculating shares, we store - // a redundant but more efficient version of `scalarQuantities`. - // See MESOS-4694. - // - // TODO(bmahler): Can we remove `scalarQuantities` in favor of - // using this type whenever scalar quantities are needed? - ResourceQuantities totals; - } total_; }; diff --git a/src/master/allocator/sorter/sorter.hpp b/src/master/allocator/sorter/sorter.hpp index 25ad48d..20f877a 100644 --- a/src/master/allocator/sorter/sorter.hpp +++ b/src/master/allocator/sorter/sorter.hpp @@ -124,17 +124,19 @@ public: const std::string& client, const SlaveID& slaveId) const = 0; - // Returns the total scalar resource quantities in this sorter. This - // omits metadata about dynamic reservations and persistent volumes; see - // `Resources::createStrippedScalarQuantity`. - virtual const Resources& totalScalarQuantities() const = 0; - - // Add resources to the total pool of resources this - // Sorter should consider. - virtual void add(const SlaveID& slaveId, const Resources& resources) = 0; + // Add/remove total scalar resource quantities of an agent to/from the + // total pool of resources this Sorter should consider. + // + // NOTE: Updating resources of an agent in the Sorter is done by first calling + // `removeSlave()` and then `addSlave()` with new resource quantities. + // + // NOTE: Attempt to add the same agent twice or remove an agent not added + // to the Sorter may crash the program. + virtual void addSlave( + const SlaveID& slaveId, + const ResourceQuantities& scalarQuantities) = 0; - // Remove resources from the total pool. - virtual void remove(const SlaveID& slaveId, const Resources& resources) = 0; + virtual void removeSlave(const SlaveID& slaveId) = 0; // Returns all of the clients in the order that they should // be allocated to, according to this Sorter's policy. diff --git a/src/tests/sorter_tests.cpp b/src/tests/sorter_tests.cpp index 1e2791f..40b4e38 100644 --- a/src/tests/sorter_tests.cpp +++ b/src/tests/sorter_tests.cpp @@ -109,8 +109,8 @@ TEST(DRFSorterTest, DRF) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get()); EXPECT_EQ(vector<string>({}), sorter.sort()); @@ -153,16 +153,16 @@ TEST(DRFSorterTest, DRF) sorter.activate("e"); sorter.allocated("e", slaveId, eResources); - Resources removedResources = Resources::parse("cpus:50;mem:0").get(); - sorter.remove(slaveId, removedResources); - // total resources is now cpus = 50, mem = 100 + sorter.removeSlave(slaveId); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:50;mem:100").get()); // shares: b = .04, c = .02, d = .06, e = .05 EXPECT_EQ(vector<string>({"c", "b", "e", "d"}), sorter.sort()); - Resources addedResources = Resources::parse("cpus:0;mem:100").get(); - sorter.add(slaveId, addedResources); - // total resources is now cpus = 50, mem = 200 + sorter.removeSlave(slaveId); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:50;mem:200").get()); Resources fResources = Resources::parse("cpus:5;mem:1").get(); sorter.add("f"); @@ -202,7 +202,8 @@ TEST(DRFSorterTest, WDRF) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get()); sorter.add("a"); sorter.activate("a"); @@ -262,9 +263,8 @@ TEST(DRFSorterTest, UpdateWeight) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get()); sorter.add("a"); sorter.activate("a"); @@ -294,9 +294,8 @@ TEST(DRFSorterTest, AllocationCountTieBreak) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get()); sorter.add("a"); sorter.add("b"); @@ -377,8 +376,8 @@ TEST(DRFSorterTest, ShallowHierarchy) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get()); sorter.add("a/a"); sorter.activate("a/a"); @@ -420,16 +419,16 @@ TEST(DRFSorterTest, ShallowHierarchy) sorter.activate("e/e"); sorter.allocated("e/e", slaveId, eResources); - Resources removedResources = Resources::parse("cpus:50;mem:0").get(); - sorter.remove(slaveId, removedResources); - // total resources is now cpus = 50, mem = 100 + sorter.removeSlave(slaveId); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:50;mem:100").get()); // shares: b/b = .04, c/c = .02, d/d = .06, e/e = .05 EXPECT_EQ(vector<string>({"c/c", "b/b", "e/e", "d/d"}), sorter.sort()); - Resources addedResources = Resources::parse("cpus:0;mem:100").get(); - sorter.add(slaveId, addedResources); - // total resources is now cpus = 50, mem = 200 + sorter.removeSlave(slaveId); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:50;mem:200").get()); Resources fResources = Resources::parse("cpus:5;mem:1").get(); sorter.add("f/f"); @@ -472,8 +471,8 @@ TEST(DRFSorterTest, DeepHierarchy) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get()); sorter.add("a/a/a/a/a"); sorter.activate("a/a/a/a/a"); @@ -515,17 +514,17 @@ TEST(DRFSorterTest, DeepHierarchy) sorter.activate("e/e/e/e/e/e"); sorter.allocated("e/e/e/e/e/e", slaveId, eResources); - Resources removedResources = Resources::parse("cpus:50;mem:0").get(); - sorter.remove(slaveId, removedResources); - // total resources is now cpus = 50, mem = 100 + sorter.removeSlave(slaveId); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:50;mem:100").get()); // shares: b/b/b/b = .04, c/c/c = .02, d/d = .06, e/e/e/e/e/e = .05 EXPECT_EQ(vector<string>({"c/c/c", "b/b/b/b", "e/e/e/e/e/e", "d/d"}), sorter.sort()); - Resources addedResources = Resources::parse("cpus:0;mem:100").get(); - sorter.add(slaveId, addedResources); - // total resources is now cpus = 50, mem = 200 + sorter.removeSlave(slaveId); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:50;mem:200").get()); Resources fResources = Resources::parse("cpus:5;mem:1").get(); sorter.add("f/f"); @@ -568,8 +567,8 @@ TEST(DRFSorterTest, HierarchicalAllocation) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get()); sorter.add("a"); sorter.add("b/c"); @@ -675,8 +674,8 @@ TEST(DRFSorterTest, HierarchicalIterationOrder) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get()); sorter.add("a/b"); sorter.add("c"); @@ -723,7 +722,8 @@ TEST(DRFSorterTest, AddChildToLeaf) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get()); sorter.add("a"); sorter.activate("a"); @@ -794,7 +794,8 @@ TEST(DRFSorterTest, AddChildToInternal) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get()); sorter.add("x/a"); sorter.activate("x/a"); @@ -837,7 +838,8 @@ TEST(DRFSorterTest, AddChildToInactiveLeaf) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get()); sorter.add("a"); sorter.activate("a"); @@ -872,7 +874,8 @@ TEST(DRFSorterTest, RemoveLeafCollapseParent) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get()); sorter.add("a"); sorter.activate("a"); @@ -907,7 +910,8 @@ TEST(DRFSorterTest, RemoveLeafCollapseParentInactive) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get()); sorter.add("a"); sorter.activate("a"); @@ -943,7 +947,8 @@ TEST(DRFSorterTest, ChangeWeightOnSubtree) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:100").get()); sorter.updateWeight("b", 3); sorter.updateWeight("a", 2); @@ -1009,9 +1014,9 @@ TEST(DRFSorterTest, SplitResourceShares) disk2.mutable_disk()->mutable_persistence()->set_id("ID2"); disk2.mutable_disk()->mutable_volume()->set_container_path("data"); - sorter.add( + sorter.addSlave( slaveId, - Resources::parse("cpus:100;mem:100;disk:95").get() + disk1 + disk2); + ResourceQuantities::fromString("cpus:100;mem:100;disk:100").get()); // Now, allocate resources to "a" and "b". Note that "b" will have // more disk if the shares are accounted correctly! @@ -1036,7 +1041,8 @@ TYPED_TEST(CommonSorterTest, UpdateAllocation) sorter.activate("a"); sorter.activate("b"); - sorter.add(slaveId, Resources::parse("cpus:10;mem:10;disk:10").get()); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:10;mem:10;disk:10").get()); sorter.allocated( "a", slaveId, Resources::parse("cpus:10;mem:10;disk:10").get()); @@ -1073,7 +1079,8 @@ TYPED_TEST(CommonSorterTest, UpdateAllocationNestedClient) sorter.activate("a/x"); sorter.activate("b/y"); - sorter.add(slaveId, Resources::parse("cpus:10;mem:10;disk:10").get()); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:10;mem:10;disk:10").get()); sorter.allocated( "a/x", slaveId, Resources::parse("cpus:10;mem:10;disk:10").get()); @@ -1107,7 +1114,8 @@ TYPED_TEST(CommonSorterTest, AllocationForInactiveClient) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:10;mem:10").get()); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:10;mem:10").get()); sorter.add("a"); sorter.add("b"); @@ -1153,11 +1161,14 @@ TYPED_TEST(CommonSorterTest, MultipleSlaves) sorter.add("framework"); sorter.activate("framework"); - Resources slaveResources = + const Resources slaveResources = Resources::parse("cpus:2;mem:512;ports:[31000-32000]").get(); - sorter.add(slaveA, slaveResources); - sorter.add(slaveB, slaveResources); + const ResourceQuantities slaveResourceQuantities = + ResourceQuantities::fromScalarResources(slaveResources.scalars()); + + sorter.addSlave(slaveA, slaveResourceQuantities); + sorter.addSlave(slaveB, slaveResourceQuantities); sorter.allocated("framework", slaveA, slaveResources); sorter.allocated("framework", slaveB, slaveResources); @@ -1186,11 +1197,14 @@ TYPED_TEST(CommonSorterTest, MultipleSlavesUpdateAllocation) sorter.add("framework"); sorter.activate("framework"); - Resources slaveResources = + const Resources slaveResources = Resources::parse("cpus:2;mem:512;disk:10;ports:[31000-32000]").get(); - sorter.add(slaveA, slaveResources); - sorter.add(slaveB, slaveResources); + const ResourceQuantities slaveResourceQuantities = + ResourceQuantities::fromScalarResources(slaveResources.scalars()); + + sorter.addSlave(slaveA, slaveResourceQuantities); + sorter.addSlave(slaveB, slaveResourceQuantities); sorter.allocated("framework", slaveA, slaveResources); sorter.allocated("framework", slaveB, slaveResources); @@ -1228,7 +1242,8 @@ TEST(DRFSorterTest, UpdateTotal) sorter.activate("a"); sorter.activate("b"); - sorter.add(slaveId, Resources::parse("cpus:10;mem:100").get()); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:10;mem:100").get()); // Dominant share of "a" is 0.2 (cpus). sorter.allocated( @@ -1242,8 +1257,9 @@ TEST(DRFSorterTest, UpdateTotal) // Update the total resources by removing the previous total and // adding back the new total. - sorter.remove(slaveId, Resources::parse("cpus:10;mem:100").get()); - sorter.add(slaveId, Resources::parse("cpus:100;mem:10").get()); + sorter.removeSlave(slaveId); + sorter.addSlave( + slaveId, ResourceQuantities::fromString("cpus:100;mem:10").get()); // Now the dominant share of "a" is 0.1 (mem) and "b" is 0.2 (mem), // which should change the sort order. @@ -1268,8 +1284,11 @@ TEST(DRFSorterTest, MultipleSlavesUpdateTotal) sorter.activate("a"); sorter.activate("b"); - sorter.add(slaveA, Resources::parse("cpus:5;mem:50").get()); - sorter.add(slaveB, Resources::parse("cpus:5;mem:50").get()); + sorter.addSlave( + slaveA, ResourceQuantities::fromString("cpus:5;mem:50").get()); + + sorter.addSlave( + slaveB, ResourceQuantities::fromString("cpus:5;mem:50").get()); // Dominant share of "a" is 0.2 (cpus). sorter.allocated( @@ -1281,10 +1300,10 @@ TEST(DRFSorterTest, MultipleSlavesUpdateTotal) EXPECT_EQ(vector<string>({"b", "a"}), sorter.sort()); - // Update the total resources of slaveA by removing the previous - // total and adding the new total. - sorter.remove(slaveA, Resources::parse("cpus:5;mem:50").get()); - sorter.add(slaveA, Resources::parse("cpus:95;mem:50").get()); + // Update the total resources of slaveA. + sorter.removeSlave(slaveA); + sorter.addSlave( + slaveA, ResourceQuantities::fromString("cpus:95;mem:50").get()); // Now the dominant share of "a" is 0.02 (cpus) and "b" is 0.03 // (mem), which should change the sort order. @@ -1308,7 +1327,7 @@ TEST(DRFSorterTest, RemoveResources) sorter.activate("b"); Resources slaveTotal = Resources::parse("cpus", "10", "*").get(); - sorter.add(slaveId, slaveTotal); + sorter.addSlave(slaveId, ResourceQuantities::fromScalarResources(slaveTotal)); // Dominant share of "a" is 0.6 (cpus). Resources allocatedForA = Resources::parse("cpus", "6", "*").get(); @@ -1321,8 +1340,10 @@ TEST(DRFSorterTest, RemoveResources) // Remove cpus from the total resources as well as the allocation of "a". Resources removed = Resources::parse("cpus", "5", "*").get(); - sorter.remove(slaveId, slaveTotal); - sorter.add(slaveId, slaveTotal - removed); + sorter.removeSlave(slaveId); + sorter.addSlave( + slaveId, ResourceQuantities::fromScalarResources(slaveTotal - removed)); + sorter.update("a", slaveId, allocatedForA, allocatedForA - removed); // Now the dominant share of "a" is 0.2 (cpus) and that of "b" is 0.8 (cpus), @@ -1351,7 +1372,7 @@ TEST(DRFSorterTest, RevocableResources) revocable.mutable_revocable(); Resources total = Resources::parse("cpus:10;mem:100").get() + revocable; - sorter.add(slaveId, total); + sorter.addSlave(slaveId, ResourceQuantities::fromScalarResources(total)); // Dominant share of "a" is 0.1 (cpus). Resources a = Resources::parse("cpus:2;mem:1").get(); @@ -1381,15 +1402,14 @@ TEST(DRFSorterTest, SharedResources) SlaveID slaveId; slaveId.set_value("agentId"); - Resource sharedDisk = createDiskResource( + const Resource sharedDisk = createDiskResource( "100", "role1", "id1", "path1", None(), true); - // Set totalResources to have disk of 1000, with disk 100 being shared. - Resources totalResources = Resources::parse( - "cpus:100;mem:100;disk(role1):900").get(); - totalResources += sharedDisk; - - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, + ResourceQuantities::fromScalarResources( + Resources::parse("cpus:100;mem:100;disk(role1):900").get() + + sharedDisk)); // Verify sort() works when shared resources are in the allocations. sorter.add("a"); @@ -1435,28 +1455,6 @@ TEST(DRFSorterTest, SharedResources) // d = .1 (dominant: disk). EXPECT_EQ(vector<string>({"b", "c", "d"}), sorter.sort()); - // Verify other basic allocator methods work when shared resources - // are in the allocations. - Resources removedResources = Resources::parse("cpus:50;mem:0").get(); - sorter.remove(slaveId, removedResources); - - // Total resources is now: - // cpus:50;mem:100;disk(role1):900;disk(role1)[id1]:100 - - // Shares: b = .04 (dominant: cpus), c = .1 (dominant: disk), - // d = .1 (dominant: disk). - EXPECT_EQ(vector<string>({"b", "c", "d"}), sorter.sort()); - - Resources addedResources = Resources::parse("cpus:0;mem:100").get(); - sorter.add(slaveId, addedResources); - - // Total resources is now: - // cpus:50;mem:200;disk(role1):900;disk(role1)[id1]:100 - - // Shares: b = .04 (dominant: cpus), c = .1 (dominant: disk), - // d = .1 (dominant: disk). - EXPECT_EQ(vector<string>({"b", "c", "d"}), sorter.sort()); - EXPECT_TRUE(sorter.contains("b")); EXPECT_FALSE(sorter.contains("a")); @@ -1475,15 +1473,14 @@ TEST(DRFSorterTest, SameDominantSharedResourcesAcrossClients) SlaveID slaveId; slaveId.set_value("agentId"); - Resource sharedDisk = createDiskResource( + const Resource sharedDisk = createDiskResource( "900", "role1", "id1", "path1", None(), true); - // Set totalResources to have disk of 1000, with disk 900 being shared. - Resources totalResources = Resources::parse( - "cpus:100;mem:100;disk(role1):100").get(); - totalResources += sharedDisk; - - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, + ResourceQuantities::fromScalarResources( + Resources::parse("cpus:100;mem:100;disk(role1):100").get() + + sharedDisk)); // Add 2 clients each with the same shared disk, but with varying // cpus and mem. @@ -1524,15 +1521,14 @@ TEST(DRFSorterTest, SameSharedResourcesSameClient) SlaveID slaveId; slaveId.set_value("agentId"); - Resource sharedDisk = createDiskResource( + const Resource sharedDisk = createDiskResource( "50", "role1", "id1", "path1", None(), true); - // Set totalResources to have disk of 1000, with disk of 50 being shared. - Resources totalResources = Resources::parse( - "cpus:100;mem:100;disk(role1):950").get(); - totalResources += sharedDisk; - - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, + ResourceQuantities::fromScalarResources( + Resources::parse("cpus:100;mem:100;disk(role1):950").get() + + sharedDisk)); // Verify sort() works when shared resources are in the allocations. sorter.add("a"); @@ -1569,15 +1565,14 @@ TEST(DRFSorterTest, SharedResourcesUnallocated) SlaveID slaveId; slaveId.set_value("agentId"); - Resource sharedDisk = createDiskResource( + const Resource sharedDisk = createDiskResource( "100", "role1", "id1", "path1", None(), true); - // Set totalResources to have disk of 1000, with disk 100 being shared. - Resources totalResources = Resources::parse( - "cpus:100;mem:100;disk(role1):900").get(); - totalResources += sharedDisk; - - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, + ResourceQuantities::fromScalarResources( + Resources::parse("cpus:100;mem:100;disk(role1):900").get() + + sharedDisk)); // Allocate 3 copies of shared resources to client 'a', but allocate no // shared resource to client 'b'. @@ -1613,43 +1608,6 @@ TEST(DRFSorterTest, SharedResourcesUnallocated) } -// This test verifies that shared resources are removed from the sorter -// only when all instances of the the same shared resource are removed. -TYPED_TEST(CommonSorterTest, RemoveSharedResources) -{ - TypeParam sorter; - - SlaveID slaveId; - slaveId.set_value("agentId"); - - Resource sharedDisk = createDiskResource( - "100", "role1", "id1", "path1", None(), true); - - sorter.add( - slaveId, Resources::parse("cpus:100;mem:100;disk(role1):900").get()); - - Resources quantity1 = sorter.totalScalarQuantities(); - - sorter.add(slaveId, sharedDisk); - Resources quantity2 = sorter.totalScalarQuantities(); - - EXPECT_EQ(Resources::parse("disk:100").get(), quantity2 - quantity1); - - sorter.add(slaveId, sharedDisk); - Resources quantity3 = sorter.totalScalarQuantities(); - - EXPECT_NE(quantity1, quantity3); - EXPECT_EQ(quantity2, quantity3); - - // The quantity of the shared disk is removed when the last copy is removed. - sorter.remove(slaveId, sharedDisk); - EXPECT_EQ(sorter.totalScalarQuantities(), quantity3); - - sorter.remove(slaveId, sharedDisk); - EXPECT_EQ(sorter.totalScalarQuantities(), quantity1); -} - - // This benchmark simulates sorting a number of clients that have // different amount of allocations. // @@ -1690,8 +1648,8 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_FullSort) cout << "Added " << clientCount << " clients in " << watch.elapsed() << endl; - Resources agentResources = Resources::parse( - "cpus:24;mem:4096;disk:4096;ports:[31000-32000]").get(); + const ResourceQuantities agentScalarQuantities = + ResourceQuantities::fromString("cpus:24;mem:4096;disk:4096").get(); watch.start(); { @@ -1701,7 +1659,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_FullSort) agents.push_back(slaveId); - sorter.add(slaveId, agentResources); + sorter.addSlave(slaveId, agentScalarQuantities); } } watch.stop(); @@ -1769,7 +1727,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_FullSort) watch.start(); { foreach (const SlaveID& slaveId, agents) { - sorter.remove(slaveId, agentResources); + sorter.removeSlave(slaveId); } } watch.stop(); @@ -1870,8 +1828,8 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_HierarchyFullSort) cout << "Added " << clientCount << " clients in " << watch.elapsed() << endl; - Resources agentResources = Resources::parse( - "cpus:24;mem:4096;disk:4096;ports:[31000-32000]").get(); + const ResourceQuantities agentScalarQuantities = + ResourceQuantities::fromString("cpus:24;mem:4096;disk:4096").get(); watch.start(); { @@ -1881,7 +1839,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_HierarchyFullSort) agents.push_back(slaveId); - sorter.add(slaveId, agentResources); + sorter.addSlave(slaveId, agentScalarQuantities); } } watch.stop(); @@ -1949,7 +1907,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_HierarchyFullSort) watch.start(); { foreach (const SlaveID& slaveId, agents) { - sorter.remove(slaveId, agentResources); + sorter.removeSlave(slaveId); } } watch.stop();
