Repository: mesos Updated Branches: refs/heads/master ac068c130 -> 9ab950d38
Used helper functions instead of switches for resource extraction. Depeding on the type of an offer operation, different resource need to be extracted from an operation. Instead of using a switch, the helper functions 'isSpeculativeOperation' and 'getConsumedResources' are used instead. Review: https://reviews.apache.org/r/64158/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1c920586 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1c920586 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1c920586 Branch: refs/heads/master Commit: 1c9205860b2458eed2e733348a1e0d15cc18b518 Parents: ac068c1 Author: Jan Schlicht <[email protected]> Authored: Thu Nov 30 18:44:17 2017 +0100 Committer: Benjamin Bannier <[email protected]> Committed: Fri Dec 1 11:50:04 2017 +0100 ---------------------------------------------------------------------- src/master/master.cpp | 122 ++++++++++++--------------------------------- src/master/master.hpp | 112 ++++++++++++++--------------------------- src/slave/slave.cpp | 39 +++------------ 3 files changed, 75 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/1c920586/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index b435fbc..dfe60ef 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -10031,37 +10031,13 @@ void Master::updateOfferOperation( // don't need to update the resource accounting for those types of // offer operations in the master and in the allocator states upon // receiving a terminal status update. - Resource consumed; - - switch (operation->info().type()) { - case Offer::Operation::LAUNCH: - LOG(FATAL) << "Unexpected LAUNCH operation"; - break; - case Offer::Operation::LAUNCH_GROUP: - LOG(FATAL) << "Unexpected LAUNCH_GROUP operation"; - break; - case Offer::Operation::RESERVE: - case Offer::Operation::UNRESERVE: - case Offer::Operation::CREATE: - case Offer::Operation::DESTROY: - return; - case Offer::Operation::CREATE_VOLUME: - consumed = operation->info().create_volume().source(); - break; - case Offer::Operation::DESTROY_VOLUME: - consumed = operation->info().destroy_volume().volume(); - break; - case Offer::Operation::CREATE_BLOCK: - consumed = operation->info().create_block().source(); - break; - case Offer::Operation::DESTROY_BLOCK: - consumed = operation->info().destroy_block().block(); - break; - case Offer::Operation::UNKNOWN: - LOG(ERROR) << "Unknown offer operation"; - return; + if (protobuf::isSpeculativeOperation(operation->info())) { + return; } + Try<Resources> consumed = protobuf::getConsumedResources(operation->info()); + CHECK_SOME(consumed); + CHECK(operation->has_slave_id()) << "External resource provider is not supported yet"; @@ -10080,8 +10056,8 @@ void Master::updateOfferOperation( allocator->updateAllocation( operation->framework_id(), operation->slave_id(), - consumed, - {ResourceConversion(consumed, converted)}); + consumed.get(), + {ResourceConversion(consumed.get(), converted)}); allocator->recoverResources( operation->framework_id(), @@ -10089,7 +10065,7 @@ void Master::updateOfferOperation( converted, None()); - Resources consumedUnallocated = consumed; + Resources consumedUnallocated = consumed.get(); consumedUnallocated.unallocate(); Resources convertedUnallocated = converted; @@ -10107,7 +10083,7 @@ void Master::updateOfferOperation( allocator->recoverResources( operation->framework_id(), operation->slave_id(), - consumed, + consumed.get(), None()); break; @@ -10256,37 +10232,22 @@ void Master::_apply( send(slave->pid, message); } else { - switch (operation.type()) { - case Offer::Operation::RESERVE: - case Offer::Operation::UNRESERVE: - case Offer::Operation::CREATE: - case Offer::Operation::DESTROY: { - // We need to strip the allocation info from the operation's - // resources in order to apply the operation successfully - // since the agent's total is stored as unallocated resources. - Offer::Operation strippedOperation = operation; - protobuf::stripAllocationInfo(&strippedOperation); + if (!protobuf::isSpeculativeOperation(operation)) { + LOG(FATAL) << "Unexpected offer operation to apply on agent " << *slave; + } - Try<vector<ResourceConversion>> conversions = - getResourceConversions(strippedOperation); + // We need to strip the allocation info from the operation's + // resources in order to apply the operation successfully + // since the agent's total is stored as unallocated resources. + Offer::Operation strippedOperation = operation; + protobuf::stripAllocationInfo(&strippedOperation); - CHECK_SOME(conversions); + Try<vector<ResourceConversion>> conversions = + getResourceConversions(strippedOperation); - slave->apply(conversions.get()); - break; - } - case Offer::Operation::LAUNCH: - case Offer::Operation::LAUNCH_GROUP: - case Offer::Operation::CREATE_VOLUME: - case Offer::Operation::DESTROY_VOLUME: - case Offer::Operation::CREATE_BLOCK: - case Offer::Operation::DESTROY_BLOCK: - LOG(FATAL) << "Unexpected offer operation to apply on agent " << *slave; - return; - case Offer::Operation::UNKNOWN: - LOG(ERROR) << "Unknown offer operation"; - return; - } + CHECK_SOME(conversions); + + slave->apply(conversions.get()); CheckpointResourcesMessage message; @@ -11206,39 +11167,18 @@ void Slave::recoverResources(OfferOperation* operation) const FrameworkID& frameworkId = operation->framework_id(); - Resource consumed; - switch (operation->info().type()) { - case Offer::Operation::LAUNCH: - case Offer::Operation::LAUNCH_GROUP: - case Offer::Operation::RESERVE: - case Offer::Operation::UNRESERVE: - case Offer::Operation::CREATE: - case Offer::Operation::DESTROY: - // These operations are speculatively applied and not - // tracked as used resources. - return; - case Offer::Operation::CREATE_VOLUME: - consumed = operation->info().create_volume().source(); - break; - case Offer::Operation::DESTROY_VOLUME: - consumed = operation->info().destroy_volume().volume(); - break; - case Offer::Operation::CREATE_BLOCK: - consumed = operation->info().create_block().source(); - break; - case Offer::Operation::DESTROY_BLOCK: - consumed = operation->info().destroy_block().block(); - break; - case Offer::Operation::UNKNOWN: { - LOG(ERROR) << "Unknown offer operation"; - return; - } + if (protobuf::isSpeculativeOperation(operation->info())) { + return; } - CHECK(usedResources[frameworkId].contains(consumed)) - << "Unknown resources " << consumed << " of framework " << frameworkId; + Try<Resources> consumed = protobuf::getConsumedResources(operation->info()); + CHECK_SOME(consumed); + + CHECK(usedResources[frameworkId].contains(consumed.get())) + << "Unknown resources " << consumed.get() + << " of framework " << frameworkId; - usedResources[frameworkId] -= consumed; + usedResources[frameworkId] -= consumed.get(); if (usedResources[frameworkId].empty()) { usedResources.erase(frameworkId); } http://git-wip-us.apache.org/repos/asf/mesos/blob/1c920586/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 1425080..5d2ae65 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -2499,51 +2499,34 @@ struct Framework offerOperationUUIDs.put(operation->info().id(), uuid.get()); } - if (!protobuf::isTerminalState(operation->latest_status().state())) { - Resource consumed; - switch (operation->info().type()) { - case Offer::Operation::LAUNCH: - case Offer::Operation::LAUNCH_GROUP: - case Offer::Operation::RESERVE: - case Offer::Operation::UNRESERVE: - case Offer::Operation::CREATE: - case Offer::Operation::DESTROY: - // These operations are speculatively applied and not - // tracked as used resources. - return; - case Offer::Operation::CREATE_VOLUME: - consumed = operation->info().create_volume().source(); - break; - case Offer::Operation::DESTROY_VOLUME: - consumed = operation->info().destroy_volume().volume(); - break; - case Offer::Operation::CREATE_BLOCK: - consumed = operation->info().create_block().source(); - break; - case Offer::Operation::DESTROY_BLOCK: - consumed = operation->info().destroy_block().block(); - break; - case Offer::Operation::UNKNOWN: - LOG(ERROR) << "Unknown offer operation"; - return; - } + if (!protobuf::isSpeculativeOperation(operation->info()) && + !protobuf::isTerminalState(operation->latest_status().state())) { + Try<Resources> consumed = + protobuf::getConsumedResources(operation->info()); + CHECK_SOME(consumed); CHECK(operation->has_slave_id()) << "External resource provider is not supported yet"; const SlaveID& slaveId = operation->slave_id(); - totalUsedResources += consumed; - usedResources[slaveId] += consumed; + totalUsedResources += consumed.get(); + usedResources[slaveId] += consumed.get(); // It's possible that we're not tracking the role from the // resources in the offer operation for this framework if the // role is absent from the framework's set of roles. In this // case, we track the role's allocation for this framework. - const std::string& role = consumed.allocation_info().role(); - - if (!isTrackedUnderRole(role)) { - trackUnderRole(role); + foreachkey (const std::string& role, consumed->allocations()) { + auto allocatedToRole = [&role](const Resource& resource) { + return resource.allocation_info().role() == role; + }; + + if (roles.count(role) == 0 && + totalUsedResources.filter(allocatedToRole).empty()) { + CHECK(totalOfferedResources.filter(allocatedToRole).empty()); + untrackUnderRole(role); + } } } } @@ -2555,44 +2538,23 @@ struct Framework const SlaveID& slaveId = operation->slave_id(); - Resource consumed; - switch (operation->info().type()) { - case Offer::Operation::LAUNCH: - case Offer::Operation::LAUNCH_GROUP: - case Offer::Operation::RESERVE: - case Offer::Operation::UNRESERVE: - case Offer::Operation::CREATE: - case Offer::Operation::DESTROY: - // These operations are speculatively applied and not - // tracked as used resources. - return; - case Offer::Operation::CREATE_VOLUME: - consumed = operation->info().create_volume().source(); - break; - case Offer::Operation::DESTROY_VOLUME: - consumed = operation->info().destroy_volume().volume(); - break; - case Offer::Operation::CREATE_BLOCK: - consumed = operation->info().create_block().source(); - break; - case Offer::Operation::DESTROY_BLOCK: - consumed = operation->info().destroy_block().block(); - break; - case Offer::Operation::UNKNOWN: - LOG(WARNING) << "Ignoring unknown offer operation"; - return; + if (protobuf::isSpeculativeOperation(operation->info())) { + return; } - CHECK(totalUsedResources.contains(consumed)) - << "Tried to recover resources " << consumed + Try<Resources> consumed = protobuf::getConsumedResources(operation->info()); + CHECK_SOME(consumed); + + CHECK(totalUsedResources.contains(consumed.get())) + << "Tried to recover resources " << consumed.get() << " which do not seem used"; - CHECK(usedResources[slaveId].contains(consumed)) - << "Tried to recover resources " << consumed << " of agent " + CHECK(usedResources[slaveId].contains(consumed.get())) + << "Tried to recover resources " << consumed.get() << " of agent " << slaveId << " which do not seem used"; - totalUsedResources -= consumed; - usedResources[slaveId] -= consumed; + totalUsedResources -= consumed.get(); + usedResources[slaveId] -= consumed.get(); if (usedResources[slaveId].empty()) { usedResources.erase(slaveId); } @@ -2601,16 +2563,16 @@ struct Framework // resources are being returned to, and we have no more resources // allocated to us for that role, stop tracking the framework // under the role. - const std::string& role = consumed.allocation_info().role(); - - auto allocatedToRole = [&role](const Resource& resource) { - return resource.allocation_info().role() == role; - }; + foreachkey (const std::string& role, consumed->allocations()) { + auto allocatedToRole = [&role](const Resource& resource) { + return resource.allocation_info().role() == role; + }; - if (roles.count(role) == 0 && - totalUsedResources.filter(allocatedToRole).empty()) { - CHECK(totalOfferedResources.filter(allocatedToRole).empty()); - untrackUnderRole(role); + if (roles.count(role) == 0 && + totalUsedResources.filter(allocatedToRole).empty()) { + CHECK(totalOfferedResources.filter(allocatedToRole).empty()); + untrackUnderRole(role); + } } } http://git-wip-us.apache.org/repos/asf/mesos/blob/1c920586/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index a9aa987..3896e43 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -7089,50 +7089,25 @@ void Slave::updateOfferOperation( return; } - Resource consumed; - switch (operation->info().type()) { - case Offer::Operation::LAUNCH: - LOG(FATAL) << "Unexpected LAUNCH operation"; - break; - case Offer::Operation::LAUNCH_GROUP: - LOG(FATAL) << "Unexpected LAUNCH_GROUP operation"; - break; - case Offer::Operation::RESERVE: - case Offer::Operation::UNRESERVE: - case Offer::Operation::CREATE: - case Offer::Operation::DESTROY: - return; - case Offer::Operation::CREATE_VOLUME: - consumed = operation->info().create_volume().source(); - break; - case Offer::Operation::DESTROY_VOLUME: - consumed = operation->info().destroy_volume().volume(); - break; - case Offer::Operation::CREATE_BLOCK: - consumed = operation->info().create_block().source(); - break; - case Offer::Operation::DESTROY_BLOCK: - consumed = operation->info().destroy_block().block(); - break; - case Offer::Operation::UNKNOWN: - LOG(WARNING) << "Unknown offer operation"; - return; + if (protobuf::isSpeculativeOperation(operation->info())) { + return; } + Try<Resources> consumed = protobuf::getConsumedResources(operation->info()); + CHECK_SOME(consumed); + switch (update.latest_status().state()) { // Terminal state, and the conversion is successful. case OFFER_OPERATION_FINISHED: { // 'totalResources' don't have allocations set, we need // to remove them from the consumed and converted resources. - if (consumed.has_allocation_info()) { - consumed.clear_allocation_info(); - } + consumed->unallocate(); Resources converted = update.latest_status().converted_resources(); converted.unallocate(); - ResourceConversion conversion(consumed, converted); + ResourceConversion conversion(consumed.get(), converted); apply({conversion});
