Repository: mesos Updated Branches: refs/heads/master cbb209991 -> dc2e130a7
Added a new API call 'updateAvailable' to the allocator. Needed to implement the master HTTP endpoints: `/reserve`, `/unreserve`, `/create` and `/destroy`. This API is similar to `updateSlave` which is currently very specific to oversubscription. I considered consolidating `updateAvailable` and `updateSlave` but it will require making offers be generated within the allocator to enable this. In specific, `updateAvailable` could fail if there aren't sufficient available resources to update, whereas `updateSlave` avoids failing by keeping the allocator in an over-allocated state. For `updateSlave`, leaving the allocator in an over-allocated state is ok. This is because it does not modify resources therefore `total - allocated` will work out to __empty__. `updateAvailable` cannot leave the allocator in an over-allocated state, because it modifies resources, and therefore `total - allocated` is not guaranteed to yield __empty__. Review: https://reviews.apache.org/r/35947 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dc2e130a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dc2e130a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dc2e130a Branch: refs/heads/master Commit: dc2e130a7faccb6ee28c207c8337cb58dfc3ca5c Parents: cbb2099 Author: Michael Park <[email protected]> Authored: Fri Jul 24 15:43:00 2015 -0700 Committer: Jie Yu <[email protected]> Committed: Fri Jul 24 15:43:01 2015 -0700 ---------------------------------------------------------------------- include/mesos/master/allocator.hpp | 6 ++ src/master/allocator/mesos/allocator.hpp | 23 +++++++ src/master/allocator/mesos/hierarchical.hpp | 46 ++++++++++++++ src/tests/hierarchical_allocator_tests.cpp | 80 ++++++++++++++++++++++++ src/tests/mesos.hpp | 15 +++++ 5 files changed, 170 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/include/mesos/master/allocator.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp index 22992c0..659f37b 100644 --- a/include/mesos/master/allocator.hpp +++ b/include/mesos/master/allocator.hpp @@ -27,6 +27,8 @@ #include <mesos/resources.hpp> +#include <process/future.hpp> + #include <stout/duration.hpp> #include <stout/hashmap.hpp> #include <stout/hashset.hpp> @@ -128,6 +130,10 @@ public: const SlaveID& slaveId, const std::vector<Offer::Operation>& operations) = 0; + virtual process::Future<Nothing> updateAvailable( + const SlaveID& slaveId, + const std::vector<Offer::Operation>& operations) = 0; + // Informs the Allocator to recover resources that are considered // used by the framework. virtual void recoverResources( http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/src/master/allocator/mesos/allocator.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp index 72470ec..aa55755 100644 --- a/src/master/allocator/mesos/allocator.hpp +++ b/src/master/allocator/mesos/allocator.hpp @@ -22,6 +22,7 @@ #include <mesos/master/allocator.hpp> #include <process/dispatch.hpp> +#include <process/future.hpp> #include <process/process.hpp> #include <stout/try.hpp> @@ -102,6 +103,10 @@ public: const SlaveID& slaveId, const std::vector<Offer::Operation>& operations); + process::Future<Nothing> updateAvailable( + const SlaveID& slaveId, + const std::vector<Offer::Operation>& operations); + void recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, @@ -188,6 +193,10 @@ public: const SlaveID& slaveId, const std::vector<Offer::Operation>& operations) = 0; + virtual process::Future<Nothing> updateAvailable( + const SlaveID& slaveId, + const std::vector<Offer::Operation>& operations) = 0; + virtual void recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, @@ -406,6 +415,20 @@ inline void MesosAllocator<AllocatorProcess>::updateAllocation( template <typename AllocatorProcess> +inline process::Future<Nothing> +MesosAllocator<AllocatorProcess>::updateAvailable( + const SlaveID& slaveId, + const std::vector<Offer::Operation>& operations) +{ + return process::dispatch( + process, + &MesosAllocatorProcess::updateAvailable, + slaveId, + operations); +} + + +template <typename AllocatorProcess> inline void MesosAllocator<AllocatorProcess>::recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/src/master/allocator/mesos/hierarchical.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index 3264d14..eaf9c6a 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -27,6 +27,7 @@ #include <process/event.hpp> #include <process/delay.hpp> +#include <process/future.hpp> #include <process/id.hpp> #include <process/metrics/gauge.hpp> #include <process/metrics/metrics.hpp> @@ -139,6 +140,10 @@ public: const SlaveID& slaveId, const std::vector<Offer::Operation>& operations); + process::Future<Nothing> updateAvailable( + const SlaveID& slaveId, + const std::vector<Offer::Operation>& operations); + void recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, @@ -716,6 +721,47 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAllocation( template <class RoleSorter, class FrameworkSorter> +process::Future<Nothing> +HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAvailable( + const SlaveID& slaveId, + const std::vector<Offer::Operation>& operations) +{ + CHECK(initialized); + CHECK(slaves.contains(slaveId)); + + Resources available = slaves[slaveId].total - slaves[slaveId].allocated; + + // It's possible for this 'apply' to fail here because a call to + // 'allocate' could have been enqueued by the allocator itself + // just before master's request to enqueue 'updateAvailable' + // arrives to the allocator. + // + // Master -------R------------ + // \___ + // \ + // Allocator --A-----A-U---A-- + // \___/ \___/ + // + // where A = allocate, R = reserve, U = updateAvailable + Try<Resources> updatedAvailable = available.apply(operations); + if (updatedAvailable.isError()) { + return process::Failure(updatedAvailable.error()); + } + + // Update the total resources. + Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations); + CHECK_SOME(updatedTotal); + + slaves[slaveId].total = updatedTotal.get(); + + // Now, update the total resources in the role sorter. + roleSorter->update(slaveId, slaves[slaveId].total.unreserved()); + + return Nothing(); +} + + +template <class RoleSorter, class FrameworkSorter> void HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources( const FrameworkID& frameworkId, http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/src/tests/hierarchical_allocator_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp index 3258840..c92d47a 100644 --- a/src/tests/hierarchical_allocator_tests.cpp +++ b/src/tests/hierarchical_allocator_tests.cpp @@ -782,6 +782,86 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation) } +// This test ensures that a call to 'updateAvailable' succeeds when the +// allocator has sufficient available resources. +TEST_F(HierarchicalAllocatorTest, UpdateAvailableSuccess) +{ + initialize(vector<string>{"role1"}); + + hashmap<FrameworkID, Resources> EMPTY; + + SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100"); + allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY); + + // Construct an offer operation for the framework's allocation. + Resources unreserved = Resources::parse("cpus:25;mem:50").get(); + Resources dynamicallyReserved = + unreserved.flatten("role1", createReservationInfo("ops")); + + Offer::Operation reserve = RESERVE(dynamicallyReserved); + + // Update the allocation in the allocator. + Future<Nothing> update = allocator->updateAvailable(slave.id(), {reserve}); + AWAIT_EXPECT_READY(update); + + // Expect to receive the updated available resources. + FrameworkInfo framework = createFrameworkInfo("role1"); + allocator->addFramework( + framework.id(), framework, hashmap<SlaveID, Resources>()); + + Future<Allocation> allocation = queue.get(); + AWAIT_READY(allocation); + EXPECT_EQ(framework.id(), allocation.get().frameworkId); + EXPECT_EQ(1u, allocation.get().resources.size()); + EXPECT_TRUE(allocation.get().resources.contains(slave.id())); + + // The allocation should be the slave's resources with the offer + // operation applied. + Try<Resources> updated = Resources(slave.resources()).apply(reserve); + ASSERT_SOME(updated); + + EXPECT_NE(Resources(slave.resources()), + Resources::sum(allocation.get().resources)); + + EXPECT_EQ(updated.get(), Resources::sum(allocation.get().resources)); +} + + +// This test ensures that a call to 'updateAvailable' fails when the +// allocator has insufficient available resources. +TEST_F(HierarchicalAllocatorTest, UpdateAvailableFail) +{ + initialize(vector<string>{"role1"}); + + hashmap<FrameworkID, Resources> EMPTY; + + SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100"); + allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY); + + // Expect to receive the all of the available resources. + FrameworkInfo framework = createFrameworkInfo("role1"); + allocator->addFramework( + framework.id(), framework, hashmap<SlaveID, Resources>()); + + Future<Allocation> allocation = queue.get(); + AWAIT_READY(allocation); + EXPECT_EQ(framework.id(), allocation.get().frameworkId); + EXPECT_EQ(1u, allocation.get().resources.size()); + EXPECT_TRUE(allocation.get().resources.contains(slave.id())); + EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); + + // Construct an offer operation for the framework's allocation. + Resources unreserved = Resources::parse("cpus:25;mem:50").get(); + Resources dynamicallyReserved = + unreserved.flatten("role1", createReservationInfo("ops")); + + Offer::Operation reserve = RESERVE(dynamicallyReserved); + + // Update the allocation in the allocator. + Future<Nothing> update = allocator->updateAvailable(slave.id(), {reserve}); + AWAIT_EXPECT_FAILED(update); +} + // This test ensures that when oversubscribed resources are updated // subsequent allocations properly account for that. TEST_F(HierarchicalAllocatorTest, UpdateSlave) http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 69134e1..8a76b4f 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -1036,6 +1036,12 @@ ACTION_P(InvokeUpdateAllocation, allocator) } +ACTION_P(InvokeUpdateResources, allocator) +{ + return allocator->real->updateAvailable(arg0, arg1); +} + + ACTION_P(InvokeRecoverResources, allocator) { allocator->real->recoverResources(arg0, arg1, arg2, arg3); @@ -1154,6 +1160,11 @@ public: EXPECT_CALL(*this, updateAllocation(_, _, _)) .WillRepeatedly(DoDefault()); + ON_CALL(*this, updateAvailable(_, _)) + .WillByDefault(InvokeUpdateResources(this)); + EXPECT_CALL(*this, updateAvailable(_, _)) + .WillRepeatedly(DoDefault()); + ON_CALL(*this, recoverResources(_, _, _, _)) .WillByDefault(InvokeRecoverResources(this)); EXPECT_CALL(*this, recoverResources(_, _, _, _)) @@ -1223,6 +1234,10 @@ public: const SlaveID&, const std::vector<Offer::Operation>&)); + MOCK_METHOD2(updateAvailable, process::Future<Nothing>( + const SlaveID&, + const std::vector<Offer::Operation>&)); + MOCK_METHOD4(recoverResources, void( const FrameworkID&, const SlaveID&,
