Repository: mesos Updated Branches: refs/heads/master 38dbadc94 -> 2d0b65ede
Added static->dynamic transformation to Allocator. This improves the compilation time of Mesos significantly, allowing developers to iterate more quickly on allocator changes. Review: https://reviews.apache.org/r/38869 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d0b65ed Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d0b65ed Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d0b65ed Branch: refs/heads/master Commit: 2d0b65edeea129b427e60a3ca360fc29f77aa0d5 Parents: 38dbadc Author: Joris Van Remoortere <[email protected]> Authored: Tue Sep 29 17:01:13 2015 -0700 Committer: Joris Van Remoortere <[email protected]> Committed: Tue Oct 13 20:03:04 2015 +0200 ---------------------------------------------------------------------- src/CMakeLists.txt | 1 + src/Makefile.am | 1 + src/master/allocator/mesos/hierarchical.cpp | 1191 ++++++++++++++++++++ src/master/allocator/mesos/hierarchical.hpp | 1273 +--------------------- 4 files changed, 1249 insertions(+), 1217 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/2d0b65ed/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 536a99f..98e76ce 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -142,6 +142,7 @@ set(MASTER_SRC master/repairer.cpp master/validation.cpp master/allocator/allocator.cpp + master/allocator/mesos/hierarchical.cpp master/allocator/sorter/drf/sorter.cpp ) http://git-wip-us.apache.org/repos/asf/mesos/blob/2d0b65ed/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index d855cb8..4a0eeb8 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -495,6 +495,7 @@ libmesos_no_3rdparty_la_SOURCES = \ master/repairer.cpp \ master/validation.cpp \ master/allocator/allocator.cpp \ + master/allocator/mesos/hierarchical.cpp \ master/allocator/sorter/drf/sorter.cpp \ messages/flags.proto \ messages/messages.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/2d0b65ed/src/master/allocator/mesos/hierarchical.cpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp new file mode 100644 index 0000000..0a6f8a6 --- /dev/null +++ b/src/master/allocator/mesos/hierarchical.cpp @@ -0,0 +1,1191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "master/allocator/mesos/hierarchical.hpp" + +#include <algorithm> +#include <vector> + +#include <mesos/resources.hpp> +#include <mesos/type_utils.hpp> + +#include <process/event.hpp> +#include <process/delay.hpp> +#include <process/id.hpp> +#include <process/timeout.hpp> + +#include <stout/check.hpp> +#include <stout/hashset.hpp> +#include <stout/stopwatch.hpp> +#include <stout/stringify.hpp> + +namespace mesos { +namespace internal { +namespace master { +namespace allocator { +namespace internal { + +// Used to represent "filters" for resources unused in offers. +class OfferFilter +{ +public: + virtual ~OfferFilter() {} + + virtual bool filter(const Resources& resources) = 0; +}; + + +class RefusedOfferFilter : public OfferFilter +{ +public: + RefusedOfferFilter( + const Resources& _resources, + const process::Timeout& _timeout) + : resources(_resources), timeout(_timeout) {} + + virtual bool filter(const Resources& _resources) + { + // TODO(jieyu): Consider separating the superset check for regular + // and revocable resources. For example, frameworks might want + // more revocable resources only or non-revocable resources only, + // but currently the filter only expires if there is more of both + // revocable and non-revocable resources. + return resources.contains(_resources) && // Refused resources are superset. + timeout.remaining() > Seconds(0); + } + + const Resources resources; + const process::Timeout timeout; +}; + + +// Used to represent "filters" for inverse offers. +// NOTE: Since this specific allocator implementation only sends inverse offers +// for maintenance primitives, and those are at the whole slave level, we only +// need to filter based on the time-out. +// If this allocator implementation starts sending out more resource specific +// inverse offers, then we can capture the `unavailableResources` in the filter +// function. +class InverseOfferFilter +{ +public: + virtual ~InverseOfferFilter() {} + + virtual bool filter() = 0; +}; + + +// NOTE: See comment above `InverseOfferFilter` regarding capturing +// `unavailableResources` if this allocator starts sending fine-grained inverse +// offers. +class RefusedInverseOfferFilter : public InverseOfferFilter +{ +public: + RefusedInverseOfferFilter(const process::Timeout& _timeout) + : timeout(_timeout) {} + + virtual bool filter() + { + // See comment above why we currently don't do more fine-grained filtering. + return timeout.remaining() > Seconds(0); + } + + const process::Timeout timeout; +}; + + +void HierarchicalAllocatorProcess::initialize( + const Duration& _allocationInterval, + const lambda::function< + void(const FrameworkID&, + const hashmap<SlaveID, Resources>&)>& _offerCallback, + const lambda::function< + void(const FrameworkID&, + const hashmap<SlaveID, UnavailableResources>&)>& + _inverseOfferCallback, + const hashmap<std::string, mesos::master::RoleInfo>& _roles) +{ + allocationInterval = _allocationInterval; + offerCallback = _offerCallback; + inverseOfferCallback = _inverseOfferCallback; + roles = _roles; + initialized = true; + + roleSorter = sorterFactory->createRoleSorter(); + foreachpair ( + const std::string& name, const mesos::master::RoleInfo& roleInfo, roles) { + roleSorter->add(name, roleInfo.weight()); + frameworkSorters[name] = sorterFactory->createFrameworkSorter(); + } + + if (roleSorter->count() == 0) { + LOG(ERROR) << "No roles specified, cannot allocate resources!"; + } + + VLOG(1) << "Initialized hierarchical allocator process"; + + delay(allocationInterval, self(), &Self::batch); +} + + +void HierarchicalAllocatorProcess::addFramework( + const FrameworkID& frameworkId, + const FrameworkInfo& frameworkInfo, + const hashmap<SlaveID, Resources>& used) +{ + CHECK(initialized); + + const std::string& role = frameworkInfo.role(); + + CHECK(roles.contains(role)); + + CHECK(!frameworkSorters[role]->contains(frameworkId.value())); + frameworkSorters[role]->add(frameworkId.value()); + + // TODO(bmahler): Validate that the reserved resources have the + // framework's role. + + // Update the allocation to this framework. + foreachpair (const SlaveID& slaveId, const Resources& allocated, used) { + roleSorter->allocated(role, slaveId, allocated.unreserved()); + frameworkSorters[role]->add(slaveId, allocated); + frameworkSorters[role]->allocated(frameworkId.value(), slaveId, allocated); + } + + frameworks[frameworkId] = Framework(); + frameworks[frameworkId].role = frameworkInfo.role(); + frameworks[frameworkId].checkpoint = frameworkInfo.checkpoint(); + + // Check if the framework desires revocable resources. + frameworks[frameworkId].revocable = false; + foreach (const FrameworkInfo::Capability& capability, + frameworkInfo.capabilities()) { + if (capability.type() == FrameworkInfo::Capability::REVOCABLE_RESOURCES) { + frameworks[frameworkId].revocable = true; + } + } + + frameworks[frameworkId].suppressed = false; + + LOG(INFO) << "Added framework " << frameworkId; + + allocate(); +} + + +void HierarchicalAllocatorProcess::removeFramework( + const FrameworkID& frameworkId) +{ + CHECK(initialized); + + CHECK(frameworks.contains(frameworkId)); + const std::string& role = frameworks[frameworkId].role; + + // Might not be in 'frameworkSorters[role]' because it was previously + // deactivated and never re-added. + if (frameworkSorters[role]->contains(frameworkId.value())) { + hashmap<SlaveID, Resources> allocation = + frameworkSorters[role]->allocation(frameworkId.value()); + + foreachpair ( + const SlaveID& slaveId, const Resources& allocated, allocation) { + roleSorter->unallocated(role, slaveId, allocated.unreserved()); + frameworkSorters[role]->remove(slaveId, allocated); + } + + frameworkSorters[role]->remove(frameworkId.value()); + } + + // Do not delete the filters contained in this + // framework's `offerFilters` hashset yet, see comments in + // HierarchicalAllocatorProcess::reviveOffers and + // HierarchicalAllocatorProcess::expire. + frameworks.erase(frameworkId); + + LOG(INFO) << "Removed framework " << frameworkId; +} + + +void HierarchicalAllocatorProcess::activateFramework( + const FrameworkID& frameworkId) +{ + CHECK(initialized); + + CHECK(frameworks.contains(frameworkId)); + const std::string& role = frameworks[frameworkId].role; + + frameworkSorters[role]->activate(frameworkId.value()); + + LOG(INFO) << "Activated framework " << frameworkId; + + allocate(); +} + + +void HierarchicalAllocatorProcess::deactivateFramework( + const FrameworkID& frameworkId) +{ + CHECK(initialized); + + CHECK(frameworks.contains(frameworkId)); + const std::string& role = frameworks[frameworkId].role; + + frameworkSorters[role]->deactivate(frameworkId.value()); + + // Note that the Sorter *does not* remove the resources allocated + // to this framework. For now, this is important because if the + // framework fails over and is activated, we still want a record + // of the resources that it is using. We might be able to collapse + // the added/removed and activated/deactivated in the future. + + // Do not delete the filters contained in this + // framework's `offerFilters` hashset yet, see comments in + // HierarchicalAllocatorProcess::reviveOffers and + // HierarchicalAllocatorProcess::expire. + frameworks[frameworkId].offerFilters.clear(); + frameworks[frameworkId].inverseOfferFilters.clear(); + + LOG(INFO) << "Deactivated framework " << frameworkId; +} + + +void HierarchicalAllocatorProcess::updateFramework( + const FrameworkID& frameworkId, + const FrameworkInfo& frameworkInfo) +{ + CHECK(initialized); + + CHECK(frameworks.contains(frameworkId)); + + // TODO(jmlvanre): Once we allow frameworks to re-register with a + // new 'role' or 'checkpoint' flag, we need to update our internal + // 'frameworks' structure. See MESOS-703 for progress on allowing + // these fields to be updated. + CHECK_EQ(frameworks[frameworkId].role, frameworkInfo.role()); + CHECK_EQ(frameworks[frameworkId].checkpoint, frameworkInfo.checkpoint()); + + frameworks[frameworkId].revocable = false; + + foreach (const FrameworkInfo::Capability& capability, + frameworkInfo.capabilities()) { + if (capability.type() == FrameworkInfo::Capability::REVOCABLE_RESOURCES) { + frameworks[frameworkId].revocable = true; + } + } +} + + +void HierarchicalAllocatorProcess::addSlave( + const SlaveID& slaveId, + const SlaveInfo& slaveInfo, + const Option<Unavailability>& unavailability, + const Resources& total, + const hashmap<FrameworkID, Resources>& used) +{ + CHECK(initialized); + CHECK(!slaves.contains(slaveId)); + + roleSorter->add(slaveId, total.unreserved()); + + foreachpair (const FrameworkID& frameworkId, + const Resources& allocated, + used) { + if (frameworks.contains(frameworkId)) { + const std::string& role = frameworks[frameworkId].role; + + // TODO(bmahler): Validate that the reserved resources have the + // framework's role. + + roleSorter->allocated(role, slaveId, allocated.unreserved()); + frameworkSorters[role]->add(slaveId, allocated); + frameworkSorters[role]->allocated( + frameworkId.value(), slaveId, allocated); + } + } + + slaves[slaveId] = Slave(); + slaves[slaveId].total = total; + slaves[slaveId].allocated = Resources::sum(used); + slaves[slaveId].activated = true; + slaves[slaveId].checkpoint = slaveInfo.checkpoint(); + slaves[slaveId].hostname = slaveInfo.hostname(); + + // NOTE: We currently implement maintenance in the allocator to be able to + // leverage state and features such as the FrameworkSorter and OfferFilter. + if (unavailability.isSome()) { + slaves[slaveId].maintenance = + typename Slave::Maintenance(unavailability.get()); + } + + LOG(INFO) << "Added slave " << slaveId << " (" << slaves[slaveId].hostname + << ") with " << slaves[slaveId].total + << " (allocated: " << slaves[slaveId].allocated << ")"; + + allocate(slaveId); +} + + +void HierarchicalAllocatorProcess::removeSlave( + const SlaveID& slaveId) +{ + CHECK(initialized); + CHECK(slaves.contains(slaveId)); + + // TODO(bmahler): Per MESOS-621, this should remove the allocations + // that any frameworks have on this slave. Otherwise the caller may + // "leak" allocated resources accidentally if they forget to recover + // all the resources. Fixing this would require more information + // than what we currently track in the allocator. + + roleSorter->remove(slaveId, slaves[slaveId].total.unreserved()); + + slaves.erase(slaveId); + + // Note that we DO NOT actually delete any filters associated with + // this slave, that will occur when the delayed + // HierarchicalAllocatorProcess::expire gets invoked (or the framework + // that applied the filters gets removed). + + LOG(INFO) << "Removed slave " << slaveId; +} + + +void HierarchicalAllocatorProcess::updateSlave( + const SlaveID& slaveId, + const Resources& oversubscribed) +{ + CHECK(initialized); + CHECK(slaves.contains(slaveId)); + + // Check that all the oversubscribed resources are revocable. + CHECK_EQ(oversubscribed, oversubscribed.revocable()); + + // Update the total resources. + + // First remove the old oversubscribed resources from the total. + slaves[slaveId].total -= slaves[slaveId].total.revocable(); + + // Now add the new estimate of oversubscribed resources. + slaves[slaveId].total += oversubscribed; + + // Now, update the total resources in the role sorter. + roleSorter->update( + slaveId, + slaves[slaveId].total.unreserved()); + + LOG(INFO) << "Slave " << slaveId << " (" << slaves[slaveId].hostname << ")" + << " updated with oversubscribed resources " << oversubscribed + << " (total: " << slaves[slaveId].total + << ", allocated: " << slaves[slaveId].allocated << ")"; + + allocate(slaveId); +} + + +void HierarchicalAllocatorProcess::activateSlave( + const SlaveID& slaveId) +{ + CHECK(initialized); + CHECK(slaves.contains(slaveId)); + + slaves[slaveId].activated = true; + + LOG(INFO)<< "Slave " << slaveId << " reactivated"; +} + + +void HierarchicalAllocatorProcess::deactivateSlave( + const SlaveID& slaveId) +{ + CHECK(initialized); + CHECK(slaves.contains(slaveId)); + + slaves[slaveId].activated = false; + + LOG(INFO) << "Slave " << slaveId << " deactivated"; +} + + +void HierarchicalAllocatorProcess::updateWhitelist( + const Option<hashset<std::string>>& _whitelist) +{ + CHECK(initialized); + + whitelist = _whitelist; + + if (whitelist.isSome()) { + LOG(INFO) << "Updated slave whitelist: " << stringify(whitelist.get()); + + if (whitelist.get().empty()) { + LOG(WARNING) << "Whitelist is empty, no offers will be made!"; + } + } else { + LOG(INFO) << "Advertising offers for all slaves"; + } +} + + +void HierarchicalAllocatorProcess::requestResources( + const FrameworkID& frameworkId, + const std::vector<Request>& requests) +{ + CHECK(initialized); + + LOG(INFO) << "Received resource request from framework " << frameworkId; +} + + +void HierarchicalAllocatorProcess::updateAllocation( + const FrameworkID& frameworkId, + const SlaveID& slaveId, + const std::vector<Offer::Operation>& operations) +{ + CHECK(initialized); + CHECK(slaves.contains(slaveId)); + CHECK(frameworks.contains(frameworkId)); + + // Here we apply offer operations to the allocated resources, which + // in turns leads to an update of the total. The available resources + // remain unchanged. + + // Update the allocated resources. + Sorter* frameworkSorter = frameworkSorters[frameworks[frameworkId].role]; + + Resources frameworkAllocation = + frameworkSorter->allocation(frameworkId.value(), slaveId); + + Try<Resources> updatedFrameworkAllocation = + frameworkAllocation.apply(operations); + + CHECK_SOME(updatedFrameworkAllocation); + + frameworkSorter->update( + frameworkId.value(), + slaveId, + frameworkAllocation, + updatedFrameworkAllocation.get()); + + roleSorter->update( + frameworks[frameworkId].role, + slaveId, + frameworkAllocation.unreserved(), + updatedFrameworkAllocation.get().unreserved()); + + Try<Resources> updatedSlaveAllocation = + slaves[slaveId].allocated.apply(operations); + + CHECK_SOME(updatedSlaveAllocation); + + slaves[slaveId].allocated = updatedSlaveAllocation.get(); + + // Update the total resources. + Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations); + CHECK_SOME(updatedTotal); + + slaves[slaveId].total = updatedTotal.get(); + + LOG(INFO) << "Updated allocation of framework " << frameworkId + << " on slave " << slaveId + << " from " << frameworkAllocation + << " to " << updatedFrameworkAllocation.get(); +} + + +process::Future<Nothing> +HierarchicalAllocatorProcess::updateAvailable( + const SlaveID& slaveId, + const std::vector<Offer::Operation>& operations) +{ + CHECK(initialized); + CHECK(slaves.contains(slaveId)); + + Resources available = slaves[slaveId].total - slaves[slaveId].allocated; + + // It's possible for this 'apply' to fail here because a call to + // 'allocate' could have been enqueued by the allocator itself + // just before master's request to enqueue 'updateAvailable' + // arrives to the allocator. + // + // Master -------R------------ + // \----+ + // | + // Allocator --A-----A-U---A-- + // \___/ \___/ + // + // where A = allocate, R = reserve, U = updateAvailable + Try<Resources> updatedAvailable = available.apply(operations); + if (updatedAvailable.isError()) { + return process::Failure(updatedAvailable.error()); + } + + // Update the total resources. + Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations); + CHECK_SOME(updatedTotal); + + slaves[slaveId].total = updatedTotal.get(); + + // Now, update the total resources in the role sorter. + roleSorter->update(slaveId, slaves[slaveId].total.unreserved()); + + return Nothing(); +} + + +void HierarchicalAllocatorProcess::updateUnavailability( + const SlaveID& slaveId, + const Option<Unavailability>& unavailability) +{ + CHECK(initialized); + CHECK(slaves.contains(slaveId)); + + // NOTE: We currently implement maintenance in the allocator to be able to + // leverage state and features such as the FrameworkSorter and OfferFilter. + + // We explicitly remove all filters for the inverse offers of this slave. We + // do this because we want to force frameworks to reassess the calculations + // they have made to respond to the inverse offer. Unavailability of a slave + // can have a large effect on failure domain calculations and inter-leaved + // unavailability schedules. + foreachvalue (Framework& framework, frameworks) { + framework.inverseOfferFilters.erase(slaveId); + } + + // Remove any old unavailability. + slaves[slaveId].maintenance = None(); + + // If we have a new unavailability. + if (unavailability.isSome()) { + slaves[slaveId].maintenance = + typename Slave::Maintenance(unavailability.get()); + } + + allocate(slaveId); +} + + +void HierarchicalAllocatorProcess::updateInverseOffer( + const SlaveID& slaveId, + const FrameworkID& frameworkId, + const Option<UnavailableResources>& unavailableResources, + const Option<mesos::master::InverseOfferStatus>& status, + const Option<Filters>& filters) +{ + CHECK(initialized); + CHECK(frameworks.contains(frameworkId)); + CHECK(slaves.contains(slaveId)); + CHECK(slaves[slaveId].maintenance.isSome()); + + // NOTE: We currently implement maintenance in the allocator to be able to + // leverage state and features such as the FrameworkSorter and OfferFilter. + + // We use a reference by alias because we intend to modify the + // `maintenance` and to improve readability. + typename Slave::Maintenance& maintenance = slaves[slaveId].maintenance.get(); + + // Only handle inverse offers that we currently have outstanding. If it is not + // currently outstanding this means it is old and can be safely ignored. + if (maintenance.offersOutstanding.contains(frameworkId)) { + // We always remove the outstanding offer so that we will send a new offer + // out the next time we schedule inverse offers. + maintenance.offersOutstanding.erase(frameworkId); + + // If the response is `Some`, this means the framework responded. Otherwise + // if it is `None` the inverse offer timed out or was rescinded. + if (status.isSome()) { + // For now we don't allow frameworks to respond with `UNKNOWN`. The caller + // should guard against this. This goes against the pattern of not + // checking external invariants; however, the allocator and master are + // currently so tightly coupled that this check is valuable. + CHECK_NE( + status.get().status(), + mesos::master::InverseOfferStatus::UNKNOWN); + + // If the framework responded, we update our state to match. + maintenance.statuses[frameworkId].CopyFrom(status.get()); + } + } + + // No need to install filters if `filters` is none. + if (filters.isNone()) { + return; + } + + // Create a refused resource filter. + Try<Duration> seconds = Duration::create(filters.get().refuse_seconds()); + + if (seconds.isError()) { + LOG(WARNING) << "Using the default value of 'refuse_seconds' to create " + << "the refused inverse offer filter because the input value " + << "is invalid: " << seconds.error(); + + seconds = Duration::create(Filters().refuse_seconds()); + } else if (seconds.get() < Duration::zero()) { + LOG(WARNING) << "Using the default value of 'refuse_seconds' to create " + << "the refused inverse offer filter because the input value " + << "is negative"; + + seconds = Duration::create(Filters().refuse_seconds()); + } + + CHECK_SOME(seconds); + + if (seconds.get() != Duration::zero()) { + VLOG(1) << "Framework " << frameworkId + << " filtered inverse offers from slave " << slaveId + << " for " << seconds.get(); + + // Create a new inverse offer filter and delay its expiration. + InverseOfferFilter* inverseOfferFilter = + new RefusedInverseOfferFilter(process::Timeout::in(seconds.get())); + + frameworks[frameworkId] + .inverseOfferFilters[slaveId].insert(inverseOfferFilter); + + // We need to disambiguate the function call to pick the correct + // expire() overload. + void (Self::*expireInverseOffer)( + const FrameworkID&, + const SlaveID&, + InverseOfferFilter*) = &Self::expire; + + delay( + seconds.get(), + self(), + expireInverseOffer, + frameworkId, + slaveId, + inverseOfferFilter); + } +} + + +process::Future< + hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>> +HierarchicalAllocatorProcess::getInverseOfferStatuses() +{ + CHECK(initialized); + + hashmap< + SlaveID, + hashmap<FrameworkID, mesos::master::InverseOfferStatus>> result; + + // Make a copy of the most recent statuses. + foreachpair (const SlaveID& id, const Slave& slave, slaves) { + if (slave.maintenance.isSome()) { + result[id] = slave.maintenance.get().statuses; + } + } + + return result; +} + + +void HierarchicalAllocatorProcess::recoverResources( + const FrameworkID& frameworkId, + const SlaveID& slaveId, + const Resources& resources, + const Option<Filters>& filters) +{ + CHECK(initialized); + + if (resources.empty()) { + return; + } + + // Updated resources allocated to framework (if framework still + // exists, which it might not in the event that we dispatched + // Master::offer before we received + // MesosAllocatorProcess::removeFramework or + // MesosAllocatorProcess::deactivateFramework, in which case we will + // have already recovered all of its resources). + if (frameworks.contains(frameworkId)) { + const std::string& role = frameworks[frameworkId].role; + + CHECK(frameworkSorters.contains(role)); + + if (frameworkSorters[role]->contains(frameworkId.value())) { + frameworkSorters[role]->unallocated( + frameworkId.value(), slaveId, resources); + frameworkSorters[role]->remove(slaveId, resources); + roleSorter->unallocated(role, slaveId, resources.unreserved()); + } + } + + // Update resources allocated on slave (if slave still exists, + // which it might not in the event that we dispatched Master::offer + // before we received Allocator::removeSlave). + if (slaves.contains(slaveId)) { + // NOTE: We cannot add the following CHECK due to the double + // precision errors. See MESOS-1187 for details. + // CHECK(slaves[slaveId].allocated.contains(resources)); + + slaves[slaveId].allocated -= resources; + + LOG(INFO) << "Recovered " << resources + << " (total: " << slaves[slaveId].total + << ", allocated: " << slaves[slaveId].allocated + << ") on slave " << slaveId + << " from framework " << frameworkId; + } + + // No need to install the filter if 'filters' is none. + if (filters.isNone()) { + return; + } + + // No need to install the filter if slave/framework does not exist. + if (!frameworks.contains(frameworkId) || !slaves.contains(slaveId)) { + return; + } + + // Create a refused resources filter. + Try<Duration> seconds = Duration::create(filters.get().refuse_seconds()); + + if (seconds.isError()) { + LOG(WARNING) << "Using the default value of 'refuse_seconds' to create " + << "the refused resources filter because the input value " + << "is invalid: " << seconds.error(); + + seconds = Duration::create(Filters().refuse_seconds()); + } else if (seconds.get() < Duration::zero()) { + LOG(WARNING) << "Using the default value of 'refuse_seconds' to create " + << "the refused resources filter because the input value " + << "is negative"; + + seconds = Duration::create(Filters().refuse_seconds()); + } + + CHECK_SOME(seconds); + + if (seconds.get() != Duration::zero()) { + VLOG(1) << "Framework " << frameworkId + << " filtered slave " << slaveId + << " for " << seconds.get(); + + // Create a new filter and delay its expiration. + OfferFilter* offerFilter = new RefusedOfferFilter( + resources, + process::Timeout::in(seconds.get())); + + frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter); + + // We need to disambiguate the function call to pick the correct + // expire() overload. + void (Self::*expireOffer)( + const FrameworkID&, + const SlaveID&, + OfferFilter*) = &Self::expire; + + delay( + seconds.get(), + self(), + expireOffer, + frameworkId, + slaveId, + offerFilter); + } +} + + +void HierarchicalAllocatorProcess::suppressOffers( + const FrameworkID& frameworkId) +{ + CHECK(initialized); + frameworks[frameworkId].suppressed = true; + + LOG(INFO) << "Suppressed offers for framework " << frameworkId; +} + + +void HierarchicalAllocatorProcess::reviveOffers( + const FrameworkID& frameworkId) +{ + CHECK(initialized); + + frameworks[frameworkId].offerFilters.clear(); + frameworks[frameworkId].inverseOfferFilters.clear(); + frameworks[frameworkId].suppressed = false; + + // We delete each actual `OfferFilter` when + // `HierarchicalAllocatorProcess::expire` gets invoked. If we delete the + // `OfferFilter` here it's possible that the same `OfferFilter` (i.e., same + // address) could get reused and `HierarchicalAllocatorProcess::expire` + // would expire that filter too soon. Note that this only works + // right now because ALL Filter types "expire". + + LOG(INFO) << "Removed offer filters for framework " << frameworkId; + + allocate(); +} + + +void HierarchicalAllocatorProcess::batch() +{ + allocate(); + delay(allocationInterval, self(), &Self::batch); +} + + +void HierarchicalAllocatorProcess::allocate() +{ + Stopwatch stopwatch; + stopwatch.start(); + + allocate(slaves.keys()); + + VLOG(1) << "Performed allocation for " << slaves.size() << " slaves in " + << stopwatch.elapsed(); +} + + +void HierarchicalAllocatorProcess::allocate( + const SlaveID& slaveId) +{ + Stopwatch stopwatch; + stopwatch.start(); + + // TODO(bmahler): Add initializer list constructor for hashset. + hashset<SlaveID> slaves; + slaves.insert(slaveId); + allocate(slaves); + + VLOG(1) << "Performed allocation for slave " << slaveId << " in " + << stopwatch.elapsed(); +} + + +void HierarchicalAllocatorProcess::allocate( + const hashset<SlaveID>& slaveIds_) +{ + if (roleSorter->count() == 0) { + LOG(ERROR) << "No roles specified, cannot allocate resources!"; + return; + } + + // Compute the offerable resources, per framework: + // (1) For reserved resources on the slave, allocate these to a + // framework having the corresponding role. + // (2) For unreserved resources on the slave, allocate these + // to a framework of any role. + hashmap<FrameworkID, hashmap<SlaveID, Resources>> offerable; + + // Randomize the order in which slaves' resources are allocated. + // TODO(vinod): Implement a smarter sorting algorithm. + std::vector<SlaveID> slaveIds(slaveIds_.begin(), slaveIds_.end()); + std::random_shuffle(slaveIds.begin(), slaveIds.end()); + + foreach (const SlaveID& slaveId, slaveIds) { + // Don't send offers for non-whitelisted and deactivated slaves. + if (!isWhitelisted(slaveId) || !slaves[slaveId].activated) { + continue; + } + + foreach (const std::string& role, roleSorter->sort()) { + foreach (const std::string& frameworkId_, + frameworkSorters[role]->sort()) { + FrameworkID frameworkId; + frameworkId.set_value(frameworkId_); + + // If the framework has suppressed offers, ignore. + if (frameworks[frameworkId].suppressed) { + continue; + } + + // Calculate the currently available resources on the slave. + Resources available = slaves[slaveId].total - slaves[slaveId].allocated; + + // NOTE: Currently, frameworks are allowed to have '*' role. + // Calling reserved('*') returns an empty Resources object. + Resources resources = available.unreserved() + available.reserved(role); + + // Remove revocable resources if the framework has not opted + // for them. + if (!frameworks[frameworkId].revocable) { + resources -= resources.revocable(); + } + + // If the resources are not allocatable, ignore. + if (!allocatable(resources)) { + continue; + } + + // If the framework filters these resources, ignore. + if (isFiltered(frameworkId, slaveId, resources)) { + continue; + } + + VLOG(2) << "Allocating " << resources << " on slave " << slaveId + << " to framework " << frameworkId; + + // Note that we perform "coarse-grained" allocation, + // meaning that we always allocate the entire remaining + // slave resources to a single framework. + offerable[frameworkId][slaveId] = resources; + slaves[slaveId].allocated += resources; + + // Reserved resources are only accounted for in the framework + // sorter, since the reserved resources are not shared across + // roles. + frameworkSorters[role]->add(slaveId, resources); + frameworkSorters[role]->allocated(frameworkId_, slaveId, resources); + roleSorter->allocated(role, slaveId, resources.unreserved()); + } + } + } + + if (offerable.empty()) { + VLOG(1) << "No resources available to allocate!"; + } else { + // Now offer the resources to each framework. + foreachkey (const FrameworkID& frameworkId, offerable) { + offerCallback(frameworkId, offerable[frameworkId]); + } + } + + // NOTE: For now, we implement maintenance inverse offers within the + // allocator. We leverage the existing timer/cycle of offers to also do any + // "deallocation" (inverse offers) necessary to satisfy maintenance needs. + deallocate(slaveIds_); +} + + +void HierarchicalAllocatorProcess::deallocate( + const hashset<SlaveID>& slaveIds_) +{ + if (frameworkSorters.empty()) { + LOG(ERROR) << "No frameworks specified, cannot send inverse offers!"; + return; + } + + // In this case, `offerable` is actually the slaves and/or resources that we + // want the master to create `InverseOffer`s from. + hashmap<FrameworkID, hashmap<SlaveID, UnavailableResources>> offerable; + + // For maintenance, we use the framework sorters to determine which frameworks + // have (1) reserved and / or (2) unreserved resource on the specified + // slaveIds. This way we only send inverse offers to frameworks that have the + // potential to lose something. We keep track of which frameworks already have + // an outstanding inverse offer for the given slave in the + // UnavailabilityStatus of the specific slave using the `offerOutstanding` + // flag. This is equivalent to the accounting we do for resources when we send + // regular offers. If we didn't keep track of outstanding offers then we would + // keep generating new inverse offers even though the framework had not + // responded yet. + + foreachvalue (Sorter* frameworkSorter, frameworkSorters) { + foreach (const SlaveID& slaveId, slaveIds_) { + CHECK(slaves.contains(slaveId)); + + if (slaves[slaveId].maintenance.isSome()) { + // We use a reference by alias because we intend to modify the + // `maintenance` and to improve readability. + typename Slave::Maintenance& maintenance = + slaves[slaveId].maintenance.get(); + + hashmap<std::string, Resources> allocation = + frameworkSorter->allocation(slaveId); + + foreachkey (const std::string& frameworkId_, allocation) { + FrameworkID frameworkId; + frameworkId.set_value(frameworkId_); + + // If this framework doesn't already have inverse offers for the + // specified slave. + if (!offerable[frameworkId].contains(slaveId)) { + // If there isn't already an outstanding inverse offer to this + // framework for the specified slave. + if (!maintenance.offersOutstanding.contains(frameworkId)) { + // Ignore in case the framework filters inverse offers for this + // slave. + // NOTE: Since this specific allocator implementation only sends + // inverse offers for maintenance primitives, and those are at the + // whole slave level, we only need to filter based on the + // time-out. + if (isFiltered(frameworkId, slaveId)) { + continue; + } + + const UnavailableResources unavailableResources = + UnavailableResources{ + Resources(), + maintenance.unavailability}; + + // For now we send inverse offers with empty resources when the + // inverse offer represents maintenance on the machine. In the + // future we could be more specific about the resources on the + // host, as we have the information available. + offerable[frameworkId][slaveId] = unavailableResources; + + // Mark this framework as having an offer oustanding for the + // specified slave. + maintenance.offersOutstanding.insert(frameworkId); + } + } + } + } + } + } + + if (offerable.empty()) { + VLOG(1) << "No inverse offers to send out!"; + } else { + // Now send inverse offers to each framework. + foreachkey (const FrameworkID& frameworkId, offerable) { + inverseOfferCallback(frameworkId, offerable[frameworkId]); + } + } +} + + +void HierarchicalAllocatorProcess::expire( + const FrameworkID& frameworkId, + const SlaveID& slaveId, + OfferFilter* offerFilter) +{ + // The filter might have already been removed (e.g., if the + // framework no longer exists or in + // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to + // keep the address from getting reused possibly causing premature + // expiration). + if (frameworks.contains(frameworkId) && + frameworks[frameworkId].offerFilters.contains(slaveId) && + frameworks[frameworkId].offerFilters[slaveId].contains(offerFilter)) { + frameworks[frameworkId].offerFilters[slaveId].erase(offerFilter); + if (frameworks[frameworkId].offerFilters[slaveId].empty()) { + frameworks[frameworkId].offerFilters.erase(slaveId); + } + } + + delete offerFilter; +} + + +void HierarchicalAllocatorProcess::expire( + const FrameworkID& frameworkId, + const SlaveID& slaveId, + InverseOfferFilter* inverseOfferFilter) +{ + // The filter might have already been removed (e.g., if the + // framework no longer exists or in + // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to + // keep the address from getting reused possibly causing premature + // expiration). + if (frameworks.contains(frameworkId) && + frameworks[frameworkId].inverseOfferFilters.contains(slaveId) && + frameworks[frameworkId].inverseOfferFilters[slaveId] + .contains(inverseOfferFilter)) { + frameworks[frameworkId].inverseOfferFilters[slaveId] + .erase(inverseOfferFilter); + + if(frameworks[frameworkId].inverseOfferFilters[slaveId].empty()) { + frameworks[frameworkId].inverseOfferFilters.erase(slaveId); + } + } + + delete inverseOfferFilter; +} + + +bool +HierarchicalAllocatorProcess::isWhitelisted( + const SlaveID& slaveId) +{ + CHECK(slaves.contains(slaveId)); + + return whitelist.isNone() || + whitelist.get().contains(slaves[slaveId].hostname); +} + + +bool +HierarchicalAllocatorProcess::isFiltered( + const FrameworkID& frameworkId, + const SlaveID& slaveId, + const Resources& resources) +{ + CHECK(frameworks.contains(frameworkId)); + CHECK(slaves.contains(slaveId)); + + // Do not offer a non-checkpointing slave's resources to a checkpointing + // framework. This is a short term fix until the following is resolved: + // https://issues.apache.org/jira/browse/MESOS-444. + if (frameworks[frameworkId].checkpoint && !slaves[slaveId].checkpoint) { + VLOG(1) << "Filtered offer with " << resources + << " on non-checkpointing slave " << slaveId + << " for checkpointing framework " << frameworkId; + + return true; + } + + if (frameworks[frameworkId].offerFilters.contains(slaveId)) { + foreach ( + OfferFilter* offerFilter, frameworks[frameworkId].offerFilters[slaveId]) { + if (offerFilter->filter(resources)) { + VLOG(1) << "Filtered offer with " << resources + << " on slave " << slaveId + << " for framework " << frameworkId; + + return true; + } + } + } + + return false; +} + + +bool HierarchicalAllocatorProcess::isFiltered( + const FrameworkID& frameworkId, + const SlaveID& slaveId) +{ + CHECK(frameworks.contains(frameworkId)); + CHECK(slaves.contains(slaveId)); + + if (frameworks[frameworkId].inverseOfferFilters.contains(slaveId)) { + foreach ( + InverseOfferFilter* inverseOfferFilter, + frameworks[frameworkId].inverseOfferFilters[slaveId]) { + if (inverseOfferFilter->filter()) { + VLOG(1) << "Filtered unavailability on slave " << slaveId + << " for framework " << frameworkId; + + return true; + } + } + } + + return false; +} + + +bool +HierarchicalAllocatorProcess::allocatable( + const Resources& resources) +{ + Option<double> cpus = resources.cpus(); + Option<Bytes> mem = resources.mem(); + + return (cpus.isSome() && cpus.get() >= MIN_CPUS) || + (mem.isSome() && mem.get() >= MIN_MEM); +} + +} // namespace internal { +} // namespace allocator { +} // namespace master { +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/2d0b65ed/src/master/allocator/mesos/hierarchical.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index d57c55e..e468b5a 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -19,26 +19,19 @@ #ifndef __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__ #define __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__ -#include <algorithm> -#include <vector> +#include <string> -#include <mesos/resources.hpp> -#include <mesos/type_utils.hpp> +#include <mesos/mesos.hpp> -#include <process/event.hpp> -#include <process/delay.hpp> #include <process/future.hpp> #include <process/id.hpp> #include <process/metrics/gauge.hpp> #include <process/metrics/metrics.hpp> -#include <process/timeout.hpp> -#include <stout/check.hpp> #include <stout/duration.hpp> #include <stout/hashmap.hpp> #include <stout/hashset.hpp> -#include <stout/stopwatch.hpp> -#include <stout/stringify.hpp> +#include <stout/option.hpp> #include "master/allocator/mesos/allocator.hpp" #include "master/allocator/sorter/drf/sorter.hpp" @@ -50,10 +43,6 @@ namespace internal { namespace master { namespace allocator { -// Forward declarations. -class OfferFilter; -class InverseOfferFilter; - // We forward declare the hierarchical allocator process so that we // can typedef an instantiation of it with DRF sorters. template <typename RoleSorter, typename FrameworkSorter> @@ -66,16 +55,52 @@ typedef MesosAllocator<HierarchicalDRFAllocatorProcess> HierarchicalDRFAllocator; +namespace internal { + +// Forward declarations. +class OfferFilter; +class InverseOfferFilter; + + +// A level of indirection that allows us to keep the allocator implementation +// in an implementation file: `hierarchical.cpp`. This maps the static +// templatization of `HierarchicalAllocatorProcess` to a polymorphic +// implementation in the internal namespace. +struct SorterFactoryBase +{ + virtual ~SorterFactoryBase() {} + + virtual Sorter* createRoleSorter() const = 0; + + virtual Sorter* createFrameworkSorter() const = 0; +}; + + +template <typename RoleSorter, typename FrameworkSorter> +struct SorterFactory : public SorterFactoryBase +{ + virtual Sorter* createRoleSorter() const + { + return new RoleSorter(); + } + + virtual Sorter* createFrameworkSorter() const + { + return new FrameworkSorter(); + } +}; + + // Implements the basic allocator algorithm - first pick a role by // some criteria, then pick one of their frameworks to allocate to. -template <typename RoleSorter, typename FrameworkSorter> class HierarchicalAllocatorProcess : public MesosAllocatorProcess { public: - HierarchicalAllocatorProcess() + HierarchicalAllocatorProcess(SorterFactoryBase* _sorterFactory) : ProcessBase(process::ID::generate("hierarchical-allocator")), initialized(false), metrics(*this), + sorterFactory(_sorterFactory), roleSorter(NULL) {} virtual ~HierarchicalAllocatorProcess() {} @@ -179,8 +204,8 @@ public: protected: // Useful typedefs for dispatch/delay/defer to self()/this. - typedef HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> Self; - typedef HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> This; + typedef HierarchicalAllocatorProcess Self; + typedef HierarchicalAllocatorProcess This; // Callback for doing batch allocations. void batch(); @@ -361,1215 +386,29 @@ protected: // resources as regular resources when doing fairness calculations. // TODO(vinod): Consider using a different fairness algorithm for // oversubscribed resources. - RoleSorter* roleSorter; - hashmap<std::string, FrameworkSorter*> frameworkSorters; -}; - - -// Used to represent "filters" for resources unused in offers. -class OfferFilter -{ -public: - virtual ~OfferFilter() {} - - virtual bool filter(const Resources& resources) = 0; -}; - - -class RefusedOfferFilter: public OfferFilter -{ -public: - RefusedOfferFilter( - const Resources& _resources, - const process::Timeout& _timeout) - : resources(_resources), timeout(_timeout) {} - - virtual bool filter(const Resources& _resources) - { - // TODO(jieyu): Consider separating the superset check for regular - // and revocable resources. For example, frameworks might want - // more revocable resources only or non-revocable resources only, - // but currently the filter only expires if there is more of both - // revocable and non-revocable resources. - return resources.contains(_resources) && // Refused resources are superset. - timeout.remaining() > Seconds(0); - } - - const Resources resources; - const process::Timeout timeout; + SorterFactoryBase* sorterFactory; + Sorter* roleSorter; + hashmap<std::string, Sorter*> frameworkSorters; }; -// Used to represent "filters" for inverse offers. -// NOTE: Since this specific allocator implementation only sends inverse offers -// for maintenance primitives, and those are at the whole slave level, we only -// need to filter based on the time-out. -// If this allocator implementation starts sending out more resource specific -// inverse offers, then we can capture the `unavailableResources` in the filter -// function. -class InverseOfferFilter -{ -public: - virtual ~InverseOfferFilter() {} - - virtual bool filter() = 0; -}; +} // namespace internal { -// NOTE: See comment above `InverseOfferFilter` regarding capturing -// `unavailableResources` if this allocator starts sending fine-grained inverse -// offers. -class RefusedInverseOfferFilter: public InverseOfferFilter +// We map the templatized version of the `HierarchicalAllocatorProcess` to one +// that relies on polymorphic sorters in the internal namespace. This allows us +// to keep the implemention of the allocator in the implementation file. +template <typename RoleSorter, typename FrameworkSorter> +class HierarchicalAllocatorProcess + : public internal::HierarchicalAllocatorProcess { public: - RefusedInverseOfferFilter(const process::Timeout& _timeout) - : timeout(_timeout) {} - - virtual bool filter() - { - // See comment above why we currently don't do more fine-grained filtering. - return timeout.remaining() > Seconds(0); - } - - const process::Timeout timeout; + HierarchicalAllocatorProcess() + : internal::HierarchicalAllocatorProcess( + new internal::SorterFactory<RoleSorter, FrameworkSorter>()) {} }; -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::initialize( - const Duration& _allocationInterval, - const lambda::function< - void(const FrameworkID&, - const hashmap<SlaveID, Resources>&)>& _offerCallback, - const lambda::function< - void(const FrameworkID&, - const hashmap<SlaveID, UnavailableResources>&)>& - _inverseOfferCallback, - const hashmap<std::string, mesos::master::RoleInfo>& _roles) -{ - allocationInterval = _allocationInterval; - offerCallback = _offerCallback; - inverseOfferCallback = _inverseOfferCallback; - roles = _roles; - initialized = true; - - roleSorter = new RoleSorter(); - foreachpair ( - const std::string& name, const mesos::master::RoleInfo& roleInfo, roles) { - roleSorter->add(name, roleInfo.weight()); - frameworkSorters[name] = new FrameworkSorter(); - } - - if (roleSorter->count() == 0) { - LOG(ERROR) << "No roles specified, cannot allocate resources!"; - } - - VLOG(1) << "Initialized hierarchical allocator process"; - - delay(allocationInterval, self(), &Self::batch); -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addFramework( - const FrameworkID& frameworkId, - const FrameworkInfo& frameworkInfo, - const hashmap<SlaveID, Resources>& used) -{ - CHECK(initialized); - - const std::string& role = frameworkInfo.role(); - - CHECK(roles.contains(role)); - - CHECK(!frameworkSorters[role]->contains(frameworkId.value())); - frameworkSorters[role]->add(frameworkId.value()); - - // TODO(bmahler): Validate that the reserved resources have the - // framework's role. - - // Update the allocation to this framework. - foreachpair (const SlaveID& slaveId, const Resources& allocated, used) { - roleSorter->allocated(role, slaveId, allocated.unreserved()); - frameworkSorters[role]->add(slaveId, allocated); - frameworkSorters[role]->allocated(frameworkId.value(), slaveId, allocated); - } - - frameworks[frameworkId] = Framework(); - frameworks[frameworkId].role = frameworkInfo.role(); - frameworks[frameworkId].checkpoint = frameworkInfo.checkpoint(); - - // Check if the framework desires revocable resources. - frameworks[frameworkId].revocable = false; - foreach (const FrameworkInfo::Capability& capability, - frameworkInfo.capabilities()) { - if (capability.type() == FrameworkInfo::Capability::REVOCABLE_RESOURCES) { - frameworks[frameworkId].revocable = true; - } - } - - frameworks[frameworkId].suppressed = false; - - LOG(INFO) << "Added framework " << frameworkId; - - allocate(); -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeFramework( - const FrameworkID& frameworkId) -{ - CHECK(initialized); - - CHECK(frameworks.contains(frameworkId)); - const std::string& role = frameworks[frameworkId].role; - - // Might not be in 'frameworkSorters[role]' because it was previously - // deactivated and never re-added. - if (frameworkSorters[role]->contains(frameworkId.value())) { - hashmap<SlaveID, Resources> allocation = - frameworkSorters[role]->allocation(frameworkId.value()); - - foreachpair ( - const SlaveID& slaveId, const Resources& allocated, allocation) { - roleSorter->unallocated(role, slaveId, allocated.unreserved()); - frameworkSorters[role]->remove(slaveId, allocated); - } - - frameworkSorters[role]->remove(frameworkId.value()); - } - - // Do not delete the filters contained in this - // framework's `offerFilters` hashset yet, see comments in - // HierarchicalAllocatorProcess::reviveOffers and - // HierarchicalAllocatorProcess::expire. - frameworks.erase(frameworkId); - - LOG(INFO) << "Removed framework " << frameworkId; -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::activateFramework( - const FrameworkID& frameworkId) -{ - CHECK(initialized); - - CHECK(frameworks.contains(frameworkId)); - const std::string& role = frameworks[frameworkId].role; - - frameworkSorters[role]->activate(frameworkId.value()); - - LOG(INFO) << "Activated framework " << frameworkId; - - allocate(); -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateFramework( - const FrameworkID& frameworkId) -{ - CHECK(initialized); - - CHECK(frameworks.contains(frameworkId)); - const std::string& role = frameworks[frameworkId].role; - - frameworkSorters[role]->deactivate(frameworkId.value()); - - // Note that the Sorter *does not* remove the resources allocated - // to this framework. For now, this is important because if the - // framework fails over and is activated, we still want a record - // of the resources that it is using. We might be able to collapse - // the added/removed and activated/deactivated in the future. - - // Do not delete the filters contained in this - // framework's `offerFilters` hashset yet, see comments in - // HierarchicalAllocatorProcess::reviveOffers and - // HierarchicalAllocatorProcess::expire. - frameworks[frameworkId].offerFilters.clear(); - frameworks[frameworkId].inverseOfferFilters.clear(); - - LOG(INFO) << "Deactivated framework " << frameworkId; -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateFramework( - const FrameworkID& frameworkId, - const FrameworkInfo& frameworkInfo) -{ - CHECK(initialized); - - CHECK(frameworks.contains(frameworkId)); - - // TODO(jmlvanre): Once we allow frameworks to re-register with a - // new 'role' or 'checkpoint' flag, we need to update our internal - // 'frameworks' structure. See MESOS-703 for progress on allowing - // these fields to be updated. - CHECK_EQ(frameworks[frameworkId].role, frameworkInfo.role()); - CHECK_EQ(frameworks[frameworkId].checkpoint, frameworkInfo.checkpoint()); - - frameworks[frameworkId].revocable = false; - - foreach (const FrameworkInfo::Capability& capability, - frameworkInfo.capabilities()) { - if (capability.type() == FrameworkInfo::Capability::REVOCABLE_RESOURCES) { - frameworks[frameworkId].revocable = true; - } - } -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addSlave( - const SlaveID& slaveId, - const SlaveInfo& slaveInfo, - const Option<Unavailability>& unavailability, - const Resources& total, - const hashmap<FrameworkID, Resources>& used) -{ - CHECK(initialized); - CHECK(!slaves.contains(slaveId)); - - roleSorter->add(slaveId, total.unreserved()); - - foreachpair (const FrameworkID& frameworkId, - const Resources& allocated, - used) { - if (frameworks.contains(frameworkId)) { - const std::string& role = frameworks[frameworkId].role; - - // TODO(bmahler): Validate that the reserved resources have the - // framework's role. - - roleSorter->allocated(role, slaveId, allocated.unreserved()); - frameworkSorters[role]->add(slaveId, allocated); - frameworkSorters[role]->allocated( - frameworkId.value(), slaveId, allocated); - } - } - - slaves[slaveId] = Slave(); - slaves[slaveId].total = total; - slaves[slaveId].allocated = Resources::sum(used); - slaves[slaveId].activated = true; - slaves[slaveId].checkpoint = slaveInfo.checkpoint(); - slaves[slaveId].hostname = slaveInfo.hostname(); - - // NOTE: We currently implement maintenance in the allocator to be able to - // leverage state and features such as the FrameworkSorter and OfferFilter. - if (unavailability.isSome()) { - slaves[slaveId].maintenance = - typename Slave::Maintenance(unavailability.get()); - } - - LOG(INFO) << "Added slave " << slaveId << " (" << slaves[slaveId].hostname - << ") with " << slaves[slaveId].total - << " (allocated: " << slaves[slaveId].allocated << ")"; - - allocate(slaveId); -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeSlave( - const SlaveID& slaveId) -{ - CHECK(initialized); - CHECK(slaves.contains(slaveId)); - - // TODO(bmahler): Per MESOS-621, this should remove the allocations - // that any frameworks have on this slave. Otherwise the caller may - // "leak" allocated resources accidentally if they forget to recover - // all the resources. Fixing this would require more information - // than what we currently track in the allocator. - - roleSorter->remove(slaveId, slaves[slaveId].total.unreserved()); - - slaves.erase(slaveId); - - // Note that we DO NOT actually delete any filters associated with - // this slave, that will occur when the delayed - // HierarchicalAllocatorProcess::expire gets invoked (or the framework - // that applied the filters gets removed). - - LOG(INFO) << "Removed slave " << slaveId; -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateSlave( - const SlaveID& slaveId, - const Resources& oversubscribed) -{ - CHECK(initialized); - CHECK(slaves.contains(slaveId)); - - // Check that all the oversubscribed resources are revocable. - CHECK_EQ(oversubscribed, oversubscribed.revocable()); - - // Update the total resources. - - // First remove the old oversubscribed resources from the total. - slaves[slaveId].total -= slaves[slaveId].total.revocable(); - - // Now add the new estimate of oversubscribed resources. - slaves[slaveId].total += oversubscribed; - - // Now, update the total resources in the role sorter. - roleSorter->update( - slaveId, - slaves[slaveId].total.unreserved()); - - LOG(INFO) << "Slave " << slaveId << " (" << slaves[slaveId].hostname << ")" - << " updated with oversubscribed resources " << oversubscribed - << " (total: " << slaves[slaveId].total - << ", allocated: " << slaves[slaveId].allocated << ")"; - - allocate(slaveId); -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::activateSlave( - const SlaveID& slaveId) -{ - CHECK(initialized); - CHECK(slaves.contains(slaveId)); - - slaves[slaveId].activated = true; - - LOG(INFO)<< "Slave " << slaveId << " reactivated"; -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateSlave( - const SlaveID& slaveId) -{ - CHECK(initialized); - CHECK(slaves.contains(slaveId)); - - slaves[slaveId].activated = false; - - LOG(INFO) << "Slave " << slaveId << " deactivated"; -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateWhitelist( - const Option<hashset<std::string>>& _whitelist) -{ - CHECK(initialized); - - whitelist = _whitelist; - - if (whitelist.isSome()) { - LOG(INFO) << "Updated slave whitelist: " << stringify(whitelist.get()); - - if (whitelist.get().empty()) { - LOG(WARNING) << "Whitelist is empty, no offers will be made!"; - } - } else { - LOG(INFO) << "Advertising offers for all slaves"; - } -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::requestResources( - const FrameworkID& frameworkId, - const std::vector<Request>& requests) -{ - CHECK(initialized); - - LOG(INFO) << "Received resource request from framework " << frameworkId; -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAllocation( - const FrameworkID& frameworkId, - const SlaveID& slaveId, - const std::vector<Offer::Operation>& operations) -{ - CHECK(initialized); - CHECK(slaves.contains(slaveId)); - CHECK(frameworks.contains(frameworkId)); - - // Here we apply offer operations to the allocated resources, which - // in turns leads to an update of the total. The available resources - // remain unchanged. - - // Update the allocated resources. - FrameworkSorter* frameworkSorter = - frameworkSorters[frameworks[frameworkId].role]; - - Resources frameworkAllocation = - frameworkSorter->allocation(frameworkId.value(), slaveId); - - Try<Resources> updatedFrameworkAllocation = - frameworkAllocation.apply(operations); - - CHECK_SOME(updatedFrameworkAllocation); - - frameworkSorter->update( - frameworkId.value(), - slaveId, - frameworkAllocation, - updatedFrameworkAllocation.get()); - - roleSorter->update( - frameworks[frameworkId].role, - slaveId, - frameworkAllocation.unreserved(), - updatedFrameworkAllocation.get().unreserved()); - - Try<Resources> updatedSlaveAllocation = - slaves[slaveId].allocated.apply(operations); - - CHECK_SOME(updatedSlaveAllocation); - - slaves[slaveId].allocated = updatedSlaveAllocation.get(); - - // Update the total resources. - Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations); - CHECK_SOME(updatedTotal); - - slaves[slaveId].total = updatedTotal.get(); - - LOG(INFO) << "Updated allocation of framework " << frameworkId - << " on slave " << slaveId - << " from " << frameworkAllocation - << " to " << updatedFrameworkAllocation.get(); -} - - -template <class RoleSorter, class FrameworkSorter> -process::Future<Nothing> -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAvailable( - const SlaveID& slaveId, - const std::vector<Offer::Operation>& operations) -{ - CHECK(initialized); - CHECK(slaves.contains(slaveId)); - - Resources available = slaves[slaveId].total - slaves[slaveId].allocated; - - // It's possible for this 'apply' to fail here because a call to - // 'allocate' could have been enqueued by the allocator itself - // just before master's request to enqueue 'updateAvailable' - // arrives to the allocator. - // - // Master -------R------------ - // \----+ - // | - // Allocator --A-----A-U---A-- - // \___/ \___/ - // - // where A = allocate, R = reserve, U = updateAvailable - Try<Resources> updatedAvailable = available.apply(operations); - if (updatedAvailable.isError()) { - return process::Failure(updatedAvailable.error()); - } - - // Update the total resources. - Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations); - CHECK_SOME(updatedTotal); - - slaves[slaveId].total = updatedTotal.get(); - - // Now, update the total resources in the role sorter. - roleSorter->update(slaveId, slaves[slaveId].total.unreserved()); - - return Nothing(); -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability( - const SlaveID& slaveId, - const Option<Unavailability>& unavailability) -{ - CHECK(initialized); - CHECK(slaves.contains(slaveId)); - - // NOTE: We currently implement maintenance in the allocator to be able to - // leverage state and features such as the FrameworkSorter and OfferFilter. - - // We explicitly remove all filters for the inverse offers of this slave. We - // do this because we want to force frameworks to reassess the calculations - // they have made to respond to the inverse offer. Unavailability of a slave - // can have a large effect on failure domain calculations and inter-leaved - // unavailability schedules. - foreachvalue (Framework& framework, frameworks) { - framework.inverseOfferFilters.erase(slaveId); - } - - // Remove any old unavailability. - slaves[slaveId].maintenance = None(); - - // If we have a new unavailability. - if (unavailability.isSome()) { - slaves[slaveId].maintenance = - typename Slave::Maintenance(unavailability.get()); - } - - allocate(slaveId); -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer( - const SlaveID& slaveId, - const FrameworkID& frameworkId, - const Option<UnavailableResources>& unavailableResources, - const Option<mesos::master::InverseOfferStatus>& status, - const Option<Filters>& filters) -{ - CHECK(initialized); - CHECK(frameworks.contains(frameworkId)); - CHECK(slaves.contains(slaveId)); - CHECK(slaves[slaveId].maintenance.isSome()); - - // NOTE: We currently implement maintenance in the allocator to be able to - // leverage state and features such as the FrameworkSorter and OfferFilter. - - // We use a reference by alias because we intend to modify the - // `maintenance` and to improve readability. - typename Slave::Maintenance& maintenance = slaves[slaveId].maintenance.get(); - - // Only handle inverse offers that we currently have outstanding. If it is not - // currently outstanding this means it is old and can be safely ignored. - if (maintenance.offersOutstanding.contains(frameworkId)) { - // We always remove the outstanding offer so that we will send a new offer - // out the next time we schedule inverse offers. - maintenance.offersOutstanding.erase(frameworkId); - - // If the response is `Some`, this means the framework responded. Otherwise - // if it is `None` the inverse offer timed out or was rescinded. - if (status.isSome()) { - // For now we don't allow frameworks to respond with `UNKNOWN`. The caller - // should guard against this. This goes against the pattern of not - // checking external invariants; however, the allocator and master are - // currently so tightly coupled that this check is valuable. - CHECK_NE( - status.get().status(), - mesos::master::InverseOfferStatus::UNKNOWN); - - // If the framework responded, we update our state to match. - maintenance.statuses[frameworkId].CopyFrom(status.get()); - } - } - - // No need to install filters if `filters` is none. - if (filters.isNone()) { - return; - } - - // Create a refused resource filter. - Try<Duration> seconds = Duration::create(filters.get().refuse_seconds()); - - if (seconds.isError()) { - LOG(WARNING) << "Using the default value of 'refuse_seconds' to create " - << "the refused inverse offer filter because the input value " - << "is invalid: " << seconds.error(); - - seconds = Duration::create(Filters().refuse_seconds()); - } else if (seconds.get() < Duration::zero()) { - LOG(WARNING) << "Using the default value of 'refuse_seconds' to create " - << "the refused inverse offer filter because the input value " - << "is negative"; - - seconds = Duration::create(Filters().refuse_seconds()); - } - - CHECK_SOME(seconds); - - if (seconds.get() != Duration::zero()) { - VLOG(1) << "Framework " << frameworkId - << " filtered inverse offers from slave " << slaveId - << " for " << seconds.get(); - - // Create a new inverse offer filter and delay its expiration. - InverseOfferFilter* inverseOfferFilter = - new RefusedInverseOfferFilter(process::Timeout::in(seconds.get())); - - frameworks[frameworkId] - .inverseOfferFilters[slaveId].insert(inverseOfferFilter); - - // We need to disambiguate the function call to pick the correct - // expire() overload. - void (Self::*expireInverseOffer)( - const FrameworkID&, - const SlaveID&, - InverseOfferFilter*) = &Self::expire; - - delay( - seconds.get(), - self(), - expireInverseOffer, - frameworkId, - slaveId, - inverseOfferFilter); - } -} - - -template <class RoleSorter, class FrameworkSorter> -process::Future< - hashmap<SlaveID, hashmap<FrameworkID, mesos::master::InverseOfferStatus>>> -HierarchicalAllocatorProcess< - RoleSorter, FrameworkSorter>::getInverseOfferStatuses() -{ - CHECK(initialized); - - hashmap< - SlaveID, - hashmap<FrameworkID, mesos::master::InverseOfferStatus>> result; - - // Make a copy of the most recent statuses. - foreachpair (const SlaveID& id, const Slave& slave, slaves) { - if (slave.maintenance.isSome()) { - result[id] = slave.maintenance.get().statuses; - } - } - - return result; -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources( - const FrameworkID& frameworkId, - const SlaveID& slaveId, - const Resources& resources, - const Option<Filters>& filters) -{ - CHECK(initialized); - - if (resources.empty()) { - return; - } - - // Updated resources allocated to framework (if framework still - // exists, which it might not in the event that we dispatched - // Master::offer before we received - // MesosAllocatorProcess::removeFramework or - // MesosAllocatorProcess::deactivateFramework, in which case we will - // have already recovered all of its resources). - if (frameworks.contains(frameworkId)) { - const std::string& role = frameworks[frameworkId].role; - - CHECK(frameworkSorters.contains(role)); - - if (frameworkSorters[role]->contains(frameworkId.value())) { - frameworkSorters[role]->unallocated( - frameworkId.value(), slaveId, resources); - frameworkSorters[role]->remove(slaveId, resources); - roleSorter->unallocated(role, slaveId, resources.unreserved()); - } - } - - // Update resources allocated on slave (if slave still exists, - // which it might not in the event that we dispatched Master::offer - // before we received Allocator::removeSlave). - if (slaves.contains(slaveId)) { - // NOTE: We cannot add the following CHECK due to the double - // precision errors. See MESOS-1187 for details. - // CHECK(slaves[slaveId].allocated.contains(resources)); - - slaves[slaveId].allocated -= resources; - - LOG(INFO) << "Recovered " << resources - << " (total: " << slaves[slaveId].total - << ", allocated: " << slaves[slaveId].allocated - << ") on slave " << slaveId - << " from framework " << frameworkId; - } - - // No need to install the filter if 'filters' is none. - if (filters.isNone()) { - return; - } - - // No need to install the filter if slave/framework does not exist. - if (!frameworks.contains(frameworkId) || !slaves.contains(slaveId)) { - return; - } - - // Create a refused resources filter. - Try<Duration> seconds = Duration::create(filters.get().refuse_seconds()); - - if (seconds.isError()) { - LOG(WARNING) << "Using the default value of 'refuse_seconds' to create " - << "the refused resources filter because the input value " - << "is invalid: " << seconds.error(); - - seconds = Duration::create(Filters().refuse_seconds()); - } else if (seconds.get() < Duration::zero()) { - LOG(WARNING) << "Using the default value of 'refuse_seconds' to create " - << "the refused resources filter because the input value " - << "is negative"; - - seconds = Duration::create(Filters().refuse_seconds()); - } - - CHECK_SOME(seconds); - - if (seconds.get() != Duration::zero()) { - VLOG(1) << "Framework " << frameworkId - << " filtered slave " << slaveId - << " for " << seconds.get(); - - // Create a new filter and delay its expiration. - OfferFilter* offerFilter = new RefusedOfferFilter( - resources, - process::Timeout::in(seconds.get())); - - frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter); - - // We need to disambiguate the function call to pick the correct - // expire() overload. - void (Self::*expireOffer)( - const FrameworkID&, - const SlaveID&, - OfferFilter*) = &Self::expire; - - delay( - seconds.get(), - self(), - expireOffer, - frameworkId, - slaveId, - offerFilter); - } -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::suppressOffers( - const FrameworkID& frameworkId) -{ - CHECK(initialized); - frameworks[frameworkId].suppressed = true; - - LOG(INFO) << "Suppressed offers for framework " << frameworkId; -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::reviveOffers( - const FrameworkID& frameworkId) -{ - CHECK(initialized); - - frameworks[frameworkId].offerFilters.clear(); - frameworks[frameworkId].inverseOfferFilters.clear(); - frameworks[frameworkId].suppressed = false; - - // We delete each actual `OfferFilter` when - // `HierarchicalAllocatorProcess::expire` gets invoked. If we delete the - // `OfferFilter` here it's possible that the same `OfferFilter` (i.e., same - // address) could get reused and `HierarchicalAllocatorProcess::expire` - // would expire that filter too soon. Note that this only works - // right now because ALL Filter types "expire". - - LOG(INFO) << "Removed offer filters for framework " << frameworkId; - - allocate(); -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::batch() -{ - allocate(); - delay(allocationInterval, self(), &Self::batch); -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate() -{ - Stopwatch stopwatch; - stopwatch.start(); - - allocate(slaves.keys()); - - VLOG(1) << "Performed allocation for " << slaves.size() << " slaves in " - << stopwatch.elapsed(); -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate( - const SlaveID& slaveId) -{ - Stopwatch stopwatch; - stopwatch.start(); - - // TODO(bmahler): Add initializer list constructor for hashset. - hashset<SlaveID> slaves; - slaves.insert(slaveId); - allocate(slaves); - - VLOG(1) << "Performed allocation for slave " << slaveId << " in " - << stopwatch.elapsed(); -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate( - const hashset<SlaveID>& slaveIds_) -{ - if (roleSorter->count() == 0) { - LOG(ERROR) << "No roles specified, cannot allocate resources!"; - return; - } - - // Compute the offerable resources, per framework: - // (1) For reserved resources on the slave, allocate these to a - // framework having the corresponding role. - // (2) For unreserved resources on the slave, allocate these - // to a framework of any role. - hashmap<FrameworkID, hashmap<SlaveID, Resources>> offerable; - - // Randomize the order in which slaves' resources are allocated. - // TODO(vinod): Implement a smarter sorting algorithm. - std::vector<SlaveID> slaveIds(slaveIds_.begin(), slaveIds_.end()); - std::random_shuffle(slaveIds.begin(), slaveIds.end()); - - foreach (const SlaveID& slaveId, slaveIds) { - // Don't send offers for non-whitelisted and deactivated slaves. - if (!isWhitelisted(slaveId) || !slaves[slaveId].activated) { - continue; - } - - foreach (const std::string& role, roleSorter->sort()) { - foreach (const std::string& frameworkId_, - frameworkSorters[role]->sort()) { - FrameworkID frameworkId; - frameworkId.set_value(frameworkId_); - - // If the framework has suppressed offers, ignore. - if (frameworks[frameworkId].suppressed) { - continue; - } - - // Calculate the currently available resources on the slave. - Resources available = slaves[slaveId].total - slaves[slaveId].allocated; - - // NOTE: Currently, frameworks are allowed to have '*' role. - // Calling reserved('*') returns an empty Resources object. - Resources resources = available.unreserved() + available.reserved(role); - - // Remove revocable resources if the framework has not opted - // for them. - if (!frameworks[frameworkId].revocable) { - resources -= resources.revocable(); - } - - // If the resources are not allocatable, ignore. - if (!allocatable(resources)) { - continue; - } - - // If the framework filters these resources, ignore. - if (isFiltered(frameworkId, slaveId, resources)) { - continue; - } - - VLOG(2) << "Allocating " << resources << " on slave " << slaveId - << " to framework " << frameworkId; - - // Note that we perform "coarse-grained" allocation, - // meaning that we always allocate the entire remaining - // slave resources to a single framework. - offerable[frameworkId][slaveId] = resources; - slaves[slaveId].allocated += resources; - - // Reserved resources are only accounted for in the framework - // sorter, since the reserved resources are not shared across - // roles. - frameworkSorters[role]->add(slaveId, resources); - frameworkSorters[role]->allocated(frameworkId_, slaveId, resources); - roleSorter->allocated(role, slaveId, resources.unreserved()); - } - } - } - - if (offerable.empty()) { - VLOG(1) << "No resources available to allocate!"; - } else { - // Now offer the resources to each framework. - foreachkey (const FrameworkID& frameworkId, offerable) { - offerCallback(frameworkId, offerable[frameworkId]); - } - } - - // NOTE: For now, we implement maintenance inverse offers within the - // allocator. We leverage the existing timer/cycle of offers to also do any - // "deallocation" (inverse offers) necessary to satisfy maintenance needs. - deallocate(slaveIds_); -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deallocate( - const hashset<SlaveID>& slaveIds_) -{ - if (frameworkSorters.empty()) { - LOG(ERROR) << "No frameworks specified, cannot send inverse offers!"; - return; - } - - // In this case, `offerable` is actually the slaves and/or resources that we - // want the master to create `InverseOffer`s from. - hashmap<FrameworkID, hashmap<SlaveID, UnavailableResources>> offerable; - - // For maintenance, we use the framework sorters to determine which frameworks - // have (1) reserved and / or (2) unreserved resource on the specified - // slaveIds. This way we only send inverse offers to frameworks that have the - // potential to lose something. We keep track of which frameworks already have - // an outstanding inverse offer for the given slave in the - // UnavailabilityStatus of the specific slave using the `offerOutstanding` - // flag. This is equivalent to the accounting we do for resources when we send - // regular offers. If we didn't keep track of outstanding offers then we would - // keep generating new inverse offers even though the framework had not - // responded yet. - - foreachvalue (FrameworkSorter* frameworkSorter, frameworkSorters) { - foreach (const SlaveID& slaveId, slaveIds_) { - CHECK(slaves.contains(slaveId)); - - if (slaves[slaveId].maintenance.isSome()) { - // We use a reference by alias because we intend to modify the - // `maintenance` and to improve readability. - typename Slave::Maintenance& maintenance = - slaves[slaveId].maintenance.get(); - - hashmap<std::string, Resources> allocation = - frameworkSorter->allocation(slaveId); - - foreachkey (const std::string& frameworkId_, allocation) { - FrameworkID frameworkId; - frameworkId.set_value(frameworkId_); - - // If this framework doesn't already have inverse offers for the - // specified slave. - if (!offerable[frameworkId].contains(slaveId)) { - // If there isn't already an outstanding inverse offer to this - // framework for the specified slave. - if (!maintenance.offersOutstanding.contains(frameworkId)) { - // Ignore in case the framework filters inverse offers for this - // slave. - // NOTE: Since this specific allocator implementation only sends - // inverse offers for maintenance primitives, and those are at the - // whole slave level, we only need to filter based on the - // time-out. - if (isFiltered(frameworkId, slaveId)) { - continue; - } - - const UnavailableResources unavailableResources = - UnavailableResources{ - Resources(), - maintenance.unavailability}; - - // For now we send inverse offers with empty resources when the - // inverse offer represents maintenance on the machine. In the - // future we could be more specific about the resources on the - // host, as we have the information available. - offerable[frameworkId][slaveId] = unavailableResources; - - // Mark this framework as having an offer oustanding for the - // specified slave. - maintenance.offersOutstanding.insert(frameworkId); - } - } - } - } - } - } - - if (offerable.empty()) { - VLOG(1) << "No inverse offers to send out!"; - } else { - // Now send inverse offers to each framework. - foreachkey (const FrameworkID& frameworkId, offerable) { - inverseOfferCallback(frameworkId, offerable[frameworkId]); - } - } -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire( - const FrameworkID& frameworkId, - const SlaveID& slaveId, - OfferFilter* offerFilter) -{ - // The filter might have already been removed (e.g., if the - // framework no longer exists or in - // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to - // keep the address from getting reused possibly causing premature - // expiration). - if (frameworks.contains(frameworkId) && - frameworks[frameworkId].offerFilters.contains(slaveId) && - frameworks[frameworkId].offerFilters[slaveId].contains(offerFilter)) { - frameworks[frameworkId].offerFilters[slaveId].erase(offerFilter); - if (frameworks[frameworkId].offerFilters[slaveId].empty()) { - frameworks[frameworkId].offerFilters.erase(slaveId); - } - } - - delete offerFilter; -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire( - const FrameworkID& frameworkId, - const SlaveID& slaveId, - InverseOfferFilter* inverseOfferFilter) -{ - // The filter might have already been removed (e.g., if the - // framework no longer exists or in - // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to - // keep the address from getting reused possibly causing premature - // expiration). - if (frameworks.contains(frameworkId) && - frameworks[frameworkId].inverseOfferFilters.contains(slaveId) && - frameworks[frameworkId].inverseOfferFilters[slaveId] - .contains(inverseOfferFilter)) { - frameworks[frameworkId].inverseOfferFilters[slaveId] - .erase(inverseOfferFilter); - - if(frameworks[frameworkId].inverseOfferFilters[slaveId].empty()) { - frameworks[frameworkId].inverseOfferFilters.erase(slaveId); - } - } - - delete inverseOfferFilter; -} - - -template <class RoleSorter, class FrameworkSorter> -bool -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isWhitelisted( - const SlaveID& slaveId) -{ - CHECK(slaves.contains(slaveId)); - - return whitelist.isNone() || - whitelist.get().contains(slaves[slaveId].hostname); -} - - -template <class RoleSorter, class FrameworkSorter> -bool -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered( - const FrameworkID& frameworkId, - const SlaveID& slaveId, - const Resources& resources) -{ - CHECK(frameworks.contains(frameworkId)); - CHECK(slaves.contains(slaveId)); - - // Do not offer a non-checkpointing slave's resources to a checkpointing - // framework. This is a short term fix until the following is resolved: - // https://issues.apache.org/jira/browse/MESOS-444. - if (frameworks[frameworkId].checkpoint && !slaves[slaveId].checkpoint) { - VLOG(1) << "Filtered offer with " << resources - << " on non-checkpointing slave " << slaveId - << " for checkpointing framework " << frameworkId; - - return true; - } - - if (frameworks[frameworkId].offerFilters.contains(slaveId)) { - foreach ( - OfferFilter* offerFilter, frameworks[frameworkId].offerFilters[slaveId]) { - if (offerFilter->filter(resources)) { - VLOG(1) << "Filtered offer with " << resources - << " on slave " << slaveId - << " for framework " << frameworkId; - - return true; - } - } - } - - return false; -} - - -template <class RoleSorter, class FrameworkSorter> -bool HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered( - const FrameworkID& frameworkId, - const SlaveID& slaveId) -{ - CHECK(frameworks.contains(frameworkId)); - CHECK(slaves.contains(slaveId)); - - if (frameworks[frameworkId].inverseOfferFilters.contains(slaveId)) { - foreach ( - InverseOfferFilter* inverseOfferFilter, - frameworks[frameworkId].inverseOfferFilters[slaveId]) { - if (inverseOfferFilter->filter()) { - VLOG(1) << "Filtered unavailability on slave " << slaveId - << " for framework " << frameworkId; - - return true; - } - } - } - - return false; -} - - -template <class RoleSorter, class FrameworkSorter> -bool -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocatable( - const Resources& resources) -{ - Option<double> cpus = resources.cpus(); - Option<Bytes> mem = resources.mem(); - - return (cpus.isSome() && cpus.get() >= MIN_CPUS) || - (mem.isSome() && mem.get() >= MIN_MEM); -} - } // namespace allocator { } // namespace master { } // namespace internal {
