Repository: mesos Updated Branches: refs/heads/master 97fec08a5 -> 37a393cb5
Fixed a bug related to offer operation application in the agent. The bug is introduced in this patch: https://reviews.apache.org/r/64477/ Given that we also keep track of each resource provider's total resources in addition to the total resources of the agent, we need to make sure we update those totals after applying an operation. The bug may manifest as a CHECK failure in the agent that checks if `totalResources` of the agent is a super set of all the resource provider resources. Review: https://reviews.apache.org/r/64589/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/37a393cb Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/37a393cb Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/37a393cb Branch: refs/heads/master Commit: 37a393cb53e552fda6c6af04aa95836d3539c305 Parents: 97fec08 Author: Jie Yu <[email protected]> Authored: Thu Dec 14 10:15:35 2017 -0800 Committer: Jie Yu <[email protected]> Committed: Thu Dec 14 10:26:07 2017 -0800 ---------------------------------------------------------------------- src/slave/slave.cpp | 84 ++++++++++++++++++++++++++++++++++-------------- src/slave/slave.hpp | 2 +- 2 files changed, 60 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/37a393cb/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 9d0e9de..e69d42a 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -3845,15 +3845,7 @@ void Slave::applyOfferOperation(const ApplyOfferOperationMessage& message) addOfferOperation(offerOperation); if (protobuf::isSpeculativeOperation(message.operation_info())) { - Offer::Operation strippedOperation = message.operation_info(); - protobuf::stripAllocationInfo(&strippedOperation); - - Try<vector<ResourceConversion>> conversions = - getResourceConversions(strippedOperation); - - CHECK_SOME(conversions); - - apply(conversions.get()); + apply(offerOperation); } if (resourceProviderId.isSome()) { @@ -7449,24 +7441,10 @@ void Slave::updateOfferOperation( return; } - Try<Resources> consumed = protobuf::getConsumedResources(operation->info()); - CHECK_SOME(consumed); - switch (update.latest_status().state()) { // Terminal state, and the conversion is successful. case OFFER_OPERATION_FINISHED: { - // 'totalResources' don't have allocations set, we need - // to remove them from the consumed and converted resources. - consumed->unallocate(); - - Resources converted = - update.latest_status().converted_resources(); - converted.unallocate(); - - ResourceConversion conversion(consumed.get(), converted); - - apply({conversion}); - + apply(operation); break; } @@ -7545,12 +7523,68 @@ ResourceProvider* Slave::getResourceProvider(const ResourceProviderID& id) const } -void Slave::apply(const vector<ResourceConversion>& conversions) +void Slave::apply(OfferOperation* operation) { + vector<ResourceConversion> conversions; + + // NOTE: 'totalResources' don't have allocations set, we need to + // remove them from the conversions. + + if (protobuf::isSpeculativeOperation(operation->info())) { + Offer::Operation strippedOperation = operation->info(); + protobuf::stripAllocationInfo(&strippedOperation); + + Try<vector<ResourceConversion>> _conversions = + getResourceConversions(strippedOperation); + + CHECK_SOME(_conversions); + + conversions = _conversions.get(); + } else { + // For non-speculative operations, we only apply the conversion + // once it becomes terminal. Before that, we don't know the + // converted resources of the conversion. + CHECK_EQ(OFFER_OPERATION_FINISHED, operation->latest_status().state()); + + Try<Resources> consumed = protobuf::getConsumedResources(operation->info()); + CHECK_SOME(consumed); + + Resources converted = operation->latest_status().converted_resources(); + + consumed->unallocate(); + converted.unallocate(); + + conversions.emplace_back(consumed.get(), converted); + } + + // Now, actually apply the operation. Try<Resources> resources = totalResources.apply(conversions); CHECK_SOME(resources); totalResources = resources.get(); + + Result<ResourceProviderID> resourceProviderId = + getResourceProviderId(operation->info()); + + CHECK(!resourceProviderId.isError()) + << "Failed to get resource provider ID: " + << resourceProviderId.error(); + + // Besides updating the agent's `totalResources`, we also need to + // update the resource provider's `totalResources`. + if (resourceProviderId.isSome()) { + ResourceProvider* resourceProvider = + getResourceProvider(resourceProviderId.get()); + + CHECK_NOTNULL(resourceProvider); + + Try<Resources> resources = + resourceProvider->totalResources.apply(conversions); + + CHECK_SOME(resources); + + resourceProvider->totalResources = resources.get(); + } } http://git-wip-us.apache.org/repos/asf/mesos/blob/37a393cb/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index b69c533..75cc583 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -581,7 +581,7 @@ private: void addResourceProvider(ResourceProvider* resourceProvider); ResourceProvider* getResourceProvider(const ResourceProviderID& id) const; - void apply(const std::vector<ResourceConversion>& conversions); + void apply(OfferOperation* operation); // Publish all resources that are needed to run the current set of // tasks and executors on the agent.
