Maintenance Primitives: Added inverse offers. Review: https://reviews.apache.org/r/37177
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a1de99f4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a1de99f4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a1de99f4 Branch: refs/heads/master Commit: a1de99f42323d8eb1396fcd10884eaac32a93eab Parents: 8e04258 Author: Joris Van Remoortere <[email protected]> Authored: Tue Aug 25 18:41:21 2015 -0400 Committer: Joris Van Remoortere <[email protected]> Committed: Mon Sep 14 13:58:37 2015 -0400 ---------------------------------------------------------------------- include/mesos/maintenance/maintenance.hpp | 15 +++ include/mesos/master/allocator.hpp | 6 + src/master/allocator/mesos/allocator.hpp | 13 ++ src/master/allocator/mesos/hierarchical.hpp | 99 ++++++++++++++ src/master/master.cpp | 9 ++ src/master/master.hpp | 4 + src/tests/hierarchical_allocator_tests.cpp | 157 +++++++++++++---------- src/tests/master_allocator_tests.cpp | 32 ++--- src/tests/mesos.hpp | 11 +- src/tests/reservation_endpoints_tests.cpp | 20 +-- src/tests/reservation_tests.cpp | 4 +- src/tests/resource_offers_tests.cpp | 2 +- src/tests/slave_recovery_tests.cpp | 2 +- 13 files changed, 270 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/include/mesos/maintenance/maintenance.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/maintenance/maintenance.hpp b/include/mesos/maintenance/maintenance.hpp index 7fec3ff..f676d01 100644 --- a/include/mesos/maintenance/maintenance.hpp +++ b/include/mesos/maintenance/maintenance.hpp @@ -22,4 +22,19 @@ // ONLY USEFUL AFTER RUNNING PROTOC. #include <mesos/maintenance/maintenance.pb.h> +#include <mesos/resources.hpp> + +namespace mesos { + +// A wrapper for resources and unavailability used to communicate between the +// Allocator and Master in order to let the Master create InverseOffers from the +// Allocator. +struct UnavailableResources +{ + Resources resources; + Unavailability unavailability; +}; + +} // namespace mesos { + #endif // __MAINTENANCE_PROTO_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/include/mesos/master/allocator.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp index b5bfc28..18d31ef 100644 --- a/include/mesos/master/allocator.hpp +++ b/include/mesos/master/allocator.hpp @@ -25,6 +25,8 @@ // ONLY USEFUL AFTER RUNNING PROTOC. #include <mesos/master/allocator.pb.h> +#include <mesos/maintenance/maintenance.hpp> + #include <mesos/resources.hpp> #include <process/future.hpp> @@ -67,6 +69,10 @@ public: const lambda::function< void(const FrameworkID&, const hashmap<SlaveID, Resources>&)>& offerCallback, + const lambda::function< + void(const FrameworkID&, + const hashmap<SlaveID, UnavailableResources>&)>& + inverseOfferCallback, const hashmap<std::string, RoleInfo>& roles) = 0; virtual void addFramework( http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/master/allocator/mesos/allocator.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp index ee6ec58..124dd3d 100644 --- a/src/master/allocator/mesos/allocator.hpp +++ b/src/master/allocator/mesos/allocator.hpp @@ -52,6 +52,10 @@ public: const lambda::function< void(const FrameworkID&, const hashmap<SlaveID, Resources>&)>& offerCallback, + const lambda::function< + void(const FrameworkID&, + const hashmap<SlaveID, UnavailableResources>&)>& + inverseOfferCallback, const hashmap<std::string, mesos::master::RoleInfo>& roles); void addFramework( @@ -147,6 +151,10 @@ public: const lambda::function< void(const FrameworkID&, const hashmap<SlaveID, Resources>&)>& offerCallback, + const lambda::function< + void(const FrameworkID&, + const hashmap<SlaveID, UnavailableResources>&)>& + inverseOfferCallback, const hashmap<std::string, mesos::master::RoleInfo>& roles) = 0; virtual void addFramework( @@ -250,6 +258,10 @@ inline void MesosAllocator<AllocatorProcess>::initialize( const lambda::function< void(const FrameworkID&, const hashmap<SlaveID, Resources>&)>& offerCallback, + const lambda::function< + void(const FrameworkID&, + const hashmap<SlaveID, UnavailableResources>&)>& + inverseOfferCallback, const hashmap<std::string, mesos::master::RoleInfo>& roles) { process::dispatch( @@ -257,6 +269,7 @@ inline void MesosAllocator<AllocatorProcess>::initialize( &MesosAllocatorProcess::initialize, allocationInterval, offerCallback, + inverseOfferCallback, roles); } http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/master/allocator/mesos/hierarchical.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index 77a5b4c..8ae7475 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -90,6 +90,10 @@ public: const lambda::function< void(const FrameworkID&, const hashmap<SlaveID, Resources>&)>& offerCallback, + const lambda::function< + void(const FrameworkID&, + const hashmap<SlaveID, UnavailableResources>&)>& + inverseOfferCallback, const hashmap<std::string, mesos::master::RoleInfo>& roles); void addFramework( @@ -176,6 +180,9 @@ protected: // Allocate resources from the specified slaves. void allocate(const hashset<SlaveID>& slaveIds); + // Send inverse offers from the specified slaves. + void deallocate(const hashset<SlaveID>& slaveIds); + // Remove a filter for the specified framework. void expire( const FrameworkID& frameworkId, @@ -202,6 +209,10 @@ protected: void(const FrameworkID&, const hashmap<SlaveID, Resources>&)> offerCallback; + lambda::function< + void(const FrameworkID&, + const hashmap<SlaveID, UnavailableResources>&)> inverseOfferCallback; + struct Metrics { explicit Metrics(const Self& process) @@ -366,10 +377,15 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::initialize( const lambda::function< void(const FrameworkID&, const hashmap<SlaveID, Resources>&)>& _offerCallback, + const lambda::function< + void(const FrameworkID&, + const hashmap<SlaveID, UnavailableResources>&)>& + _inverseOfferCallback, const hashmap<std::string, mesos::master::RoleInfo>& _roles) { allocationInterval = _allocationInterval; offerCallback = _offerCallback; + inverseOfferCallback = _inverseOfferCallback; roles = _roles; initialized = true; @@ -1086,6 +1102,89 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate( offerCallback(frameworkId, offerable[frameworkId]); } } + + // NOTE: For now, we implement maintenance inverse offers within the + // allocator. We leverage the existing timer/cycle of offers to also do any + // "deallocation" (inverse offers) necessary to satisfy maintenance needs. + deallocate(slaveIds_); +} + + +template <class RoleSorter, class FrameworkSorter> +void +HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deallocate( + const hashset<SlaveID>& slaveIds_) +{ + if (frameworkSorters.empty()) { + LOG(ERROR) << "No frameworks specified, cannot send inverse offers!"; + return; + } + + // In this case, `offerable` is actually the slaves and/or resources that we + // want the master to create `InverseOffer`s from. + hashmap<FrameworkID, hashmap<SlaveID, UnavailableResources>> offerable; + + // For maintenance, we use the framework sorters to determine which frameworks + // have (1) reserved and / or (2) unreserved resource on the specified + // slaveIds. This way we only send inverse offers to frameworks that have the + // potential to lose something. We keep track of which frameworks already have + // an outstanding inverse offer for the given slave in the + // UnavailabilityStatus of the specific slave using the `offerOutstanding` + // flag. This is equivalent to the accounting we do for resources when we send + // regular offers. If we didn't keep track of outstanding offers then we would + // keep generating new inverse offers even though the framework had not + // responded yet. + + foreachvalue (FrameworkSorter* frameworkSorter, frameworkSorters) { + foreach (const SlaveID& slaveId, slaveIds_) { + CHECK(slaves.contains(slaveId)); + + if (slaves[slaveId].maintenance.isSome()) { + // We use a reference by alias because we intend to modify the + // `maintenance` and to improve readability. + typename Slave::Maintenance& maintenance = + slaves[slaveId].maintenance.get(); + + hashmap<std::string, Resources> allocation = + frameworkSorter->allocation(slaveId); + + foreachkey (const std::string& frameworkId_, allocation) { + FrameworkID frameworkId; + frameworkId.set_value(frameworkId_); + + // If this framework doesn't already have inverse offers for the + // specified slave. + if (!offerable[frameworkId].contains(slaveId)) { + // If there isn't already an outstanding inverse offer to this + // framework for the specified slave. + if (!maintenance.offersOutstanding.contains(frameworkId)) { + // For now we send inverse offers with empty resources when the + // inverse offer represents maintenance on the machine. In the + // future we could be more specific about the resources on the + // host, as we have the information available. + offerable[frameworkId][slaveId] = + UnavailableResources{ + Resources(), + maintenance.unavailability}; + + // Mark this framework as having an offer oustanding for the + // specified slave. + maintenance.offersOutstanding.insert(frameworkId); + } + } + } + } + } + } + + if (offerable.empty()) { + VLOG(1) << "No inverse offers to send out!"; + } else { + // Now send inverse offers to each framework. + foreachkey (const FrameworkID& frameworkId, offerable) { + inverseOfferCallback(frameworkId, offerable[frameworkId]); + } + } } http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 0b3ba56..8471735 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -621,6 +621,7 @@ void Master::initialize() allocator->initialize( flags.allocation_interval, defer(self(), &Master::offer, lambda::_1, lambda::_2), + defer(self(), &Master::inverseOffer, lambda::_1, lambda::_2), roleInfos); // Parse the whitelist. Passing Allocator::updateWhitelist() @@ -4781,6 +4782,14 @@ void Master::offer(const FrameworkID& frameworkId, } +void Master::inverseOffer( + const FrameworkID& frameworkId, + const hashmap<SlaveID, UnavailableResources>& resources) +{ + // TODO(jmlvanre): Implement this function. +} + + // TODO(vinod): If due to network partition there are two instances // of the framework that think they are leaders and try to // authenticate with master they would be stepping on each other's http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index cd71a25..1ba0837 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -491,6 +491,10 @@ public: const FrameworkID& framework, const hashmap<SlaveID, Resources>& resources); + void inverseOffer( + const FrameworkID& framework, + const hashmap<SlaveID, UnavailableResources>& resources); + // Invoked when there is a newly elected leading master. // Made public for testing purposes. void detected(const process::Future<Option<MasterInfo>>& pid); http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/hierarchical_allocator_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp index 0a24b6b..2f37c98 100644 --- a/src/tests/hierarchical_allocator_tests.cpp +++ b/src/tests/hierarchical_allocator_tests.cpp @@ -79,6 +79,13 @@ struct Allocation }; +struct Deallocation +{ + FrameworkID frameworkId; + hashmap<SlaveID, UnavailableResources> resources; +}; + + class HierarchicalAllocatorTestBase : public ::testing::Test { protected: @@ -95,9 +102,13 @@ protected: void initialize( const vector<string>& _roles, const master::Flags& _flags = master::Flags(), - const Option<lambda::function< + Option<lambda::function< void(const FrameworkID&, - const hashmap<SlaveID, Resources>&)>>& offerCallback = None()) + const hashmap<SlaveID, Resources>&)>> offerCallback = None(), + Option<lambda::function< + void(const FrameworkID&, + const hashmap<SlaveID, UnavailableResources>&)>> + inverseOfferCallback = None()) { flags = _flags; @@ -111,17 +122,35 @@ protected: roles[role] = info; } - if (offerCallback.isSome()) { - allocator->initialize( - flags.allocation_interval, - offerCallback.get(), - roles); - } else { - allocator->initialize( - flags.allocation_interval, - lambda::bind(&put, &queue, lambda::_1, lambda::_2), - roles); + if (offerCallback.isNone()) { + offerCallback = + [this](const FrameworkID& frameworkId, + const hashmap<SlaveID, Resources>& resources) { + Allocation allocation; + allocation.frameworkId = frameworkId; + allocation.resources = resources; + + allocations.put(allocation); + }; } + + if (inverseOfferCallback.isNone()) { + inverseOfferCallback = + [this](const FrameworkID& frameworkId, + const hashmap<SlaveID, UnavailableResources>& resources) { + Deallocation deallocation; + deallocation.frameworkId = frameworkId; + deallocation.resources = resources; + + deallocations.put(deallocation); + }; + } + + allocator->initialize( + flags.allocation_interval, + offerCallback.get(), + inverseOfferCallback.get(), + roles); } SlaveInfo createSlaveInfo(const string& resources) @@ -158,25 +187,13 @@ protected: return resource; } -private: - static void put( - process::Queue<Allocation>* queue, - const FrameworkID& frameworkId, - const hashmap<SlaveID, Resources>& resources) - { - Allocation allocation; - allocation.frameworkId = frameworkId; - allocation.resources = resources; - - queue->put(allocation); - } - protected: master::Flags flags; Allocator* allocator; - process::Queue<Allocation> queue; + process::Queue<Allocation> allocations; + process::Queue<Deallocation> deallocations; hashmap<string, RoleInfo> roles; @@ -224,7 +241,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF) allocator->addFramework( framework1.id(), framework1, hashmap<SlaveID, Resources>()); - Future<Allocation> allocation = queue.get(); + Future<Allocation> allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework1.id(), allocation.get().frameworkId); EXPECT_EQ(slave1.resources(), Resources::sum(allocation.get().resources)); @@ -246,7 +263,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF) // framework2 will be offered all of slave2's resources since role2 // has the lowest user share, and framework2 is its only framework. - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework2.id(), allocation.get().frameworkId); EXPECT_EQ(slave2.resources(), Resources::sum(allocation.get().resources)); @@ -266,7 +283,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF) // framework2 will be offered all of slave3's resources since role2 // has the lowest share. - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework2.id(), allocation.get().frameworkId); EXPECT_EQ(slave3.resources(), Resources::sum(allocation.get().resources)); @@ -292,7 +309,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF) // framework3 will be offered all of slave4's resources since role1 // has the lowest user share, and framework3 has the lowest share of // role1's frameworks. - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework3.id(), allocation.get().frameworkId); EXPECT_EQ(slave4.resources(), Resources::sum(allocation.get().resources)); @@ -319,7 +336,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF) // Even though framework4 doesn't have any resources, role2 has a // lower share than role1, so framework2 receives slave5's resources. - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework2.id(), allocation.get().frameworkId); EXPECT_EQ(slave5.resources(), Resources::sum(allocation.get().resources)); @@ -350,7 +367,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF) allocator->addFramework( framework1.id(), framework1, hashmap<SlaveID, Resources>()); - Future<Allocation> allocation = queue.get(); + Future<Allocation> allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework1.id(), allocation.get().frameworkId); EXPECT_EQ(slave1.resources(), Resources::sum(allocation.get().resources)); @@ -363,7 +380,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF) SlaveInfo slave2 = createSlaveInfo("cpus:2;mem:512;disk:0"); allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), EMPTY); - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework2.id(), allocation.get().frameworkId); EXPECT_EQ(slave2.resources(), Resources::sum(allocation.get().resources)); @@ -375,7 +392,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF) SlaveInfo slave3 = createSlaveInfo("cpus:2;mem:512;disk:0"); allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), EMPTY); - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework1.id(), allocation.get().frameworkId); EXPECT_EQ(slave3.resources(), Resources::sum(allocation.get().resources)); @@ -392,7 +409,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF) "cpus(role1):2;mem(role1):1024;disk(role1):0"); allocator->addSlave(slave4.id(), slave4, None(), slave4.resources(), EMPTY); - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework3.id(), allocation.get().frameworkId); EXPECT_EQ(slave4.resources(), Resources::sum(allocation.get().resources)); @@ -425,7 +442,7 @@ TEST_F(HierarchicalAllocatorTest, CoarseGrained) allocator->addFramework( framework1.id(), framework1, hashmap<SlaveID, Resources>()); - Future<Allocation> allocation = queue.get(); + Future<Allocation> allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework1.id(), allocation.get().frameworkId); EXPECT_EQ(slave1.resources() + slave2.resources(), @@ -448,28 +465,28 @@ TEST_F(HierarchicalAllocatorTest, CoarseGrained) allocator->addFramework( framework2.id(), framework2, hashmap<SlaveID, Resources>()); - hashmap<FrameworkID, Allocation> allocations; + hashmap<FrameworkID, Allocation> frameworkAllocations; - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); - allocations[allocation.get().frameworkId] = allocation.get(); + frameworkAllocations[allocation.get().frameworkId] = allocation.get(); - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); - allocations[allocation.get().frameworkId] = allocation.get(); + frameworkAllocations[allocation.get().frameworkId] = allocation.get(); // Note that slave1 and slave2 have the same resources, we don't // care which framework received which slave.. only that they each // received one. - ASSERT_TRUE(allocations.contains(framework1.id())); - ASSERT_EQ(1u, allocations[framework1.id()].resources.size()); + ASSERT_TRUE(frameworkAllocations.contains(framework1.id())); + ASSERT_EQ(1u, frameworkAllocations[framework1.id()].resources.size()); EXPECT_EQ(slave1.resources(), - Resources::sum(allocations[framework1.id()].resources)); + Resources::sum(frameworkAllocations[framework1.id()].resources)); - ASSERT_TRUE(allocations.contains(framework2.id())); - ASSERT_EQ(1u, allocations[framework1.id()].resources.size()); + ASSERT_TRUE(frameworkAllocations.contains(framework2.id())); + ASSERT_EQ(1u, frameworkAllocations[framework1.id()].resources.size()); EXPECT_EQ(slave2.resources(), - Resources::sum(allocations[framework1.id()].resources)); + Resources::sum(frameworkAllocations[framework1.id()].resources)); } @@ -501,7 +518,7 @@ TEST_F(HierarchicalAllocatorTest, SameShareFairness) hashmap<FrameworkID, size_t> counts; for (int i = 0; i < 10; i++) { - Future<Allocation> allocation = queue.get(); + Future<Allocation> allocation = allocations.get(); AWAIT_READY(allocation); counts[allocation.get().frameworkId]++; @@ -552,7 +569,7 @@ TEST_F(HierarchicalAllocatorTest, Reservations) allocator->addFramework( framework1.id(), framework1, hashmap<SlaveID, Resources>()); - Future<Allocation> allocation = queue.get(); + Future<Allocation> allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework1.id(), allocation.get().frameworkId); EXPECT_EQ(2u, allocation.get().resources.size()); @@ -566,7 +583,7 @@ TEST_F(HierarchicalAllocatorTest, Reservations) allocator->addFramework( framework2.id(), framework2, hashmap<SlaveID, Resources>()); - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework2.id(), allocation.get().frameworkId); EXPECT_EQ(1u, allocation.get().resources.size()); @@ -595,7 +612,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources) allocator->addFramework( framework1.id(), framework1, hashmap<SlaveID, Resources>()); - Future<Allocation> allocation = queue.get(); + Future<Allocation> allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework1.id(), allocation.get().frameworkId); EXPECT_EQ(1u, allocation.get().resources.size()); @@ -613,7 +630,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources) Clock::advance(flags.allocation_interval); - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework1.id(), allocation.get().frameworkId); EXPECT_EQ(1u, allocation.get().resources.size()); @@ -631,7 +648,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources) Clock::advance(flags.allocation_interval); - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework1.id(), allocation.get().frameworkId); EXPECT_EQ(1u, allocation.get().resources.size()); @@ -669,7 +686,7 @@ TEST_F(HierarchicalAllocatorTest, Allocatable) "disk:128"); allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), EMPTY); - Future<Allocation> allocation = queue.get(); + Future<Allocation> allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework.id(), allocation.get().frameworkId); EXPECT_EQ(1u, allocation.get().resources.size()); @@ -683,7 +700,7 @@ TEST_F(HierarchicalAllocatorTest, Allocatable) "disk:128"); allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), EMPTY); - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework.id(), allocation.get().frameworkId); EXPECT_EQ(1u, allocation.get().resources.size()); @@ -700,7 +717,7 @@ TEST_F(HierarchicalAllocatorTest, Allocatable) "disk:128"); allocator->addSlave(slave4.id(), slave4, None(), slave4.resources(), EMPTY); - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework.id(), allocation.get().frameworkId); EXPECT_EQ(1u, allocation.get().resources.size()); @@ -726,7 +743,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation) allocator->addFramework( framework.id(), framework, hashmap<SlaveID, Resources>()); - Future<Allocation> allocation = queue.get(); + Future<Allocation> allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework.id(), allocation.get().frameworkId); EXPECT_EQ(1u, allocation.get().resources.size()); @@ -764,7 +781,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation) Clock::advance(flags.allocation_interval); - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework.id(), allocation.get().frameworkId); EXPECT_EQ(1u, allocation.get().resources.size()); @@ -809,7 +826,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAvailableSuccess) allocator->addFramework( framework.id(), framework, hashmap<SlaveID, Resources>()); - Future<Allocation> allocation = queue.get(); + Future<Allocation> allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework.id(), allocation.get().frameworkId); EXPECT_EQ(1u, allocation.get().resources.size()); @@ -843,7 +860,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAvailableFail) allocator->addFramework( framework.id(), framework, hashmap<SlaveID, Resources>()); - Future<Allocation> allocation = queue.get(); + Future<Allocation> allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(framework.id(), allocation.get().frameworkId); EXPECT_EQ(1u, allocation.get().resources.size()); @@ -884,7 +901,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave) framework.id(), framework, hashmap<SlaveID, Resources>()); // Initially, all the resources are allocated. - Future<Allocation> allocation = queue.get(); + Future<Allocation> allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); @@ -893,7 +910,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave) allocator->updateSlave(slave.id(), oversubscribed); // The next allocation should be for 10 oversubscribed resources. - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(oversubscribed, Resources::sum(allocation.get().resources)); @@ -902,7 +919,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave) allocator->updateSlave(slave.id(), oversubscribed2); // The next allocation should be for 2 oversubscribed cpus. - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(oversubscribed2 - oversubscribed, Resources::sum(allocation.get().resources)); @@ -914,7 +931,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave) // Since there are no more available oversubscribed resources there // shouldn't be an allocation. Clock::settle(); - allocation = queue.get(); + allocation = allocations.get(); ASSERT_TRUE(allocation.isPending()); } @@ -938,7 +955,7 @@ TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated) framework.id(), framework, hashmap<SlaveID, Resources>()); // Initially, all the resources are allocated. - Future<Allocation> allocation = queue.get(); + Future<Allocation> allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); @@ -949,7 +966,7 @@ TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated) // No allocation should be made for oversubscribed resources because // the framework has not opted in for them. Clock::settle(); - allocation = queue.get(); + allocation = allocations.get(); ASSERT_TRUE(allocation.isPending()); } @@ -976,7 +993,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources) framework.id(), framework, hashmap<SlaveID, Resources>()); // Initially, all the resources are allocated. - Future<Allocation> allocation = queue.get(); + Future<Allocation> allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); @@ -985,7 +1002,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources) allocator->updateSlave(slave.id(), oversubscribed); // The next allocation should be for 10 oversubscribed cpus. - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(oversubscribed, Resources::sum(allocation.get().resources)); @@ -999,7 +1016,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources) // The next allocation should be for 6 oversubscribed and 2 regular // cpus. - allocation = queue.get(); + allocation = allocations.get(); AWAIT_READY(allocation); EXPECT_EQ(recovered, Resources::sum(allocation.get().resources)); } @@ -1028,7 +1045,7 @@ TEST_F(HierarchicalAllocatorTest, Whitelist) allocator->addFramework( framework.id(), framework, hashmap<SlaveID, Resources>()); - Future<Allocation> allocation = queue.get(); + Future<Allocation> allocation = allocations.get(); // Ensure a batch allocation is triggered. Clock::advance(flags.allocation_interval); http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/master_allocator_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_allocator_tests.cpp b/src/tests/master_allocator_tests.cpp index c6a419b..1fe3757 100644 --- a/src/tests/master_allocator_tests.cpp +++ b/src/tests/master_allocator_tests.cpp @@ -95,7 +95,7 @@ TYPED_TEST(MasterAllocatorTest, SingleFramework) { TestAllocator<TypeParam> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = this->StartMaster(&allocator); ASSERT_SOME(master); @@ -141,7 +141,7 @@ TYPED_TEST(MasterAllocatorTest, ResourcesUnused) { TestAllocator<TypeParam> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = this->StartMaster(&allocator); ASSERT_SOME(master); @@ -246,7 +246,7 @@ TYPED_TEST(MasterAllocatorTest, OutOfOrderDispatch) { TestAllocator<TypeParam> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = this->StartMaster(&allocator); ASSERT_SOME(master); @@ -374,7 +374,7 @@ TYPED_TEST(MasterAllocatorTest, SchedulerFailover) { TestAllocator<TypeParam> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = this->StartMaster(&allocator); ASSERT_SOME(master); @@ -496,7 +496,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkExited) { TestAllocator<TypeParam> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); master::Flags masterFlags = this->CreateMasterFlags(); masterFlags.allocation_interval = Milliseconds(50); @@ -642,7 +642,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveLost) { TestAllocator<TypeParam> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = this->StartMaster(&allocator); ASSERT_SOME(master); @@ -757,7 +757,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveAdded) { TestAllocator<TypeParam> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); master::Flags masterFlags = this->CreateMasterFlags(); masterFlags.allocation_interval = Milliseconds(50); @@ -851,7 +851,7 @@ TYPED_TEST(MasterAllocatorTest, TaskFinished) { TestAllocator<TypeParam> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); master::Flags masterFlags = this->CreateMasterFlags(); masterFlags.allocation_interval = Milliseconds(50); @@ -952,7 +952,7 @@ TYPED_TEST(MasterAllocatorTest, CpusOnlyOfferedAndTaskLaunched) { TestAllocator<TypeParam> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); master::Flags masterFlags = this->CreateMasterFlags(); masterFlags.allocation_interval = Milliseconds(50); @@ -1030,7 +1030,7 @@ TYPED_TEST(MasterAllocatorTest, MemoryOnlyOfferedAndTaskLaunched) { TestAllocator<TypeParam> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); master::Flags masterFlags = this->CreateMasterFlags(); masterFlags.allocation_interval = Milliseconds(50); @@ -1121,7 +1121,7 @@ TYPED_TEST(MasterAllocatorTest, Whitelist) TestAllocator<TypeParam> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Future<Nothing> updateWhitelist1; EXPECT_CALL(allocator, updateWhitelist(Option<hashset<string>>(hosts))) @@ -1161,7 +1161,7 @@ TYPED_TEST(MasterAllocatorTest, RoleTest) { TestAllocator<TypeParam> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); master::Flags masterFlags = this->CreateMasterFlags(); masterFlags.roles = Some("role2"); @@ -1253,7 +1253,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst) { TestAllocator<TypeParam> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = this->StartMaster(&allocator); ASSERT_SOME(master); @@ -1311,7 +1311,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst) { TestAllocator<TypeParam> allocator2; - EXPECT_CALL(allocator2, initialize(_, _, _)); + EXPECT_CALL(allocator2, initialize(_, _, _, _)); Future<Nothing> addFramework; EXPECT_CALL(allocator2, addFramework(_, _, _)) @@ -1378,7 +1378,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveReregistersFirst) { TestAllocator<TypeParam> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = this->StartMaster(&allocator); ASSERT_SOME(master); @@ -1435,7 +1435,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveReregistersFirst) { TestAllocator<TypeParam> allocator2; - EXPECT_CALL(allocator2, initialize(_, _, _)); + EXPECT_CALL(allocator2, initialize(_, _, _, _)); Future<Nothing> addSlave; EXPECT_CALL(allocator2, addSlave(_, _, _, _, _)) http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 477b7e4..858618f 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -1265,7 +1265,7 @@ public: ACTION_P(InvokeInitialize, allocator) { - allocator->real->initialize(arg0, arg1, arg2); + allocator->real->initialize(arg0, arg1, arg2, arg3); } @@ -1407,9 +1407,9 @@ public: // to get the best of both worlds: the ability to use 'DoDefault' // and no warnings when expectations are not explicit. - ON_CALL(*this, initialize(_, _, _)) + ON_CALL(*this, initialize(_, _, _, _)) .WillByDefault(InvokeInitialize(this)); - EXPECT_CALL(*this, initialize(_, _, _)) + EXPECT_CALL(*this, initialize(_, _, _, _)) .WillRepeatedly(DoDefault()); ON_CALL(*this, addFramework(_, _, _)) @@ -1500,11 +1500,14 @@ public: virtual ~TestAllocator() {} - MOCK_METHOD3(initialize, void( + MOCK_METHOD4(initialize, void( const Duration&, const lambda::function< void(const FrameworkID&, const hashmap<SlaveID, Resources>&)>&, + const lambda::function< + void(const FrameworkID&, + const hashmap<SlaveID, UnavailableResources>&)>&, const hashmap<std::string, mesos::master::RoleInfo>&)); MOCK_METHOD3(addFramework, void( http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/reservation_endpoints_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/reservation_endpoints_tests.cpp b/src/tests/reservation_endpoints_tests.cpp index 572a8d6..398a2e1 100644 --- a/src/tests/reservation_endpoints_tests.cpp +++ b/src/tests/reservation_endpoints_tests.cpp @@ -132,7 +132,7 @@ TEST_F(ReservationEndpointsTest, AvailableResources) { TestAllocator<> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = StartMaster(&allocator); ASSERT_SOME(master); @@ -225,7 +225,7 @@ TEST_F(ReservationEndpointsTest, ReserveOfferedResources) { TestAllocator<> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = StartMaster(&allocator); ASSERT_SOME(master); @@ -299,7 +299,7 @@ TEST_F(ReservationEndpointsTest, UnreserveOfferedResources) { TestAllocator<> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = StartMaster(&allocator); ASSERT_SOME(master); @@ -381,7 +381,7 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources) { TestAllocator<> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); master::Flags masterFlags = CreateMasterFlags(); // Turn off allocation. We're doing it manually. @@ -527,7 +527,7 @@ TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources) // Turn off allocation. We're doing it manually. masterFlags.allocation_interval = Seconds(1000); - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = StartMaster(&allocator, masterFlags); ASSERT_SOME(master); @@ -678,7 +678,7 @@ TEST_F(ReservationEndpointsTest, InsufficientResources) { TestAllocator<> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = StartMaster(&allocator); ASSERT_SOME(master); @@ -720,7 +720,7 @@ TEST_F(ReservationEndpointsTest, NoHeader) { TestAllocator<> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = StartMaster(&allocator); ASSERT_SOME(master); @@ -770,7 +770,7 @@ TEST_F(ReservationEndpointsTest, BadCredentials) { TestAllocator<> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = StartMaster(&allocator); ASSERT_SOME(master); @@ -847,7 +847,7 @@ TEST_F(ReservationEndpointsTest, NoResources) { TestAllocator<> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = StartMaster(&allocator); ASSERT_SOME(master); @@ -882,7 +882,7 @@ TEST_F(ReservationEndpointsTest, NonMatchingPrincipal) { TestAllocator<> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = StartMaster(&allocator); ASSERT_SOME(master); http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/reservation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/reservation_tests.cpp b/src/tests/reservation_tests.cpp index 91fcf0d..6b7c43c 100644 --- a/src/tests/reservation_tests.cpp +++ b/src/tests/reservation_tests.cpp @@ -410,7 +410,7 @@ TEST_F(ReservationTest, DropReserveTooLarge) masterFlags.allocation_interval = Milliseconds(50); masterFlags.roles = frameworkInfo.role(); - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = StartMaster(&allocator, masterFlags); ASSERT_SOME(master); @@ -501,7 +501,7 @@ TEST_F(ReservationTest, DropReserveStaticReservation) masterFlags.allocation_interval = Milliseconds(50); masterFlags.roles = frameworkInfo.role(); - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master>> master = StartMaster(&allocator, masterFlags); ASSERT_SOME(master); http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/resource_offers_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp index 882a9ff..af40a07 100644 --- a/src/tests/resource_offers_tests.cpp +++ b/src/tests/resource_offers_tests.cpp @@ -283,7 +283,7 @@ TEST_F(ResourceOffersTest, Request) { TestAllocator<master::allocator::HierarchicalDRFAllocator> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)) + EXPECT_CALL(allocator, initialize(_, _, _, _)) .Times(1); Try<PID<Master>> master = StartMaster(&allocator); http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index b636986..dd8f823 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -2143,7 +2143,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave) { TestAllocator<master::allocator::HierarchicalDRFAllocator> allocator; - EXPECT_CALL(allocator, initialize(_, _, _)); + EXPECT_CALL(allocator, initialize(_, _, _, _)); Try<PID<Master> > master = this->StartMaster(&allocator); ASSERT_SOME(master);
