Added fitler support for Inverse Offers. Review: https://reviews.apache.org/r/38324
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9d03297a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9d03297a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9d03297a Branch: refs/heads/master Commit: 9d03297a9064dcde3ec920db4ef66003b4d323da Parents: eec3fec Author: Artem Harutyunyan <[email protected]> Authored: Sat Sep 19 14:24:35 2015 -0400 Committer: Joris Van Remoortere <[email protected]> Committed: Sun Sep 20 14:21:15 2015 -0400 ---------------------------------------------------------------------- include/mesos/master/allocator.hpp | 3 +- src/master/allocator/mesos/allocator.hpp | 12 +- src/master/allocator/mesos/hierarchical.hpp | 205 +++++++++++++++++++++-- src/master/master.cpp | 6 +- src/tests/mesos.hpp | 11 +- 5 files changed, 215 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/include/mesos/master/allocator.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp index 7301058..3fea47f 100644 --- a/include/mesos/master/allocator.hpp +++ b/include/mesos/master/allocator.hpp @@ -160,7 +160,8 @@ public: const SlaveID& slaveId, const FrameworkID& frameworkId, const Option<UnavailableResources>& unavailableResources, - const Option<InverseOfferStatus>& status) = 0; + const Option<InverseOfferStatus>& status, + const Option<Filters>& filters = None()) = 0; // Informs the Allocator to recover resources that are considered // used by the framework. http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/allocator/mesos/allocator.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp index 4f02dd1..904dc62 100644 --- a/src/master/allocator/mesos/allocator.hpp +++ b/src/master/allocator/mesos/allocator.hpp @@ -120,7 +120,8 @@ public: const SlaveID& slaveId, const FrameworkID& frameworkId, const Option<UnavailableResources>& unavailableResources, - const Option<mesos::master::InverseOfferStatus>& status); + const Option<mesos::master::InverseOfferStatus>& status, + const Option<Filters>& filters); void recoverResources( const FrameworkID& frameworkId, @@ -228,7 +229,8 @@ public: const SlaveID& slaveId, const FrameworkID& frameworkId, const Option<UnavailableResources>& unavailableResources, - const Option<mesos::master::InverseOfferStatus>& status) = 0; + const Option<mesos::master::InverseOfferStatus>& status, + const Option<Filters>& filters = None()) = 0; virtual void recoverResources( const FrameworkID& frameworkId, @@ -489,7 +491,8 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer( const SlaveID& slaveId, const FrameworkID& frameworkId, const Option<UnavailableResources>& unavailableResources, - const Option<mesos::master::InverseOfferStatus>& status) + const Option<mesos::master::InverseOfferStatus>& status, + const Option<Filters>& filters) { return process::dispatch( process, @@ -497,7 +500,8 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer( slaveId, frameworkId, unavailableResources, - status); + status, + filters); } http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/allocator/mesos/hierarchical.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index a4c4107..d3496bc 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -52,7 +52,7 @@ namespace allocator { // Forward declarations. class OfferFilter; - +class InverseOfferFilter; // We forward declare the hierarchical allocator process so that we // can typedef an instantiation of it with DRF sorters. @@ -158,7 +158,8 @@ public: const SlaveID& slaveId, const FrameworkID& frameworkId, const Option<UnavailableResources>& unavailableResources, - const Option<mesos::master::InverseOfferStatus>& status); + const Option<mesos::master::InverseOfferStatus>& status, + const Option<Filters>& filters); void recoverResources( const FrameworkID& frameworkId, @@ -192,12 +193,18 @@ protected: // Send inverse offers from the specified slaves. void deallocate(const hashset<SlaveID>& slaveIds); - // Remove a filter for the specified framework. + // Remove an offer filter for the specified framework. void expire( const FrameworkID& frameworkId, const SlaveID& slaveId, OfferFilter* offerFilter); + // Remove an inverse offer filter for the specified framework. + void expire( + const FrameworkID& frameworkId, + const SlaveID& slaveId, + InverseOfferFilter* inverseOfferFilter); + // Checks whether the slave is whitelisted. bool isWhitelisted(const SlaveID& slaveId); @@ -208,6 +215,12 @@ protected: const SlaveID& slaveId, const Resources& resources); + // Returns true if there is an inverse offer filter for this framework + // on this slave. + bool isFiltered( + const FrameworkID& frameworkID, + const SlaveID& slaveID); + bool allocatable(const Resources& resources); bool initialized; @@ -251,8 +264,9 @@ protected: // Whether the framework desires revocable resources. bool revocable; - // Active filters on offers for the framework. + // Active offer and inverse offer filters for the framework. hashmap<SlaveID, hashset<OfferFilter*>> offerFilters; + hashmap<SlaveID, hashset<InverseOfferFilter*>> inverseOfferFilters; }; double _event_queue_dispatches() @@ -382,6 +396,41 @@ public: }; +// Used to represent "filters" for inverse offers. +// NOTE: Since this specific allocator implementation only sends inverse offers +// for maintenance primitives, and those are at the whole slave level, we only +// need to filter based on the time-out. +// If this allocator implementation starts sending out more resource specific +// inverse offers, then we can capture the `unavailableResources` in the filter +// function. +class InverseOfferFilter +{ +public: + virtual ~InverseOfferFilter() {} + + virtual bool filter() = 0; +}; + + +// NOTE: See comment above `InverseOfferFilter` regarding capturing +// `unavailableResources` if this allocator starts sending fine-grained inverse +// offers. +class RefusedInverseOfferFilter: public InverseOfferFilter +{ +public: + RefusedInverseOfferFilter(const process::Timeout& _timeout) + : timeout(_timeout) {} + + virtual bool filter() + { + // See comment above why we currently don't do more fine-grained filtering. + return timeout.remaining() > Seconds(0); + } + + const process::Timeout timeout; +}; + + template <class RoleSorter, class FrameworkSorter> void HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::initialize( @@ -541,6 +590,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateFramework( // HierarchicalAllocatorProcess::reviveOffers and // HierarchicalAllocatorProcess::expire. frameworks[frameworkId].offerFilters.clear(); + frameworks[frameworkId].inverseOfferFilters.clear(); LOG(INFO) << "Deactivated framework " << frameworkId; } @@ -860,6 +910,15 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability( // NOTE: We currently implement maintenance in the allocator to be able to // leverage state and features such as the FrameworkSorter and OfferFilter. + // We explicitly remove all filters for the inverse offers of this slave. We + // do this because we want to force frameworks to reassess the calculations + // they have made to respond to the inverse offer. Unavailability of a slave + // can have a large effect on failure domain calculations and inter-leaved + // unavailability schedules. + foreachvalue (Framework& framework, frameworks) { + framework.inverseOfferFilters.erase(slaveId); + } + // Remove any old unavailability. slaves[slaveId].maintenance = None(); @@ -879,7 +938,8 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer( const SlaveID& slaveId, const FrameworkID& frameworkId, const Option<UnavailableResources>& unavailableResources, - const Option<mesos::master::InverseOfferStatus>& status) + const Option<mesos::master::InverseOfferStatus>& status, + const Option<Filters>& filters) { CHECK(initialized); CHECK(frameworks.contains(frameworkId)); @@ -915,6 +975,58 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer( maintenance.statuses[frameworkId].CopyFrom(status.get()); } } + + // No need to install filters if `filters` is none. + if (filters.isNone()) { + return; + } + + // Create a refused resource filter. + Try<Duration> seconds = Duration::create(filters.get().refuse_seconds()); + + if (seconds.isError()) { + LOG(WARNING) << "Using the default value of 'refuse_seconds' to create " + << "the refused inverse offer filter because the input value " + << "is invalid: " << seconds.error(); + + seconds = Duration::create(Filters().refuse_seconds()); + } else if (seconds.get() < Duration::zero()) { + LOG(WARNING) << "Using the default value of 'refuse_seconds' to create " + << "the refused inverse offer filter because the input value " + << "is negative"; + + seconds = Duration::create(Filters().refuse_seconds()); + } + + CHECK_SOME(seconds); + + if (seconds.get() != Duration::zero()) { + VLOG(1) << "Framework " << frameworkId + << " filtered inverse offers from slave " << slaveId + << " for " << seconds.get(); + + // Create a new inverse offer filter and delay its expiration. + InverseOfferFilter* inverseOfferFilter = + new RefusedInverseOfferFilter(process::Timeout::in(seconds.get())); + + frameworks[frameworkId] + .inverseOfferFilters[slaveId].insert(inverseOfferFilter); + + // We need to disambiguate the function call to pick the correct + // expire() overload. + void (Self::*expireInverseOffer)( + const FrameworkID&, + const SlaveID&, + InverseOfferFilter*) = &Self::expire; + + delay( + seconds.get(), + self(), + expireInverseOffer, + frameworkId, + slaveId, + inverseOfferFilter); + } } @@ -1009,10 +1121,17 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources( frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter); + // We need to disambiguate the function call to pick the correct + // expire() overload. + void (Self::*expireOffer)( + const FrameworkID&, + const SlaveID&, + OfferFilter*) = &Self::expire; + delay( seconds.get(), self(), - &Self::expire, + expireOffer, frameworkId, slaveId, offerFilter); @@ -1038,6 +1157,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::reviveOffers( CHECK(initialized); frameworks[frameworkId].offerFilters.clear(); + frameworks[frameworkId].inverseOfferFilters.clear(); frameworks[frameworkId].quiesced = false; // We delete each actual `OfferFilter` when @@ -1239,14 +1359,26 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deallocate( // If there isn't already an outstanding inverse offer to this // framework for the specified slave. if (!maintenance.offersOutstanding.contains(frameworkId)) { + // Ignore in case the framework filters inverse offers for this + // slave. + // NOTE: Since this specific allocator implementation only sends + // inverse offers for maintenance primitives, and those are at the + // whole slave level, we only need to filter based on the + // time-out. + if (isFiltered(frameworkId, slaveId)) { + continue; + } + + const UnavailableResources unavailableResources = + UnavailableResources{ + Resources(), + maintenance.unavailability}; + // For now we send inverse offers with empty resources when the // inverse offer represents maintenance on the machine. In the // future we could be more specific about the resources on the // host, as we have the information available. - offerable[frameworkId][slaveId] = - UnavailableResources{ - Resources(), - maintenance.unavailability}; + offerable[frameworkId][slaveId] = unavailableResources; // Mark this framework as having an offer oustanding for the // specified slave. @@ -1295,6 +1427,34 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire( template <class RoleSorter, class FrameworkSorter> +void +HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire( + const FrameworkID& frameworkId, + const SlaveID& slaveId, + InverseOfferFilter* inverseOfferFilter) +{ + // The filter might have already been removed (e.g., if the + // framework no longer exists or in + // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to + // keep the address from getting reused possibly causing premature + // expiration). + if (frameworks.contains(frameworkId) && + frameworks[frameworkId].inverseOfferFilters.contains(slaveId) && + frameworks[frameworkId].inverseOfferFilters[slaveId] + .contains(inverseOfferFilter)) { + frameworks[frameworkId].inverseOfferFilters[slaveId] + .erase(inverseOfferFilter); + + if(frameworks[frameworkId].inverseOfferFilters[slaveId].empty()) { + frameworks[frameworkId].inverseOfferFilters.erase(slaveId); + } + } + + delete inverseOfferFilter; +} + + +template <class RoleSorter, class FrameworkSorter> bool HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isWhitelisted( const SlaveID& slaveId) @@ -1345,6 +1505,31 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered( template <class RoleSorter, class FrameworkSorter> +bool HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered( + const FrameworkID& frameworkId, + const SlaveID& slaveId) +{ + CHECK(frameworks.contains(frameworkId)); + CHECK(slaves.contains(slaveId)); + + if (frameworks[frameworkId].inverseOfferFilters.contains(slaveId)) { + foreach ( + InverseOfferFilter* inverseOfferFilter, + frameworks[frameworkId].inverseOfferFilters[slaveId]) { + if (inverseOfferFilter->filter()) { + VLOG(1) << "Filtered unavailability on slave " << slaveId + << " for framework " << frameworkId; + + return true; + } + } + } + + return false; +} + + +template <class RoleSorter, class FrameworkSorter> bool HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocatable( const Resources& resources) http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 5393ee8..6c0db21 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2850,7 +2850,8 @@ void Master::accept( UnavailableResources{ inverseOffer->resources(), inverseOffer->unavailability()}, - status); + status, + accept.filters()); removeInverseOffer(inverseOffer); continue; @@ -3324,7 +3325,8 @@ void Master::decline( UnavailableResources{ inverseOffer->resources(), inverseOffer->unavailability()}, - status); + status, + decline.filters()); removeInverseOffer(inverseOffer); continue; http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index dd587bb..e1c0635 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -1361,7 +1361,7 @@ ACTION_P(InvokeUpdateUnavailability, allocator) ACTION_P(InvokeUpdateInverseOffer, allocator) { - return allocator->real->updateInverseOffer(arg0, arg1, arg2, arg3); + return allocator->real->updateInverseOffer(arg0, arg1, arg2, arg3, arg4); } @@ -1499,9 +1499,9 @@ public: EXPECT_CALL(*this, updateUnavailability(_, _)) .WillRepeatedly(DoDefault()); - ON_CALL(*this, updateInverseOffer(_, _, _, _)) + ON_CALL(*this, updateInverseOffer(_, _, _, _, _)) .WillByDefault(InvokeUpdateInverseOffer(this)); - EXPECT_CALL(*this, updateInverseOffer(_, _, _, _)) + EXPECT_CALL(*this, updateInverseOffer(_, _, _, _, _)) .WillRepeatedly(DoDefault()); ON_CALL(*this, recoverResources(_, _, _, _)) @@ -1590,11 +1590,12 @@ public: const SlaveID&, const Option<Unavailability>&)); - MOCK_METHOD4(updateInverseOffer, void( + MOCK_METHOD5(updateInverseOffer, void( const SlaveID&, const FrameworkID&, const Option<UnavailableResources>&, - const Option<mesos::master::InverseOfferStatus>&)); + const Option<mesos::master::InverseOfferStatus>&, + const Option<Filters>&)); MOCK_METHOD4(recoverResources, void( const FrameworkID&,
