Added a new allocator method to add resources to agents. The added method complements 'Allocator::addSlave'. While in 'addSlave' the total agent resources and used resources are passed, the method 'addResourceProvider' added here allows to add additional, potentially used resources to an existing agent.
Review: https://reviews.apache.org/r/63997/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b220abcc Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b220abcc Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b220abcc Branch: refs/heads/master Commit: b220abcc0118204a6fa63a3c287ea7272af42b50 Parents: 8c0f8a4 Author: Benjamin Bannier <[email protected]> Authored: Thu Nov 30 17:03:50 2017 +0100 Committer: Benjamin Bannier <[email protected]> Committed: Thu Nov 30 18:33:58 2017 +0100 ---------------------------------------------------------------------- include/mesos/allocator/allocator.hpp | 13 +++ src/master/allocator/mesos/allocator.hpp | 25 +++++ src/master/allocator/mesos/hierarchical.cpp | 21 ++++ src/master/allocator/mesos/hierarchical.hpp | 5 + src/tests/allocator.hpp | 16 +++ src/tests/hierarchical_allocator_tests.cpp | 135 +++++++++++++++++++++++ 6 files changed, 215 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b220abcc/include/mesos/allocator/allocator.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/allocator/allocator.hpp b/include/mesos/allocator/allocator.hpp index ae12200..acb9e4f 100644 --- a/include/mesos/allocator/allocator.hpp +++ b/include/mesos/allocator/allocator.hpp @@ -215,6 +215,19 @@ public: capabilities = None()) = 0; /** + * Add resources from a local resource provider to an agent. + * + * @param slave Id of the agent to modify. + * @param total The resources to add to the agent's total resources. + * @param used The resources to add to the resources tracked as used + * for this agent. + */ + virtual void addResourceProvider( + const SlaveID& slave, + const Resources& total, + const hashmap<FrameworkID, Resources>& used) = 0; + + /** * Activates an agent. This is invoked when an agent reregisters. Offers * are only sent for activated agents. */ http://git-wip-us.apache.org/repos/asf/mesos/blob/b220abcc/src/master/allocator/mesos/allocator.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp index 8fa4fde..48254b6 100644 --- a/src/master/allocator/mesos/allocator.hpp +++ b/src/master/allocator/mesos/allocator.hpp @@ -102,6 +102,11 @@ public: const Option<Resources>& total = None(), const Option<std::vector<SlaveInfo::Capability>>& capabilities = None()); + void addResourceProvider( + const SlaveID& slave, + const Resources& total, + const hashmap<FrameworkID, Resources>& used); + void activateSlave( const SlaveID& slaveId); @@ -243,6 +248,11 @@ public: const Option<std::vector<SlaveInfo::Capability>>& capabilities = None()) = 0; + virtual void addResourceProvider( + const SlaveID& slave, + const Resources& total, + const hashmap<FrameworkID, Resources>& used) = 0; + virtual void activateSlave( const SlaveID& slaveId) = 0; @@ -490,6 +500,21 @@ inline void MesosAllocator<AllocatorProcess>::updateSlave( template <typename AllocatorProcess> +void MesosAllocator<AllocatorProcess>::addResourceProvider( + const SlaveID& slave, + const Resources& total, + const hashmap<FrameworkID, Resources>& used) +{ + process::dispatch( + process, + &MesosAllocatorProcess::addResourceProvider, + slave, + total, + used); +} + + +template <typename AllocatorProcess> inline void MesosAllocator<AllocatorProcess>::activateSlave( const SlaveID& slaveId) { http://git-wip-us.apache.org/repos/asf/mesos/blob/b220abcc/src/master/allocator/mesos/hierarchical.cpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp index 6b0be6a..ab2abf8 100644 --- a/src/master/allocator/mesos/hierarchical.cpp +++ b/src/master/allocator/mesos/hierarchical.cpp @@ -627,6 +627,27 @@ void HierarchicalAllocatorProcess::updateSlave( } +void HierarchicalAllocatorProcess::addResourceProvider( + const SlaveID& slaveId, + const Resources& total, + const hashmap<FrameworkID, Resources>& used) +{ + CHECK(initialized); + CHECK(slaves.contains(slaveId)); + + trackAllocatedResources(slaveId, used); + + Slave& slave = slaves.at(slaveId); + updateSlaveTotal(slaveId, slave.total + total); + slave.allocated += Resources::sum(used); + + VLOG(1) + << "Grew agent " << slaveId << " by " + << total << " (total), " + << used << " (used)"; +} + + void HierarchicalAllocatorProcess::activateSlave( const SlaveID& slaveId) { http://git-wip-us.apache.org/repos/asf/mesos/blob/b220abcc/src/master/allocator/mesos/hierarchical.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index fd604c9..3c87dc7 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -147,6 +147,11 @@ public: const Option<Resources>& total = None(), const Option<std::vector<SlaveInfo::Capability>>& capabilities = None()); + void addResourceProvider( + const SlaveID& slave, + const Resources& total, + const hashmap<FrameworkID, Resources>& used); + void deactivateSlave( const SlaveID& slaveId); http://git-wip-us.apache.org/repos/asf/mesos/blob/b220abcc/src/tests/allocator.hpp ---------------------------------------------------------------------- diff --git a/src/tests/allocator.hpp b/src/tests/allocator.hpp index 6a84f1b..fc5d9ef 100644 --- a/src/tests/allocator.hpp +++ b/src/tests/allocator.hpp @@ -103,6 +103,12 @@ ACTION_P(InvokeUpdateSlave, allocator) } +ACTION_P(InvokeAddResourceProvider, allocator) +{ + allocator->real->addResourceProvider(arg0, arg1, arg2); +} + + ACTION_P(InvokeActivateSlave, allocator) { allocator->real->activateSlave(arg0); @@ -279,6 +285,11 @@ public: EXPECT_CALL(*this, updateSlave(_, _, _)) .WillRepeatedly(DoDefault()); + ON_CALL(*this, addResourceProvider(_, _, _)) + .WillByDefault(InvokeAddResourceProvider(this)); + EXPECT_CALL(*this, addResourceProvider(_, _, _)) + .WillRepeatedly(DoDefault()); + ON_CALL(*this, activateSlave(_)) .WillByDefault(InvokeActivateSlave(this)); EXPECT_CALL(*this, activateSlave(_)) @@ -410,6 +421,11 @@ public: const Option<Resources>&, const Option<std::vector<SlaveInfo::Capability>>&)); + MOCK_METHOD3(addResourceProvider, void( + const SlaveID&, + const Resources&, + const hashmap<FrameworkID, Resources>&)); + MOCK_METHOD1(activateSlave, void( const SlaveID&)); http://git-wip-us.apache.org/repos/asf/mesos/blob/b220abcc/src/tests/hierarchical_allocator_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp index f0f95ba..0309074 100644 --- a/src/tests/hierarchical_allocator_tests.cpp +++ b/src/tests/hierarchical_allocator_tests.cpp @@ -1402,6 +1402,141 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources) } +// Checks that resource provider resources can be added to an agent +// and that the added used resources are correctly taken into account +// when computing fair share. +TEST_F(HierarchicalAllocatorTest, AddResourceProvider) +{ + Clock::pause(); + + initialize(); + + // Register two deactivated frameworks. + FrameworkInfo framework1 = createFrameworkInfo({"role1"}); + allocator->addFramework(framework1.id(), framework1, {}, false, {}); + + FrameworkInfo framework2 = createFrameworkInfo({"role2"}); + allocator->addFramework(framework2.id(), framework2, {}, false, {}); + + // Add a single agent with `resources` resources. + const Resources resources = Resources::parse("cpus:1;mem:100;disk:10").get(); + + SlaveInfo slave1 = createSlaveInfo(resources); + allocator->addSlave( + slave1.id(), + slave1, + AGENT_CAPABILITIES(), + None(), + slave1.resources(), + {}); + + { + // Add a resource provider with `resources*2` to the agent, all in + // use by `framework1`. + Resources allocation = resources + resources; + allocation.allocate("role1"); + allocator->addResourceProvider( + slave1.id(), + resources + resources, + {{framework1.id(), allocation}}); + } + + // Activate `framework2`. The next allocation will be to + // `framework2` which is the only active framework. After that + // `framework1`'s dominant share is 2/3 and `framework2`'s is 1/3. + allocator->activateFramework(framework2.id()); + + { + Resources allocation = slave1.resources(); + allocation.allocate("role2"); + Allocation expected = Allocation( + framework2.id(), + {{"role2", {{slave1.id(), allocation}}}}); + + AWAIT_EXPECT_EQ(expected, allocations.get()); + } + + // Activate `framework1` so it can receive offers. Currently all + // available resources are allocated. + allocator->activateFramework(framework1.id()); + + // Add another agent with `resources` resources. With that + // `framework1` no has a dominant share of 2/4 and `framework2` of + // 1/4. + SlaveInfo slave2 = createSlaveInfo(resources); + allocator->addSlave( + slave2.id(), + slave2, + AGENT_CAPABILITIES(), + None(), + slave2.resources(), + {}); + + { + // The next allocation will be to `framework2` since it is + // furthest below fair share. + Resources allocation = slave2.resources(); + allocation.allocate("role2"); + Allocation expected = Allocation( + framework2.id(), + {{"role2", {{slave2.id(), allocation}}}}); + + AWAIT_EXPECT_EQ(expected, allocations.get()); + } +} + + +// Check that even if as an overallocated resource provider is added to an +// agent, new allocations are only made for unused agent resources. +TEST_F(HierarchicalAllocatorTest, AddResourceProviderOverallocated) +{ + Clock::pause(); + + initialize(); + + const Resources resources = Resources::parse("cpus:1;mem:100;disk:10").get(); + + // Register an agent. + SlaveInfo slave = createSlaveInfo(resources + resources); + allocator->addSlave( + slave.id(), + slave, + AGENT_CAPABILITIES(), + None(), + slave.resources(), + {}); + + // Register a framework in deactivated state + // so it initially does not receive offers. + FrameworkInfo framework = createFrameworkInfo({"role"}); + allocator->addFramework(framework.id(), framework, {}, false, {}); + + // Track an allocation to the framework of half the agent's resources. We add + // no new resources to the total, but just increment the used resources. + Resources allocation = resources; + allocation.allocate("role"); + allocator->addResourceProvider( + slave.id(), + Resources(), + {{framework.id(), allocation}}); + + // Activate framework so it receives offers. + allocator->activateFramework(framework.id()); + + // Trigger a batch allocation. In the subsequent offer we expect the + // framework to receive the other half of the agent's resources so + // that it now has all its resources allocated to it. + Clock::advance(flags.allocation_interval); + Clock::settle(); + + Allocation expected = Allocation( + framework.id(), + {{"role", {{slave.id(), allocation}}}}); + + AWAIT_EXPECT_EQ(expected, allocations.get()); +} + + TEST_F(HierarchicalAllocatorTest, Allocatable) { // Pausing the clock is not necessary, but ensures that the test
