This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch 1.6.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 884cd98ab9480cbb9d281f118cb7ed028ec6bbf0
Author: Benjamin Mahler <[email protected]>
AuthorDate: Fri Jun 21 18:49:55 2019 -0400

    Fixed a memory "leak" of filters in the allocator.
    
    Per MESOS-9852, the allocator does not keep a handle to the offer
    filter timer, which means it cannot remove the timer if it's no
    longer relevant (e.g. due to revive). This means that timers build
    up in memory.
    
    Also, the offer filter is allocated on the heap, and is not deleted
    until the expiry of the timer (which may take forever!), even if the
    offer filter is no longer relevant (e.g. due to revive). This means
    that offer filters build up in memory.
    
    The fix applied is to manage offer filters using a shared_ptr in the
    maps, which means that they get deleted when erased. The expiration
    functions need to now use a weak_ptr in case they run after the
    offer filter has been erased (which is possible due to racing between
    the expiry timer firing and the timer being discarded).
    
    To discard the timers when no longer needed, the destructors of the
    filters perform the discard.
    
    This was tested against a test which spins in a revive + long decline
    loop. Previously, the RSS continues to grow, but after this fix
    it remains the same.
    
    Review: https://reviews.apache.org/r/70927
---
 src/master/allocator/mesos/hierarchical.cpp | 220 ++++++++++++++--------------
 src/master/allocator/mesos/hierarchical.hpp |  22 +--
 2 files changed, 124 insertions(+), 118 deletions(-)

diff --git a/src/master/allocator/mesos/hierarchical.cpp 
b/src/master/allocator/mesos/hierarchical.cpp
index 278ab9d..f61ab4d 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -43,9 +43,12 @@
 #include "common/protobuf_utils.hpp"
 #include "common/resource_quantities.hpp"
 
+using std::make_shared;
 using std::set;
+using std::shared_ptr;
 using std::string;
 using std::vector;
+using std::weak_ptr;
 
 using mesos::allocator::InverseOfferStatus;
 
@@ -80,20 +83,38 @@ public:
 class RefusedOfferFilter : public OfferFilter
 {
 public:
-  RefusedOfferFilter(const Resources& _resources) : resources(_resources) {}
+  RefusedOfferFilter(
+      const Resources& _resources,
+      const Duration& timeout)
+    : _resources(_resources),
+      _expired(after(timeout)) {}
 
-  virtual bool filter(const Resources& _resources) const
+  virtual ~RefusedOfferFilter()
   {
+    // Cancel the timeout upon destruction to avoid lingering timers.
+    _expired.discard();
+  }
+
+  Future<Nothing> expired() const { return _expired; };
+
+  bool filter(const Resources& resources) const override
+  {
+    // NOTE: We do not check for the filter being expired here
+    // because `recoverResources()` expects the filter to apply
+    // until the filter is removed, see:
+    // https://github.com/apache/mesos/commit/2f170f302fe94c4
+    //
     // TODO(jieyu): Consider separating the superset check for regular
     // and revocable resources. For example, frameworks might want
     // more revocable resources only or non-revocable resources only,
     // but currently the filter only expires if there is more of both
     // revocable and non-revocable resources.
-    return resources.contains(_resources); // Refused resources are superset.
+    return _resources.contains(resources); // Refused resources are superset.
   }
 
 private:
-  const Resources resources;
+  const Resources _resources;
+  Future<Nothing> _expired;
 };
 
 
@@ -120,17 +141,25 @@ public:
 class RefusedInverseOfferFilter : public InverseOfferFilter
 {
 public:
-  RefusedInverseOfferFilter(const Timeout& _timeout)
-    : timeout(_timeout) {}
+  RefusedInverseOfferFilter(const Duration& timeout)
+    : _expired(after(timeout)) {}
+
+  virtual ~RefusedInverseOfferFilter()
+  {
+    // Cancel the timeout upon destruction to avoid lingering timers.
+    _expired.discard();
+  }
+
+  Future<Nothing> expired() const { return _expired; };
 
   virtual bool filter() const
   {
     // See comment above why we currently don't do more fine-grained filtering.
-    return timeout.remaining() > Seconds(0);
+    return _expired.isPending();
   }
 
 private:
-  const Timeout timeout;
+  Future<Nothing> _expired;
 };
 
 
@@ -339,10 +368,6 @@ void HierarchicalAllocatorProcess::removeFramework(
     untrackFrameworkUnderRole(frameworkId, role);
   }
 
