This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch 1.8.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit b751a6f9067d519585df8ba1d0e688bd57df7c4e Author: Andrei Sekretenko <[email protected]> AuthorDate: Wed Oct 30 19:50:26 2019 -0400 Fixed allocator performance issue in updateAllocation(). This patch addresses poor performance of `HierarchicalAllocatorProcess::updateAllocation()` for agents with a huge number of non-addable resources in a many-framework case (see MESOS-10015). Sorter methods for totals tracking that modify `Resources` of an agent in the Sorter are replaced with methods that add/remove resource quantities of an agent as a whole (which was actually the only use case of the old methods). Thus, subtracting/adding `Resources` of a whole agent no longer occurs when updating resources of an agent in a Sorter. Further, this patch completely removes agent resource tracking logic from the random sorter (which by itself makes no use of them) by implementing cluster totals tracking in the allocator. Results of `*BENCHMARK_WithReservationParam.UpdateAllocation*` (for the DRF sorter): 1.8.x branch: Agent resources size: 200 (50 frameworks) Made 20 reserve and unreserve operations in 1.938801227secs Agent resources size: 400 (100 frameworks) Made 20 reserve and unreserve operations in 13.861857374secs Agent resources size: 800 (200 frameworks) Made 20 reserve and unreserve operations in 2.13412983136667mins 1.8.x branch + this pathch: Agent resources size: 200 (50 frameworks) Made 20 reserve and unreserve operations in 214.063821ms Agent resources size: 400 (100 frameworks) Made 20 reserve and unreserve operations in 425.278671ms Agent resources size: 800 (200 frameworks) Made 20 reserve and unreserve operations in 1.136214374secs ... Agent resources size: 6400 (1600 frameworks) Made 20 reserve and unreserve operations in 50.094194999secs This is a backport of https://reviews.apache.org/r/71646 Review: https://reviews.apache.org/r/71697/ --- src/master/allocator/mesos/hierarchical.cpp | 61 ++++-- src/master/allocator/mesos/hierarchical.hpp | 3 + src/master/allocator/sorter/drf/sorter.cpp | 74 +++----- src/master/allocator/sorter/drf/sorter.hpp | 23 +-- src/master/allocator/sorter/random/sorter.cpp | 57 ------ src/master/allocator/sorter/random/sorter.hpp | 26 +-- src/master/allocator/sorter/sorter.hpp | 21 ++- src/tests/sorter_tests.cpp | 257 +++++++++----------------- 8 files changed, 185 insertions(+), 337 deletions(-) diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp index 061b702..e264b72 100644 --- a/src/master/allocator/mesos/hierarchical.cpp +++ b/src/master/allocator/mesos/hierarchical.cpp @@ -570,14 +570,20 @@ void HierarchicalAllocatorProcess::addSlave( trackReservations(total.reservations()); - roleSorter->add(slaveId, total); + const ResourceQuantities agentScalarQuantities = + ResourceQuantities::fromScalarResources(total.scalars()); + + totalScalarQuantities += agentScalarQuantities; + roleSorter->addSlave(slaveId, agentScalarQuantities); foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { - sorter->add(slaveId, total); + sorter->addSlave(slaveId, agentScalarQuantities); } // See comment at `quotaRoleSorter` declaration regarding non-revocable. - quotaRoleSorter->add(slaveId, total.nonRevocable()); + quotaRoleSorter->addSlave( + slaveId, + ResourceQuantities::fromScalarResources(total.nonRevocable().scalars())); foreachpair (const FrameworkID& frameworkId, const Resources& allocation, @@ -641,18 +647,23 @@ void HierarchicalAllocatorProcess::removeSlave( // all the resources. Fixing this would require more information // than what we currently track in the allocator. - roleSorter->remove(slaveId, slaves.at(slaveId).getTotal()); + roleSorter->removeSlave(slaveId); foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { - sorter->remove(slaveId, slaves.at(slaveId).getTotal()); + sorter->removeSlave(slaveId); } - // See comment at `quotaRoleSorter` declaration regarding non-revocable. - quotaRoleSorter->remove( - slaveId, slaves.at(slaveId).getTotal().nonRevocable()); + quotaRoleSorter->removeSlave(slaveId); untrackReservations(slaves.at(slaveId).getTotal().reservations()); + const ResourceQuantities agentScalarQuantities = + ResourceQuantities::fromScalarResources( + slaves.at(slaveId).getTotal().scalars()); + + CHECK(totalScalarQuantities.contains(agentScalarQuantities)); + totalScalarQuantities -= agentScalarQuantities; + slaves.erase(slaveId); allocationCandidates.erase(slaveId); @@ -1752,7 +1763,7 @@ void HierarchicalAllocatorProcess::__allocate() // allocated resources - // unallocated reservations - // unallocated revocable resources - ResourceQuantities availableHeadroom = roleSorter->totalScalarQuantities(); + ResourceQuantities availableHeadroom = totalScalarQuantities; // NOTE: The role sorter does not return aggregated allocation // information whereas `reservationScalarQuantities` does, so @@ -2486,7 +2497,7 @@ double HierarchicalAllocatorProcess::_resources_offered_or_allocated( double HierarchicalAllocatorProcess::_resources_total( const string& resource) { - return roleSorter->totalScalarQuantities().get(resource).value(); + return totalScalarQuantities.get(resource).value(); } @@ -2551,7 +2562,9 @@ void HierarchicalAllocatorProcess::trackFrameworkUnderRole( frameworkSorters.at(role)->initialize(options.fairnessExcludeResourceNames); foreachvalue (const Slave& slave, slaves) { - frameworkSorters.at(role)->add(slave.info.id(), slave.getTotal()); + frameworkSorters.at(role)->addSlave( + slave.info.id(), + ResourceQuantities::fromScalarResources(slave.getTotal().scalars())); } metrics.addRole(role); @@ -2678,18 +2691,30 @@ bool HierarchicalAllocatorProcess::updateSlaveTotal( trackReservations(newReservations); } - // Update the totals in the sorters. - roleSorter->remove(slaveId, oldTotal); - roleSorter->add(slaveId, total); + // Update the total in the allocator and totals in the sorters. + const ResourceQuantities oldAgentScalarQuantities = + ResourceQuantities::fromScalarResources(oldTotal.scalars()); + + const ResourceQuantities agentScalarQuantities = + ResourceQuantities::fromScalarResources(total.scalars()); + + CHECK(totalScalarQuantities.contains(oldAgentScalarQuantities)); + totalScalarQuantities -= oldAgentScalarQuantities; + totalScalarQuantities += agentScalarQuantities; + + roleSorter->removeSlave(slaveId); + roleSorter->addSlave(slaveId, agentScalarQuantities); foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { - sorter->remove(slaveId, oldTotal); - sorter->add(slaveId, total); + sorter->removeSlave(slaveId); + sorter->addSlave(slaveId, agentScalarQuantities); } // See comment at `quotaRoleSorter` declaration regarding non-revocable. - quotaRoleSorter->remove(slaveId, oldTotal.nonRevocable()); - quotaRoleSorter->add(slaveId, total.nonRevocable()); + quotaRoleSorter->removeSlave(slaveId); + quotaRoleSorter->addSlave( + slaveId, + ResourceQuantities::fromScalarResources(total.nonRevocable().scalars())); return true; } diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index 4f71682..38ebcba 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -529,6 +529,9 @@ protected: hashmap<SlaveID, Slave> slaves; + // Total scalar resource quantities on all agents. + ResourceQuantities totalScalarQuantities; + // A set of agents that are kept as allocation candidates. Events // may add or remove candidates to the set. When an allocation is // processed, the set of candidates is cleared. diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp index 9367469..96f0895 100644 --- a/src/master/allocator/sorter/drf/sorter.cpp +++ b/src/master/allocator/sorter/drf/sorter.cpp @@ -475,68 +475,36 @@ Resources DRFSorter::allocation( } -const ResourceQuantities& DRFSorter::totalScalarQuantities() const +void DRFSorter::addSlave( + const SlaveID& slaveId, + const ResourceQuantities& scalarQuantities) { - return total_.totals; -} + bool inserted = total_.agentResourceQuantities.emplace( + slaveId, scalarQuantities).second; + CHECK(inserted) << "Attempted to add already added agent " << slaveId; -void DRFSorter::add(const SlaveID& slaveId, const Resources& resources) -{ - if (!resources.empty()) { - // Add shared resources to the total quantities when the same - // resources don't already exist in the total. - const Resources newShared = resources.shared() - .filter([this, slaveId](const Resource& resource) { - return !total_.resources[slaveId].contains(resource); - }); - - total_.resources[slaveId] += resources; - - const ResourceQuantities scalarQuantities = - ResourceQuantities::fromScalarResources( - (resources.nonShared() + newShared).scalars()); - - total_.totals += scalarQuantities; - - // We have to recalculate all shares when the total resources - // change, but we put it off until `sort` is called so that if - // something else changes before the next allocation we don't - // recalculate everything twice. - dirty = true; - } + total_.totals += scalarQuantities; + + // We have to recalculate all shares when the total resources + // change, but we put it off until `sort` is called so that if + // something else changes before the next allocation we don't + // recalculate everything twice. + dirty = true; } -void DRFSorter::remove(const SlaveID& slaveId, const Resources& resources) +void DRFSorter::removeSlave(const SlaveID& slaveId) { - if (!resources.empty()) { - CHECK(total_.resources.contains(slaveId)); - CHECK(total_.resources[slaveId].contains(resources)) - << total_.resources[slaveId] << " does not contain " << resources; - - total_.resources[slaveId] -= resources; + const auto agent = total_.agentResourceQuantities.find(slaveId); + CHECK(agent != total_.agentResourceQuantities.end()) + << "Attempted to remove unknown agent " << slaveId; - // Remove shared resources from the total quantities when there - // are no instances of same resources left in the total. - const Resources absentShared = resources.shared() - .filter([this, slaveId](const Resource& resource) { - return !total_.resources[slaveId].contains(resource); - }); + CHECK(total_.totals.contains(agent->second)); + total_.totals -= agent->second; - const ResourceQuantities scalarQuantities = - ResourceQuantities::fromScalarResources( - (resources.nonShared() + absentShared).scalars()); - - CHECK(total_.totals.contains(scalarQuantities)); - total_.totals -= scalarQuantities; - - if (total_.resources[slaveId].empty()) { - total_.resources.erase(slaveId); - } - - dirty = true; - } + total_.agentResourceQuantities.erase(agent); + dirty = true; } diff --git a/src/master/allocator/sorter/drf/sorter.hpp b/src/master/allocator/sorter/drf/sorter.hpp index 7daf1bf..dd40379 100644 --- a/src/master/allocator/sorter/drf/sorter.hpp +++ b/src/master/allocator/sorter/drf/sorter.hpp @@ -98,11 +98,11 @@ public: const std::string& clientPath, const SlaveID& slaveId) const override; - const ResourceQuantities& totalScalarQuantities() const override; - - void add(const SlaveID& slaveId, const Resources& resources) override; + void addSlave( + const SlaveID& slaveId, + const ResourceQuantities& scalarQuantities) override; - void remove(const SlaveID& slaveId, const Resources& resources) override; + void removeSlave(const SlaveID& slaveId) override; std::vector<std::string> sort() override; @@ -152,18 +152,19 @@ private: // Total resources. struct Total { - // We need to keep track of the resources (and not just scalar - // quantities) to account for multiple copies of the same shared - // resources. We need to ensure that we do not update the scalar - // quantities for shared resources when the change is only in the - // number of copies in the sorter. - hashmap<SlaveID, Resources> resources; - // We keep the aggregated scalar resource quantities to speed // up share calculation. Note, resources shared count are ignored. // Because sharedness inherently refers to the identities of resources // and not quantities. ResourceQuantities totals; + + // We keep track of per-agent resource quantities to handle agent removal. + // + // Note that the only way to change the stored resource quantities + // is to remove the agent from the sorter and add it with new resources. + // Thus, when a resource shared count on an agent changes, multiple copies + // of the same shared resource are still accounted for exactly once. + hashmap<SlaveID, const ResourceQuantities> agentResourceQuantities; } total_; // Metrics are optionally exposed by the sorter. diff --git a/src/master/allocator/sorter/random/sorter.cpp b/src/master/allocator/sorter/random/sorter.cpp index 9899cfd..abd619b 100644 --- a/src/master/allocator/sorter/random/sorter.cpp +++ b/src/master/allocator/sorter/random/sorter.cpp @@ -412,63 +412,6 @@ Resources RandomSorter::allocation( } -const ResourceQuantities& RandomSorter::totalScalarQuantities() const -{ - return total_.totals; -} - - -void RandomSorter::add(const SlaveID& slaveId, const Resources& resources) -{ - if (!resources.empty()) { - // Add shared resources to the total quantities when the same - // resources don't already exist in the total. - const Resources newShared = resources.shared() - .filter([this, slaveId](const Resource& resource) { - return !total_.resources[slaveId].contains(resource); - }); - - total_.resources[slaveId] += resources; - - const ResourceQuantities scalarQuantities = - ResourceQuantities::fromScalarResources( - (resources.nonShared() + newShared).scalars()); - - total_.totals += scalarQuantities; - } -} - - -void RandomSorter::remove(const SlaveID& slaveId, const Resources& resources) -{ - if (!resources.empty()) { - CHECK(total_.resources.contains(slaveId)); - CHECK(total_.resources[slaveId].contains(resources)) - << total_.resources[slaveId] << " does not contain " << resources; - - total_.resources[slaveId] -= resources; - - // Remove shared resources from the total quantities when there - // are no instances of same resources left in the total. - const Resources absentShared = resources.shared() - .filter([this, slaveId](const Resource& resource) { - return !total_.resources[slaveId].contains(resource); - }); - - const ResourceQuantities scalarQuantities = - ResourceQuantities::fromScalarResources( - (resources.nonShared() + absentShared).scalars()); - - CHECK(total_.totals.contains(scalarQuantities)); - total_.totals -= scalarQuantities; - - if (total_.resources[slaveId].empty()) { - total_.resources.erase(slaveId); - } - } -} - - vector<string> RandomSorter::sort() { pair<vector<string>, vector<double>> clientsAndWeights = diff --git a/src/master/allocator/sorter/random/sorter.hpp b/src/master/allocator/sorter/random/sorter.hpp index c8e777b..a119bb2 100644 --- a/src/master/allocator/sorter/random/sorter.hpp +++ b/src/master/allocator/sorter/random/sorter.hpp @@ -98,11 +98,12 @@ public: const std::string& clientPath, const SlaveID& slaveId) const override; - const ResourceQuantities& totalScalarQuantities() const override; - - void add(const SlaveID& slaveId, const Resources& resources) override; + // NOTE: addSlave/removeSlave is a no-op for this sorter. + void addSlave( + const SlaveID& slaveId, + const ResourceQuantities& scalarQuantities) override {}; - void remove(const SlaveID& slaveId, const Resources& resources) override; + void removeSlave(const SlaveID& slaveId) override {}; // This will perform a weighted random shuffle on each call. // @@ -182,23 +183,6 @@ private: // rooted at that path. This hashmap might include weights for paths // that are not currently in the sorter tree. hashmap<std::string, double> weights; - - // Total resources. - struct Total - { - // We need to keep track of the resources (and not just scalar - // quantities) to account for multiple copies of the same shared - // resources. We need to ensure that we do not update the scalar - // quantities for shared resources when the change is only in the - // number of copies in the sorter. - hashmap<SlaveID, Resources> resources; - - // We keep the aggregated scalar resource quantities to speed - // up share calculation. Note, resources shared count are ignored. - // Because sharedness inherently refers to the identities of resources - // and not quantities. - ResourceQuantities totals; - } total_; }; diff --git a/src/master/allocator/sorter/sorter.hpp b/src/master/allocator/sorter/sorter.hpp index d56a116..93d7842 100644 --- a/src/master/allocator/sorter/sorter.hpp +++ b/src/master/allocator/sorter/sorter.hpp @@ -124,15 +124,18 @@ public: const std::string& client, const SlaveID& slaveId) const = 0; - // Returns the total scalar resource quantities in this sorter. - virtual const ResourceQuantities& totalScalarQuantities() const = 0; - - // Add resources to the total pool of resources this - // Sorter should consider. - virtual void add(const SlaveID& slaveId, const Resources& resources) = 0; - - // Remove resources from the total pool. - virtual void remove(const SlaveID& slaveId, const Resources& resources) = 0; + // Add/remove total scalar resource quantities of an agent to/from the + // total pool of resources this Sorter should consider. + // + // NOTE: Updating resources of an agent in the Sorter is done by first calling + // `removeSlave()` and then `addSlave()` with new resource quantities. + // + // NOTE: Attempt to add the same agent twice or remove an agent not added + // to the Sorter may crash the program. + virtual void addSlave( + const SlaveID& slaveId, const ResourceQuantities& scalarQuantities) = 0; + + virtual void removeSlave(const SlaveID& slaveId) = 0; // Returns all of the clients in the order that they should // be allocated to, according to this Sorter's policy. diff --git a/src/tests/sorter_tests.cpp b/src/tests/sorter_tests.cpp index 1e4a789..e734b81 100644 --- a/src/tests/sorter_tests.cpp +++ b/src/tests/sorter_tests.cpp @@ -109,8 +109,7 @@ TEST(DRFSorterTest, DRF) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); EXPECT_EQ(vector<string>({}), sorter.sort()); @@ -153,16 +152,14 @@ TEST(DRFSorterTest, DRF) sorter.activate("e"); sorter.allocated("e", slaveId, eResources); - Resources removedResources = Resources::parse("cpus:50;mem:0").get(); - sorter.remove(slaveId, removedResources); - // total resources is now cpus = 50, mem = 100 + sorter.removeSlave(slaveId); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:50;mem:100")); // shares: b = .04, c = .02, d = .06, e = .05 EXPECT_EQ(vector<string>({"c", "b", "e", "d"}), sorter.sort()); - Resources addedResources = Resources::parse("cpus:0;mem:100").get(); - sorter.add(slaveId, addedResources); - // total resources is now cpus = 50, mem = 200 + sorter.removeSlave(slaveId); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:50;mem:200")); Resources fResources = Resources::parse("cpus:5;mem:1").get(); sorter.add("f"); @@ -202,7 +199,7 @@ TEST(DRFSorterTest, WDRF) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.activate("a"); @@ -262,9 +259,7 @@ TEST(DRFSorterTest, UpdateWeight) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - - sorter.add(slaveId, totalResources); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.activate("a"); @@ -294,9 +289,7 @@ TEST(DRFSorterTest, AllocationCountTieBreak) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - - sorter.add(slaveId, totalResources); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.add("b"); @@ -377,8 +370,7 @@ TEST(DRFSorterTest, ShallowHierarchy) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a/a"); sorter.activate("a/a"); @@ -420,16 +412,14 @@ TEST(DRFSorterTest, ShallowHierarchy) sorter.activate("e/e"); sorter.allocated("e/e", slaveId, eResources); - Resources removedResources = Resources::parse("cpus:50;mem:0").get(); - sorter.remove(slaveId, removedResources); - // total resources is now cpus = 50, mem = 100 + sorter.removeSlave(slaveId); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:50;mem:100")); // shares: b/b = .04, c/c = .02, d/d = .06, e/e = .05 EXPECT_EQ(vector<string>({"c/c", "b/b", "e/e", "d/d"}), sorter.sort()); - Resources addedResources = Resources::parse("cpus:0;mem:100").get(); - sorter.add(slaveId, addedResources); - // total resources is now cpus = 50, mem = 200 + sorter.removeSlave(slaveId); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:50;mem:200")); Resources fResources = Resources::parse("cpus:5;mem:1").get(); sorter.add("f/f"); @@ -472,8 +462,7 @@ TEST(DRFSorterTest, DeepHierarchy) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a/a/a/a/a"); sorter.activate("a/a/a/a/a"); @@ -515,17 +504,15 @@ TEST(DRFSorterTest, DeepHierarchy) sorter.activate("e/e/e/e/e/e"); sorter.allocated("e/e/e/e/e/e", slaveId, eResources); - Resources removedResources = Resources::parse("cpus:50;mem:0").get(); - sorter.remove(slaveId, removedResources); - // total resources is now cpus = 50, mem = 100 + sorter.removeSlave(slaveId); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:50;mem:100")); // shares: b/b/b/b = .04, c/c/c = .02, d/d = .06, e/e/e/e/e/e = .05 EXPECT_EQ(vector<string>({"c/c/c", "b/b/b/b", "e/e/e/e/e/e", "d/d"}), sorter.sort()); - Resources addedResources = Resources::parse("cpus:0;mem:100").get(); - sorter.add(slaveId, addedResources); - // total resources is now cpus = 50, mem = 200 + sorter.removeSlave(slaveId); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:50;mem:200")); Resources fResources = Resources::parse("cpus:5;mem:1").get(); sorter.add("f/f"); @@ -568,8 +555,7 @@ TEST(DRFSorterTest, HierarchicalAllocation) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.add("b/c"); @@ -690,8 +676,7 @@ TEST(DRFSorterTest, HierarchicalIterationOrder) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a/b"); sorter.add("c"); @@ -738,7 +723,7 @@ TEST(DRFSorterTest, AddChildToLeaf) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.activate("a"); @@ -809,7 +794,7 @@ TEST(DRFSorterTest, AddChildToInternal) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("x/a"); sorter.activate("x/a"); @@ -852,7 +837,7 @@ TEST(DRFSorterTest, AddChildToInactiveLeaf) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.activate("a"); @@ -887,7 +872,7 @@ TYPED_TEST(CommonSorterTest, RemoveLeafCollapseParent) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.activate("a"); @@ -932,7 +917,7 @@ TYPED_TEST(CommonSorterTest, RemoveLeafCollapseParentInactive) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.activate("a"); @@ -973,7 +958,7 @@ TEST(DRFSorterTest, ChangeWeightOnSubtree) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.updateWeight("b", 3); sorter.updateWeight("a", 2); @@ -1039,9 +1024,9 @@ TEST(DRFSorterTest, SplitResourceShares) disk2.mutable_disk()->mutable_persistence()->set_id("ID2"); disk2.mutable_disk()->mutable_volume()->set_container_path("data"); - sorter.add( + sorter.addSlave( slaveId, - Resources::parse("cpus:100;mem:100;disk:95").get() + disk1 + disk2); + *ResourceQuantities::fromString("cpus:100;mem:100;disk:100")); // Now, allocate resources to "a" and "b". Note that "b" will have // more disk if the shares are accounted correctly! @@ -1066,7 +1051,8 @@ TYPED_TEST(CommonSorterTest, UpdateAllocation) sorter.activate("a"); sorter.activate("b"); - sorter.add(slaveId, Resources::parse("cpus:10;mem:10;disk:10").get()); + sorter.addSlave( + slaveId, *ResourceQuantities::fromString("cpus:10;mem:10;disk:10")); sorter.allocated( "a", slaveId, Resources::parse("cpus:10;mem:10;disk:10").get()); @@ -1111,7 +1097,8 @@ TYPED_TEST(CommonSorterTest, UpdateAllocationNestedClient) sorter.activate("a/x"); sorter.activate("b/y"); - sorter.add(slaveId, Resources::parse("cpus:10;mem:10;disk:10").get()); + sorter.addSlave( + slaveId, *ResourceQuantities::fromString("cpus:10;mem:10;disk:10")); sorter.allocated( "a/x", slaveId, Resources::parse("cpus:10;mem:10;disk:10").get()); @@ -1153,7 +1140,7 @@ TYPED_TEST(CommonSorterTest, AllocationForInactiveClient) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:10;mem:10").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:10;mem:10")); sorter.add("a"); sorter.add("b"); @@ -1211,11 +1198,14 @@ TYPED_TEST(CommonSorterTest, MultipleSlaves) sorter.add("framework"); sorter.activate("framework"); - Resources slaveResources = + const Resources slaveResources = Resources::parse("cpus:2;mem:512;ports:[31000-32000]").get(); - sorter.add(slaveA, slaveResources); - sorter.add(slaveB, slaveResources); + const ResourceQuantities slaveResourceQuantities = + ResourceQuantities::fromScalarResources(slaveResources.scalars()); + + sorter.addSlave(slaveA, slaveResourceQuantities); + sorter.addSlave(slaveB, slaveResourceQuantities); sorter.allocated("framework", slaveA, slaveResources); sorter.allocated("framework", slaveB, slaveResources); @@ -1224,14 +1214,10 @@ TYPED_TEST(CommonSorterTest, MultipleSlaves) EXPECT_EQ(slaveResources, sorter.allocation("framework", slaveA)); EXPECT_EQ(slaveResources, sorter.allocation("framework", slaveB)); - EXPECT_EQ( - ResourceQuantities::fromScalarResources(slaveResources.scalars()) + - ResourceQuantities::fromScalarResources(slaveResources.scalars()), + EXPECT_EQ(slaveResourceQuantities + slaveResourceQuantities, sorter.allocationScalarQuantities("framework")); - EXPECT_EQ( - ResourceQuantities::fromScalarResources(slaveResources.scalars()) + - ResourceQuantities::fromScalarResources(slaveResources.scalars()), + EXPECT_EQ(slaveResourceQuantities + slaveResourceQuantities, sorter.allocationScalarQuantities()); } @@ -1254,11 +1240,14 @@ TYPED_TEST(CommonSorterTest, MultipleSlavesUpdateAllocation) sorter.add("framework"); sorter.activate("framework"); - Resources slaveResources = + const Resources slaveResources = Resources::parse("cpus:2;mem:512;disk:10;ports:[31000-32000]").get(); - sorter.add(slaveA, slaveResources); - sorter.add(slaveB, slaveResources); + const ResourceQuantities slaveResourceQuantities = + ResourceQuantities::fromScalarResources(slaveResources.scalars()); + + sorter.addSlave(slaveA, slaveResourceQuantities); + sorter.addSlave(slaveB, slaveResourceQuantities); sorter.allocated("framework", slaveA, slaveResources); sorter.allocated("framework", slaveB, slaveResources); @@ -1306,7 +1295,7 @@ TEST(DRFSorterTest, UpdateTotal) sorter.activate("a"); sorter.activate("b"); - sorter.add(slaveId, Resources::parse("cpus:10;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:10;mem:100")); // Dominant share of "a" is 0.2 (cpus). sorter.allocated( @@ -1320,8 +1309,8 @@ TEST(DRFSorterTest, UpdateTotal) // Update the total resources by removing the previous total and // adding back the new total. - sorter.remove(slaveId, Resources::parse("cpus:10;mem:100").get()); - sorter.add(slaveId, Resources::parse("cpus:100;mem:10").get()); + sorter.removeSlave(slaveId); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:10")); // Now the dominant share of "a" is 0.1 (mem) and "b" is 0.2 (mem), // which should change the sort order. @@ -1346,8 +1335,8 @@ TEST(DRFSorterTest, MultipleSlavesUpdateTotal) sorter.activate("a"); sorter.activate("b"); - sorter.add(slaveA, Resources::parse("cpus:5;mem:50").get()); - sorter.add(slaveB, Resources::parse("cpus:5;mem:50").get()); + sorter.addSlave(slaveA, *ResourceQuantities::fromString("cpus:5;mem:50")); + sorter.addSlave(slaveB, *ResourceQuantities::fromString("cpus:5;mem:50")); // Dominant share of "a" is 0.2 (cpus). sorter.allocated( @@ -1359,10 +1348,9 @@ TEST(DRFSorterTest, MultipleSlavesUpdateTotal) EXPECT_EQ(vector<string>({"b", "a"}), sorter.sort()); - // Update the total resources of slaveA by removing the previous - // total and adding the new total. - sorter.remove(slaveA, Resources::parse("cpus:5;mem:50").get()); - sorter.add(slaveA, Resources::parse("cpus:95;mem:50").get()); + // Update the total resources of slaveA. + sorter.removeSlave(slaveA); + sorter.addSlave(slaveA, *ResourceQuantities::fromString("cpus:95;mem:50")); // Now the dominant share of "a" is 0.02 (cpus) and "b" is 0.03 // (mem), which should change the sort order. @@ -1386,7 +1374,7 @@ TEST(DRFSorterTest, RemoveResources) sorter.activate("b"); Resources slaveTotal = Resources::parse("cpus", "10", "*").get(); - sorter.add(slaveId, slaveTotal); + sorter.addSlave(slaveId, ResourceQuantities::fromScalarResources(slaveTotal)); // Dominant share of "a" is 0.6 (cpus). Resources allocatedForA = Resources::parse("cpus", "6", "*").get(); @@ -1399,8 +1387,10 @@ TEST(DRFSorterTest, RemoveResources) // Remove cpus from the total resources as well as the allocation of "a". Resources removed = Resources::parse("cpus", "5", "*").get(); - sorter.remove(slaveId, slaveTotal); - sorter.add(slaveId, slaveTotal - removed); + sorter.removeSlave(slaveId); + sorter.addSlave( + slaveId, ResourceQuantities::fromScalarResources(slaveTotal - removed)); + sorter.update("a", slaveId, allocatedForA, allocatedForA - removed); // Now the dominant share of "a" is 0.2 (cpus) and that of "b" is 0.8 (cpus), @@ -1429,7 +1419,7 @@ TEST(DRFSorterTest, RevocableResources) revocable.mutable_revocable(); Resources total = Resources::parse("cpus:10;mem:100").get() + revocable; - sorter.add(slaveId, total); + sorter.addSlave(slaveId, ResourceQuantities::fromScalarResources(total)); // Dominant share of "a" is 0.1 (cpus). Resources a = Resources::parse("cpus:2;mem:1").get(); @@ -1459,15 +1449,13 @@ TEST(DRFSorterTest, SharedResources) SlaveID slaveId; slaveId.set_value("agentId"); - Resource sharedDisk = createDiskResource( + const Resource sharedDisk = createDiskResource( "100", "role1", "id1", "path1", None(), true); - // Set totalResources to have disk of 1000, with disk 100 being shared. - Resources totalResources = Resources::parse( - "cpus:100;mem:100;disk(role1):900").get(); - totalResources += sharedDisk; - - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, + ResourceQuantities::fromScalarResources( + *Resources::parse("cpus:100;mem:100;disk(role1):900") + sharedDisk)); // Verify sort() works when shared resources are in the allocations. sorter.add("a"); @@ -1513,28 +1501,6 @@ TEST(DRFSorterTest, SharedResources) // d = .1 (dominant: disk). EXPECT_EQ(vector<string>({"b", "c", "d"}), sorter.sort()); - // Verify other basic allocator methods work when shared resources - // are in the allocations. - Resources removedResources = Resources::parse("cpus:50;mem:0").get(); - sorter.remove(slaveId, removedResources); - - // Total resources is now: - // cpus:50;mem:100;disk(role1):900;disk(role1)[id1]:100 - - // Shares: b = .04 (dominant: cpus), c = .1 (dominant: disk), - // d = .1 (dominant: disk). - EXPECT_EQ(vector<string>({"b", "c", "d"}), sorter.sort()); - - Resources addedResources = Resources::parse("cpus:0;mem:100").get(); - sorter.add(slaveId, addedResources); - - // Total resources is now: - // cpus:50;mem:200;disk(role1):900;disk(role1)[id1]:100 - - // Shares: b = .04 (dominant: cpus), c = .1 (dominant: disk), - // d = .1 (dominant: disk). - EXPECT_EQ(vector<string>({"b", "c", "d"}), sorter.sort()); - EXPECT_TRUE(sorter.contains("b")); EXPECT_FALSE(sorter.contains("a")); @@ -1553,15 +1519,13 @@ TEST(DRFSorterTest, SameDominantSharedResourcesAcrossClients) SlaveID slaveId; slaveId.set_value("agentId"); - Resource sharedDisk = createDiskResource( + const Resource sharedDisk = createDiskResource( "900", "role1", "id1", "path1", None(), true); - // Set totalResources to have disk of 1000, with disk 900 being shared. - Resources totalResources = Resources::parse( - "cpus:100;mem:100;disk(role1):100").get(); - totalResources += sharedDisk; - - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, + ResourceQuantities::fromScalarResources( + *Resources::parse("cpus:100;mem:100;disk(role1):100") + sharedDisk)); // Add 2 clients each with the same shared disk, but with varying // cpus and mem. @@ -1602,15 +1566,13 @@ TEST(DRFSorterTest, SameSharedResourcesSameClient) SlaveID slaveId; slaveId.set_value("agentId"); - Resource sharedDisk = createDiskResource( + const Resource sharedDisk = createDiskResource( "50", "role1", "id1", "path1", None(), true); - // Set totalResources to have disk of 1000, with disk of 50 being shared. - Resources totalResources = Resources::parse( - "cpus:100;mem:100;disk(role1):950").get(); - totalResources += sharedDisk; - - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, + ResourceQuantities::fromScalarResources( + *Resources::parse("cpus:100;mem:100;disk(role1):950") + sharedDisk)); // Verify sort() works when shared resources are in the allocations. sorter.add("a"); @@ -1647,15 +1609,13 @@ TEST(DRFSorterTest, SharedResourcesUnallocated) SlaveID slaveId; slaveId.set_value("agentId"); - Resource sharedDisk = createDiskResource( + const Resource sharedDisk = createDiskResource( "100", "role1", "id1", "path1", None(), true); - // Set totalResources to have disk of 1000, with disk 100 being shared. - Resources totalResources = Resources::parse( - "cpus:100;mem:100;disk(role1):900").get(); - totalResources += sharedDisk; - - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, + ResourceQuantities::fromScalarResources( + *Resources::parse("cpus:100;mem:100;disk(role1):900") + sharedDisk)); // Allocate 3 copies of shared resources to client 'a', but allocate no // shared resource to client 'b'. @@ -1691,45 +1651,6 @@ TEST(DRFSorterTest, SharedResourcesUnallocated) } -// This test verifies that shared resources are removed from the sorter -// only when all instances of the the same shared resource are removed. -TYPED_TEST(CommonSorterTest, RemoveSharedResources) -{ - TypeParam sorter; - - SlaveID slaveId; - slaveId.set_value("agentId"); - - Resource sharedDisk = createDiskResource( - "100", "role1", "id1", "path1", None(), true); - - sorter.add( - slaveId, Resources::parse("cpus:100;mem:100;disk(role1):900").get()); - - ResourceQuantities quantity1 = sorter.totalScalarQuantities(); - - sorter.add(slaveId, sharedDisk); - ResourceQuantities quantity2 = sorter.totalScalarQuantities(); - - EXPECT_EQ( - CHECK_NOTERROR(ResourceQuantities::fromString("disk:100")), - quantity2 - quantity1); - - sorter.add(slaveId, sharedDisk); - ResourceQuantities quantity3 = sorter.totalScalarQuantities(); - - EXPECT_NE(quantity1, quantity3); - EXPECT_EQ(quantity2, quantity3); - - // The quantity of the shared disk is removed when the last copy is removed. - sorter.remove(slaveId, sharedDisk); - EXPECT_EQ(sorter.totalScalarQuantities(), quantity3); - - sorter.remove(slaveId, sharedDisk); - EXPECT_EQ(sorter.totalScalarQuantities(), quantity1); -} - - // This benchmark simulates sorting a number of clients that have // different amount of allocations. // @@ -1770,8 +1691,8 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_FullSort) cout << "Added " << clientCount << " clients in " << watch.elapsed() << endl; - Resources agentResources = Resources::parse( - "cpus:24;mem:4096;disk:4096;ports:[31000-32000]").get(); + const ResourceQuantities agentScalarQuantities = + *ResourceQuantities::fromString("cpus:24;mem:4096;disk:4096"); watch.start(); { @@ -1781,7 +1702,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_FullSort) agents.push_back(slaveId); - sorter.add(slaveId, agentResources); + sorter.addSlave(slaveId, agentScalarQuantities); } } watch.stop(); @@ -1849,7 +1770,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_FullSort) watch.start(); { foreach (const SlaveID& slaveId, agents) { - sorter.remove(slaveId, agentResources); + sorter.removeSlave(slaveId); } } watch.stop(); @@ -1950,8 +1871,8 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_HierarchyFullSort) cout << "Added " << clientCount << " clients in " << watch.elapsed() << endl; - Resources agentResources = Resources::parse( - "cpus:24;mem:4096;disk:4096;ports:[31000-32000]").get(); + const ResourceQuantities agentScalarQuantities = + *ResourceQuantities::fromString("cpus:24;mem:4096;disk:4096"); watch.start(); { @@ -1961,7 +1882,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_HierarchyFullSort) agents.push_back(slaveId); - sorter.add(slaveId, agentResources); + sorter.addSlave(slaveId, agentScalarQuantities); } } watch.stop(); @@ -2029,7 +1950,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_HierarchyFullSort) watch.start(); { foreach (const SlaveID& slaveId, agents) { - sorter.remove(slaveId, agentResources); + sorter.removeSlave(slaveId); } } watch.stop();
