Repository: mesos Updated Branches: refs/heads/master 781414099 -> d376f05fe
Fixed allocator to do allocations per slave rather than per framework. Review: https://reviews.apache.org/r/24356 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d376f05f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d376f05f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d376f05f Branch: refs/heads/master Commit: d376f05fea7f7432a287a7b87c5d1fe44dae01c4 Parents: 7814140 Author: Vinod Kone <[email protected]> Authored: Fri Jul 18 14:11:14 2014 -0700 Committer: Vinod Kone <[email protected]> Committed: Thu Aug 7 23:32:32 2014 -0700 ---------------------------------------------------------------------- src/master/hierarchical_allocator_process.hpp | 79 ++++---- src/tests/allocator_tests.cpp | 201 ++++++++++++++++++--- 2 files changed, 220 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/d376f05f/src/master/hierarchical_allocator_process.hpp ---------------------------------------------------------------------- diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp index d81082f..34f8cd6 100644 --- a/src/master/hierarchical_allocator_process.hpp +++ b/src/master/hierarchical_allocator_process.hpp @@ -19,6 +19,9 @@ #ifndef __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__ #define __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__ +#include <algorithm> +#include <vector> + #include <mesos/resources.hpp> #include <process/delay.hpp> @@ -675,7 +678,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate( template <class RoleSorter, class FrameworkSorter> void HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate( - const hashset<SlaveID>& slaveIds) + const hashset<SlaveID>& slaveIds_) { CHECK(initialized); @@ -684,57 +687,67 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate( return; } - if (slaveIds.empty()) { + if (slaveIds_.empty()) { VLOG(1) << "No resources available to allocate!"; return; } - foreach (const std::string& role, roleSorter->sort()) { - foreach (const std::string& frameworkIdValue, sorters[role]->sort()) { - FrameworkID frameworkId; - frameworkId.set_value(frameworkIdValue); + // Randomize the order in which slaves' resources are allocated. + // TODO(vinod): Implement a smarter sorting algorithm. + std::vector<SlaveID> slaveIds(slaveIds_.begin(), slaveIds_.end()); + std::random_shuffle(slaveIds.begin(), slaveIds.end()); + + hashmap<FrameworkID, hashmap<SlaveID, Resources> > offerable; + foreach (const SlaveID& slaveId, slaveIds) { + // If the slave is not activated or whitelisted, ignore it. + if (!slaves[slaveId].activated || !slaves[slaveId].whitelisted) { + continue; + } + + foreach (const std::string& role, roleSorter->sort()) { + foreach (const std::string& frameworkIdValue, sorters[role]->sort()) { + FrameworkID frameworkId; + frameworkId.set_value(frameworkIdValue); - Resources allocatedResources; - hashmap<SlaveID, Resources> offerable; - foreach (const SlaveID& slaveId, slaveIds) { Resources unreserved = slaves[slaveId].available.extract("*"); Resources resources = unreserved; - if (role != "*") { resources += slaves[slaveId].available.extract(role); } - // Check whether or not this framework filters this slave. - bool filtered = isFiltered(frameworkId, slaveId, resources); - - if (!filtered && - slaves[slaveId].activated && - slaves[slaveId].whitelisted && - allocatable(resources)) { - VLOG(1) - << "Offering " << resources << " on slave " << slaveId - << " to framework " << frameworkId; + // If the resources are not allocatable, ignore. + if (!allocatable(resources)) { + continue; + } - offerable[slaveId] = resources; + // If the framework filters these resources, ignore. + if (isFiltered(frameworkId, slaveId, resources)) { + continue; + } - // Update framework and slave resources. - slaves[slaveId].available -= resources; + VLOG(1) + << "Offering " << resources << " on slave " << slaveId + << " to framework " << frameworkId; - // We only count resources not reserved for this role - // in the share the sorter considers. - allocatedResources += unreserved; - } - } + offerable[frameworkId][slaveId] = resources; - if (!offerable.empty()) { - sorters[role]->add(allocatedResources); - sorters[role]->allocated(frameworkIdValue, allocatedResources); - roleSorter->allocated(role, allocatedResources); + // Update slave resources. + slaves[slaveId].available -= resources; - dispatch(master, &Master::offer, frameworkId, offerable); + // Update the sorters. + // We only count resources not reserved for this role + // in the share the sorter considers. + sorters[role]->add(unreserved); + sorters[role]->allocated(frameworkIdValue, unreserved); + roleSorter->allocated(role, unreserved); } } } + + // Now offer the resources to each framework. + foreachkey (const FrameworkID& frameworkId, offerable) { + dispatch(master, &Master::offer, frameworkId, offerable[frameworkId]); + } } http://git-wip-us.apache.org/repos/asf/mesos/blob/d376f05f/src/tests/allocator_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/allocator_tests.cpp b/src/tests/allocator_tests.cpp index f0226cb..774528a 100644 --- a/src/tests/allocator_tests.cpp +++ b/src/tests/allocator_tests.cpp @@ -30,6 +30,8 @@ #include <process/gmock.hpp> #include <process/pid.hpp> +#include <stout/some.hpp> + #include "master/allocator.hpp" #include "master/detector.hpp" #include "master/hierarchical_allocator_process.hpp" @@ -82,12 +84,12 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess) EXPECT_CALL(allocator, initialize(_, _, _)); master::Flags masterFlags = CreateMasterFlags(); - masterFlags.roles = Option<string>("role1,role2"); + masterFlags.roles = Some("role1,role2"); Try<PID<Master> > master = StartMaster(&allocator, masterFlags); ASSERT_SOME(master); slave::Flags flags1 = CreateSlaveFlags(); - flags1.resources = Option<string>("cpus:2;mem:1024;disk:0"); + flags1.resources = Some("cpus:2;mem:1024;disk:0"); EXPECT_CALL(allocator, slaveAdded(_, _, _)); @@ -145,7 +147,7 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess) AWAIT_READY(frameworkAdded2); slave::Flags flags2 = CreateSlaveFlags(); - flags2.resources = Option<string>("cpus:1;mem:512;disk:0"); + flags2.resources = Some("cpus:1;mem:512;disk:0"); EXPECT_CALL(allocator, slaveAdded(_, _, _)); @@ -172,7 +174,7 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess) // framework2 share = 1 slave::Flags flags3 = CreateSlaveFlags(); - flags3.resources = Option<string>("cpus:3;mem:2048;disk:0"); + flags3.resources = Some("cpus:3;mem:2048;disk:0"); EXPECT_CALL(allocator, slaveAdded(_, _, _)); @@ -220,7 +222,7 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess) AWAIT_READY(frameworkAdded3); slave::Flags flags4 = CreateSlaveFlags(); - flags4.resources = Option<string>("cpus:4;mem:4096;disk:0"); + flags4.resources = Some("cpus:4;mem:4096;disk:0"); EXPECT_CALL(allocator, slaveAdded(_, _, _)); @@ -270,7 +272,7 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess) AWAIT_READY(frameworkAdded4); slave::Flags flags5 = CreateSlaveFlags(); - flags5.resources = Option<string>("cpus:1;mem:512;disk:0"); + flags5.resources = Some("cpus:1;mem:512;disk:0"); EXPECT_CALL(allocator, slaveAdded(_, _, _)); @@ -322,6 +324,151 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess) } +// This test ensures that allocation is done per slave. This is done +// by having 2 slaves and 2 frameworks and making sure each framework +// gets only one slave's resources during an allocation. +TEST_F(DRFAllocatorTest, PerSlaveAllocation) +{ + MockAllocatorProcess<HierarchicalDRFAllocatorProcess> allocator; + + EXPECT_CALL(allocator, initialize(_, _, _)); + + // Start the master. + // NOTE: We set a high allocation interval, so that allocator does + // allocations only based on events (framework added, slave added) + // but not due to allocation interval. This lets us tightly control + // the test expectations. + master::Flags masterFlags = CreateMasterFlags(); + masterFlags.roles = Some("role1,role2"); + masterFlags.allocation_interval = Days(1); + Try<PID<Master> > master = StartMaster(&allocator, masterFlags); + ASSERT_SOME(master); + + // Start slave 1. + slave::Flags flags1 = CreateSlaveFlags(); + flags1.resources = Some("cpus:2;mem:1024;disk:0"); + + Future<Nothing> slaveAdded1; + EXPECT_CALL(allocator, slaveAdded(_, _, _)) + .WillOnce(DoAll(InvokeSlaveAdded(&allocator), + FutureSatisfy(&slaveAdded1))); + + Try<PID<Slave> > slave1 = StartSlave(flags1); + ASSERT_SOME(slave1); + + AWAIT_READY(slaveAdded1); + + // Start slave 2. + slave::Flags flags2 = CreateSlaveFlags(); + flags2.resources = Some("cpus:2;mem:1024;disk:0"); + + Future<Nothing> slaveAdded2; + EXPECT_CALL(allocator, slaveAdded(_, _, _)) + .WillOnce(DoAll(InvokeSlaveAdded(&allocator), + FutureSatisfy(&slaveAdded2))); + + Try<PID<Slave> > slave2 = StartSlave(flags2); + ASSERT_SOME(slave2); + + AWAIT_READY(slaveAdded2); + + // Start framework 1. + FrameworkInfo frameworkInfo1; // Bug in gcc 4.1.*, must assign on next line. + frameworkInfo1 = DEFAULT_FRAMEWORK_INFO; + frameworkInfo1.set_name("framework1"); + frameworkInfo1.set_user("user1"); + frameworkInfo1.set_role("role1"); + + MockScheduler sched1; + MesosSchedulerDriver driver1( + &sched1, frameworkInfo1, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(allocator, frameworkAdded(_, _, _)); + + EXPECT_CALL(sched1, registered(_, _, _)); + + Future<Nothing> resourcesRecovered1; + Future<Nothing> resourcesRecovered2; + EXPECT_CALL(allocator, resourcesRecovered(_, _, _, _)) + .WillOnce(DoAll(InvokeResourcesRecovered(&allocator), + FutureSatisfy(&resourcesRecovered1))) + .WillOnce(DoAll(InvokeResourcesRecovered(&allocator), + FutureSatisfy(&resourcesRecovered2))); + + // Decline the offers immediately so that resources for both slaves + // are eligible for allocation to this and other frameworks. + Filters filters; + filters.set_refuse_seconds(0); + EXPECT_CALL(sched1, resourceOffers(_, _)) + .WillOnce(DeclineOffers(filters)); + + driver1.start(); + + // Wait until the resources are returned to the allocator. + // NOTE: No allocations will be made after this point until a new + // framework registers because + // 1) 'resourcesRecovered' does not trigger an allocation and + // 2) 'flags.allocation_interval' is set to a very high value. + AWAIT_READY(resourcesRecovered1); + AWAIT_READY(resourcesRecovered2); + + // Start framework 2. + FrameworkInfo frameworkInfo2; // Bug in gcc 4.1.*, must assign on next line. + frameworkInfo2 = DEFAULT_FRAMEWORK_INFO; + frameworkInfo2.set_name("framework2"); + frameworkInfo2.set_user("user2"); + frameworkInfo2.set_role("role2"); + + MockScheduler sched2; + MesosSchedulerDriver driver2( + &sched2, frameworkInfo2, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(allocator, frameworkAdded(_, _, _)); + + EXPECT_CALL(sched2, registered(_, _, _)); + + // Offers to framework 1. + Future<vector<Offer> > offers1; + EXPECT_CALL(sched1, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers1)); + + // Offers to framework 2. + Future<vector<Offer> > offers2; + EXPECT_CALL(sched2, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers2)); + + driver2.start(); + + // Now each framework should receive offers for one slave each. + AWAIT_READY(offers1); + EXPECT_THAT(offers1.get(), OfferEq(2, 1024)); + + AWAIT_READY(offers2); + EXPECT_THAT(offers2.get(), OfferEq(2, 1024)); + + // Shut everything down. + EXPECT_CALL(allocator, resourcesRecovered(_, _, _, _)) + .WillRepeatedly(DoDefault()); + + EXPECT_CALL(allocator, frameworkDeactivated(_)) + .WillRepeatedly(DoDefault()); + + EXPECT_CALL(allocator, frameworkRemoved(_)) + .WillRepeatedly(DoDefault()); + + driver1.stop(); + driver1.join(); + + driver2.stop(); + driver2.join(); + + EXPECT_CALL(allocator, slaveRemoved(_)) + .WillRepeatedly(DoDefault()); + + Shutdown(); +} + + // Helper that simply increments the value by reference. ACTION_P(Increment, value) { *value += 1; } @@ -440,7 +587,7 @@ TEST_F(ReservationAllocatorTest, ReservedResources) EXPECT_CALL(allocator, initialize(_, _, _)); master::Flags masterFlags = CreateMasterFlags(); - masterFlags.roles = Option<string>("role1,role2,role3"); + masterFlags.roles = Some("role1,role2,role3"); Try<PID<Master> > master = StartMaster(&allocator, masterFlags); ASSERT_SOME(master); @@ -455,19 +602,19 @@ TEST_F(ReservationAllocatorTest, ReservedResources) slave::Flags flags1 = CreateSlaveFlags(); flags1.default_role = "role1"; - flags1.resources = Option<string>("cpus:2;mem:1024;disk:0"); + flags1.resources = Some("cpus:2;mem:1024;disk:0"); Try<PID<Slave> > slave1 = StartSlave(flags1); ASSERT_SOME(slave1); slave::Flags flags2 = CreateSlaveFlags(); flags2.resources = - Option<string>("cpus(role2):2;mem(role2):1024;cpus:1;mem:1024;disk:0"); + Some("cpus(role2):2;mem(role2):1024;cpus:1;mem:1024;disk:0"); Try<PID<Slave> > slave2 = StartSlave(flags2); ASSERT_SOME(slave2); slave::Flags flags3 = CreateSlaveFlags(); flags3.default_role = "role3"; - flags3.resources = Option<string>("cpus:4;mem:4096;disk:0"); + flags3.resources = Some("cpus:4;mem:4096;disk:0"); Try<PID<Slave> > slave3 = StartSlave(flags3); ASSERT_SOME(slave3); @@ -475,7 +622,7 @@ TEST_F(ReservationAllocatorTest, ReservedResources) // since there is no framework for role4. slave::Flags flags4 = CreateSlaveFlags(); flags4.default_role = "role4"; - flags4.resources = Option<string>("cpus:1;mem:1024;disk:0"); + flags4.resources = Some("cpus:1;mem:1024;disk:0"); Try<PID<Slave> > slave4 = StartSlave(flags4); ASSERT_SOME(slave4); @@ -550,7 +697,7 @@ TEST_F(ReservationAllocatorTest, ReservedResources) slave::Flags flags5 = CreateSlaveFlags(); flags5.default_role = "role1"; - flags5.resources = Option<string>("cpus:1;mem:512;disk:0"); + flags5.resources = Some("cpus:1;mem:512;disk:0"); EXPECT_CALL(allocator, slaveAdded(_, _, _)); @@ -595,7 +742,7 @@ TEST_F(ReservationAllocatorTest, ResourcesReturned) EXPECT_CALL(allocator, initialize(_, _, _)); master::Flags masterFlags = CreateMasterFlags(); - masterFlags.roles = Option<string>("role1,role2"); + masterFlags.roles = Some("role1,role2"); masterFlags.allocation_interval = Milliseconds(50); Try<PID<Master> > master = StartMaster(&allocator, masterFlags); @@ -625,7 +772,7 @@ TEST_F(ReservationAllocatorTest, ResourcesReturned) // because there is no framework with role3 and the unreserved // memory can't be offered without a cpu to go with it. slave::Flags flags2 = CreateSlaveFlags(); - flags2.resources = Option<string>("cpus(role3):4;mem:1024;disk:0"); + flags2.resources = Some("cpus(role3):4;mem:1024;disk:0"); Try<PID<Slave> > slave2 = StartSlave(flags2); ASSERT_SOME(slave2); @@ -777,7 +924,7 @@ TYPED_TEST(AllocatorTest, MockAllocator) ASSERT_SOME(master); slave::Flags flags = this->CreateSlaveFlags(); - flags.resources = Option<string>("cpus:2;mem:1024;disk:0"); + flags.resources = Some("cpus:2;mem:1024;disk:0"); EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); @@ -835,7 +982,7 @@ TYPED_TEST(AllocatorTest, ResourcesUnused) MockExecutor exec(DEFAULT_EXECUTOR_ID); slave::Flags flags1 = this->CreateSlaveFlags(); - flags1.resources = Option<string>("cpus:2;mem:1024"); + flags1.resources = Some("cpus:2;mem:1024"); EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); @@ -946,7 +1093,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDispatch) ASSERT_SOME(master); slave::Flags flags1 = this->CreateSlaveFlags(); - flags1.resources = Option<string>("cpus:2;mem:1024"); + flags1.resources = Some("cpus:2;mem:1024"); EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); @@ -1079,7 +1226,7 @@ TYPED_TEST(AllocatorTest, SchedulerFailover) MockExecutor exec(DEFAULT_EXECUTOR_ID); slave::Flags flags = this->CreateSlaveFlags(); - flags.resources = Option<string>("cpus:3;mem:1024"); + flags.resources = Some("cpus:3;mem:1024"); EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); @@ -1223,7 +1370,7 @@ TYPED_TEST(AllocatorTest, FrameworkExited) slave::Flags flags = this->CreateSlaveFlags(); - flags.resources = Option<string>("cpus:3;mem:1024"); + flags.resources = Some("cpus:3;mem:1024"); EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); @@ -1358,7 +1505,7 @@ TYPED_TEST(AllocatorTest, SlaveLost) MockExecutor exec(DEFAULT_EXECUTOR_ID); slave::Flags flags1 = this->CreateSlaveFlags(); - flags1.resources = Option<string>("cpus:2;mem:1024"); + flags1.resources = Some("cpus:2;mem:1024"); EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); @@ -1469,7 +1616,7 @@ TYPED_TEST(AllocatorTest, SlaveAdded) MockExecutor exec(DEFAULT_EXECUTOR_ID); slave::Flags flags1 = this->CreateSlaveFlags(); - flags1.resources = Option<string>("cpus:3;mem:1024"); + flags1.resources = Some("cpus:3;mem:1024"); EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); @@ -1518,7 +1665,7 @@ TYPED_TEST(AllocatorTest, SlaveAdded) AWAIT_READY(launchTask); slave::Flags flags2 = this->CreateSlaveFlags(); - flags2.resources = Option<string>("cpus:4;mem:2048"); + flags2.resources = Some("cpus:4;mem:2048"); EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); @@ -1570,7 +1717,7 @@ TYPED_TEST(AllocatorTest, TaskFinished) MockExecutor exec(DEFAULT_EXECUTOR_ID); slave::Flags flags = this->CreateSlaveFlags(); - flags.resources = Option<string>("cpus:3;mem:1024"); + flags.resources = Some("cpus:3;mem:1024"); EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); @@ -1690,7 +1837,7 @@ TYPED_TEST(AllocatorTest, WhitelistSlave) EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); slave::Flags flags = this->CreateSlaveFlags(); - flags.resources = Option<string>("cpus:2;mem:1024"); + flags.resources = Some("cpus:2;mem:1024"); Try<string> hostname = os::hostname(); ASSERT_SOME(hostname); @@ -1773,7 +1920,7 @@ TYPED_TEST(AllocatorTest, RoleTest) EXPECT_CALL(this->allocator, initialize(_, _, _)); master::Flags masterFlags = this->CreateMasterFlags(); - masterFlags.roles = Option<string>("role2"); + masterFlags.roles = Some("role2"); Try<PID<Master> > master = this->StartMaster(&this->allocator, masterFlags); ASSERT_SOME(master); @@ -1862,7 +2009,7 @@ TYPED_TEST(AllocatorTest, FrameworkReregistersFirst) StandaloneMasterDetector slaveDetector(master.get()); slave::Flags flags = this->CreateSlaveFlags(); - flags.resources = Option<string>("cpus:2;mem:1024"); + flags.resources = Some("cpus:2;mem:1024"); Try<PID<Slave> > slave = this->StartSlave(&exec, &slaveDetector, flags); ASSERT_SOME(slave); @@ -1987,7 +2134,7 @@ TYPED_TEST(AllocatorTest, SlaveReregistersFirst) EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); slave::Flags flags = this->CreateSlaveFlags(); - flags.resources = Option<string>("cpus:2;mem:1024"); + flags.resources = Some("cpus:2;mem:1024"); Try<PID<Slave> > slave = this->StartSlave(&exec, &slaveDetector, flags); ASSERT_SOME(slave);
