Repository: mesos
Updated Branches:
  refs/heads/master ebb8b590b -> 87d8bd08e


Updated allocator to properly handle oversubscribed resources.

Review: https://reviews.apache.org/r/34616


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/87d8bd08
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/87d8bd08
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/87d8bd08

Branch: refs/heads/master
Commit: 87d8bd08e45771b40e238a787a7adee53a244946
Parents: ebb8b59
Author: Vinod Kone <[email protected]>
Authored: Fri May 22 13:15:09 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Fri May 22 17:33:41 2015 -0700

----------------------------------------------------------------------
 src/master/allocator/mesos/hierarchical.hpp |  64 +++++++++-
 src/tests/hierarchical_allocator_tests.cpp  | 153 +++++++++++++++++++++++
 2 files changed, 214 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/87d8bd08/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp 
b/src/master/allocator/mesos/hierarchical.hpp
index 9c949b4..44fbcca 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -179,6 +179,9 @@ protected:
     std::string role;
     bool checkpoint;  // Whether the framework desires checkpointing.
 
+    // Whether the framework desires revocable resources.
+    bool revocable;
+
     hashset<Filter*> filters; // Active filters for the framework.
   };
 
@@ -186,7 +189,10 @@ protected:
 
   struct Slave
   {
+    // Total amount of regular *and* oversubscribed resources.
     Resources total;
+
+    // Available regular *and* oversubscribed resources.
     Resources available;
 
     bool activated;  // Whether to offer resources.
@@ -210,6 +216,11 @@ protected:
   //   Both reserved resources and unreserved resources are used
   //   in the fairness calculation. This is because reserved
   //   resources can be allocated to any framework in the role.
+  //
+  // Note that the hierarchical allocator considers oversubscribed
+  // resources as regular resources when doing fairness calculations.
+  // TODO(vinod): Consider using a different fairness algorithm for
+  // oversubscribed resources.
   RoleSorter* roleSorter;
   hashmap<std::string, FrameworkSorter*> frameworkSorters;
 };
