This is an automated email from the ASF dual-hosted git repository. bmahler pushed a commit to branch 1.9.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit d44882fc8edd1c8d843ff1f47ede79ee12dada03 Author: Andrei Sekretenko <[email protected]> AuthorDate: Wed Oct 30 19:45:33 2019 -0400 Fixed allocator performance issue in updateAllocation(). This patch addresses poor performance of `HierarchicalAllocatorProcess::updateAllocation()` for agents with a huge number of non-addable resources in a many-framework case (see MESOS-10015). Sorter methods for totals tracking that modify `Resources` of an agent in the Sorter are replaced with methods that add/remove resource quantities of an agent as a whole (which was actually the only use case of the old methods). Thus, subtracting/adding `Resources` of a whole agent no longer occurs when updating resources of an agent in a Sorter. Further, this patch completely removes agent resource tracking logic from the random sorter (which by itself makes no use of them) by implementing cluster totals tracking in the allocator. Results of `*BENCHMARK_WithReservationParam.UpdateAllocation*` (for the DRF sorter): Master: Agent resources size: 200 (50 frameworks) Made 20 reserve and unreserve operations in 2.08586secs Agent resources size: 400 (100 frameworks) Made 20 reserve and unreserve operations in 13.8449005secs Agent resources size: 800 (200 frameworks) Made 20 reserve and unreserve operations in 2.19253121188333mins Master + this patch: Agent resources size: 200 (50 frameworks) Made 20 reserve and unreserve operations in 468.482366ms Agent resources size: 400 (100 frameworks) Made 20 reserve and unreserve operations in 925.725947ms Agent resources size: 800 (200 frameworks) Made 20 reserve and unreserve operations in 2.110337109secs ... Agent resources size: 6400 (1600 frameworks) Made 20 reserve and unreserve operations in 1.50141861756667mins Review: https://reviews.apache.org/r/71646/ --- src/master/allocator/mesos/hierarchical.cpp | 46 +++- src/master/allocator/mesos/hierarchical.hpp | 3 + src/master/allocator/mesos/sorter/drf/sorter.cpp | 74 ++---- src/master/allocator/mesos/sorter/drf/sorter.hpp | 23 +- .../allocator/mesos/sorter/random/sorter.cpp | 57 ----- .../allocator/mesos/sorter/random/sorter.hpp | 26 +-- src/master/allocator/mesos/sorter/sorter.hpp | 20 +- src/tests/sorter_tests.cpp | 259 +++++++-------------- 8 files changed, 177 insertions(+), 331 deletions(-) diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp index dd73d5b..93e1fb1 100644 --- a/src/master/allocator/mesos/hierarchical.cpp +++ b/src/master/allocator/mesos/hierarchical.cpp @@ -803,10 +803,14 @@ void HierarchicalAllocatorProcess::addSlave( roleTree.trackReservations(total.reserved()); - roleSorter->add(slaveId, total); + const ResourceQuantities agentScalarQuantities = + ResourceQuantities::fromScalarResources(total.scalars()); + + totalScalarQuantities += agentScalarQuantities; + roleSorter->addSlave(slaveId, agentScalarQuantities); foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { - sorter->add(slaveId, total); + sorter->addSlave(slaveId, agentScalarQuantities); } foreachpair (const FrameworkID& frameworkId, @@ -873,13 +877,19 @@ void HierarchicalAllocatorProcess::removeSlave( // all the resources. Fixing this would require more information // than what we currently track in the allocator. - roleSorter->remove(slaveId, slave.getTotal()); + roleSorter->removeSlave(slaveId); foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { - sorter->remove(slaveId, slave.getTotal()); + sorter->removeSlave(slaveId); } roleTree.untrackReservations(slave.getTotal().reserved()); + + const ResourceQuantities agentScalarQuantities = + ResourceQuantities::fromScalarResources(slave.getTotal().scalars()); + + CHECK_CONTAINS(totalScalarQuantities, agentScalarQuantities); + totalScalarQuantities -= agentScalarQuantities; } slaves.erase(slaveId); @@ -1870,7 +1880,7 @@ void HierarchicalAllocatorProcess::__allocate() // allocated resources - // unallocated reservations - // unallocated revocable resources - ResourceQuantities availableHeadroom = roleSorter->totalScalarQuantities(); + ResourceQuantities availableHeadroom = totalScalarQuantities; // NOTE: The role sorter does not return aggregated allocation // information whereas `reservationScalarQuantities` does, so @@ -2666,7 +2676,7 @@ double HierarchicalAllocatorProcess::_resources_offered_or_allocated( double HierarchicalAllocatorProcess::_resources_total( const string& resource) { - return roleSorter->totalScalarQuantities().get(resource).value(); + return totalScalarQuantities.get(resource).value(); } @@ -2773,7 +2783,9 @@ void HierarchicalAllocatorProcess::trackFrameworkUnderRole( frameworkSorter->initialize(options.fairnessExcludeResourceNames); foreachvalue (const Slave& slave, slaves) { - frameworkSorter->add(slave.info.id(), slave.getTotal()); + frameworkSorter->addSlave( + slave.info.id(), + ResourceQuantities::fromScalarResources(slave.getTotal().scalars())); } } @@ -2826,13 +2838,23 @@ bool HierarchicalAllocatorProcess::updateSlaveTotal( roleTree.untrackReservations(oldTotal.reserved()); roleTree.trackReservations(total.reserved()); - // Update the totals in the sorters. - roleSorter->remove(slaveId, oldTotal); - roleSorter->add(slaveId, total); + // Update the total in the allocator and totals in the sorters. + const ResourceQuantities oldAgentScalarQuantities = + ResourceQuantities::fromScalarResources(oldTotal.scalars()); + + const ResourceQuantities agentScalarQuantities = + ResourceQuantities::fromScalarResources(total.scalars()); + + CHECK_CONTAINS(totalScalarQuantities, oldAgentScalarQuantities); + totalScalarQuantities -= oldAgentScalarQuantities; + totalScalarQuantities += agentScalarQuantities; + + roleSorter->removeSlave(slaveId); + roleSorter->addSlave(slaveId, agentScalarQuantities); foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { - sorter->remove(slaveId, oldTotal); - sorter->add(slaveId, total); + sorter->removeSlave(slaveId); + sorter->addSlave(slaveId, agentScalarQuantities); } return true; diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index 65d103e..31bab01 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -660,6 +660,9 @@ protected: hashmap<SlaveID, Slave> slaves; + // Total scalar resource quantities on all agents. + ResourceQuantities totalScalarQuantities; + RoleTree roleTree; // A set of agents that are kept as allocation candidates. Events diff --git a/src/master/allocator/mesos/sorter/drf/sorter.cpp b/src/master/allocator/mesos/sorter/drf/sorter.cpp index 09889cd..443195b 100644 --- a/src/master/allocator/mesos/sorter/drf/sorter.cpp +++ b/src/master/allocator/mesos/sorter/drf/sorter.cpp @@ -471,68 +471,36 @@ Resources DRFSorter::allocation( } -const ResourceQuantities& DRFSorter::totalScalarQuantities() const +void DRFSorter::addSlave( + const SlaveID& slaveId, + const ResourceQuantities& scalarQuantities) { - return total_.totals; -} + bool inserted = total_.agentResourceQuantities.emplace( + slaveId, scalarQuantities).second; + CHECK(inserted) << "Attempted to add already added agent " << slaveId; -void DRFSorter::add(const SlaveID& slaveId, const Resources& resources) -{ - if (!resources.empty()) { - // Add shared resources to the total quantities when the same - // resources don't already exist in the total. - const Resources newShared = resources.shared() - .filter([this, slaveId](const Resource& resource) { - return !total_.resources[slaveId].contains(resource); - }); - - total_.resources[slaveId] += resources; - - const ResourceQuantities scalarQuantities = - ResourceQuantities::fromScalarResources( - (resources.nonShared() + newShared).scalars()); - - total_.totals += scalarQuantities; - - // We have to recalculate all shares when the total resources - // change, but we put it off until `sort` is called so that if - // something else changes before the next allocation we don't - // recalculate everything twice. - dirty = true; - } + total_.totals += scalarQuantities; + + // We have to recalculate all shares when the total resources + // change, but we put it off until `sort` is called so that if + // something else changes before the next allocation we don't + // recalculate everything twice. + dirty = true; } -void DRFSorter::remove(const SlaveID& slaveId, const Resources& resources) +void DRFSorter::removeSlave(const SlaveID& slaveId) { - if (!resources.empty()) { - CHECK(total_.resources.contains(slaveId)); - CHECK(total_.resources[slaveId].contains(resources)) - << total_.resources[slaveId] << " does not contain " << resources; - - total_.resources[slaveId] -= resources; + const auto agent = total_.agentResourceQuantities.find(slaveId); + CHECK(agent != total_.agentResourceQuantities.end()) + << "Attempted to remove unknown agent " << slaveId; - // Remove shared resources from the total quantities when there - // are no instances of same resources left in the total. - const Resources absentShared = resources.shared() - .filter([this, slaveId](const Resource& resource) { - return !total_.resources[slaveId].contains(resource); - }); + CHECK_CONTAINS(total_.totals, agent->second); + total_.totals -= agent->second; - const ResourceQuantities scalarQuantities = - ResourceQuantities::fromScalarResources( - (resources.nonShared() + absentShared).scalars()); - - CHECK(total_.totals.contains(scalarQuantities)); - total_.totals -= scalarQuantities; - - if (total_.resources[slaveId].empty()) { - total_.resources.erase(slaveId); - } - - dirty = true; - } + total_.agentResourceQuantities.erase(agent); + dirty = true; } diff --git a/src/master/allocator/mesos/sorter/drf/sorter.hpp b/src/master/allocator/mesos/sorter/drf/sorter.hpp index 248ca81..5cb48cd 100644 --- a/src/master/allocator/mesos/sorter/drf/sorter.hpp +++ b/src/master/allocator/mesos/sorter/drf/sorter.hpp @@ -97,11 +97,11 @@ public: const std::string& clientPath, const SlaveID& slaveId) const override; - const ResourceQuantities& totalScalarQuantities() const override; - - void add(const SlaveID& slaveId, const Resources& resources) override; + void addSlave( + const SlaveID& slaveId, + const ResourceQuantities& scalarQuantities) override; - void remove(const SlaveID& slaveId, const Resources& resources) override; + void removeSlave(const SlaveID& slaveId) override; std::vector<std::string> sort() override; @@ -151,18 +151,19 @@ private: // Total resources. struct Total { - // We need to keep track of the resources (and not just scalar - // quantities) to account for multiple copies of the same shared - // resources. We need to ensure that we do not update the scalar - // quantities for shared resources when the change is only in the - // number of copies in the sorter. - hashmap<SlaveID, Resources> resources; - // We keep the aggregated scalar resource quantities to speed // up share calculation. Note, resources shared count are ignored. // Because sharedness inherently refers to the identities of resources // and not quantities. ResourceQuantities totals; + + // We keep track of per-agent resource quantities to handle agent removal. + // + // Note that the only way to change the stored resource quantities + // is to remove the agent from the sorter and add it with new resources. + // Thus, when a resource shared count on an agent changes, multiple copies + // of the same shared resource are still accounted for exactly once. + hashmap<SlaveID, const ResourceQuantities> agentResourceQuantities; } total_; // Metrics are optionally exposed by the sorter. diff --git a/src/master/allocator/mesos/sorter/random/sorter.cpp b/src/master/allocator/mesos/sorter/random/sorter.cpp index 60a5797..b64ee8e 100644 --- a/src/master/allocator/mesos/sorter/random/sorter.cpp +++ b/src/master/allocator/mesos/sorter/random/sorter.cpp @@ -408,63 +408,6 @@ Resources RandomSorter::allocation( } -const ResourceQuantities& RandomSorter::totalScalarQuantities() const -{ - return total_.totals; -} - - -void RandomSorter::add(const SlaveID& slaveId, const Resources& resources) -{ - if (!resources.empty()) { - // Add shared resources to the total quantities when the same - // resources don't already exist in the total. - const Resources newShared = resources.shared() - .filter([this, slaveId](const Resource& resource) { - return !total_.resources[slaveId].contains(resource); - }); - - total_.resources[slaveId] += resources; - - const ResourceQuantities scalarQuantities = - ResourceQuantities::fromScalarResources( - (resources.nonShared() + newShared).scalars()); - - total_.totals += scalarQuantities; - } -} - - -void RandomSorter::remove(const SlaveID& slaveId, const Resources& resources) -{ - if (!resources.empty()) { - CHECK(total_.resources.contains(slaveId)); - CHECK(total_.resources[slaveId].contains(resources)) - << total_.resources[slaveId] << " does not contain " << resources; - - total_.resources[slaveId] -= resources; - - // Remove shared resources from the total quantities when there - // are no instances of same resources left in the total. - const Resources absentShared = resources.shared() - .filter([this, slaveId](const Resource& resource) { - return !total_.resources[slaveId].contains(resource); - }); - - const ResourceQuantities scalarQuantities = - ResourceQuantities::fromScalarResources( - (resources.nonShared() + absentShared).scalars()); - - CHECK(total_.totals.contains(scalarQuantities)); - total_.totals -= scalarQuantities; - - if (total_.resources[slaveId].empty()) { - total_.resources.erase(slaveId); - } - } -} - - vector<string> RandomSorter::sort() { pair<vector<string>, vector<double>> clientsAndWeights = diff --git a/src/master/allocator/mesos/sorter/random/sorter.hpp b/src/master/allocator/mesos/sorter/random/sorter.hpp index df9c895..b334253 100644 --- a/src/master/allocator/mesos/sorter/random/sorter.hpp +++ b/src/master/allocator/mesos/sorter/random/sorter.hpp @@ -97,11 +97,12 @@ public: const std::string& clientPath, const SlaveID& slaveId) const override; - const ResourceQuantities& totalScalarQuantities() const override; - - void add(const SlaveID& slaveId, const Resources& resources) override; + // NOTE: addSlave/removeSlave is a no-op for this sorter. + void addSlave( + const SlaveID& slaveId, + const ResourceQuantities& scalarQuantities) override {}; - void remove(const SlaveID& slaveId, const Resources& resources) override; + void removeSlave(const SlaveID& slaveId) override {}; // This will perform a weighted random shuffle on each call. // @@ -181,23 +182,6 @@ private: // rooted at that path. This hashmap might include weights for paths // that are not currently in the sorter tree. hashmap<std::string, double> weights; - - // Total resources. - struct Total - { - // We need to keep track of the resources (and not just scalar - // quantities) to account for multiple copies of the same shared - // resources. We need to ensure that we do not update the scalar - // quantities for shared resources when the change is only in the - // number of copies in the sorter. - hashmap<SlaveID, Resources> resources; - - // We keep the aggregated scalar resource quantities to speed - // up share calculation. Note, resources shared count are ignored. - // Because sharedness inherently refers to the identities of resources - // and not quantities. - ResourceQuantities totals; - } total_; }; diff --git a/src/master/allocator/mesos/sorter/sorter.hpp b/src/master/allocator/mesos/sorter/sorter.hpp index 52b8a7b..181cfaa 100644 --- a/src/master/allocator/mesos/sorter/sorter.hpp +++ b/src/master/allocator/mesos/sorter/sorter.hpp @@ -124,15 +124,19 @@ public: const std::string& client, const SlaveID& slaveId) const = 0; - // Returns the total scalar resource quantities in this sorter. - virtual const ResourceQuantities& totalScalarQuantities() const = 0; - - // Add resources to the total pool of resources this - // Sorter should consider. - virtual void add(const SlaveID& slaveId, const Resources& resources) = 0; + // Add/remove total scalar resource quantities of an agent to/from the + // total pool of resources this Sorter should consider. + // + // NOTE: Updating resources of an agent in the Sorter is done by first calling + // `removeSlave()` and then `addSlave()` with new resource quantities. + // + // NOTE: Attempt to add the same agent twice or remove an agent not added + // to the Sorter may crash the program. + virtual void addSlave( + const SlaveID& slaveId, + const ResourceQuantities& scalarQuantities) = 0; - // Remove resources from the total pool. - virtual void remove(const SlaveID& slaveId, const Resources& resources) = 0; + virtual void removeSlave(const SlaveID& slaveId) = 0; // Returns all of the clients in the order that they should // be allocated to, according to this Sorter's policy. diff --git a/src/tests/sorter_tests.cpp b/src/tests/sorter_tests.cpp index 78d6dde..8b8774e 100644 --- a/src/tests/sorter_tests.cpp +++ b/src/tests/sorter_tests.cpp @@ -243,8 +243,7 @@ TEST(DRFSorterTest, DRF) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); EXPECT_EQ(vector<string>({}), sorter.sort()); @@ -287,16 +286,14 @@ TEST(DRFSorterTest, DRF) sorter.activate("e"); sorter.allocated("e", slaveId, eResources); - Resources removedResources = Resources::parse("cpus:50;mem:0").get(); - sorter.remove(slaveId, removedResources); - // total resources is now cpus = 50, mem = 100 + sorter.removeSlave(slaveId); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:50;mem:100")); // shares: b = .04, c = .02, d = .06, e = .05 EXPECT_EQ(vector<string>({"c", "b", "e", "d"}), sorter.sort()); - Resources addedResources = Resources::parse("cpus:0;mem:100").get(); - sorter.add(slaveId, addedResources); - // total resources is now cpus = 50, mem = 200 + sorter.removeSlave(slaveId); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:50;mem:200")); Resources fResources = Resources::parse("cpus:5;mem:1").get(); sorter.add("f"); @@ -336,7 +333,7 @@ TEST(DRFSorterTest, WDRF) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.activate("a"); @@ -396,9 +393,7 @@ TEST(DRFSorterTest, UpdateWeight) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - - sorter.add(slaveId, totalResources); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.activate("a"); @@ -428,9 +423,7 @@ TEST(DRFSorterTest, AllocationCountTieBreak) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - - sorter.add(slaveId, totalResources); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.add("b"); @@ -511,8 +504,7 @@ TEST(DRFSorterTest, ShallowHierarchy) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a/a"); sorter.activate("a/a"); @@ -554,16 +546,14 @@ TEST(DRFSorterTest, ShallowHierarchy) sorter.activate("e/e"); sorter.allocated("e/e", slaveId, eResources); - Resources removedResources = Resources::parse("cpus:50;mem:0").get(); - sorter.remove(slaveId, removedResources); - // total resources is now cpus = 50, mem = 100 + sorter.removeSlave(slaveId); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:50;mem:100")); // shares: b/b = .04, c/c = .02, d/d = .06, e/e = .05 EXPECT_EQ(vector<string>({"c/c", "b/b", "e/e", "d/d"}), sorter.sort()); - Resources addedResources = Resources::parse("cpus:0;mem:100").get(); - sorter.add(slaveId, addedResources); - // total resources is now cpus = 50, mem = 200 + sorter.removeSlave(slaveId); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:50;mem:200")); Resources fResources = Resources::parse("cpus:5;mem:1").get(); sorter.add("f/f"); @@ -606,8 +596,7 @@ TEST(DRFSorterTest, DeepHierarchy) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a/a/a/a/a"); sorter.activate("a/a/a/a/a"); @@ -649,17 +638,15 @@ TEST(DRFSorterTest, DeepHierarchy) sorter.activate("e/e/e/e/e/e"); sorter.allocated("e/e/e/e/e/e", slaveId, eResources); - Resources removedResources = Resources::parse("cpus:50;mem:0").get(); - sorter.remove(slaveId, removedResources); - // total resources is now cpus = 50, mem = 100 + sorter.removeSlave(slaveId); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:50;mem:100")); // shares: b/b/b/b = .04, c/c/c = .02, d/d = .06, e/e/e/e/e/e = .05 EXPECT_EQ(vector<string>({"c/c/c", "b/b/b/b", "e/e/e/e/e/e", "d/d"}), sorter.sort()); - Resources addedResources = Resources::parse("cpus:0;mem:100").get(); - sorter.add(slaveId, addedResources); - // total resources is now cpus = 50, mem = 200 + sorter.removeSlave(slaveId); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:50;mem:200")); Resources fResources = Resources::parse("cpus:5;mem:1").get(); sorter.add("f/f"); @@ -702,8 +689,7 @@ TEST(DRFSorterTest, HierarchicalAllocation) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.add("b/c"); @@ -824,8 +810,7 @@ TEST(DRFSorterTest, HierarchicalIterationOrder) SlaveID slaveId; slaveId.set_value("agentId"); - Resources totalResources = Resources::parse("cpus:100;mem:100").get(); - sorter.add(slaveId, totalResources); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a/b"); sorter.add("c"); @@ -872,7 +857,7 @@ TEST(DRFSorterTest, AddChildToLeaf) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.activate("a"); @@ -943,7 +928,7 @@ TEST(DRFSorterTest, AddChildToInternal) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("x/a"); sorter.activate("x/a"); @@ -986,7 +971,7 @@ TEST(DRFSorterTest, AddChildToInactiveLeaf) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.activate("a"); @@ -1021,7 +1006,7 @@ TYPED_TEST(CommonSorterTest, RemoveLeafCollapseParent) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.activate("a"); @@ -1066,7 +1051,7 @@ TYPED_TEST(CommonSorterTest, RemoveLeafCollapseParentInactive) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.add("a"); sorter.activate("a"); @@ -1107,7 +1092,7 @@ TEST(DRFSorterTest, ChangeWeightOnSubtree) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:100;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:100")); sorter.updateWeight("b", 3); sorter.updateWeight("a", 2); @@ -1173,9 +1158,9 @@ TEST(DRFSorterTest, SplitResourceShares) disk2.mutable_disk()->mutable_persistence()->set_id("ID2"); disk2.mutable_disk()->mutable_volume()->set_container_path("data"); - sorter.add( + sorter.addSlave( slaveId, - Resources::parse("cpus:100;mem:100;disk:95").get() + disk1 + disk2); + *ResourceQuantities::fromString("cpus:100;mem:100;disk:100")); // Now, allocate resources to "a" and "b". Note that "b" will have // more disk if the shares are accounted correctly! @@ -1200,7 +1185,8 @@ TYPED_TEST(CommonSorterTest, UpdateAllocation) sorter.activate("a"); sorter.activate("b"); - sorter.add(slaveId, Resources::parse("cpus:10;mem:10;disk:10").get()); + sorter.addSlave( + slaveId, *ResourceQuantities::fromString("cpus:10;mem:10;disk:10")); sorter.allocated( "a", slaveId, Resources::parse("cpus:10;mem:10;disk:10").get()); @@ -1241,7 +1227,7 @@ TYPED_TEST(CommonSorterTest, UpdateAllocation) sorter.add("c"); sorter.activate("c"); - sorter.add(slaveId, resourcesC); + sorter.addSlave(slaveId, ResourceQuantities::fromScalarResources(resourcesC)); sorter.allocated("c", slaveId2, resourcesC); sorter.update("c", slaveId2, resourcesC, Resources()); @@ -1261,7 +1247,8 @@ TYPED_TEST(CommonSorterTest, UpdateAllocationNestedClient) sorter.activate("a/x"); sorter.activate("b/y"); - sorter.add(slaveId, Resources::parse("cpus:10;mem:10;disk:10").get()); + sorter.addSlave( + slaveId, *ResourceQuantities::fromString("cpus:10;mem:10;disk:10")); sorter.allocated( "a/x", slaveId, Resources::parse("cpus:10;mem:10;disk:10").get()); @@ -1303,7 +1290,7 @@ TYPED_TEST(CommonSorterTest, AllocationForInactiveClient) SlaveID slaveId; slaveId.set_value("agentId"); - sorter.add(slaveId, Resources::parse("cpus:10;mem:10").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:10;mem:10")); sorter.add("a"); sorter.add("b"); @@ -1361,11 +1348,14 @@ TYPED_TEST(CommonSorterTest, MultipleSlaves) sorter.add("framework"); sorter.activate("framework"); - Resources slaveResources = + const Resources slaveResources = Resources::parse("cpus:2;mem:512;ports:[31000-32000]").get(); - sorter.add(slaveA, slaveResources); - sorter.add(slaveB, slaveResources); + const ResourceQuantities slaveResourceQuantities = + ResourceQuantities::fromScalarResources(slaveResources.scalars()); + + sorter.addSlave(slaveA, slaveResourceQuantities); + sorter.addSlave(slaveB, slaveResourceQuantities); sorter.allocated("framework", slaveA, slaveResources); sorter.allocated("framework", slaveB, slaveResources); @@ -1374,14 +1364,10 @@ TYPED_TEST(CommonSorterTest, MultipleSlaves) EXPECT_EQ(slaveResources, sorter.allocation("framework", slaveA)); EXPECT_EQ(slaveResources, sorter.allocation("framework", slaveB)); - EXPECT_EQ( - ResourceQuantities::fromScalarResources(slaveResources.scalars()) + - ResourceQuantities::fromScalarResources(slaveResources.scalars()), + EXPECT_EQ(slaveResourceQuantities + slaveResourceQuantities, sorter.allocationScalarQuantities("framework")); - EXPECT_EQ( - ResourceQuantities::fromScalarResources(slaveResources.scalars()) + - ResourceQuantities::fromScalarResources(slaveResources.scalars()), + EXPECT_EQ(slaveResourceQuantities + slaveResourceQuantities, sorter.allocationScalarQuantities()); } @@ -1404,11 +1390,14 @@ TYPED_TEST(CommonSorterTest, MultipleSlavesUpdateAllocation) sorter.add("framework"); sorter.activate("framework"); - Resources slaveResources = + const Resources slaveResources = Resources::parse("cpus:2;mem:512;disk:10;ports:[31000-32000]").get(); - sorter.add(slaveA, slaveResources); - sorter.add(slaveB, slaveResources); + const ResourceQuantities slaveResourceQuantities = + ResourceQuantities::fromScalarResources(slaveResources.scalars()); + + sorter.addSlave(slaveA, slaveResourceQuantities); + sorter.addSlave(slaveB, slaveResourceQuantities); sorter.allocated("framework", slaveA, slaveResources); sorter.allocated("framework", slaveB, slaveResources); @@ -1456,7 +1445,7 @@ TEST(DRFSorterTest, UpdateTotal) sorter.activate("a"); sorter.activate("b"); - sorter.add(slaveId, Resources::parse("cpus:10;mem:100").get()); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:10;mem:100")); // Dominant share of "a" is 0.2 (cpus). sorter.allocated( @@ -1470,8 +1459,8 @@ TEST(DRFSorterTest, UpdateTotal) // Update the total resources by removing the previous total and // adding back the new total. - sorter.remove(slaveId, Resources::parse("cpus:10;mem:100").get()); - sorter.add(slaveId, Resources::parse("cpus:100;mem:10").get()); + sorter.removeSlave(slaveId); + sorter.addSlave(slaveId, *ResourceQuantities::fromString("cpus:100;mem:10")); // Now the dominant share of "a" is 0.1 (mem) and "b" is 0.2 (mem), // which should change the sort order. @@ -1496,8 +1485,8 @@ TEST(DRFSorterTest, MultipleSlavesUpdateTotal) sorter.activate("a"); sorter.activate("b"); - sorter.add(slaveA, Resources::parse("cpus:5;mem:50").get()); - sorter.add(slaveB, Resources::parse("cpus:5;mem:50").get()); + sorter.addSlave(slaveA, *ResourceQuantities::fromString("cpus:5;mem:50")); + sorter.addSlave(slaveB, *ResourceQuantities::fromString("cpus:5;mem:50")); // Dominant share of "a" is 0.2 (cpus). sorter.allocated( @@ -1509,10 +1498,9 @@ TEST(DRFSorterTest, MultipleSlavesUpdateTotal) EXPECT_EQ(vector<string>({"b", "a"}), sorter.sort()); - // Update the total resources of slaveA by removing the previous - // total and adding the new total. - sorter.remove(slaveA, Resources::parse("cpus:5;mem:50").get()); - sorter.add(slaveA, Resources::parse("cpus:95;mem:50").get()); + // Update the total resources of slaveA. + sorter.removeSlave(slaveA); + sorter.addSlave(slaveA, *ResourceQuantities::fromString("cpus:95;mem:50")); // Now the dominant share of "a" is 0.02 (cpus) and "b" is 0.03 // (mem), which should change the sort order. @@ -1536,7 +1524,7 @@ TEST(DRFSorterTest, RemoveResources) sorter.activate("b"); Resources slaveTotal = Resources::parse("cpus", "10", "*").get(); - sorter.add(slaveId, slaveTotal); + sorter.addSlave(slaveId, ResourceQuantities::fromScalarResources(slaveTotal)); // Dominant share of "a" is 0.6 (cpus). Resources allocatedForA = Resources::parse("cpus", "6", "*").get(); @@ -1549,8 +1537,10 @@ TEST(DRFSorterTest, RemoveResources) // Remove cpus from the total resources as well as the allocation of "a". Resources removed = Resources::parse("cpus", "5", "*").get(); - sorter.remove(slaveId, slaveTotal); - sorter.add(slaveId, slaveTotal - removed); + sorter.removeSlave(slaveId); + sorter.addSlave( + slaveId, ResourceQuantities::fromScalarResources(slaveTotal - removed)); + sorter.update("a", slaveId, allocatedForA, allocatedForA - removed); // Now the dominant share of "a" is 0.2 (cpus) and that of "b" is 0.8 (cpus), @@ -1579,7 +1569,7 @@ TEST(DRFSorterTest, RevocableResources) revocable.mutable_revocable(); Resources total = Resources::parse("cpus:10;mem:100").get() + revocable; - sorter.add(slaveId, total); + sorter.addSlave(slaveId, ResourceQuantities::fromScalarResources(total)); // Dominant share of "a" is 0.1 (cpus). Resources a = Resources::parse("cpus:2;mem:1").get(); @@ -1609,15 +1599,13 @@ TEST(DRFSorterTest, SharedResources) SlaveID slaveId; slaveId.set_value("agentId"); - Resource sharedDisk = createDiskResource( + const Resource sharedDisk = createDiskResource( "100", "role1", "id1", "path1", None(), true); - // Set totalResources to have disk of 1000, with disk 100 being shared. - Resources totalResources = Resources::parse( - "cpus:100;mem:100;disk(role1):900").get(); - totalResources += sharedDisk; - - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, + ResourceQuantities::fromScalarResources( + *Resources::parse("cpus:100;mem:100;disk(role1):900") + sharedDisk)); // Verify sort() works when shared resources are in the allocations. sorter.add("a"); @@ -1663,28 +1651,6 @@ TEST(DRFSorterTest, SharedResources) // d = .1 (dominant: disk). EXPECT_EQ(vector<string>({"b", "c", "d"}), sorter.sort()); - // Verify other basic allocator methods work when shared resources - // are in the allocations. - Resources removedResources = Resources::parse("cpus:50;mem:0").get(); - sorter.remove(slaveId, removedResources); - - // Total resources is now: - // cpus:50;mem:100;disk(role1):900;disk(role1)[id1]:100 - - // Shares: b = .04 (dominant: cpus), c = .1 (dominant: disk), - // d = .1 (dominant: disk). - EXPECT_EQ(vector<string>({"b", "c", "d"}), sorter.sort()); - - Resources addedResources = Resources::parse("cpus:0;mem:100").get(); - sorter.add(slaveId, addedResources); - - // Total resources is now: - // cpus:50;mem:200;disk(role1):900;disk(role1)[id1]:100 - - // Shares: b = .04 (dominant: cpus), c = .1 (dominant: disk), - // d = .1 (dominant: disk). - EXPECT_EQ(vector<string>({"b", "c", "d"}), sorter.sort()); - EXPECT_TRUE(sorter.contains("b")); EXPECT_FALSE(sorter.contains("a")); @@ -1703,15 +1669,13 @@ TEST(DRFSorterTest, SameDominantSharedResourcesAcrossClients) SlaveID slaveId; slaveId.set_value("agentId"); - Resource sharedDisk = createDiskResource( + const Resource sharedDisk = createDiskResource( "900", "role1", "id1", "path1", None(), true); - // Set totalResources to have disk of 1000, with disk 900 being shared. - Resources totalResources = Resources::parse( - "cpus:100;mem:100;disk(role1):100").get(); - totalResources += sharedDisk; - - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, + ResourceQuantities::fromScalarResources( + *Resources::parse("cpus:100;mem:100;disk(role1):100") + sharedDisk)); // Add 2 clients each with the same shared disk, but with varying // cpus and mem. @@ -1752,15 +1716,13 @@ TEST(DRFSorterTest, SameSharedResourcesSameClient) SlaveID slaveId; slaveId.set_value("agentId"); - Resource sharedDisk = createDiskResource( + const Resource sharedDisk = createDiskResource( "50", "role1", "id1", "path1", None(), true); - // Set totalResources to have disk of 1000, with disk of 50 being shared. - Resources totalResources = Resources::parse( - "cpus:100;mem:100;disk(role1):950").get(); - totalResources += sharedDisk; - - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, + ResourceQuantities::fromScalarResources( + *Resources::parse("cpus:100;mem:100;disk(role1):950") + sharedDisk)); // Verify sort() works when shared resources are in the allocations. sorter.add("a"); @@ -1797,15 +1759,13 @@ TEST(DRFSorterTest, SharedResourcesUnallocated) SlaveID slaveId; slaveId.set_value("agentId"); - Resource sharedDisk = createDiskResource( + const Resource sharedDisk = createDiskResource( "100", "role1", "id1", "path1", None(), true); - // Set totalResources to have disk of 1000, with disk 100 being shared. - Resources totalResources = Resources::parse( - "cpus:100;mem:100;disk(role1):900").get(); - totalResources += sharedDisk; - - sorter.add(slaveId, totalResources); + sorter.addSlave( + slaveId, + ResourceQuantities::fromScalarResources( + *Resources::parse("cpus:100;mem:100;disk(role1):900") + sharedDisk)); // Allocate 3 copies of shared resources to client 'a', but allocate no // shared resource to client 'b'. @@ -1841,45 +1801,6 @@ TEST(DRFSorterTest, SharedResourcesUnallocated) } -// This test verifies that shared resources are removed from the sorter -// only when all instances of the the same shared resource are removed. -TYPED_TEST(CommonSorterTest, RemoveSharedResources) -{ - TypeParam sorter; - - SlaveID slaveId; - slaveId.set_value("agentId"); - - Resource sharedDisk = createDiskResource( - "100", "role1", "id1", "path1", None(), true); - - sorter.add( - slaveId, Resources::parse("cpus:100;mem:100;disk(role1):900").get()); - - ResourceQuantities quantity1 = sorter.totalScalarQuantities(); - - sorter.add(slaveId, sharedDisk); - ResourceQuantities quantity2 = sorter.totalScalarQuantities(); - - EXPECT_EQ( - CHECK_NOTERROR(ResourceQuantities::fromString("disk:100")), - quantity2 - quantity1); - - sorter.add(slaveId, sharedDisk); - ResourceQuantities quantity3 = sorter.totalScalarQuantities(); - - EXPECT_NE(quantity1, quantity3); - EXPECT_EQ(quantity2, quantity3); - - // The quantity of the shared disk is removed when the last copy is removed. - sorter.remove(slaveId, sharedDisk); - EXPECT_EQ(sorter.totalScalarQuantities(), quantity3); - - sorter.remove(slaveId, sharedDisk); - EXPECT_EQ(sorter.totalScalarQuantities(), quantity1); -} - - // This benchmark simulates sorting a number of clients that have // different amount of allocations. // @@ -1920,8 +1841,8 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_FullSort) cout << "Added " << clientCount << " clients in " << watch.elapsed() << endl; - Resources agentResources = Resources::parse( - "cpus:24;mem:4096;disk:4096;ports:[31000-32000]").get(); + const ResourceQuantities agentScalarQuantities = + *ResourceQuantities::fromString("cpus:24;mem:4096;disk:4096"); watch.start(); { @@ -1931,7 +1852,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_FullSort) agents.push_back(slaveId); - sorter.add(slaveId, agentResources); + sorter.addSlave(slaveId, agentScalarQuantities); } } watch.stop(); @@ -1999,7 +1920,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_FullSort) watch.start(); { foreach (const SlaveID& slaveId, agents) { - sorter.remove(slaveId, agentResources); + sorter.removeSlave(slaveId); } } watch.stop(); @@ -2100,8 +2021,8 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_HierarchyFullSort) cout << "Added " << clientCount << " clients in " << watch.elapsed() << endl; - Resources agentResources = Resources::parse( - "cpus:24;mem:4096;disk:4096;ports:[31000-32000]").get(); + const ResourceQuantities agentScalarQuantities = + *ResourceQuantities::fromString("cpus:24;mem:4096;disk:4096"); watch.start(); { @@ -2111,7 +2032,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_HierarchyFullSort) agents.push_back(slaveId); - sorter.add(slaveId, agentResources); + sorter.addSlave(slaveId, agentScalarQuantities); } } watch.stop(); @@ -2179,7 +2100,7 @@ TYPED_TEST(CommonSorterTest, BENCHMARK_HierarchyFullSort) watch.start(); { foreach (const SlaveID& slaveId, agents) { - sorter.remove(slaveId, agentResources); + sorter.removeSlave(slaveId); } } watch.stop();