-  // Do not delete the filters contained in this
-  // framework's `offerFilters` hashset yet, see comments in
-  // HierarchicalAllocatorProcess::reviveOffers and
-  // HierarchicalAllocatorProcess::expire.
   frameworks.erase(frameworkId);
 
   LOG(INFO) << "Removed framework " << frameworkId;
@@ -399,10 +424,6 @@ void HierarchicalAllocatorProcess::deactivateFramework(
 
   framework.active = false;
 
-  // Do not delete the filters contained in this
-  // framework's `offerFilters` hashset yet, see comments in
-  // HierarchicalAllocatorProcess::reviveOffers and
-  // HierarchicalAllocatorProcess::expire.
   framework.offerFilters.clear();
   framework.inverseOfferFilters.clear();
 
@@ -606,10 +627,7 @@ void HierarchicalAllocatorProcess::removeSlave(
   slaves.erase(slaveId);
   allocationCandidates.erase(slaveId);
 
-  // Note that we DO NOT actually delete any filters associated with
-  // this slave, that will occur when the delayed
-  // HierarchicalAllocatorProcess::expire gets invoked (or the framework
-  // that applied the filters gets removed).
+  removeFilters(slaveId);
 
   LOG(INFO) << "Removed agent " << slaveId;
 }
@@ -730,7 +748,7 @@ void HierarchicalAllocatorProcess::removeFilters(const 
SlaveID& slaveId)
 
     // Need a typedef here, otherwise the preprocessor gets confused
     // by the comma in the template argument list.
-    typedef hashmap<SlaveID, hashset<OfferFilter*>> Filters;
+    typedef hashmap<SlaveID, hashset<shared_ptr<OfferFilter>>> Filters;
     foreachvalue (Filters& filters, framework.offerFilters) {
       filters.erase(slaveId);
     }
@@ -1086,25 +1104,17 @@ void HierarchicalAllocatorProcess::updateInverseOffer(
             << " for " << timeout.get();
 
     // Create a new inverse offer filter and delay its expiration.
-    InverseOfferFilter* inverseOfferFilter =
-      new RefusedInverseOfferFilter(Timeout::in(timeout.get()));
+    shared_ptr<RefusedInverseOfferFilter> inverseOfferFilter =
+      make_shared<RefusedInverseOfferFilter>(timeout.get());
 
     framework.inverseOfferFilters[slaveId].insert(inverseOfferFilter);
 
-    // We need to disambiguate the function call to pick the correct
-    // `expire()` overload.
-    void (Self::*expireInverseOffer)(
-             const FrameworkID&,
-             const SlaveID&,
-             InverseOfferFilter*) = &Self::expire;
-
-    delay(
-        timeout.get(),
-        self(),
-        expireInverseOffer,
-        frameworkId,
-        slaveId,
-        inverseOfferFilter);
+    weak_ptr<InverseOfferFilter> weakPtr = inverseOfferFilter;
+
+    inverseOfferFilter->expired()
+      .onReady(defer(self(), [=](Nothing) {
+        expire(frameworkId, slaveId, weakPtr);
+      }));
   }
 }
 
@@ -1236,15 +1246,6 @@ void HierarchicalAllocatorProcess::recoverResources(
             << " filtered agent " << slaveId
             << " for " << timeout.get();
 
-    // Create a new filter. Note that we unallocate the resources
-    // since filters are applied per-role already.
-    Resources unallocated = resources;
-    unallocated.unallocate();
-
-    OfferFilter* offerFilter = new RefusedOfferFilter(unallocated);
-    frameworks.at(frameworkId)
-      .offerFilters[role][slaveId].insert(offerFilter);
-
     // Expire the filter after both an `allocationInterval` and the
     // `timeout` have elapsed. This ensures that the filter does not
     // expire before we perform the next allocation for this agent,
@@ -1258,21 +1259,23 @@ void HierarchicalAllocatorProcess::recoverResources(
     // (MESOS-3078), we would not need to increase the timeout here.
     timeout = std::max(allocationInterval, timeout.get());
 
-    // We need to disambiguate the function call to pick the correct
-    // `expire()` overload.
-    void (Self::*expireOffer)(
-        const FrameworkID&,
-        const string&,
-        const SlaveID&,
-        OfferFilter*) = &Self::expire;
-
-    delay(timeout.get(),
-          self(),
-          expireOffer,
-          frameworkId,
-          role,
-          slaveId,
-          offerFilter);
+    // Create a new filter. Note that we unallocate the resources
+    // since filters are applied per-role already.
+    Resources unallocated = resources;
+    unallocated.unallocate();
+
+    shared_ptr<RefusedOfferFilter> offerFilter =
+      make_shared<RefusedOfferFilter>(unallocated, timeout.get());
+
+    frameworks.at(frameworkId)
+      .offerFilters[role][slaveId].insert(offerFilter);
+
+    weak_ptr<OfferFilter> weakPtr = offerFilter;
+
+    offerFilter->expired()
+      .onReady(defer(self(), [=](Nothing) {
+        expire(frameworkId, role, slaveId, weakPtr);
+      }));
   }
 }
 
