This is an automated email from the ASF dual-hosted git repository.

mzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit fbeb49d6e7784b23e2a6657716271f67ffc5a4ee
Author: Meng Zhu <[email protected]>
AuthorDate: Fri Jul 12 14:40:18 2019 -0700

    Added offer rescind logic for limits enforcement.
    
    Outstanding offers of a role (including offers allocated to its
    subroles) are rescinded until the sum of its consumed quota and
    outstanding offered resources are below the requested limits.
    
    Note, the rescind effort here is best-effort. It is complex and
    expensive to rescind accurately, due to (1) the cost of tracking
    the correct resource state (e.g. for limits, tracking of the
    precise amount of consumed plus offered (with no reservation
    overlap, and similarly, for guarantees, aggregation of all roles'
    consumption and outstanding offers) (2) the race between the master
    and the allocator. In addition, rescinding offers for quota is
    mostly about improving a transient state. Once a quota is set,
    hopefully with resource churn, the quota will eventually be
    enforced. Lastly, once Mesos starts to adopt an optimistic offer
    model (MESOS-1607), quota enforcement will happen during admission
    control, rendering offer rescind unnecessary. As a result, we cut
    some corners here to only make best effort rescinding.
    
    Specifically, for limits enforcement, we rescind outstanding offers
    until the role's consumed and offered is below it's limits. Note,
    since consumed and offered might overlap (reservations that are being
    offered), this approach might lead to some over rescinding. Also, due
    to the race between the master and the allocator, we might rescind
    less than we should due to pending offers in the master mailbox.
    
    Also added a test.
    
    Review: https://reviews.apache.org/r/71068
---
 src/master/quota_handler.cpp     |  74 +++++++++++++++++++++-
 src/tests/master_quota_tests.cpp | 131 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 204 insertions(+), 1 deletion(-)

diff --git a/src/master/quota_handler.cpp b/src/master/quota_handler.cpp
index 4388fe6..d1be0ab 100644
--- a/src/master/quota_handler.cpp
+++ b/src/master/quota_handler.cpp
@@ -580,7 +580,79 @@ Future<http::Response> Master::QuotaHandler::_update(
         master->allocator->updateQuota(config.role(), Quota{config});
       }
 
-      // TODO(mzhu): Rescind offers.
+      // Rescind offers to enforce guarantees and limits.
+      //
+      // Note, the rescind effort here is best-effort. It is complex and
+      // expensive to rescind accurately, due to (1) the cost of tracking
+      // the correct resource state (e.g. for limits, tracking of the precise
+      // amount of consumed plus offered (with no reservation overlap, and
+      // similarly, for guarantees, aggregation of all roles' consumption and
+      // outstanding offers) (2) the race between the master and the allocator.
+      // In addition, rescinding offers for quota is mostly about improving a
+      // transient state. Once a quota is set, hopefully with resource churn,
+      // the quota will eventually be enforced. Lastly, once Mesos starts to
+      // adopt an optimistic offer model (MESOS-1607), quota enforcement will
+      // happen during admission control, rendering offer rescind unnecessary.
+      // As a result, we cut some corners here to only make best effort
+      // rescinding (more on this below).
+
+      foreach (const auto& config, configs) {
+        RoleResourceBreakdown resourceBreakdown{master, config.role()};
+
+        // NOTE: Since consumed and offered may overlap (unallocated
+        // reservations maybe in both), this would lead to some over-rescind.
+        ResourceQuantities consumedAndOffered =
+          resourceBreakdown.consumedQuota() + resourceBreakdown.offered();
+        ResourceLimits limits{config.limits()};
+
+        const string& roleName = config.role(); // For cleaner captures.
+        auto allocatedToRoleSubtree = [&roleName](const Offer& offer) {
+          CHECK(offer.has_allocation_info())
+            << " Offer " << offer.id() << " has no allocation_info";
+          return offer.allocation_info().role() == roleName ||
+                 roles::isStrictSubroleOf(
+                     offer.allocation_info().role(), roleName);
+        };
+
+        // We first rescind offers to ensure individual role's limits
+        // are not breached. We rescind outstanding offers until the role's
+        // `consumedAndOffered` is below it's limits. Note, since `consumed`
+        // and `offered` might overlap (reservations that are being offered),
+        // this approach might lead to some over rescinding. Also, due to
+        // the race between the master and the allocator, we might rescind less
+        // than we should due to pending offers in the master mailbox.
+
+        // Loop over all frameworks since `role->frameworks` only tracks
+        // those that are directly subscribed to this role, and we
+        // need to consider all descendant role offers.
+        foreachvalue (Framework* framework, master->frameworks.registered) {
+          if (limits.contains(consumedAndOffered)) {
+            break; // Done rescinding.
+          }
+
+          foreach (Offer* offer, utils::copy(framework->offers)) {
+            if (limits.contains(consumedAndOffered)) {
+              break; // Done rescinding.
+            }
+
+            if (!allocatedToRoleSubtree(*offer)) {
+              continue;
+            }
+
+            consumedAndOffered -=
+              ResourceQuantities::fromResources(offer->resources());
+
+            master->allocator->recoverResources(
+                offer->framework_id(),
+                offer->slave_id(),
+                offer->resources(),
+                None());
+            master->removeOffer(offer, true);
+          }
+        }
+
+        // TODO(mzhu): Rescind offers to satisfy guarantees.
+      }
 
       return OK();
     }));
