This is an automated email from the ASF dual-hosted git repository. mzhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 7eb21c41ed255184988298e29644bf7f310c3374 Author: Andrei Sekretenko <[email protected]> AuthorDate: Fri Sep 6 14:15:38 2019 -0700 Moved `removeOffers()` from `Master::accept()` into `Master::_accept()`. This patch moves offer removal on accept into the deferred continuation that follows authorization (if offers pass validation in `accept()`). Incrementing the `offers_accepted` metric is also moved to `_accept()`. This is a prerequisite for implementing `rescindOffer()` / `declineOffer()` / in the dependent patch. Review: https://reviews.apache.org/r/71434/ --- src/master/master.cpp | 81 +++++++++++++++++++++++++-------------------------- src/master/master.hpp | 1 - 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/src/master/master.cpp b/src/master/master.cpp index 89435c4..60eb3aa 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -4427,48 +4427,32 @@ void Master::accept( // From now on, we are handling the valid offers case. + // Get slave id and allocation info from some existing offer + // (as they are valid, they must have the same slave id and allocation info). + const Offer* existingOffer = ([this](const RepeatedPtrField<OfferID>& ids) { + for (const OfferID& id : ids) { + const Offer* offer = getOffer(id); + if (offer != nullptr) { + return offer; + } + } + + LOG(FATAL) << "No validated offer_ids correspond to existing offers"; + })(accept.offer_ids()); + // TODO(bmahler): We currently only support using multiple offers // for a single slave. - Option<SlaveID> slaveId = None(); + SlaveID slaveId = existingOffer->slave_id(); - // TODO(asekretenko): The code below is copying AllocationInfo (and + // TODO(asekretenko): The code below is copying AllocationInfo (and later // injecting it into operations) as a whole, but only the 'role' field is // subject to offer validation. As for now, this works fine, because // AllocationInfo has no other fields. However, this is fragile and can // silently break if more fields are added to AllocationInfo. - Option<Resource::AllocationInfo> allocationInfo = None(); - Resources offeredResources; - - size_t offersAccepted = 0; - - // Compute offered resources and remove the offers. - foreach (const OfferID& offerId, accept.offer_ids()) { - Offer* offer = getOffer(offerId); - if (offer == nullptr) { - LOG(WARNING) << "Ignoring accept of offer " << offerId - << " since it is no longer valid"; - continue; - } - - if (slaveId.isNone()) { - slaveId = offer->slave_id(); - } - - if (allocationInfo.isNone()) { - allocationInfo = offer->allocation_info(); - } - - offeredResources += offer->resources(); - offersAccepted++; - - removeOffer(offer); - } - - framework->metrics.offers_accepted += offersAccepted; + Resource::AllocationInfo allocationInfo = existingOffer->allocation_info(); - CHECK_SOME(slaveId); - Slave* slave = slaves.registered.get(slaveId.get()); - CHECK(slave != nullptr) << slaveId.get(); + Slave* slave = slaves.registered.get(slaveId); + CHECK(slave != nullptr) << slaveId; // Validate and upgrade all of the resources in `accept.operations`: // @@ -4625,7 +4609,7 @@ void Master::accept( drop(framework, operation, "Operation requested feedback, but agent " + - stringify(slaveId.get()) + + stringify(slaveId) + " does not have the required RESOURCE_PROVIDER capability"); break; } @@ -4636,7 +4620,7 @@ void Master::accept( drop(framework, operation, "Operation on agent default resources requested feedback," - " but agent " + stringify(slaveId.get()) + + " but agent " + stringify(slaveId) + " does not have the required AGENT_OPERATION_FEEDBACK and" " RESOURCE_PROVIDER capabilities"); break; @@ -4666,8 +4650,7 @@ void Master::accept( // within an offer now contain an `AllocationInfo`. We therefore // inject the offer's allocation info into the operation's // resources if the scheduler has not done so already. - CHECK_SOME(allocationInfo); - protobuf::injectAllocationInfo(&operation, allocationInfo.get()); + protobuf::injectAllocationInfo(&operation, allocationInfo); switch (operation.type()) { case Offer::Operation::RESERVE: @@ -4918,8 +4901,7 @@ void Master::accept( .onAny(defer(self(), &Master::_accept, framework->id(), - slaveId.get(), - offeredResources, + slaveId, std::move(accept), lambda::_1)); } @@ -4928,10 +4910,25 @@ void Master::accept( void Master::_accept( const FrameworkID& frameworkId, const SlaveID& slaveId, - const Resources& offeredResources, scheduler::Call::Accept&& accept, const Future<vector<Future<bool>>>& _authorizations) { + Resources offeredResources; + size_t offersAccepted = 0; + + foreach (const OfferID& offerId, accept.offer_ids()) { + Offer* offer = getOffer(offerId); + if (offer == nullptr) { + LOG(WARNING) << "Ignoring accept of offer " << offerId + << " since it is no longer valid"; + continue; + } + offeredResources += offer->resources(); + ++offersAccepted; + + removeOffer(offer); + } + Framework* framework = getFramework(frameworkId); // TODO(jieyu): Consider using the 'drop' overload mentioned in @@ -4951,6 +4948,8 @@ void Master::_accept( return; } + framework->metrics.offers_accepted += offersAccepted; + Slave* slave = slaves.registered.get(slaveId); if (slave == nullptr || !slave->connected) { diff --git a/src/master/master.hpp b/src/master/master.hpp index 19c1782..3f35b25 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1096,7 +1096,6 @@ private: void _accept( const FrameworkID& frameworkId, const SlaveID& slaveId, - const Resources& offeredResources, mesos::scheduler::Call::Accept&& accept, const process::Future< std::vector<process::Future<bool>>>& authorizations);
