This is an automated email from the ASF dual-hosted git repository.

mzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 2ec34ca5951a5a8da3d1ab93839cce68e815c1d5
Author: Meng Zhu <[email protected]>
AuthorDate: Tue Sep 3 13:31:36 2019 -0700

    Added tracking of framework allocations in the allocator Slave class.
    
    This would simplify the tracking logic regarding
    resource allocations in the allocator. See MESOS-9182.
    
    Negligible performance impact:
    
    Master:
    
    BENCHMARK_HierarchicalAllocator_WithQuotaParam.LargeAndSmallQuota/2
    Added 3000 agents in 77.999483ms
    Added 3000 frameworks in 16.736076171secs
    Benchmark setup: 3000 agents, 3000 roles, 3000 frameworks,
    with drf sorter
    Made 3500 allocations in 15.342376944secs
    Made 0 allocation in 13.96720191secs
    
    Master + this patch:
    
    BENCHMARK_HierarchicalAllocator_WithQuotaParam.LargeAndSmallQuota/2
    Added 3000 agents in 83.597048ms
    Added 3000 frameworks in 16.757011745secs
    Benchmark setup: 3000 agents, 3000 roles, 3000 frameworks,
    with drf sorter
    Made 3500 allocations in 15.566366241secs
    Made 0 allocation in 14.022591871secs
    
    Review: https://reviews.apache.org/r/68508
---
 src/master/allocator/mesos/hierarchical.cpp | 31 ++++++------
 src/master/allocator/mesos/hierarchical.hpp | 74 ++++++++++++++++++-----------
 2 files changed, 63 insertions(+), 42 deletions(-)

diff --git a/src/master/allocator/mesos/hierarchical.cpp 
b/src/master/allocator/mesos/hierarchical.cpp
index 187de17..a477e50 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -791,7 +791,7 @@ void HierarchicalAllocatorProcess::addSlave(
                      protobuf::slave::Capabilities(capabilities),
                      true,
                      total,
-                     Resources::sum(used))});
+                     used)});
 
   Slave& slave = *CHECK_NOTNONE(getSlave(slaveId));
 
@@ -854,7 +854,7 @@ void HierarchicalAllocatorProcess::addSlave(
   LOG(INFO)
     << "Added agent " << slaveId << " (" << slave.info.hostname() << ")"
     << " with " << slave.getTotal()
-    << " (offered or allocated: " << slave.getOfferedOrAllocated() << ")";
+    << " (offered or allocated: " << slave.getTotalOfferedOrAllocated() << ")";
 
   generateOffers(slaveId);
 }
