Updated Allocator::slaveAdded to take the total resources explicitly. Review: https://reviews.apache.org/r/28665
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/31317b35 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/31317b35 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/31317b35 Branch: refs/heads/master Commit: 31317b35ba68b423565e4eeb1c3e92f69251402e Parents: 0cb4c9c Author: Benjamin Mahler <[email protected]> Authored: Tue Dec 2 18:04:32 2014 -0800 Committer: Benjamin Mahler <[email protected]> Committed: Wed Dec 3 14:59:28 2014 -0800 ---------------------------------------------------------------------- src/master/allocator.hpp | 12 +++- src/master/hierarchical_allocator_process.hpp | 4 +- src/master/master.cpp | 9 ++- src/tests/allocator_tests.cpp | 56 ++++++++-------- src/tests/mesos.hpp | 76 +++++++++++++++------- src/tests/slave_recovery_tests.cpp | 2 +- 6 files changed, 99 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/31317b35/src/master/allocator.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator.hpp b/src/master/allocator.hpp index 02d20d0..04eb2a3 100644 --- a/src/master/allocator.hpp +++ b/src/master/allocator.hpp @@ -57,9 +57,6 @@ public: virtual ~AllocatorProcess() {} - // Explicitely use 'initialize' since we're overloading below. - using process::ProcessBase::initialize; - virtual void initialize( const Flags& flags, const process::PID<Master>& master, @@ -80,9 +77,15 @@ public: virtual void frameworkDeactivated( const FrameworkID& frameworkId) = 0; + // Note that the 'total' resources are passed explicitly because it + // includes resources that are dynamically "persisted" on the slave + // (e.g. persistent volumes, dynamic reservations, etc). + // The slaveInfo resources, on the other hand, correspond directly + // to the static --resources flag value on the slave. virtual void slaveAdded( const SlaveID& slaveId, const SlaveInfo& slaveInfo, + const Resources& total, const hashmap<FrameworkID, Resources>& used) = 0; virtual void slaveRemoved( @@ -156,6 +159,7 @@ public: void slaveAdded( const SlaveID& slaveId, const SlaveInfo& slaveInfo, + const Resources& total, const hashmap<FrameworkID, Resources>& used); void slaveRemoved( @@ -268,6 +272,7 @@ inline void Allocator::frameworkDeactivated( inline void Allocator::slaveAdded( const SlaveID& slaveId, const SlaveInfo& slaveInfo, + const Resources& total, const hashmap<FrameworkID, Resources>& used) { process::dispatch( @@ -275,6 +280,7 @@ inline void Allocator::slaveAdded( &AllocatorProcess::slaveAdded, slaveId, slaveInfo, + total, used); } http://git-wip-us.apache.org/repos/asf/mesos/blob/31317b35/src/master/hierarchical_allocator_process.hpp ---------------------------------------------------------------------- diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp index c71739b..f4577bd 100644 --- a/src/master/hierarchical_allocator_process.hpp +++ b/src/master/hierarchical_allocator_process.hpp @@ -92,6 +92,7 @@ public: void slaveAdded( const SlaveID& slaveId, const SlaveInfo& slaveInfo, + const Resources& total, const hashmap<FrameworkID, Resources>& used); void slaveRemoved( @@ -394,13 +395,12 @@ void HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::slaveAdded( const SlaveID& slaveId, const SlaveInfo& slaveInfo, + const Resources& total, const hashmap<FrameworkID, Resources>& used) { CHECK(initialized); CHECK(!slaves.contains(slaveId)); - const Resources& total = slaveInfo.resources(); - roleSorter->add(total); foreachpair (const FrameworkID& frameworkId, http://git-wip-us.apache.org/repos/asf/mesos/blob/31317b35/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 99b5a20..9d92f1a 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -4408,7 +4408,14 @@ void Master::addSlave( } } - allocator->slaveAdded(slave->id, slave->info, slave->usedResources); + // TODO(bmahler): This will need to include resources that + // are "persisted" on the slave (e.g. persistent volumes, + // dynamic reservations, etc). + allocator->slaveAdded( + slave->id, + slave->info, + slave->info.resources(), + slave->usedResources); } http://git-wip-us.apache.org/repos/asf/mesos/blob/31317b35/src/tests/allocator_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/allocator_tests.cpp b/src/tests/allocator_tests.cpp index 1fcbb4a..a7ffa39 100644 --- a/src/tests/allocator_tests.cpp +++ b/src/tests/allocator_tests.cpp @@ -92,7 +92,7 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess) slave::Flags flags1 = CreateSlaveFlags(); flags1.resources = Some("cpus:2;mem:1024;disk:0"); - EXPECT_CALL(allocator, slaveAdded(_, _, _)); + EXPECT_CALL(allocator, slaveAdded(_, _, _, _)); Try<PID<Slave> > slave1 = StartSlave(flags1); ASSERT_SOME(slave1); @@ -150,7 +150,7 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess) slave::Flags flags2 = CreateSlaveFlags(); flags2.resources = Some("cpus:1;mem:512;disk:0"); - EXPECT_CALL(allocator, slaveAdded(_, _, _)); + EXPECT_CALL(allocator, slaveAdded(_, _, _, _)); Future<vector<Offer> > offers2; EXPECT_CALL(sched2, resourceOffers(_, _)) @@ -177,7 +177,7 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess) slave::Flags flags3 = CreateSlaveFlags(); flags3.resources = Some("cpus:3;mem:2048;disk:0"); - EXPECT_CALL(allocator, slaveAdded(_, _, _)); + EXPECT_CALL(allocator, slaveAdded(_, _, _, _)); Future<vector<Offer> > offers3; EXPECT_CALL(sched2, resourceOffers(_, _)) @@ -225,7 +225,7 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess) slave::Flags flags4 = CreateSlaveFlags(); flags4.resources = Some("cpus:4;mem:4096;disk:0"); - EXPECT_CALL(allocator, slaveAdded(_, _, _)); + EXPECT_CALL(allocator, slaveAdded(_, _, _, _)); Future<vector<Offer> > offers4; EXPECT_CALL(sched3, resourceOffers(_, _)) @@ -275,7 +275,7 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess) slave::Flags flags5 = CreateSlaveFlags(); flags5.resources = Some("cpus:1;mem:512;disk:0"); - EXPECT_CALL(allocator, slaveAdded(_, _, _)); + EXPECT_CALL(allocator, slaveAdded(_, _, _, _)); Future<vector<Offer> > offers5; EXPECT_CALL(sched2, resourceOffers(_, _)) @@ -350,7 +350,7 @@ TEST_F(DRFAllocatorTest, PerSlaveAllocation) flags1.resources = Some("cpus:2;mem:1024;disk:0"); Future<Nothing> slaveAdded1; - EXPECT_CALL(allocator, slaveAdded(_, _, _)) + EXPECT_CALL(allocator, slaveAdded(_, _, _, _)) .WillOnce(DoAll(InvokeSlaveAdded(&allocator), FutureSatisfy(&slaveAdded1))); @@ -364,7 +364,7 @@ TEST_F(DRFAllocatorTest, PerSlaveAllocation) flags2.resources = Some("cpus:2;mem:1024;disk:0"); Future<Nothing> slaveAdded2; - EXPECT_CALL(allocator, slaveAdded(_, _, _)) + EXPECT_CALL(allocator, slaveAdded(_, _, _, _)) .WillOnce(DoAll(InvokeSlaveAdded(&allocator), FutureSatisfy(&slaveAdded2))); @@ -549,7 +549,7 @@ TEST_F(DRFAllocatorTest, SameShareAllocations) .WillRepeatedly(DoDefault()); // Start the slave. - EXPECT_CALL(allocator, slaveAdded(_, _, _)); + EXPECT_CALL(allocator, slaveAdded(_, _, _, _)); Try<PID<Slave> > slave = StartSlave(); ASSERT_SOME(slave); @@ -594,7 +594,7 @@ TEST_F(ReservationAllocatorTest, ReservedResources) ASSERT_SOME(master); Future<Nothing> slaveAdded; - EXPECT_CALL(allocator, slaveAdded(_, _, _)) + EXPECT_CALL(allocator, slaveAdded(_, _, _, _)) .WillOnce(DoDefault()) .WillOnce(DoDefault()) .WillOnce(DoDefault()) @@ -700,7 +700,7 @@ TEST_F(ReservationAllocatorTest, ReservedResources) flags5.default_role = "role1"; flags5.resources = Some("cpus:1;mem:512;disk:0"); - EXPECT_CALL(allocator, slaveAdded(_, _, _)); + EXPECT_CALL(allocator, slaveAdded(_, _, _, _)); Future<Nothing> resourceOffers4; EXPECT_CALL(sched1, resourceOffers(_, OfferEq(1, 512))) @@ -751,7 +751,7 @@ TEST_F(ReservationAllocatorTest, ResourcesReturned) MockExecutor exec(DEFAULT_EXECUTOR_ID); - EXPECT_CALL(allocator, slaveAdded(_, _, _)) + EXPECT_CALL(allocator, slaveAdded(_, _, _, _)) .Times(2); Future<Nothing> slaveAdded1 = FUTURE_DISPATCH( @@ -926,7 +926,7 @@ TYPED_TEST(AllocatorTest, MockAllocator) slave::Flags flags = this->CreateSlaveFlags(); flags.resources = Some("cpus:2;mem:1024;disk:0"); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); Try<PID<Slave> > slave = this->StartSlave(flags); ASSERT_SOME(slave); @@ -984,7 +984,7 @@ TYPED_TEST(AllocatorTest, ResourcesUnused) slave::Flags flags1 = this->CreateSlaveFlags(); flags1.resources = Some("cpus:2;mem:1024"); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); Try<PID<Slave> > slave1 = this->StartSlave(&exec, flags1); ASSERT_SOME(slave1); @@ -1095,7 +1095,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDispatch) slave::Flags flags1 = this->CreateSlaveFlags(); flags1.resources = Some("cpus:2;mem:1024"); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); Try<PID<Slave> > slave1 = this->StartSlave(flags1); ASSERT_SOME(slave1); @@ -1228,7 +1228,7 @@ TYPED_TEST(AllocatorTest, SchedulerFailover) slave::Flags flags = this->CreateSlaveFlags(); flags.resources = Some("cpus:3;mem:1024"); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); Try<PID<Slave> > slave = this->StartSlave(&exec, flags); ASSERT_SOME(slave); @@ -1372,7 +1372,7 @@ TYPED_TEST(AllocatorTest, FrameworkExited) flags.resources = Some("cpus:3;mem:1024"); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); Try<PID<Slave> > slave = this->StartSlave(&containerizer, flags); ASSERT_SOME(slave); @@ -1513,7 +1513,7 @@ TYPED_TEST(AllocatorTest, SlaveLost) slave::Flags flags1 = this->CreateSlaveFlags(); flags1.resources = Some("cpus:2;mem:1024"); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); Try<PID<Slave> > slave1 = this->StartSlave(&exec, flags1); ASSERT_SOME(slave1); @@ -1578,7 +1578,7 @@ TYPED_TEST(AllocatorTest, SlaveLost) slave::Flags flags2 = this->CreateSlaveFlags(); flags2.resources = string("cpus:3;mem:256;disk:1024;ports:[31000-32000]"); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); // Eventually after slave2 is launched, we should get // an offer that contains all of slave2's resources @@ -1632,7 +1632,7 @@ TYPED_TEST(AllocatorTest, SlaveAdded) slave::Flags flags1 = this->CreateSlaveFlags(); flags1.resources = Some("cpus:3;mem:1024"); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); Try<PID<Slave> > slave1 = this->StartSlave(&exec, flags1); ASSERT_SOME(slave1); @@ -1681,7 +1681,7 @@ TYPED_TEST(AllocatorTest, SlaveAdded) slave::Flags flags2 = this->CreateSlaveFlags(); flags2.resources = Some("cpus:4;mem:2048"); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); // After slave2 launches, all of its resources are combined with the // resources on slave1 that the task isn't using. @@ -1733,7 +1733,7 @@ TYPED_TEST(AllocatorTest, TaskFinished) slave::Flags flags = this->CreateSlaveFlags(); flags.resources = Some("cpus:3;mem:1024"); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); Try<PID<Slave> > slave = this->StartSlave(&exec, flags); ASSERT_SOME(slave); @@ -1842,7 +1842,7 @@ TYPED_TEST(AllocatorTest, CpusOnlyOfferedAndTaskLaunched) slave::Flags flags = this->CreateSlaveFlags(); flags.resources = Some("cpus:2;mem:0"); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); Try<PID<Slave> > slave = this->StartSlave(&exec, flags); ASSERT_SOME(slave); @@ -1930,7 +1930,7 @@ TYPED_TEST(AllocatorTest, MemoryOnlyOfferedAndTaskLaunched) slave::Flags flags = this->CreateSlaveFlags(); flags.resources = Some("cpus:0;mem:200"); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); Try<PID<Slave> > slave = this->StartSlave(&exec, flags); ASSERT_SOME(slave); @@ -2024,7 +2024,7 @@ TYPED_TEST(AllocatorTest, WhitelistSlave) Try<PID<Master> > master = this->StartMaster(&this->allocator, masterFlags); ASSERT_SOME(master); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); slave::Flags flags = this->CreateSlaveFlags(); flags.resources = Some("cpus:2;mem:1024"); @@ -2194,7 +2194,7 @@ TYPED_TEST(AllocatorTest, FrameworkReregistersFirst) MockExecutor exec(DEFAULT_EXECUTOR_ID); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); StandaloneMasterDetector slaveDetector(master.get()); @@ -2269,7 +2269,7 @@ TYPED_TEST(AllocatorTest, FrameworkReregistersFirst) AWAIT_READY(frameworkAdded); - EXPECT_CALL(allocator2, slaveAdded(_, _, _)); + EXPECT_CALL(allocator2, slaveAdded(_, _, _, _)); Future<vector<Offer> > resourceOffers2; EXPECT_CALL(sched, resourceOffers(&driver, _)) @@ -2321,7 +2321,7 @@ TYPED_TEST(AllocatorTest, SlaveReregistersFirst) MockExecutor exec(DEFAULT_EXECUTOR_ID); StandaloneMasterDetector slaveDetector(master.get()); - EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + EXPECT_CALL(this->allocator, slaveAdded(_, _, _, _)); slave::Flags flags = this->CreateSlaveFlags(); flags.resources = Some("cpus:2;mem:1024"); @@ -2378,7 +2378,7 @@ TYPED_TEST(AllocatorTest, SlaveReregistersFirst) EXPECT_CALL(allocator2, initialize(_, _, _)); Future<Nothing> slaveAdded; - EXPECT_CALL(allocator2, slaveAdded(_, _, _)) + EXPECT_CALL(allocator2, slaveAdded(_, _, _, _)) .WillOnce(DoAll(InvokeSlaveAdded(&allocator2), FutureSatisfy(&slaveAdded))); http://git-wip-us.apache.org/repos/asf/mesos/blob/31317b35/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index f132c6c..94fd753 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -670,7 +670,7 @@ public: ON_CALL(*this, frameworkDeactivated(_)) .WillByDefault(InvokeFrameworkDeactivated(this)); - ON_CALL(*this, slaveAdded(_, _, _)) + ON_CALL(*this, slaveAdded(_, _, _, _)) .WillByDefault(InvokeSlaveAdded(this)); ON_CALL(*this, slaveRemoved(_)) @@ -701,29 +701,54 @@ public: process::wait(real); } - MOCK_METHOD3(initialize, void(const master::Flags&, - const process::PID<master::Master>&, - const hashmap<std::string, RoleInfo>&)); - MOCK_METHOD3(frameworkAdded, void(const FrameworkID&, - const FrameworkInfo&, - const Resources&)); - MOCK_METHOD1(frameworkRemoved, void(const FrameworkID&)); - MOCK_METHOD2(frameworkActivated, void(const FrameworkID&, - const FrameworkInfo&)); - MOCK_METHOD1(frameworkDeactivated, void(const FrameworkID&)); - MOCK_METHOD3(slaveAdded, void(const SlaveID&, - const SlaveInfo&, - const hashmap<FrameworkID, Resources>&)); - MOCK_METHOD1(slaveRemoved, void(const SlaveID&)); - MOCK_METHOD1(slaveDeactivated, void(const SlaveID&)); - MOCK_METHOD1(slaveActivated, void(const SlaveID&)); - MOCK_METHOD1(updateWhitelist, void(const Option<hashset<std::string> >&)); - MOCK_METHOD2(resourcesRequested, void(const FrameworkID&, - const std::vector<Request>&)); - MOCK_METHOD4(resourcesRecovered, void(const FrameworkID&, - const SlaveID&, - const Resources&, - const Option<Filters>& filters)); + MOCK_METHOD3(initialize, void( + const master::Flags&, + const process::PID<master::Master>&, + const hashmap<std::string, RoleInfo>&)); + + MOCK_METHOD3(frameworkAdded, void( + const FrameworkID&, + const FrameworkInfo&, + const Resources&)); + + MOCK_METHOD1(frameworkRemoved, void( + const FrameworkID&)); + + MOCK_METHOD2(frameworkActivated, void( + const FrameworkID&, + const FrameworkInfo&)); + + MOCK_METHOD1(frameworkDeactivated, void( + const FrameworkID&)); + + MOCK_METHOD4(slaveAdded, void( + const SlaveID&, + const SlaveInfo&, + const Resources&, + const hashmap<FrameworkID, Resources>&)); + + MOCK_METHOD1(slaveRemoved, void( + const SlaveID&)); + + MOCK_METHOD1(slaveDeactivated, void( + const SlaveID&)); + + MOCK_METHOD1(slaveActivated, void( + const SlaveID&)); + + MOCK_METHOD1(updateWhitelist, void( + const Option<hashset<std::string> >&)); + + MOCK_METHOD2(resourcesRequested, void( + const FrameworkID&, + const std::vector<Request>&)); + + MOCK_METHOD4(resourcesRecovered, void( + const FrameworkID&, + const SlaveID&, + const Resources&, + const Option<Filters>& filters)); + MOCK_METHOD1(offersRevived, void(const FrameworkID&)); T real; @@ -796,7 +821,8 @@ ACTION_P(InvokeSlaveAdded, allocator) &master::allocator::AllocatorProcess::slaveAdded, arg0, arg1, - arg2); + arg2, + arg3); } http://git-wip-us.apache.org/repos/asf/mesos/blob/31317b35/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index 2b6c76a..6cf5fb8 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -2207,7 +2207,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave) slave::Flags flags = this->CreateSlaveFlags(); - EXPECT_CALL(allocator, slaveAdded(_, _, _)); + EXPECT_CALL(allocator, slaveAdded(_, _, _, _)); Try<TypeParam*> containerizer1 = TypeParam::create(flags, true); ASSERT_SOME(containerizer1);