@@ -327,6 +338,15 @@ HierarchicalAllocatorProcess<RoleSorter, 
FrameworkSorter>::addFramework(
   frameworks[frameworkId].role = frameworkInfo.role();
   frameworks[frameworkId].checkpoint = frameworkInfo.checkpoint();
 
+  // Check if the framework desires revocable resources.
+  frameworks[frameworkId].revocable = false;
+  foreach (const FrameworkInfo::Capability& capability,
+           frameworkInfo.capabilities()) {
+    if (capability.type() == FrameworkInfo::Capability::REVOCABLE_RESOURCES) {
+      frameworks[frameworkId].revocable = true;
+    }
+  }
+
   LOG(INFO) << "Added framework " << frameworkId;
 
   allocate();
@@ -494,10 +514,42 @@ HierarchicalAllocatorProcess<RoleSorter, 
FrameworkSorter>::updateSlave(
   CHECK(initialized);
   CHECK(slaves.contains(slaveId));
 
-  LOG(INFO) << "Slave " << slaveId << " updated with oversubscribed resources "
-            << oversubscribed;
+  // Check that all the oversubscribed resources are revocable.
+  CHECK_EQ(oversubscribed, oversubscribed.revocable());
+
+  // Update the total resources.
+
+  // First remove the old oversubscribed resources from the total.
+  slaves[slaveId].total -= slaves[slaveId].total.revocable();
+
+  // Now add the new estimate of oversubscribed resources.
+  slaves[slaveId].total += oversubscribed;
+
+  // Now, update the total resources in the role sorter.
+  roleSorter->update(
+      slaveId,
+      slaves[slaveId].total.unreserved());
+
+  // Calculate the current allocation of oversubscribed resources.
+  Resources allocation;
+  foreachkey (const std::string& role, roles) {
+    allocation += roleSorter->allocation(role)[slaveId].revocable();
+  }
+
+  // Update the available resources.
+
+  // First remove the old oversubscribed resources from available.
+  slaves[slaveId].available -= slaves[slaveId].available.revocable();
 
-  // TODO(vinod): Implement this.
+  // Now add the new estimate of available oversubscribed resources.
+  slaves[slaveId].available += oversubscribed - allocation;
+
+  LOG(INFO) << "Slave " << slaveId << " (" << slaves[slaveId].hostname
+            << ") updated with oversubscribed resources " << oversubscribed
+            << " (total: " << slaves[slaveId].total
+            << ", available: " << slaves[slaveId].available << ")";
+
+  allocate(slaveId);
 }
 
 
@@ -819,6 +871,12 @@ HierarchicalAllocatorProcess<RoleSorter, 
FrameworkSorter>::allocate(
           slaves[slaveId].available.unreserved() +
           slaves[slaveId].available.reserved(role);
 
+        // Remove revocable resources if the framework has not opted
+        // for them.
+        if (!frameworks[frameworkId].revocable) {
+          resources -= resources.revocable();
+        }
+
         // If the resources are not allocatable, ignore.
         if (!allocatable(resources)) {
           continue;

http://git-wip-us.apache.org/repos/asf/mesos/blob/87d8bd08/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp 
b/src/tests/hierarchical_allocator_tests.cpp
index 1a43dc7..85bb29e 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -128,6 +128,16 @@ protected:
     return frameworkInfo;
   }
 
+  Resources createRevocableResources(
+      const string& name,
+      const string& value,
+      const string& role = "*")
+  {
+    Resource resource = Resources::parse(name, value, role).get();
+    resource.mutable_revocable();
+    return resource;
+  }
+
 private:
   static void put(
       process::Queue<Allocation>* queue,
@@ -749,6 +759,149 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
 }
 
 
+// This test ensures that when oversubscribed resources are updated
+// subsequent allocations properly account for that.
+TEST_F(HierarchicalAllocatorTest, UpdateSlave)
+{
+  // Pause clock to disable periodic allocation.
+  Clock::pause();
+  initialize(vector<string>{"role1"});
+
+  hashmap<FrameworkID, Resources> EMPTY;
+
+  SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
+  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+
+  // Add a framework that can accept revocable resources.
+  FrameworkInfo framework = createFrameworkInfo("role1");
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::REVOCABLE_RESOURCES);
+
+  allocator->addFramework(
+      framework.id(), framework, hashmap<SlaveID, Resources>());
+
+  // Initially, all the resources are allocated.
+  Future<Allocation> allocation = queue.get();
+  AWAIT_READY(allocation);
+  EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources));
+
+  // Update the slave with 10 oversubscribed cpus.
+  Resources oversubscribed = createRevocableResources("cpus", "10");
+  allocator->updateSlave(slave.id(), oversubscribed);
+
+  // The next allocation should be for 10 oversubscribed resources.
+  allocation = queue.get();
+  AWAIT_READY(allocation);
+  EXPECT_EQ(oversubscribed, Resources::sum(allocation.get().resources));
+
+  // Update the slave again with 12 oversubscribed cpus.
+  Resources oversubscribed2 = createRevocableResources("cpus", "12");
+  allocator->updateSlave(slave.id(), oversubscribed2);
+
+  // The next allocation should be for 2 oversubscribed cpus.
+  allocation = queue.get();
+  AWAIT_READY(allocation);
+  EXPECT_EQ(oversubscribed2 - oversubscribed,
+            Resources::sum(allocation.get().resources));
+
+  // Update the slave again with 5 oversubscribed cpus.
+  Resources oversubscribed3 = createRevocableResources("cpus", "5");
+  allocator->updateSlave(slave.id(), oversubscribed3);
+
+  // Since there are no more available oversubscribed resources there
+  // shouldn't be an allocation.
+  Clock::settle();
+  allocation = queue.get();
+  ASSERT_TRUE(allocation.isPending());
+}
+
+
+// This test verifies that a framework that has not opted in for
+// revocable resources do not get allocated oversubscribed resources.
+TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated)
+{
+  // Pause clock to disable periodic allocation.
+  Clock::pause();
+  initialize(vector<string>{"role1"});
+
+  hashmap<FrameworkID, Resources> EMPTY;
+
+  SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
+  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+
+  // Add a framework that does *not* accept revocable resources.
+  FrameworkInfo framework = createFrameworkInfo("role1");
+  allocator->addFramework(
+      framework.id(), framework, hashmap<SlaveID, Resources>());
+
+  // Initially, all the resources are allocated.
+  Future<Allocation> allocation = queue.get();
+  AWAIT_READY(allocation);
+  EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources));
+
+  // Update the slave with 10 oversubscribed cpus.
+  Resources oversubscribed = createRevocableResources("cpus", "10");
+  allocator->updateSlave(slave.id(), oversubscribed);
+
+  // No allocation should be made for oversubscribed resources because
+  // the framework has not opted in for them.
+  Clock::settle();
+  allocation = queue.get();
+  ASSERT_TRUE(allocation.isPending());
+}
+
+
+// This test verifies that when oversubscribed resources are partially
+// recovered subsequent allocation properly accounts for that.
+TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
+{
+  // Pause clock to disable periodic allocation.
+  Clock::pause();
+  initialize(vector<string>{"role1"});
+
+  hashmap<FrameworkID, Resources> EMPTY;
+
+  SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
+  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+
+  // Add a framework that can accept revocable resources.
+  FrameworkInfo framework = createFrameworkInfo("role1");
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::REVOCABLE_RESOURCES);
+
+  allocator->addFramework(
+      framework.id(), framework, hashmap<SlaveID, Resources>());
+
+  // Initially, all the resources are allocated.
+  Future<Allocation> allocation = queue.get();
+  AWAIT_READY(allocation);
+  EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources));
+
+  // Update the slave with 10 oversubscribed cpus.
+  Resources oversubscribed = createRevocableResources("cpus", "10");
+  allocator->updateSlave(slave.id(), oversubscribed);
+
+  // The next allocation should be for 10 oversubscribed cpus.
+  allocation = queue.get();
+  AWAIT_READY(allocation);
+  EXPECT_EQ(oversubscribed, Resources::sum(allocation.get().resources));
+
+  // Recover 6 oversubscribed cpus and 2 regular cpus.
+  Resources recovered = createRevocableResources("cpus", "6");
+  recovered += Resources::parse("cpus:2").get();
+
+  allocator->recoverResources(framework.id(), slave.id(), recovered, None());
+
+  Clock::advance(flags.allocation_interval);
+
+  // The next allocation should be for 6 oversubscribed and 2 regular
+  // cpus.
+  allocation = queue.get();
+  AWAIT_READY(allocation);
+  EXPECT_EQ(recovered, Resources::sum(allocation.get().resources));
+}
+
+
 // Checks that a slave that is not whitelisted will not have its
 // resources get offered, and that if the whitelist is updated so
 // that it is whitelisted, its resources will then be offered.

Reply via email to