This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 903cab6 Fixed a memory "leak" of filters in the allocator.
903cab6 is described below
commit 903cab66adaac3365d2e552ddad9f55139613bae
Author: Benjamin Mahler <[email protected]>
AuthorDate: Fri Jun 21 18:49:55 2019 -0400
Fixed a memory "leak" of filters in the allocator.
Per MESOS-9852, the allocator does not keep a handle to the offer
filter timer, which means it cannot remove the timer if it's no
longer relevant (e.g. due to revive). This means that timers build
up in memory.
Also, the offer filter is allocated on the heap, and is not deleted
until the expiry of the timer (which may take forever!), even if the
offer filter is no longer relevant (e.g. due to revive). This means
that offer filters build up in memory.
The fix applied is to manage offer filters using a shared_ptr in the
maps, which means that they get deleted when erased. The expiration
functions need to now use a weak_ptr in case they run after the
offer filter has been erased (which is possible due to racing between
the expiry timer firing and the timer being discarded).
To discard the timers when no longer needed, the destructors of the
filters perform the discard.
This was tested against a test which spins in a revive + long decline
loop. Previously, the RSS continues to grow, but after this fix
it remains the same.
Review: https://reviews.apache.org/r/70927
---
src/master/allocator/mesos/hierarchical.cpp | 220 ++++++++++++++--------------
src/master/allocator/mesos/hierarchical.hpp | 19 ++-
2 files changed, 122 insertions(+), 117 deletions(-)
diff --git a/src/master/allocator/mesos/hierarchical.cpp
b/src/master/allocator/mesos/hierarchical.cpp
index 7076e96..a9e2b7b 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -45,9 +45,12 @@
#include "common/protobuf_utils.hpp"
#include "common/resources_utils.hpp"
+using std::make_shared;
using std::set;
+using std::shared_ptr;
using std::string;
using std::vector;
+using std::weak_ptr;
using mesos::allocator::InverseOfferStatus;
using mesos::allocator::Options;
@@ -82,20 +85,38 @@ public:
class RefusedOfferFilter : public OfferFilter
{
public:
- RefusedOfferFilter(const Resources& _resources) : resources(_resources) {}
+ RefusedOfferFilter(
+ const Resources& _resources,
+ const Duration& timeout)
+ : _resources(_resources),
+ _expired(after(timeout)) {}
- bool filter(const Resources& _resources) const override
+ virtual ~RefusedOfferFilter()
{
+ // Cancel the timeout upon destruction to avoid lingering timers.
+ _expired.discard();
+ }
+
+ Future<Nothing> expired() const { return _expired; };
+
+ bool filter(const Resources& resources) const override
+ {
+ // NOTE: We do not check for the filter being expired here
+ // because `recoverResources()` expects the filter to apply
+ // until the filter is removed, see:
+ // https://github.com/apache/mesos/commit/2f170f302fe94c4
+ //
// TODO(jieyu): Consider separating the superset check for regular
// and revocable resources. For example, frameworks might want
// more revocable resources only or non-revocable resources only,
// but currently the filter only expires if there is more of both
// revocable and non-revocable resources.
- return resources.contains(_resources); // Refused resources are superset.
+ return _resources.contains(resources); // Refused resources are superset.
}
private:
- const Resources resources;
+ const Resources _resources;
+ Future<Nothing> _expired;
};
@@ -122,17 +143,25 @@ public:
class RefusedInverseOfferFilter : public InverseOfferFilter
{
public:
- RefusedInverseOfferFilter(const Timeout& _timeout)
- : timeout(_timeout) {}
+ RefusedInverseOfferFilter(const Duration& timeout)
+ : _expired(after(timeout)) {}
+
+ virtual ~RefusedInverseOfferFilter()
+ {
+ // Cancel the timeout upon destruction to avoid lingering timers.
+ _expired.discard();
+ }
+
+ Future<Nothing> expired() const { return _expired; };
bool filter() const override
{
// See comment above why we currently don't do more fine-grained filtering.
- return timeout.remaining() > Seconds(0);
+ return _expired.isPending();
}
private:
- const Timeout timeout;
+ Future<Nothing> _expired;
};
@@ -378,10 +407,6 @@ void HierarchicalAllocatorProcess::removeFramework(
frameworkId,
Owned<FrameworkMetrics>(framework.metrics.release()));
- // Do not delete the filters contained in this
- // framework's `offerFilters` hashset yet, see comments in
- // HierarchicalAllocatorProcess::reviveOffers and
- // HierarchicalAllocatorProcess::expire.
frameworks.erase(frameworkId);
LOG(INFO) << "Removed framework " << frameworkId;
@@ -439,10 +464,6 @@ void HierarchicalAllocatorProcess::deactivateFramework(
framework.active = false;
- // Do not delete the filters contained in this
- // framework's `offerFilters` hashset yet, see comments in
- // HierarchicalAllocatorProcess::reviveOffers and
- // HierarchicalAllocatorProcess::expire.
framework.offerFilters.clear();
framework.inverseOfferFilters.clear();
@@ -663,10 +684,7 @@ void HierarchicalAllocatorProcess::removeSlave(
slaves.erase(slaveId);
allocationCandidates.erase(slaveId);
- // Note that we DO NOT actually delete any filters associated with
- // this slave, that will occur when the delayed
- // HierarchicalAllocatorProcess::expire gets invoked (or the framework
- // that applied the filters gets removed).
+ removeFilters(slaveId);
LOG(INFO) << "Removed agent " << slaveId;
}
@@ -787,7 +805,7 @@ void HierarchicalAllocatorProcess::removeFilters(const
SlaveID& slaveId)
// Need a typedef here, otherwise the preprocessor gets confused
// by the comma in the template argument list.
- typedef hashmap<SlaveID, hashset<OfferFilter*>> Filters;
+ typedef hashmap<SlaveID, hashset<shared_ptr<OfferFilter>>> Filters;
foreachvalue (Filters& filters, framework.offerFilters) {
filters.erase(slaveId);
}
@@ -1143,25 +1161,17 @@ void HierarchicalAllocatorProcess::updateInverseOffer(
<< " for " << timeout.get();
// Create a new inverse offer filter and delay its expiration.
- InverseOfferFilter* inverseOfferFilter =
- new RefusedInverseOfferFilter(Timeout::in(timeout.get()));
+ shared_ptr<RefusedInverseOfferFilter> inverseOfferFilter =
+ make_shared<RefusedInverseOfferFilter>(*timeout);
framework.inverseOfferFilters[slaveId].insert(inverseOfferFilter);
- // We need to disambiguate the function call to pick the correct
- // `expire()` overload.
- void (Self::*expireInverseOffer)(
- const FrameworkID&,
- const SlaveID&,
- InverseOfferFilter*) = &Self::expire;
-
- delay(
- timeout.get(),
- self(),
- expireInverseOffer,
- frameworkId,
- slaveId,
- inverseOfferFilter);
+ weak_ptr<InverseOfferFilter> weakPtr = inverseOfferFilter;
+
+ inverseOfferFilter->expired()
+ .onReady(defer(self(), [=](Nothing) {
+ expire(frameworkId, slaveId, weakPtr);
+ }));
}
}
@@ -1294,15 +1304,6 @@ void HierarchicalAllocatorProcess::recoverResources(
<< " filtered agent " << slaveId
<< " for " << timeout.get();
- // Create a new filter. Note that we unallocate the resources
- // since filters are applied per-role already.
- Resources unallocated = resources;
- unallocated.unallocate();
-
- OfferFilter* offerFilter = new RefusedOfferFilter(unallocated);
- frameworks.at(frameworkId)
- .offerFilters[role][slaveId].insert(offerFilter);
-
// Expire the filter after both an `allocationInterval` and the
// `timeout` have elapsed. This ensures that the filter does not
// expire before we perform the next allocation for this agent,
@@ -1316,21 +1317,23 @@ void HierarchicalAllocatorProcess::recoverResources(
// (MESOS-3078), we would not need to increase the timeout here.
timeout = std::max(options.allocationInterval, timeout.get());
- // We need to disambiguate the function call to pick the correct
- // `expire()` overload.
- void (Self::*expireOffer)(
- const FrameworkID&,
- const string&,
- const SlaveID&,
- OfferFilter*) = &Self::expire;
-
- delay(timeout.get(),
- self(),
- expireOffer,
- frameworkId,
- role,
- slaveId,
- offerFilter);
+ // Create a new filter. Note that we unallocate the resources
+ // since filters are applied per-role already.
+ Resources unallocated = resources;
+ unallocated.unallocate();
+
+ shared_ptr<RefusedOfferFilter> offerFilter =
+ make_shared<RefusedOfferFilter>(unallocated, *timeout);
+
+ frameworks.at(frameworkId)
+ .offerFilters[role][slaveId].insert(offerFilter);
+
+ weak_ptr<OfferFilter> weakPtr = offerFilter;
+
+ offerFilter->expired()
+ .onReady(defer(self(), [=](Nothing) {
+ expire(frameworkId, role, slaveId, weakPtr);
+ }));
}
}
@@ -1387,13 +1390,6 @@ void HierarchicalAllocatorProcess::reviveOffers(
framework.metrics->reviveRole(role);
}
- // We delete each actual `OfferFilter` when
- // `HierarchicalAllocatorProcess::expire` gets invoked. If we delete the
- // `OfferFilter` here it's possible that the same `OfferFilter` (i.e., same
- // address) could get reused and `HierarchicalAllocatorProcess::expire`
- // would expire that filter too soon. Note that this only works
- // right now because ALL Filter types "expire".
-
LOG(INFO) << "Revived offers for roles " << stringify(roles)
<< " of framework " << frameworkId;
@@ -2277,36 +2273,40 @@ void HierarchicalAllocatorProcess::_expire(
const FrameworkID& frameworkId,
const string& role,
const SlaveID& slaveId,
- OfferFilter* offerFilter)
+ const weak_ptr<OfferFilter>& offerFilter)
{
// The filter might have already been removed (e.g., if the
- // framework no longer exists or in `reviveOffers()`) but not
- // yet deleted (to keep the address from getting reused
- // possibly causing premature expiration).
- //
+ // framework no longer exists or in `reviveOffers()`) but
+ // we may land here if the cancelation of the expiry timeout
+ // did not succeed (due to the dispatch already being in the
+ // queue).
+ shared_ptr<OfferFilter> filter = offerFilter.lock();
+
+ if (filter.get() == nullptr) {
+ return;
+ }
+
// Since this is a performance-sensitive piece of code,
// we use find to avoid the doing any redundant lookups.
-
auto frameworkIterator = frameworks.find(frameworkId);
- if (frameworkIterator != frameworks.end()) {
- Framework& framework = frameworkIterator->second;
+ CHECK(frameworkIterator != frameworks.end());
- auto roleFilters = framework.offerFilters.find(role);
- if (roleFilters != framework.offerFilters.end()) {
- auto agentFilters = roleFilters->second.find(slaveId);
+ Framework& framework = frameworkIterator->second;
- if (agentFilters != roleFilters->second.end()) {
- // Erase the filter (may be a no-op per the comment above).
- agentFilters->second.erase(offerFilter);
+ auto roleFilters = framework.offerFilters.find(role);
+ CHECK(roleFilters != framework.offerFilters.end());
- if (agentFilters->second.empty()) {
- roleFilters->second.erase(slaveId);
- }
- }
- }
- }
+ auto agentFilters = roleFilters->second.find(slaveId);
+ CHECK(agentFilters != roleFilters->second.end());
- delete offerFilter;
+ // Erase the filter (may be a no-op per the comment above).
+ agentFilters->second.erase(filter);
+ if (agentFilters->second.empty()) {
+ roleFilters->second.erase(slaveId);
+ }
+ if (roleFilters->second.empty()) {
+ framework.offerFilters.erase(role);
+ }
}
@@ -2314,7 +2314,7 @@ void HierarchicalAllocatorProcess::expire(
const FrameworkID& frameworkId,
const string& role,
const SlaveID& slaveId,
- OfferFilter* offerFilter)
+ const weak_ptr<OfferFilter>& offerFilter)
{
dispatch(
self(),
@@ -2329,32 +2329,34 @@ void HierarchicalAllocatorProcess::expire(
void HierarchicalAllocatorProcess::expire(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
- InverseOfferFilter* inverseOfferFilter)
+ const weak_ptr<InverseOfferFilter>& inverseOfferFilter)
{
// The filter might have already been removed (e.g., if the
// framework no longer exists or in
- // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
- // keep the address from getting reused possibly causing premature
- // expiration).
- //
+ // HierarchicalAllocatorProcess::reviveOffers) but
+ // we may land here if the cancelation of the expiry timeout
+ // did not succeed (due to the dispatch already being in the
+ // queue).
+ shared_ptr<InverseOfferFilter> filter = inverseOfferFilter.lock();
+
+ if (filter.get() == nullptr) {
+ return;
+ }
+
// Since this is a performance-sensitive piece of code,
// we use find to avoid the doing any redundant lookups.
-
auto frameworkIterator = frameworks.find(frameworkId);
- if (frameworkIterator != frameworks.end()) {
- Framework& framework = frameworkIterator->second;
+ CHECK(frameworkIterator != frameworks.end());
- auto filters = framework.inverseOfferFilters.find(slaveId);
- if (filters != framework.inverseOfferFilters.end()) {
- filters->second.erase(inverseOfferFilter);
+ Framework& framework = frameworkIterator->second;
- if (filters->second.empty()) {
- framework.inverseOfferFilters.erase(slaveId);
- }
- }
- }
+ auto filters = framework.inverseOfferFilters.find(slaveId);
+ CHECK(filters != framework.inverseOfferFilters.end());
- delete inverseOfferFilter;
+ filters->second.erase(filter);
+ if (filters->second.empty()) {
+ framework.inverseOfferFilters.erase(slaveId);
+ }
}
@@ -2420,7 +2422,7 @@ bool HierarchicalAllocatorProcess::isFiltered(
return false;
}
- foreach (OfferFilter* offerFilter, agentFilters->second) {
+ foreach (const shared_ptr<OfferFilter>& offerFilter, agentFilters->second) {
if (offerFilter->filter(resources)) {
VLOG(1) << "Filtered offer with " << resources
<< " on agent " << slaveId
@@ -2445,7 +2447,7 @@ bool HierarchicalAllocatorProcess::isFiltered(
const Framework& framework = frameworks.at(frameworkId);
if (framework.inverseOfferFilters.contains(slaveId)) {
- foreach (InverseOfferFilter* inverseOfferFilter,
+ foreach (const shared_ptr<InverseOfferFilter>& inverseOfferFilter,
framework.inverseOfferFilters.at(slaveId)) {
if (inverseOfferFilter->filter()) {
VLOG(1) << "Filtered unavailability on agent " << slaveId
diff --git a/src/master/allocator/mesos/hierarchical.hpp
b/src/master/allocator/mesos/hierarchical.hpp
index 25472eb..f14b1fa 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -17,6 +17,7 @@
#ifndef __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__
#define __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__
+#include <memory>
#include <set>
#include <string>
@@ -89,11 +90,13 @@ struct Framework
protobuf::framework::Capabilities capabilities;
- // Active offer and inverse offer filters for the framework.
- // Offer filters are tied to the role the filtered resources
- // were allocated to.
- hashmap<std::string, hashmap<SlaveID, hashset<OfferFilter*>>> offerFilters;
- hashmap<SlaveID, hashset<InverseOfferFilter*>> inverseOfferFilters;
+ // Offer filters are tied to the role the filtered
+ // resources were allocated to.
+ hashmap<std::string, hashmap<SlaveID, hashset<std::shared_ptr<OfferFilter>>>>
+ offerFilters;
+
+ hashmap<SlaveID, hashset<std::shared_ptr<InverseOfferFilter>>>
+ inverseOfferFilters;
bool active;
@@ -469,19 +472,19 @@ protected:
const FrameworkID& frameworkId,
const std::string& role,
const SlaveID& slaveId,
- OfferFilter* offerFilter);
+ const std::weak_ptr<OfferFilter>& offerFilter);
void _expire(
const FrameworkID& frameworkId,
const std::string& role,
const SlaveID& slaveId,
- OfferFilter* offerFilter);
+ const std::weak_ptr<OfferFilter>& offerFilter);
// Remove an inverse offer filter for the specified framework.
void expire(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
- InverseOfferFilter* inverseOfferFilter);
+ const std::weak_ptr<InverseOfferFilter>& inverseOfferFilter);
// Checks whether the slave is whitelisted.
bool isWhitelisted(const SlaveID& slaveId) const;