Repository: mesos Updated Branches: refs/heads/master e02ae0b4e -> 68505cd0a
Improved the performance of DRF sorter by caching the scalars. Review: https://reviews.apache.org/r/35664 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ce1c6e2a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ce1c6e2a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ce1c6e2a Branch: refs/heads/master Commit: ce1c6e2aad748d9f999c09b9bb4897e19fc18175 Parents: 114d2aa Author: Jie Yu <[email protected]> Authored: Fri Jun 19 12:38:24 2015 -0700 Committer: Jie Yu <[email protected]> Committed: Fri Jun 19 15:43:59 2015 -0700 ---------------------------------------------------------------------- src/master/allocator/sorter/drf/sorter.cpp | 112 +++++++++++++----------- src/master/allocator/sorter/drf/sorter.hpp | 23 ++++- src/tests/sorter_tests.cpp | 55 ++++++++++-- 3 files changed, 128 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/ce1c6e2a/src/master/allocator/sorter/drf/sorter.cpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp index c5f4caf..12434a0 100644 --- a/src/master/allocator/sorter/drf/sorter.cpp +++ b/src/master/allocator/sorter/drf/sorter.cpp @@ -24,7 +24,6 @@ using std::list; using std::set; using std::string; - namespace mesos { namespace internal { namespace master { @@ -47,7 +46,7 @@ void DRFSorter::add(const string& name, double weight) Client client(name, 0, 0); clients.insert(client); - allocations[name] = hashmap<SlaveID, Resources>(); + allocations[name] = Allocation(); weights[name] = weight; } @@ -107,7 +106,8 @@ void DRFSorter::allocated( clients.insert(client); } - allocations[name][slaveId] += resources; + allocations[name].resources[slaveId] += resources; + allocations[name].scalars += resources.scalars(); // If the total resources have changed, we're going to // recalculate all the shares, so don't bother just @@ -131,19 +131,23 @@ void DRFSorter::update( // Otherwise, we need to ensure we re-calculate the shares, as // is being currently done, for safety. - CHECK(resources[slaveId].contains(oldAllocation)); + CHECK(total.resources[slaveId].contains(oldAllocation)); + CHECK(total.scalars.contains(oldAllocation.scalars())); - resources[slaveId] -= oldAllocation; - resources[slaveId] += newAllocation; + total.resources[slaveId] -= oldAllocation; + total.resources[slaveId] += newAllocation; - CHECK(allocations[name][slaveId].contains(oldAllocation)); + total.scalars -= oldAllocation.scalars(); + total.scalars += newAllocation.scalars(); - allocations[name][slaveId] -= oldAllocation; - if (allocations[name][slaveId].empty()) { - allocations[name].erase(slaveId); - } + CHECK(allocations[name].resources[slaveId].contains(oldAllocation)); + CHECK(allocations[name].scalars.contains(oldAllocation.scalars())); + + allocations[name].resources[slaveId] -= oldAllocation; + allocations[name].resources[slaveId] += newAllocation; - allocations[name][slaveId] += newAllocation; + allocations[name].scalars -= oldAllocation.scalars(); + allocations[name].scalars += newAllocation.scalars(); // Just assume the total has changed, per the TODO above. dirty = true; @@ -152,7 +156,7 @@ void DRFSorter::update( hashmap<SlaveID, Resources> DRFSorter::allocation(const string& name) { - return allocations[name]; + return allocations[name].resources; } @@ -161,9 +165,11 @@ void DRFSorter::unallocated( const SlaveID& slaveId, const Resources& resources) { - allocations[name][slaveId] -= resources; - if (allocations[name][slaveId].empty()) { - allocations[name].erase(slaveId); + allocations[name].resources[slaveId] -= resources; + allocations[name].scalars -= resources.scalars(); + + if (allocations[name].resources[slaveId].empty()) { + allocations[name].resources.erase(slaveId); } if (!dirty) { @@ -172,10 +178,11 @@ void DRFSorter::unallocated( } -void DRFSorter::add(const SlaveID& slaveId, const Resources& _resources) +void DRFSorter::add(const SlaveID& slaveId, const Resources& resources) { - if (!_resources.empty()) { - resources[slaveId] += _resources; + if (!resources.empty()) { + total.resources[slaveId] += resources; + total.scalars += resources.scalars(); // We have to recalculate all shares when the total resources // change, but we put it off until sort is called so that if @@ -186,15 +193,16 @@ void DRFSorter::add(const SlaveID& slaveId, const Resources& _resources) } -void DRFSorter::remove(const SlaveID& slaveId, const Resources& _resources) +void DRFSorter::remove(const SlaveID& slaveId, const Resources& resources) { - if (!_resources.empty()) { - CHECK(resources.contains(slaveId)); + if (!resources.empty()) { + CHECK(total.resources.contains(slaveId)); - resources[slaveId] -= _resources; + total.resources[slaveId] -= resources; + total.scalars -= resources.scalars(); - if (resources[slaveId].empty()) { - resources.erase(slaveId); + if (total.resources[slaveId].empty()) { + total.resources.erase(slaveId); } dirty = true; @@ -202,12 +210,17 @@ void DRFSorter::remove(const SlaveID& slaveId, const Resources& _resources) } -void DRFSorter::update(const SlaveID& slaveId, const Resources& _resources) +void DRFSorter::update(const SlaveID& slaveId, const Resources& resources) { - resources[slaveId] = _resources; + CHECK(total.scalars.contains(total.resources[slaveId].scalars())); + + total.scalars -= total.resources[slaveId].scalars(); + total.scalars += resources.scalars(); - if (resources[slaveId].empty()) { - resources.erase(slaveId); + total.resources[slaveId] = resources; + + if (total.resources[slaveId].empty()) { + total.resources.erase(slaveId); } dirty = true; @@ -274,39 +287,34 @@ void DRFSorter::update(const string& name) double DRFSorter::calculateShare(const string& name) { - double share = 0; + double share = 0.0; // TODO(benh): This implementation of "dominant resource fairness" // currently does not take into account resources that are not // scalars. - // NOTE: Summation is incorrect for non-scalars, but since we - // only care about scalar resources, this is safe. - Resources totalResources = Resources::sum(resources); - Resources clientAllocation = Resources::sum(allocations[name]); - - // Scalar resources may be spread across multiple 'Resource' - // objects. E.g. persistent volumes. So we first collect the names - // of the scalar resources, before computing the totals. - hashset<string> scalars; - foreach (const Resource& resource, totalResources) { - if (resource.type() == Value::SCALAR) { - scalars.insert(resource.name()); - } - } + foreach (const string& scalar, total.scalars.names()) { + double _total = 0.0; - foreach (const string& scalar, scalars) { - Option<Value::Scalar> total = totalResources.get<Value::Scalar>(scalar); + // NOTE: Scalar resources may be spread across multiple + // 'Resource' objects. E.g. persistent volumes. + foreach (const Resource& resource, total.scalars.get(scalar)) { + CHECK_EQ(resource.type(), Value::SCALAR); + _total += resource.scalar().value(); + } - if (total.isSome() && total.get().value() > 0) { - Option<Value::Scalar> allocation = - clientAllocation.get<Value::Scalar>(scalar); + if (_total > 0.0) { + double allocation = 0.0; - if (allocation.isNone()) { - allocation = Value::Scalar(); + // NOTE: Scalar resources may be spread across multiple + // 'Resource' objects. E.g. persistent volumes. + foreach (const Resource& resource, + allocations[name].scalars.get(scalar)) { + CHECK_EQ(resource.type(), Value::SCALAR); + allocation += resource.scalar().value(); } - share = std::max(share, allocation.get().value() / total.get().value()); + share = std::max(share, allocation / _total); } } http://git-wip-us.apache.org/repos/asf/mesos/blob/ce1c6e2a/src/master/allocator/sorter/drf/sorter.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/sorter/drf/sorter.hpp b/src/master/allocator/sorter/drf/sorter.hpp index 35dc1a4..d38925e 100644 --- a/src/master/allocator/sorter/drf/sorter.hpp +++ b/src/master/allocator/sorter/drf/sorter.hpp @@ -121,14 +121,29 @@ private: // A set of Clients (names and shares) sorted by share. std::set<Client, DRFComparator> clients; - // Maps client names to the resources they have been allocated. - hashmap<std::string, hashmap<SlaveID, Resources>> allocations; - // Maps client names to the weights that should be applied to their shares. hashmap<std::string, double> weights; // Total resources. - hashmap<SlaveID, Resources> resources; + struct Total { + hashmap<SlaveID, Resources> resources; + + // NOTE: Scalars can be safely aggregated across slaves. We keep + // that to speed up the calculation of shares. See MESOS-2891 for + // the reasons why we want to do that. + Resources scalars; + } total; + + // Allocation for a client. + struct Allocation { + hashmap<SlaveID, Resources> resources; + + // Similarly, we aggregated scalars across slaves. See note above. + Resources scalars; + }; + + // Maps client names to the resources they have been allocated. + hashmap<std::string, Allocation> allocations; }; } // namespace allocator { http://git-wip-us.apache.org/repos/asf/mesos/blob/ce1c6e2a/src/tests/sorter_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/sorter_tests.cpp b/src/tests/sorter_tests.cpp index 435e0bf..4013886 100644 --- a/src/tests/sorter_tests.cpp +++ b/src/tests/sorter_tests.cpp @@ -281,12 +281,12 @@ TEST(SorterTest, MultipleSlaves) } -// We aggregate resources from multiple slaves into the sorter. -// Since non-scalar resources don't aggregate well across slaves, -// we need to keep track of the SlaveIDs of the resources. -// This tests that no resources vanish in the process of aggregation -// by performing a updates from unreserved to reserved resources. -TEST(SorterTest, MultipleSlaveUpdates) +// We aggregate resources from multiple slaves into the sorter. Since +// non-scalar resources don't aggregate well across slaves, we need to +// keep track of the SlaveIDs of the resources. This tests that no +// resources vanish in the process of aggregation by performing update +// allocations from unreserved to reserved resources. +TEST(SorterTest, MultipleSlavesUpdateAllocation) { DRFSorter sorter; @@ -366,6 +366,49 @@ TEST(SorterTest, UpdateTotal) } +// Similar to the above 'UpdateTotal' test, but tests the scenario +// when there are multiple slaves. +TEST(SorterTest, MultipleSlavesUpdateTotal) +{ + DRFSorter sorter; + + SlaveID slaveA; + slaveA.set_value("slaveA"); + + SlaveID slaveB; + slaveB.set_value("slaveB"); + + sorter.add("a"); + sorter.add("b"); + + sorter.add(slaveA, Resources::parse("cpus:5;mem:50").get()); + sorter.add(slaveB, Resources::parse("cpus:5;mem:50").get()); + + // Dominant share of "a" is 0.2 (cpus). + sorter.allocated( + "a", slaveA, Resources::parse("cpus:2;mem:1").get()); + + // Dominant share of "b" is 0.1 (cpus). + sorter.allocated( + "b", slaveB, Resources::parse("cpus:1;mem:3").get()); + + list<string> sorted = sorter.sort(); + ASSERT_EQ(2u, sorted.size()); + EXPECT_EQ("b", sorted.front()); + EXPECT_EQ("a", sorted.back()); + + // Update the total resources of slaveA. + sorter.update(slaveA, Resources::parse("cpus:95;mem:50").get()); + + // Now the dominant share of "a" is 0.02 (cpus) and "b" is 0.03 + // (mem), which should change the sort order. + sorted = sorter.sort(); + ASSERT_EQ(2u, sorted.size()); + EXPECT_EQ("a", sorted.front()); + EXPECT_EQ("b", sorted.back()); +} + + // This test verifies that revocable resources are properly accounted // for in the DRF sorter. TEST(SorterTest, RevocableResources)