@@ -964,6 +964,9 @@ void HierarchicalAllocatorProcess::addResourceProvider(
 {
   CHECK(initialized);
 
+  Slave& slave = *CHECK_NOTNONE(getSlave(slaveId));
+  updateSlaveTotal(slaveId, slave.getTotal() + total);
+
   foreachpair (const FrameworkID& frameworkId,
                const Resources& allocation,
                used) {
@@ -982,13 +985,10 @@ void HierarchicalAllocatorProcess::addResourceProvider(
       continue;
     }
 
+    slave.decreaseAvailable(frameworkId, allocation);
     trackAllocatedResources(slaveId, frameworkId, allocation);
   }
 
-  Slave& slave = *CHECK_NOTNONE(getSlave(slaveId));
-  updateSlaveTotal(slaveId, slave.getTotal() + total);
-  slave.decreaseAvailable(Resources::sum(used));
-
   VLOG(1)
     << "Grew agent " << slaveId << " by "
     << total << " (total), "
@@ -1114,8 +1114,8 @@ void HierarchicalAllocatorProcess::updateAllocation(
   const Resources& updatedOfferedResources = _updatedOfferedResources.get();
 
   // Update the per-slave allocation.
-  slave.increaseAvailable(offeredResources);
-  slave.decreaseAvailable(updatedOfferedResources);
+  slave.increaseAvailable(frameworkId, offeredResources);
+  slave.decreaseAvailable(frameworkId, updatedOfferedResources);
 
   // Update the allocation in the framework sorter.
   frameworkSorter->update(
@@ -1442,16 +1442,17 @@ void HierarchicalAllocatorProcess::recoverResources(
   Option<Slave*> slave = getSlave(slaveId);
 
   if (slave.isSome()) {
-    CHECK((*slave)->getOfferedOrAllocated().contains(resources))
+    CHECK((*slave)->getTotalOfferedOrAllocated().contains(resources))
       << "agent " << slaveId << " resources "
-      << (*slave)->getOfferedOrAllocated() << " do not contain " << resources;
+      << (*slave)->getTotalOfferedOrAllocated() << " do not contain "
+      << resources;
 
-    (*slave)->increaseAvailable(resources);
+    (*slave)->increaseAvailable(frameworkId, resources);
 
     VLOG(1) << "Recovered " << resources
             << " (total: " << (*slave)->getTotal()
             << ", offered or allocated: "
-            << (*slave)->getOfferedOrAllocated() << ")"
+            << (*slave)->getTotalOfferedOrAllocated() << ")"
             << " on agent " << slaveId
             << " from framework " << frameworkId;
   }
@@ -2168,7 +2169,7 @@ void HierarchicalAllocatorProcess::__generateOffers()
           ResourceQuantities::fromScalarResources(guaranteesOffering);
         availableHeadroom -= increasedQuotaConsumption;
 
-        slave.decreaseAvailable(toOffer);
+        slave.decreaseAvailable(frameworkId, toOffer);
 
         trackAllocatedResources(slaveId, frameworkId, toOffer);
       }
@@ -2315,7 +2316,7 @@ void HierarchicalAllocatorProcess::__generateOffers()
 
         availableHeadroom -= increasedQuotaConsumption;
 
-        slave.decreaseAvailable(toOffer);
+        slave.decreaseAvailable(frameworkId, toOffer);
 
         trackAllocatedResources(slaveId, frameworkId, toOffer);
       }
@@ -2654,7 +2655,7 @@ double 
HierarchicalAllocatorProcess::_resources_offered_or_allocated(
 
   foreachvalue (const Slave& slave, slaves) {
     Option<Value::Scalar> value =
-      slave.getOfferedOrAllocated().get<Value::Scalar>(resource);
+      slave.getTotalOfferedOrAllocated().get<Value::Scalar>(resource);
 
     if (value.isSome()) {
       offered_or_allocated += value->value();
diff --git a/src/master/allocator/mesos/hierarchical.hpp 
b/src/master/allocator/mesos/hierarchical.hpp
index 48ba399..9e6570a 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -253,12 +253,13 @@ public:
       const protobuf::slave::Capabilities& _capabilities,
       bool _activated,
       const Resources& _total,
-      const Resources& _offeredOrAllocated)
+      const hashmap<FrameworkID, Resources>& _offeredOrAllocated)
     : info(_info),
       capabilities(_capabilities),
       activated(_activated),
       total(_total),
       offeredOrAllocated(_offeredOrAllocated),
+      totalOfferedOrAllocated(Resources::sum(_offeredOrAllocated)),
       shared(_total.shared()),
       hasGpu_(_total.gpus().getOrElse(0) > 0)
   {
@@ -267,7 +268,15 @@ public:
 
   const Resources& getTotal() const { return total; }
 
-  const Resources& getOfferedOrAllocated() const { return offeredOrAllocated; }
+  const hashmap<FrameworkID, Resources>& getOfferedOrAllocated() const
+  {
+    return offeredOrAllocated;
+  }
+
+  const Resources& getTotalOfferedOrAllocated() const
+  {
+    return totalOfferedOrAllocated;
+  }
 
   const Resources& getAvailable() const { return available; }
 
@@ -281,16 +290,30 @@ public:
     updateAvailable();
   }
 
-  void decreaseAvailable(const Resources& offeredOrAllocated_)
+  void increaseAvailable(
+      const FrameworkID& frameworkId, const Resources& offeredOrAllocated_)
   {
-    offeredOrAllocated += offeredOrAllocated_;
+    // Increasing available is to subtract offered or allocated.
+
+    Resources& resources = offeredOrAllocated.at(frameworkId);
+    resources -= offeredOrAllocated_;
+    if (resources.empty()) {
+      offeredOrAllocated.erase(frameworkId);
+    }
+
+    totalOfferedOrAllocated -= offeredOrAllocated_;
 
     updateAvailable();
   }
 
-  void increaseAvailable(const Resources& offeredOrAllocated_)
+  void decreaseAvailable(
+      const FrameworkID& frameworkId, const Resources& offeredOrAllocated_)
   {
-    offeredOrAllocated -= offeredOrAllocated_;
+    // Decreasing available is to add offered or allocated.
+
+    offeredOrAllocated[frameworkId] += offeredOrAllocated_;
+
+    totalOfferedOrAllocated += offeredOrAllocated_;
 
     updateAvailable();
   }
@@ -342,45 +365,42 @@ public:
   Option<Maintenance> maintenance;
 
 private:
-  void updateAvailable() {
+  void updateAvailable()
+  {
     // In order to subtract from the total,
     // we strip the allocation information.
-    Resources offeredOrAllocated_ = offeredOrAllocated;
-    offeredOrAllocated_.unallocate();
+    Resources totalOfferedOrAllocated_ = totalOfferedOrAllocated;
+    totalOfferedOrAllocated_.unallocate();
 
-    // Calling `nonShared()` currently copies the underlying resources
-    // and is therefore rather expensive. We avoid it in the common
-    // case that there are no shared resources.
-    //
-    // TODO(mzhu): Ideally there would be a single logical path here.
-    // One solution is to have `Resources` be copy-on-write such that
-    // `nonShared()` performs no copying and instead points to a
-    // subset of the original `Resource` objects.
+    // This is hot path. We avoid the unnecessary resource traversals
+    // in the common case where there are no shared resources.
     if (shared.empty()) {
-      available = total - offeredOrAllocated_;
+      available = total - totalOfferedOrAllocated_;
     } else {
       // Since shared resources are offerable even when they are in use, we
       // always include them as part of available resources.
       available =
-        (total.nonShared() - offeredOrAllocated_.nonShared()) + shared;
+        (total.nonShared() - totalOfferedOrAllocated_.nonShared()) + shared;
     }
   }
 
   // Total amount of regular *and* oversubscribed resources.
   Resources total;
 
-  // Regular *and* oversubscribed resources that are offered or allocated.
-  //
-  // NOTE: We maintain multiple copies of each shared resource allocated
-  // to a slave, where the number of copies represents the number of times
-  // this shared resource has been allocated to (and has not been recovered
-  // from) a specific framework.
-  //
   // NOTE: We keep track of the slave's allocated resources despite
   // having that information in sorters. This is because the
   // information in sorters is not accurate if some framework
   // hasn't reregistered. See MESOS-2919 for details.
-  Resources offeredOrAllocated;
+  //
+  // This includes both regular *and* oversubscribed resources.
+  //
+  // An entry is erased if a framework no longer has any
+  // offered or allocated on the agent.
+  hashmap<FrameworkID, Resources> offeredOrAllocated;
+
+  // Sum of all offered or allocated resources on the agent. This should equal
+  // to sum of `offeredOrAllocated` (including all the meta-data).
+  Resources totalOfferedOrAllocated;
 
   // We track the total and allocated resources on the slave to
   // avoid calculating it in place every time.

Reply via email to