diff --git a/src/tests/master_quota_tests.cpp b/src/tests/master_quota_tests.cpp
index 58ecc50..93f989f 100644
--- a/src/tests/master_quota_tests.cpp
+++ b/src/tests/master_quota_tests.cpp
@@ -1685,6 +1685,137 @@ TEST_F(MasterQuotaTest, 
AvailableResourcesAfterRescinding)
 }
 
 
+// This tests verifies the offer rescind logic for quota limits enforcement.
+// If a role's quota consumption plus offered are above the requested limits,
+// outstanding offers of that role will be rescinded.
+TEST_F(MasterQuotaTest, RescindOffersEnforcingLimits)
+{
+  TestAllocator<> allocator;
+  EXPECT_CALL(allocator, initialize(_, _, _));
+
+  Try<Owned<cluster::Master>> master = StartMaster(&allocator);
+  ASSERT_SOME(master);
+
+  // Start an agent.
+  slave::Flags flags1 = CreateSlaveFlags();
+  flags1.resources = "cpus:1;mem:1024";
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave1 = StartSlave(detector.get(), flags1);
+  ASSERT_SOME(slave1);
+
+  // Start a framework under `ROLE1`.
+  FrameworkInfo frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo1.set_roles(0, ROLE1);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver framework1(
+      &sched1, frameworkInfo1, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId1;
+  EXPECT_CALL(sched1, registered(&framework1, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId1));
+
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched1, resourceOffers(&framework1, _))
+    .WillOnce(FutureArg<1>(&offers1))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  Future<Nothing> offerRescinded1;
+  EXPECT_CALL(sched1, offerRescinded(&framework1, _))
+    .WillOnce(FutureSatisfy(&offerRescinded1));
+
+  framework1.start();
+
+  AWAIT_READY(offers1);
+  ASSERT_EQ(1u, offers1->size());
+
+  // Cluster resources: cpus:1;mem:1024
+  // Allocated: `ROLE1` cpus:1;mem:1024
+
+  // Start a second agent with identical resources.
+  slave::Flags flags2 = CreateSlaveFlags();
+  flags2.resources = "cpus:1;mem:1024";
+  Try<Owned<cluster::Slave>> slave2 = StartSlave(detector.get(), flags2);
+  ASSERT_SOME(slave2);
+
+  FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo2.set_roles(0, ROLE1 + "/child");
+
+  MockScheduler sched2;
+  MesosSchedulerDriver framework2(
+      &sched2, frameworkInfo2, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId2;
+  EXPECT_CALL(sched2, registered(&framework2, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId2));
+
+  Future<vector<Offer>> offers2;
+  EXPECT_CALL(sched2, resourceOffers(&framework2, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  Future<Nothing> offerRescinded2;
+  EXPECT_CALL(sched2, offerRescinded(&framework2, _))
+    .WillOnce(FutureSatisfy(&offerRescinded2));
+
+  framework2.start();
+
+  AWAIT_READY(offers2);
+  ASSERT_EQ(1u, offers2->size());
+
+  // Cluster resources: cpus:2;mem:2048
+  // Allocated: `ROLE1`  cpus:1;mem:1024
+  //            `ROLE1/child` cpus:1;mem:1024
+
+  // Set `ROLE1` resource limits to be current allocations.
+  // No offer is rescinded.
+  {
+    process::http::Headers headers = 
createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Content-Type"] = "application/json";
+
+    Future<Response> response = process::http::post(
+        master.get()->pid,
+        "/api/v1",
+        headers,
+        createUpdateQuotaRequestBody(
+            createQuotaConfig(ROLE1, "", stringify("cpus:2;mem:2048"))));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+    EXPECT_TRUE(offerRescinded1.isPending());
+    EXPECT_TRUE(offerRescinded2.isPending());
+  }
+
+  // Set `ROLE1` resource limits to be `cpus:0.5;mem:512`.
+  // This requires both outstanding offers to be rescinded.
+  {
+    process::http::Headers headers = 
createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Content-Type"] = "application/json";
+
+    Future<Response> response = process::http::post(
+        master.get()->pid,
+        "/api/v1",
+        headers,
+        createUpdateQuotaRequestBody(
+            createQuotaConfig(ROLE1, "", stringify("cpus:0.5;mem:512"))));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+    AWAIT_READY(offerRescinded1);
+    AWAIT_READY(offerRescinded2);
+  }
+
+  // Tear down frameworks before agents to avoid offers being
+  // rescinded again.
+  framework1.stop();
+  framework1.join();
+
+  framework2.stop();
+  framework2.join();
+}
+
+
 // These tests ensure quota implements declared functionality. Note that the
 // tests here are allocator-agnostic, which means we expect every allocator to
 // implement basic quota guarantees.

Reply via email to