@@ -1327,13 +1330,6 @@ void HierarchicalAllocatorProcess::reviveOffers(
     framework.suppressedRoles.erase(role);
   }
 
-  // We delete each actual `OfferFilter` when
-  // `HierarchicalAllocatorProcess::expire` gets invoked. If we delete the
-  // `OfferFilter` here it's possible that the same `OfferFilter` (i.e., same
-  // address) could get reused and `HierarchicalAllocatorProcess::expire`
-  // would expire that filter too soon. Note that this only works
-  // right now because ALL Filter types "expire".
-
   LOG(INFO) << "Revived offers for roles " << stringify(roles)
             << " of framework " << frameworkId;
 
@@ -2290,36 +2286,40 @@ void HierarchicalAllocatorProcess::_expire(
     const FrameworkID& frameworkId,
     const string& role,
     const SlaveID& slaveId,
-    OfferFilter* offerFilter)
+    const weak_ptr<OfferFilter>& offerFilter)
 {
   // The filter might have already been removed (e.g., if the
-  // framework no longer exists or in `reviveOffers()`) but not
-  // yet deleted (to keep the address from getting reused
-  // possibly causing premature expiration).
-  //
+  // framework no longer exists or in `reviveOffers()`) but
+  // we may land here if the cancelation of the expiry timeout
+  // did not succeed (due to the dispatch already being in the
+  // queue).
+  shared_ptr<OfferFilter> filter = offerFilter.lock();
+
+  if (filter.get() == nullptr) {
+    return;
+  }
+
   // Since this is a performance-sensitive piece of code,
   // we use find to avoid the doing any redundant lookups.
-
   auto frameworkIterator = frameworks.find(frameworkId);
-  if (frameworkIterator != frameworks.end()) {
-    Framework& framework = frameworkIterator->second;
+  CHECK(frameworkIterator != frameworks.end());
 
-    auto roleFilters = framework.offerFilters.find(role);
-    if (roleFilters != framework.offerFilters.end()) {
-      auto agentFilters = roleFilters->second.find(slaveId);
+  Framework& framework = frameworkIterator->second;
 
-      if (agentFilters != roleFilters->second.end()) {
-        // Erase the filter (may be a no-op per the comment above).
-        agentFilters->second.erase(offerFilter);
+  auto roleFilters = framework.offerFilters.find(role);
+  CHECK(roleFilters != framework.offerFilters.end());
 
-        if (agentFilters->second.empty()) {
-          roleFilters->second.erase(slaveId);
-        }
-      }
-    }
-  }
+  auto agentFilters = roleFilters->second.find(slaveId);
+  CHECK(agentFilters != roleFilters->second.end());
 
-  delete offerFilter;
+  // Erase the filter (may be a no-op per the comment above).
+  agentFilters->second.erase(filter);
+  if (agentFilters->second.empty()) {
+    roleFilters->second.erase(slaveId);
+  }
+  if (roleFilters->second.empty()) {
+    framework.offerFilters.erase(role);
+  }
 }
 
 
