Repository: mesos
Updated Branches:
  refs/heads/master ac068c130 -> 9ab950d38


Used helper functions instead of switches for resource extraction.

Depeding on the type of an offer operation, different resource need
to be extracted from an operation. Instead of using a switch, the helper
functions 'isSpeculativeOperation' and 'getConsumedResources' are used
instead.

Review: https://reviews.apache.org/r/64158/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1c920586
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1c920586
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1c920586

Branch: refs/heads/master
Commit: 1c9205860b2458eed2e733348a1e0d15cc18b518
Parents: ac068c1
Author: Jan Schlicht <[email protected]>
Authored: Thu Nov 30 18:44:17 2017 +0100
Committer: Benjamin Bannier <[email protected]>
Committed: Fri Dec 1 11:50:04 2017 +0100

----------------------------------------------------------------------
 src/master/master.cpp | 122 ++++++++++++---------------------------------
 src/master/master.hpp | 112 ++++++++++++++---------------------------
 src/slave/slave.cpp   |  39 +++------------
 3 files changed, 75 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1c920586/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index b435fbc..dfe60ef 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -10031,37 +10031,13 @@ void Master::updateOfferOperation(
   // don't need to update the resource accounting for those types of
   // offer operations in the master and in the allocator states upon
   // receiving a terminal status update.
-  Resource consumed;
-
-  switch (operation->info().type()) {
-    case Offer::Operation::LAUNCH:
-      LOG(FATAL) << "Unexpected LAUNCH operation";
-      break;
-    case Offer::Operation::LAUNCH_GROUP:
-      LOG(FATAL) << "Unexpected LAUNCH_GROUP operation";
-      break;
-    case Offer::Operation::RESERVE:
-    case Offer::Operation::UNRESERVE:
-    case Offer::Operation::CREATE:
-    case Offer::Operation::DESTROY:
-      return;
-    case Offer::Operation::CREATE_VOLUME:
-      consumed = operation->info().create_volume().source();
-      break;
-    case Offer::Operation::DESTROY_VOLUME:
-      consumed = operation->info().destroy_volume().volume();
-      break;
-    case Offer::Operation::CREATE_BLOCK:
-      consumed = operation->info().create_block().source();
-      break;
-    case Offer::Operation::DESTROY_BLOCK:
-      consumed = operation->info().destroy_block().block();
-      break;
-    case Offer::Operation::UNKNOWN:
-      LOG(ERROR) << "Unknown offer operation";
-      return;
+  if (protobuf::isSpeculativeOperation(operation->info())) {
+    return;
   }
 
+  Try<Resources> consumed = protobuf::getConsumedResources(operation->info());
+  CHECK_SOME(consumed);
+
   CHECK(operation->has_slave_id())
     << "External resource provider is not supported yet";
 
@@ -10080,8 +10056,8 @@ void Master::updateOfferOperation(
       allocator->updateAllocation(
           operation->framework_id(),
           operation->slave_id(),
-          consumed,
-          {ResourceConversion(consumed, converted)});
+          consumed.get(),
+          {ResourceConversion(consumed.get(), converted)});
 
       allocator->recoverResources(
           operation->framework_id(),
@@ -10089,7 +10065,7 @@ void Master::updateOfferOperation(
           converted,
           None());
 
-      Resources consumedUnallocated = consumed;
+      Resources consumedUnallocated = consumed.get();
       consumedUnallocated.unallocate();
 
       Resources convertedUnallocated = converted;
@@ -10107,7 +10083,7 @@ void Master::updateOfferOperation(
       allocator->recoverResources(
           operation->framework_id(),
           operation->slave_id(),
-          consumed,
+          consumed.get(),
           None());
 
       break;
@@ -10256,37 +10232,22 @@ void Master::_apply(
 
     send(slave->pid, message);
   } else {
-    switch (operation.type()) {
-      case Offer::Operation::RESERVE:
-      case Offer::Operation::UNRESERVE:
-      case Offer::Operation::CREATE:
-      case Offer::Operation::DESTROY: {
-        // We need to strip the allocation info from the operation's
-        // resources in order to apply the operation successfully
-        // since the agent's total is stored as unallocated resources.
-        Offer::Operation strippedOperation = operation;
-        protobuf::stripAllocationInfo(&strippedOperation);
+    if (!protobuf::isSpeculativeOperation(operation)) {
+      LOG(FATAL) << "Unexpected offer operation to apply on agent " << *slave;
+    }
 
-        Try<vector<ResourceConversion>> conversions =
-          getResourceConversions(strippedOperation);
+    // We need to strip the allocation info from the operation's
+    // resources in order to apply the operation successfully
+    // since the agent's total is stored as unallocated resources.
+    Offer::Operation strippedOperation = operation;
+    protobuf::stripAllocationInfo(&strippedOperation);
 
-        CHECK_SOME(conversions);
+    Try<vector<ResourceConversion>> conversions =
+      getResourceConversions(strippedOperation);
 
-        slave->apply(conversions.get());
-        break;
-      }
-      case Offer::Operation::LAUNCH:
-      case Offer::Operation::LAUNCH_GROUP:
-      case Offer::Operation::CREATE_VOLUME:
-      case Offer::Operation::DESTROY_VOLUME:
-      case Offer::Operation::CREATE_BLOCK:
-      case Offer::Operation::DESTROY_BLOCK:
-        LOG(FATAL) << "Unexpected offer operation to apply on agent " << 
*slave;
-        return;
-      case Offer::Operation::UNKNOWN:
-        LOG(ERROR) << "Unknown offer operation";
-        return;
-    }
+    CHECK_SOME(conversions);
+
+    slave->apply(conversions.get());
 
     CheckpointResourcesMessage message;
 
@@ -11206,39 +11167,18 @@ void Slave::recoverResources(OfferOperation* 
operation)
 
   const FrameworkID& frameworkId = operation->framework_id();
 
-  Resource consumed;
-  switch (operation->info().type()) {
-    case Offer::Operation::LAUNCH:
-    case Offer::Operation::LAUNCH_GROUP:
-    case Offer::Operation::RESERVE:
-    case Offer::Operation::UNRESERVE:
-    case Offer::Operation::CREATE:
-    case Offer::Operation::DESTROY:
-      // These operations are speculatively applied and not
-      // tracked as used resources.
-      return;
-    case Offer::Operation::CREATE_VOLUME:
-      consumed = operation->info().create_volume().source();
-      break;
-    case Offer::Operation::DESTROY_VOLUME:
-      consumed = operation->info().destroy_volume().volume();
-      break;
-    case Offer::Operation::CREATE_BLOCK:
-      consumed = operation->info().create_block().source();
-      break;
-    case Offer::Operation::DESTROY_BLOCK:
-      consumed = operation->info().destroy_block().block();
-      break;
-    case Offer::Operation::UNKNOWN: {
-      LOG(ERROR) << "Unknown offer operation";
-      return;
-    }
+  if (protobuf::isSpeculativeOperation(operation->info())) {
+    return;
   }
 
-  CHECK(usedResources[frameworkId].contains(consumed))
-    << "Unknown resources " << consumed << " of framework " << frameworkId;
+  Try<Resources> consumed = protobuf::getConsumedResources(operation->info());
+  CHECK_SOME(consumed);
+
+  CHECK(usedResources[frameworkId].contains(consumed.get()))
+    << "Unknown resources " << consumed.get()
+    << " of framework " << frameworkId;
 
-  usedResources[frameworkId] -= consumed;
+  usedResources[frameworkId] -= consumed.get();
   if (usedResources[frameworkId].empty()) {
     usedResources.erase(frameworkId);
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/1c920586/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 1425080..5d2ae65 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -2499,51 +2499,34 @@ struct Framework
       offerOperationUUIDs.put(operation->info().id(), uuid.get());
     }
 
-    if (!protobuf::isTerminalState(operation->latest_status().state())) {
-      Resource consumed;
-      switch (operation->info().type()) {
-        case Offer::Operation::LAUNCH:
-        case Offer::Operation::LAUNCH_GROUP:
-        case Offer::Operation::RESERVE:
-        case Offer::Operation::UNRESERVE:
-        case Offer::Operation::CREATE:
-        case Offer::Operation::DESTROY:
-          // These operations are speculatively applied and not
-          // tracked as used resources.
-          return;
-        case Offer::Operation::CREATE_VOLUME:
-          consumed = operation->info().create_volume().source();
-          break;
-        case Offer::Operation::DESTROY_VOLUME:
-          consumed = operation->info().destroy_volume().volume();
-          break;
-        case Offer::Operation::CREATE_BLOCK:
-          consumed = operation->info().create_block().source();
-          break;
-        case Offer::Operation::DESTROY_BLOCK:
-          consumed = operation->info().destroy_block().block();
-          break;
-        case Offer::Operation::UNKNOWN:
-          LOG(ERROR) << "Unknown offer operation";
-          return;
-      }
+    if (!protobuf::isSpeculativeOperation(operation->info()) &&
+        !protobuf::isTerminalState(operation->latest_status().state())) {
+      Try<Resources> consumed =
+        protobuf::getConsumedResources(operation->info());
+      CHECK_SOME(consumed);
 
       CHECK(operation->has_slave_id())
         << "External resource provider is not supported yet";
 
       const SlaveID& slaveId = operation->slave_id();
 
-      totalUsedResources += consumed;
-      usedResources[slaveId] += consumed;
+      totalUsedResources += consumed.get();
+      usedResources[slaveId] += consumed.get();
 
       // It's possible that we're not tracking the role from the
       // resources in the offer operation for this framework if the
       // role is absent from the framework's set of roles. In this
       // case, we track the role's allocation for this framework.
-      const std::string& role = consumed.allocation_info().role();
-
-      if (!isTrackedUnderRole(role)) {
-        trackUnderRole(role);
+      foreachkey (const std::string& role, consumed->allocations()) {
+        auto allocatedToRole = [&role](const Resource& resource) {
+          return resource.allocation_info().role() == role;
+        };
+
+        if (roles.count(role) == 0 &&
+            totalUsedResources.filter(allocatedToRole).empty()) {
+          CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+          untrackUnderRole(role);
+        }
       }
     }
   }
@@ -2555,44 +2538,23 @@ struct Framework
 
     const SlaveID& slaveId = operation->slave_id();
 
-    Resource consumed;
-    switch (operation->info().type()) {
-      case Offer::Operation::LAUNCH:
-      case Offer::Operation::LAUNCH_GROUP:
-      case Offer::Operation::RESERVE:
-      case Offer::Operation::UNRESERVE:
-      case Offer::Operation::CREATE:
-      case Offer::Operation::DESTROY:
-        // These operations are speculatively applied and not
-        // tracked as used resources.
-        return;
-      case Offer::Operation::CREATE_VOLUME:
-        consumed = operation->info().create_volume().source();
-        break;
-      case Offer::Operation::DESTROY_VOLUME:
-        consumed = operation->info().destroy_volume().volume();
-        break;
-      case Offer::Operation::CREATE_BLOCK:
-        consumed = operation->info().create_block().source();
-        break;
-      case Offer::Operation::DESTROY_BLOCK:
-        consumed = operation->info().destroy_block().block();
-        break;
-      case Offer::Operation::UNKNOWN:
-        LOG(WARNING) << "Ignoring unknown offer operation";
-        return;
+    if (protobuf::isSpeculativeOperation(operation->info())) {
+      return;
     }
 
-    CHECK(totalUsedResources.contains(consumed))
-      << "Tried to recover resources " << consumed
+    Try<Resources> consumed = 
protobuf::getConsumedResources(operation->info());
+    CHECK_SOME(consumed);
+
+    CHECK(totalUsedResources.contains(consumed.get()))
+      << "Tried to recover resources " << consumed.get()
       << " which do not seem used";
 
-    CHECK(usedResources[slaveId].contains(consumed))
-      << "Tried to recover resources " << consumed << " of agent "
+    CHECK(usedResources[slaveId].contains(consumed.get()))
+      << "Tried to recover resources " << consumed.get() << " of agent "
       << slaveId << " which do not seem used";
 
-    totalUsedResources -= consumed;
-    usedResources[slaveId] -= consumed;
+    totalUsedResources -= consumed.get();
+    usedResources[slaveId] -= consumed.get();
     if (usedResources[slaveId].empty()) {
       usedResources.erase(slaveId);
     }
@@ -2601,16 +2563,16 @@ struct Framework
     // resources are being returned to, and we have no more resources
     // allocated to us for that role, stop tracking the framework
     // under the role.
-    const std::string& role = consumed.allocation_info().role();
-
-    auto allocatedToRole = [&role](const Resource& resource) {
-      return resource.allocation_info().role() == role;
-    };
+    foreachkey (const std::string& role, consumed->allocations()) {
+      auto allocatedToRole = [&role](const Resource& resource) {
+        return resource.allocation_info().role() == role;
+      };
 
-    if (roles.count(role) == 0 &&
-        totalUsedResources.filter(allocatedToRole).empty()) {
-      CHECK(totalOfferedResources.filter(allocatedToRole).empty());
-      untrackUnderRole(role);
+      if (roles.count(role) == 0 &&
+          totalUsedResources.filter(allocatedToRole).empty()) {
+        CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+        untrackUnderRole(role);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/1c920586/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a9aa987..3896e43 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7089,50 +7089,25 @@ void Slave::updateOfferOperation(
     return;
   }
 
-  Resource consumed;
-  switch (operation->info().type()) {
-    case Offer::Operation::LAUNCH:
-      LOG(FATAL) << "Unexpected LAUNCH operation";
-      break;
-    case Offer::Operation::LAUNCH_GROUP:
-      LOG(FATAL) << "Unexpected LAUNCH_GROUP operation";
-      break;
-    case Offer::Operation::RESERVE:
-    case Offer::Operation::UNRESERVE:
-    case Offer::Operation::CREATE:
-    case Offer::Operation::DESTROY:
-      return;
-    case Offer::Operation::CREATE_VOLUME:
-      consumed = operation->info().create_volume().source();
-      break;
-    case Offer::Operation::DESTROY_VOLUME:
-      consumed = operation->info().destroy_volume().volume();
-      break;
-    case Offer::Operation::CREATE_BLOCK:
-      consumed = operation->info().create_block().source();
-      break;
-    case Offer::Operation::DESTROY_BLOCK:
-      consumed = operation->info().destroy_block().block();
-      break;
-    case Offer::Operation::UNKNOWN:
-      LOG(WARNING) << "Unknown offer operation";
-      return;
+  if (protobuf::isSpeculativeOperation(operation->info())) {
+    return;
   }
 
+  Try<Resources> consumed = protobuf::getConsumedResources(operation->info());
+  CHECK_SOME(consumed);
+
   switch (update.latest_status().state()) {
     // Terminal state, and the conversion is successful.
     case OFFER_OPERATION_FINISHED: {
       // 'totalResources' don't have allocations set, we need
       // to remove them from the consumed and converted resources.
-      if (consumed.has_allocation_info()) {
-        consumed.clear_allocation_info();
-      }
+      consumed->unallocate();
 
       Resources converted =
         update.latest_status().converted_resources();
       converted.unallocate();
 
-      ResourceConversion conversion(consumed, converted);
+      ResourceConversion conversion(consumed.get(), converted);
 
       apply({conversion});
 

Reply via email to