Repository: mesos Updated Branches: refs/heads/master ebb8b590b -> 87d8bd08e
Updated allocator to properly handle oversubscribed resources. Review: https://reviews.apache.org/r/34616 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/87d8bd08 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/87d8bd08 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/87d8bd08 Branch: refs/heads/master Commit: 87d8bd08e45771b40e238a787a7adee53a244946 Parents: ebb8b59 Author: Vinod Kone <[email protected]> Authored: Fri May 22 13:15:09 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Fri May 22 17:33:41 2015 -0700 ---------------------------------------------------------------------- src/master/allocator/mesos/hierarchical.hpp | 64 +++++++++- src/tests/hierarchical_allocator_tests.cpp | 153 +++++++++++++++++++++++ 2 files changed, 214 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/87d8bd08/src/master/allocator/mesos/hierarchical.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index 9c949b4..44fbcca 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -179,6 +179,9 @@ protected: std::string role; bool checkpoint; // Whether the framework desires checkpointing. + // Whether the framework desires revocable resources. + bool revocable; + hashset<Filter*> filters; // Active filters for the framework. }; @@ -186,7 +189,10 @@ protected: struct Slave { + // Total amount of regular *and* oversubscribed resources. Resources total; + + // Available regular *and* oversubscribed resources. Resources available; bool activated; // Whether to offer resources. @@ -210,6 +216,11 @@ protected: // Both reserved resources and unreserved resources are used // in the fairness calculation. This is because reserved // resources can be allocated to any framework in the role. + // + // Note that the hierarchical allocator considers oversubscribed + // resources as regular resources when doing fairness calculations. + // TODO(vinod): Consider using a different fairness algorithm for + // oversubscribed resources. RoleSorter* roleSorter; hashmap<std::string, FrameworkSorter*> frameworkSorters; }; @@ -327,6 +338,15 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addFramework( frameworks[frameworkId].role = frameworkInfo.role(); frameworks[frameworkId].checkpoint = frameworkInfo.checkpoint(); + // Check if the framework desires revocable resources. + frameworks[frameworkId].revocable = false; + foreach (const FrameworkInfo::Capability& capability, + frameworkInfo.capabilities()) { + if (capability.type() == FrameworkInfo::Capability::REVOCABLE_RESOURCES) { + frameworks[frameworkId].revocable = true; + } + } + LOG(INFO) << "Added framework " << frameworkId; allocate(); @@ -494,10 +514,42 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateSlave( CHECK(initialized); CHECK(slaves.contains(slaveId)); - LOG(INFO) << "Slave " << slaveId << " updated with oversubscribed resources " - << oversubscribed; + // Check that all the oversubscribed resources are revocable. + CHECK_EQ(oversubscribed, oversubscribed.revocable()); + + // Update the total resources. + + // First remove the old oversubscribed resources from the total. + slaves[slaveId].total -= slaves[slaveId].total.revocable(); + + // Now add the new estimate of oversubscribed resources. + slaves[slaveId].total += oversubscribed; + + // Now, update the total resources in the role sorter. + roleSorter->update( + slaveId, + slaves[slaveId].total.unreserved()); + + // Calculate the current allocation of oversubscribed resources. + Resources allocation; + foreachkey (const std::string& role, roles) { + allocation += roleSorter->allocation(role)[slaveId].revocable(); + } + + // Update the available resources. + + // First remove the old oversubscribed resources from available. + slaves[slaveId].available -= slaves[slaveId].available.revocable(); - // TODO(vinod): Implement this. + // Now add the new estimate of available oversubscribed resources. + slaves[slaveId].available += oversubscribed - allocation; + + LOG(INFO) << "Slave " << slaveId << " (" << slaves[slaveId].hostname + << ") updated with oversubscribed resources " << oversubscribed + << " (total: " << slaves[slaveId].total + << ", available: " << slaves[slaveId].available << ")"; + + allocate(slaveId); } @@ -819,6 +871,12 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate( slaves[slaveId].available.unreserved() + slaves[slaveId].available.reserved(role); + // Remove revocable resources if the framework has not opted + // for them. + if (!frameworks[frameworkId].revocable) { + resources -= resources.revocable(); + } + // If the resources are not allocatable, ignore. if (!allocatable(resources)) { continue; http://git-wip-us.apache.org/repos/asf/mesos/blob/87d8bd08/src/tests/hierarchical_allocator_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp index 1a43dc7..85bb29e 100644 --- a/src/tests/hierarchical_allocator_tests.cpp +++ b/src/tests/hierarchical_allocator_tests.cpp @@ -128,6 +128,16 @@ protected: return frameworkInfo; } + Resources createRevocableResources( + const string& name, + const string& value, + const string& role = "*") + { + Resource resource = Resources::parse(name, value, role).get(); + resource.mutable_revocable(); + return resource; + } + private: static void put( process::Queue<Allocation>* queue, @@ -749,6 +759,149 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation) } +// This test ensures that when oversubscribed resources are updated +// subsequent allocations properly account for that. +TEST_F(HierarchicalAllocatorTest, UpdateSlave) +{ + // Pause clock to disable periodic allocation. + Clock::pause(); + initialize(vector<string>{"role1"}); + + hashmap<FrameworkID, Resources> EMPTY; + + SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100"); + allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY); + + // Add a framework that can accept revocable resources. + FrameworkInfo framework = createFrameworkInfo("role1"); + framework.add_capabilities()->set_type( + FrameworkInfo::Capability::REVOCABLE_RESOURCES); + + allocator->addFramework( + framework.id(), framework, hashmap<SlaveID, Resources>()); + + // Initially, all the resources are allocated. + Future<Allocation> allocation = queue.get(); + AWAIT_READY(allocation); + EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); + + // Update the slave with 10 oversubscribed cpus. + Resources oversubscribed = createRevocableResources("cpus", "10"); + allocator->updateSlave(slave.id(), oversubscribed); + + // The next allocation should be for 10 oversubscribed resources. + allocation = queue.get(); + AWAIT_READY(allocation); + EXPECT_EQ(oversubscribed, Resources::sum(allocation.get().resources)); + + // Update the slave again with 12 oversubscribed cpus. + Resources oversubscribed2 = createRevocableResources("cpus", "12"); + allocator->updateSlave(slave.id(), oversubscribed2); + + // The next allocation should be for 2 oversubscribed cpus. + allocation = queue.get(); + AWAIT_READY(allocation); + EXPECT_EQ(oversubscribed2 - oversubscribed, + Resources::sum(allocation.get().resources)); + + // Update the slave again with 5 oversubscribed cpus. + Resources oversubscribed3 = createRevocableResources("cpus", "5"); + allocator->updateSlave(slave.id(), oversubscribed3); + + // Since there are no more available oversubscribed resources there + // shouldn't be an allocation. + Clock::settle(); + allocation = queue.get(); + ASSERT_TRUE(allocation.isPending()); +} + + +// This test verifies that a framework that has not opted in for +// revocable resources do not get allocated oversubscribed resources. +TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated) +{ + // Pause clock to disable periodic allocation. + Clock::pause(); + initialize(vector<string>{"role1"}); + + hashmap<FrameworkID, Resources> EMPTY; + + SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100"); + allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY); + + // Add a framework that does *not* accept revocable resources. + FrameworkInfo framework = createFrameworkInfo("role1"); + allocator->addFramework( + framework.id(), framework, hashmap<SlaveID, Resources>()); + + // Initially, all the resources are allocated. + Future<Allocation> allocation = queue.get(); + AWAIT_READY(allocation); + EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); + + // Update the slave with 10 oversubscribed cpus. + Resources oversubscribed = createRevocableResources("cpus", "10"); + allocator->updateSlave(slave.id(), oversubscribed); + + // No allocation should be made for oversubscribed resources because + // the framework has not opted in for them. + Clock::settle(); + allocation = queue.get(); + ASSERT_TRUE(allocation.isPending()); +} + + +// This test verifies that when oversubscribed resources are partially +// recovered subsequent allocation properly accounts for that. +TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources) +{ + // Pause clock to disable periodic allocation. + Clock::pause(); + initialize(vector<string>{"role1"}); + + hashmap<FrameworkID, Resources> EMPTY; + + SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100"); + allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY); + + // Add a framework that can accept revocable resources. + FrameworkInfo framework = createFrameworkInfo("role1"); + framework.add_capabilities()->set_type( + FrameworkInfo::Capability::REVOCABLE_RESOURCES); + + allocator->addFramework( + framework.id(), framework, hashmap<SlaveID, Resources>()); + + // Initially, all the resources are allocated. + Future<Allocation> allocation = queue.get(); + AWAIT_READY(allocation); + EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); + + // Update the slave with 10 oversubscribed cpus. + Resources oversubscribed = createRevocableResources("cpus", "10"); + allocator->updateSlave(slave.id(), oversubscribed); + + // The next allocation should be for 10 oversubscribed cpus. + allocation = queue.get(); + AWAIT_READY(allocation); + EXPECT_EQ(oversubscribed, Resources::sum(allocation.get().resources)); + + // Recover 6 oversubscribed cpus and 2 regular cpus. + Resources recovered = createRevocableResources("cpus", "6"); + recovered += Resources::parse("cpus:2").get(); + + allocator->recoverResources(framework.id(), slave.id(), recovered, None()); + + Clock::advance(flags.allocation_interval); + + // The next allocation should be for 6 oversubscribed and 2 regular + // cpus. + allocation = queue.get(); + AWAIT_READY(allocation); + EXPECT_EQ(recovered, Resources::sum(allocation.get().resources)); +} + + // Checks that a slave that is not whitelisted will not have its // resources get offered, and that if the whitelist is updated so // that it is whitelisted, its resources will then be offered.