@@ -2327,7 +2327,7 @@ void HierarchicalAllocatorProcess::expire(
     const FrameworkID& frameworkId,
     const string& role,
     const SlaveID& slaveId,
-    OfferFilter* offerFilter)
+    const weak_ptr<OfferFilter>& offerFilter)
 {
   dispatch(
       self(),
@@ -2342,32 +2342,34 @@ void HierarchicalAllocatorProcess::expire(
 void HierarchicalAllocatorProcess::expire(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,
-    InverseOfferFilter* inverseOfferFilter)
+    const weak_ptr<InverseOfferFilter>& inverseOfferFilter)
 {
   // The filter might have already been removed (e.g., if the
   // framework no longer exists or in
-  // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
-  // keep the address from getting reused possibly causing premature
-  // expiration).
-  //
+  // HierarchicalAllocatorProcess::reviveOffers) but
+  // we may land here if the cancelation of the expiry timeout
+  // did not succeed (due to the dispatch already being in the
+  // queue).
+  shared_ptr<InverseOfferFilter> filter = inverseOfferFilter.lock();
+
+  if (filter.get() == nullptr) {
+    return;
+  }
+
   // Since this is a performance-sensitive piece of code,
   // we use find to avoid the doing any redundant lookups.
-
   auto frameworkIterator = frameworks.find(frameworkId);
-  if (frameworkIterator != frameworks.end()) {
-    Framework& framework = frameworkIterator->second;
+  CHECK(frameworkIterator != frameworks.end());
 
-    auto filters = framework.inverseOfferFilters.find(slaveId);
-    if (filters != framework.inverseOfferFilters.end()) {
-      filters->second.erase(inverseOfferFilter);
+  Framework& framework = frameworkIterator->second;
 
-      if (filters->second.empty()) {
-        framework.inverseOfferFilters.erase(slaveId);
-      }
-    }
-  }
+  auto filters = framework.inverseOfferFilters.find(slaveId);
+  CHECK(filters != framework.inverseOfferFilters.end());
 
-  delete inverseOfferFilter;
+  filters->second.erase(filter);
+  if (filters->second.empty()) {
+    framework.inverseOfferFilters.erase(slaveId);
+  }
 }
 
 
@@ -2433,7 +2435,7 @@ bool HierarchicalAllocatorProcess::isFiltered(
     return false;
   }
 
-  foreach (OfferFilter* offerFilter, agentFilters->second) {
+  foreach (const shared_ptr<OfferFilter>& offerFilter, agentFilters->second) {
     if (offerFilter->filter(resources)) {
       VLOG(1) << "Filtered offer with " << resources
               << " on agent " << slaveId
@@ -2458,7 +2460,7 @@ bool HierarchicalAllocatorProcess::isFiltered(
   const Framework& framework = frameworks.at(frameworkId);
 
   if (framework.inverseOfferFilters.contains(slaveId)) {
-    foreach (InverseOfferFilter* inverseOfferFilter,
+    foreach (const shared_ptr<InverseOfferFilter>& inverseOfferFilter,
              framework.inverseOfferFilters.at(slaveId)) {
       if (inverseOfferFilter->filter()) {
         VLOG(1) << "Filtered unavailability on agent " << slaveId
diff --git a/src/master/allocator/mesos/hierarchical.hpp 
b/src/master/allocator/mesos/hierarchical.hpp
index fd3fa75..fc08f25 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -17,6 +17,7 @@
 #ifndef __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__
 #define __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__
 
+#include <memory>
 #include <set>
 #include <string>
 
@@ -74,7 +75,6 @@ namespace internal {
 class OfferFilter;
 class InverseOfferFilter;
 
-
 // Implements the basic allocator algorithm - first pick a role by
 // some criteria, then pick one of their frameworks to allocate to.
 class HierarchicalAllocatorProcess : public MesosAllocatorProcess
@@ -258,19 +258,19 @@ protected:
       const FrameworkID& frameworkId,
       const std::string& role,
       const SlaveID& slaveId,
-      OfferFilter* offerFilter);
+      const std::weak_ptr<OfferFilter>& offerFilter);
 
   void _expire(
       const FrameworkID& frameworkId,
       const std::string& role,
       const SlaveID& slaveId,
-      OfferFilter* offerFilter);
+      const std::weak_ptr<OfferFilter>& offerFilter);
 
   // Remove an inverse offer filter for the specified framework.
   void expire(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
-      InverseOfferFilter* inverseOfferFilter);
+      const std::weak_ptr<InverseOfferFilter>& inverseOfferFilter);
 
   // Checks whether the slave is whitelisted.
   bool isWhitelisted(const SlaveID& slaveId) const;
@@ -325,11 +325,15 @@ protected:
 
     protobuf::framework::Capabilities capabilities;
 
-    // Active offer and inverse offer filters for the framework.
-    // Offer filters are tied to the role the filtered resources
-    // were allocated to.
-    hashmap<std::string, hashmap<SlaveID, hashset<OfferFilter*>>> offerFilters;
-    hashmap<SlaveID, hashset<InverseOfferFilter*>> inverseOfferFilters;
+    // Offer filters are tied to the role the filtered
+    // resources were allocated to.
+    hashmap<std::string,
+            hashmap<SlaveID, hashset<std::shared_ptr<OfferFilter>>>>
+      offerFilters;
+
+    hashmap<SlaveID,
+            hashset<std::shared_ptr<InverseOfferFilter>>>
+      inverseOfferFilters;
 
     bool active;
   };

Reply via email to