This is an automated email from the ASF dual-hosted git repository. mzhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit a8050cafaa5465bd74a2ced1c37bb6b64c735445 Author: Andrei Sekretenko <[email protected]> AuthorDate: Fri Sep 6 14:15:28 2019 -0700 Separated handling offer validation failure from handling success. This patch refactors the loop through offer IDs in `Master::accept()` into two simpler loops: one loop for the offer validation failure case, another for the case of validation success, thus bringing removal of offers and recovering their resources close together. This is a prerequisite for implementing `rescindOffer()`/ `declineOffer()` in the dependent patch. Review: https://reviews.apache.org/r/71433/ --- src/master/master.cpp | 111 +++++++++++++++++++++++++++++--------------------- 1 file changed, 64 insertions(+), 47 deletions(-) diff --git a/src/master/master.cpp b/src/master/master.cpp index f00906e..89435c4 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -4335,74 +4335,49 @@ void Master::accept( // TODO(jieyu): Add metrics for non launch operations. } - // TODO(bmahler): We currently only support using multiple offers - // for a single slave. - Resources offeredResources; - Option<SlaveID> slaveId = None(); Option<Error> error = None(); - Option<Resource::AllocationInfo> allocationInfo = None(); if (accept.offer_ids().size() == 0) { error = Error("No offers specified"); } else { // Validate the offers. error = validation::offer::validate(accept.offer_ids(), this, framework); + } - size_t offersAccepted = 0; + if (error.isSome()) { + // TODO(jieyu): Consider adding a 'drop' overload for ACCEPT call to + // consistently handle message dropping. It would be ideal if the + // 'drop' overload can handle both resource recovery and lost task + // notifications. - // Compute offered resources and remove the offers. If the - // validation failed, return resources to the allocator. + // Remove existing offers and recover their resources. foreach (const OfferID& offerId, accept.offer_ids()) { Offer* offer = getOffer(offerId); - if (offer != nullptr) { - // Don't bother adding resources to `offeredResources` in case - // validation failed; just recover them. - if (error.isSome()) { - allocator->recoverResources( - offer->framework_id(), - offer->slave_id(), - offer->resources(), - None()); - } else { - slaveId = offer->slave_id(); - allocationInfo = offer->allocation_info(); - offeredResources += offer->resources(); - - offersAccepted++; - } - - removeOffer(offer); + if (offer == nullptr) { + // If the offer was not in our offer set, then this offer is no + // longer valid. + LOG(WARNING) << "Ignoring accept of offer " << offerId + << " since it is no longer valid"; continue; } - // If the offer was not in our offer set, then this offer is no - // longer valid. - LOG(WARNING) << "Ignoring accept of offer " << offerId - << " since it is no longer valid"; - } + allocator->recoverResources( + offer->framework_id(), + offer->slave_id(), + offer->resources(), + None()); - framework->metrics.offers_accepted += offersAccepted; - } + removeOffer(offer); + } - // If invalid, send TASK_DROPPED for the launch attempts. If the - // framework is not partition-aware, send TASK_LOST instead. If - // other operations have their `id` field set, then send - // OPERATION_ERROR updates for them. - // - // TODO(jieyu): Consider adding a 'drop' overload for ACCEPT call to - // consistently handle message dropping. It would be ideal if the - // 'drop' overload can handle both resource recovery and lost task - // notifications. - if (error.isSome()) { LOG(WARNING) << "ACCEPT call used invalid offers '" << accept.offer_ids() << "': " << error->message; - TaskState newTaskState = TASK_DROPPED; - if (!framework->capabilities.partitionAware) { - newTaskState = TASK_LOST; - } + const TaskState newTaskState = + framework->capabilities.partitionAware ? TASK_DROPPED : TASK_LOST; foreach (const Offer::Operation& operation, accept.operations()) { + // Send OPERATION_ERROR for non-LAUNCH operations if (operation.type() != Offer::Operation::LAUNCH && operation.type() != Offer::Operation::LAUNCH_GROUP) { drop(framework, @@ -4411,6 +4386,7 @@ void Master::accept( continue; } + // Send task status updates for launch attempts. const RepeatedPtrField<TaskInfo>& tasks = [&]() { if (operation.type() == Offer::Operation::LAUNCH) { return operation.launch().task_infos(); @@ -4449,6 +4425,47 @@ void Master::accept( return; } + // From now on, we are handling the valid offers case. + + // TODO(bmahler): We currently only support using multiple offers + // for a single slave. + Option<SlaveID> slaveId = None(); + + // TODO(asekretenko): The code below is copying AllocationInfo (and + // injecting it into operations) as a whole, but only the 'role' field is + // subject to offer validation. As for now, this works fine, because + // AllocationInfo has no other fields. However, this is fragile and can + // silently break if more fields are added to AllocationInfo. + Option<Resource::AllocationInfo> allocationInfo = None(); + Resources offeredResources; + + size_t offersAccepted = 0; + + // Compute offered resources and remove the offers. + foreach (const OfferID& offerId, accept.offer_ids()) { + Offer* offer = getOffer(offerId); + if (offer == nullptr) { + LOG(WARNING) << "Ignoring accept of offer " << offerId + << " since it is no longer valid"; + continue; + } + + if (slaveId.isNone()) { + slaveId = offer->slave_id(); + } + + if (allocationInfo.isNone()) { + allocationInfo = offer->allocation_info(); + } + + offeredResources += offer->resources(); + offersAccepted++; + + removeOffer(offer); + } + + framework->metrics.offers_accepted += offersAccepted; + CHECK_SOME(slaveId); Slave* slave = slaves.registered.get(slaveId.get()); CHECK(slave != nullptr) << slaveId.get();
