This is an automated email from the ASF dual-hosted git repository. mzhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 927b012e96abeebbb02c293698be1ef43867e15f Author: Andrei Sekretenko <[email protected]> AuthorDate: Fri Sep 6 14:15:54 2019 -0700 Replaced removeOffer + recoverResources pairs with specialized helpers. This patch adds helper methods `Master::rescindOffer()` / `Master::discardOffer()` that recover offer's resources in the allocator and remove the offer, and replaces paired calls of `removeOffer()` + `allocator->recoverResources()` with these helpers. Review: https://reviews.apache.org/r/71436/ --- src/master/http.cpp | 8 +- src/master/master.cpp | 242 ++++++++++++++++------------------------- src/master/master.hpp | 19 +++- src/master/quota_handler.cpp | 20 +--- src/master/weights_handler.cpp | 8 +- 5 files changed, 115 insertions(+), 182 deletions(-) diff --git a/src/master/http.cpp b/src/master/http.cpp index 0987d93..60765c9 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -4336,13 +4336,7 @@ Future<Response> Master::Http::_operation( // NOTE: However it's entirely possible that these resources are // offered to other frameworks in the next 'allocate' and the filter // cannot prevent it. - master->allocator->recoverResources( - offer->framework_id(), - offer->slave_id(), - offer->resources(), - Filters()); - - master->removeOffer(offer, true); // Rescind! + master->rescindOffer(offer, Filters()); // If we've rescinded enough offers to cover 'operation', we're done. Try<Resources> updatedRecovered = totalRecovered.apply(operation); diff --git a/src/master/master.cpp b/src/master/master.cpp index 60eb3aa..a2c289a 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1164,9 +1164,8 @@ void Master::finalize() } } - // Remove offers. foreach (Offer* offer, utils::copy(slave->offers)) { - removeOffer(offer); + discardOffer(offer); } // Remove inverse offers. @@ -3127,17 +3126,12 @@ void Master::_subscribe( LOG(INFO) << "Allowing framework " << *framework << " to subscribe with an already used id"; - // Remove any offers sent to this framework. + // Rescind any offers sent to this framework. // NOTE: We need to do this because the scheduler might have // replied to the offers but the driver might have dropped // those messages since it wasn't connected to the master. foreach (Offer* offer, utils::copy(framework->offers)) { - allocator->recoverResources( - offer->framework_id(), - offer->slave_id(), - offer->resources(), - None()); - removeOffer(offer, true); // Rescind. + rescindOffer(offer); } // Also remove inverse offers. @@ -3368,13 +3362,11 @@ void Master::deactivate(Framework* framework, bool rescind) // Remove the framework's offers. foreach (Offer* offer, utils::copy(framework->offers)) { - allocator->recoverResources( - offer->framework_id(), - offer->slave_id(), - offer->resources(), - None()); - - removeOffer(offer, rescind); + if (rescind) { + rescindOffer(offer); + } else { + discardOffer(offer); + } } // Remove the framework's inverse offers. @@ -3421,15 +3413,8 @@ void Master::deactivate(Slave* slave) allocator->deactivateSlave(slave->id); - // Remove and rescind offers. foreach (Offer* offer, utils::copy(slave->offers)) { - allocator->recoverResources( - offer->framework_id(), - slave->id, - offer->resources(), - None()); - - removeOffer(offer, true); // Rescind! + rescindOffer(offer); } // Remove and rescind inverse offers. @@ -4350,24 +4335,17 @@ void Master::accept( // 'drop' overload can handle both resource recovery and lost task // notifications. - // Remove existing offers and recover their resources. + // Discard existing offers. foreach (const OfferID& offerId, accept.offer_ids()) { Offer* offer = getOffer(offerId); - if (offer == nullptr) { + if (offer != nullptr) { + discardOffer(offer); + } else { // If the offer was not in our offer set, then this offer is no // longer valid. LOG(WARNING) << "Ignoring accept of offer " << offerId << " since it is no longer valid"; - continue; } - - allocator->recoverResources( - offer->framework_id(), - offer->slave_id(), - offer->resources(), - None()); - - removeOffer(offer); } LOG(WARNING) << "ACCEPT call used invalid offers '" << accept.offer_ids() @@ -4913,21 +4891,14 @@ void Master::_accept( scheduler::Call::Accept&& accept, const Future<vector<Future<bool>>>& _authorizations) { - Resources offeredResources; - size_t offersAccepted = 0; - - foreach (const OfferID& offerId, accept.offer_ids()) { - Offer* offer = getOffer(offerId); - if (offer == nullptr) { - LOG(WARNING) << "Ignoring accept of offer " << offerId - << " since it is no longer valid"; - continue; + auto discardOffers = [this](const RepeatedPtrField<OfferID>& ids) { + for (const OfferID& offerId : ids) { + Offer* offer = getOffer(offerId); + if (offer != nullptr) { + discardOffer(offer); + } } - offeredResources += offer->resources(); - ++offersAccepted; - - removeOffer(offer); - } + }; Framework* framework = getFramework(frameworkId); @@ -4938,18 +4909,12 @@ void Master::_accept( << "Ignoring ACCEPT call for framework " << frameworkId << " because the framework cannot be found"; - // Tell the allocator about the recovered resources. - allocator->recoverResources( - frameworkId, - slaveId, - offeredResources, - None()); - + // TODO(asekretenko): consider replacing this with a CHECK that there + // never are any offers for a non-active (inactive/completed/...) framework. + discardOffers(accept.offer_ids()); return; } - framework->metrics.offers_accepted += offersAccepted; - Slave* slave = slaves.registered.get(slaveId); if (slave == nullptr || !slave->connected) { @@ -5011,16 +4976,30 @@ void Master::_accept( } } - // Tell the allocator about the recovered resources. - allocator->recoverResources( - frameworkId, - slaveId, - offeredResources, - None()); - + // TODO(asekretenko): consider replacing this with a CHECK that there + // never are any offers for a removed/disconnected slave. + discardOffers(accept.offer_ids()); return; } + Resources offeredResources; + size_t offersAccepted = 0; + + foreach (const OfferID& offerId, accept.offer_ids()) { + Offer* offer = getOffer(offerId); + if (offer == nullptr) { + LOG(WARNING) << "Ignoring accept of offer " << offerId + << " since it is no longer valid"; + continue; + } + offeredResources += offer->resources(); + ++offersAccepted; + + _removeOffer(framework, offer); + } + + framework->metrics.offers_accepted += offersAccepted; + // We maintain the "running remaining" resources here to support pipelining of // speculative operations (e.g., RESERVE), which would modify the remaining // resources. Resources consumed by non-speculative operations (e.g., LAUNCH) @@ -5328,13 +5307,7 @@ void Master::_accept( foreach (const Resource& volume, operation.destroy().volumes()) { if (offered.contains(volume)) { - allocator->recoverResources( - offer->framework_id(), - offer->slave_id(), - offered, - None()); - - removeOffer(offer, true); + rescindOffer(offer); // This offer may contain other volumes that are being destroyed. // However, we have already rescinded it, so we should move on @@ -6245,18 +6218,10 @@ void Master::decline( size_t offersDeclined = 0; - // Return resources to the allocator. foreach (const OfferID& offerId, decline.offer_ids()) { Offer* offer = getOffer(offerId); if (offer != nullptr) { - allocator->recoverResources( - offer->framework_id(), - offer->slave_id(), - offer->resources(), - decline.filters()); - - removeOffer(offer); - + discardOffer(offer, decline.filters()); offersDeclined++; continue; } @@ -8320,20 +8285,12 @@ void Master::updateFramework( // the frameworks from the added/removed roles, respectively. allocator->updateFramework(framework->id(), frameworkInfo, suppressedRoles); - // First, remove the offers allocated to roles being removed. + // Rescind offers allocated to the roles that were removed. + const set<string> newRoles = protobuf::framework::getRoles(frameworkInfo); foreach (Offer* offer, utils::copy(framework->offers)) { - set<string> newRoles = protobuf::framework::getRoles(frameworkInfo); - if (newRoles.count(offer->allocation_info().role()) > 0) { - continue; + if (newRoles.count(offer->allocation_info().role()) == 0) { + rescindOffer(offer); } - - allocator->recoverResources( - offer->framework_id(), - offer->slave_id(), - offer->resources(), - None()); - - removeOffer(offer, true); // Rescind! } framework->update(frameworkInfo); @@ -8829,18 +8786,17 @@ void Master::updateSlave(UpdateSlaveMessage&& message) // Then rescind outstanding offers affected by the update. // NOTE: Need a copy of offers because the offers are removed inside the loop. foreach (Offer* offer, utils::copy(slave->offers)) { - bool rescind = false; - const Resources& offered = offer->resources(); // Since updates of the agent's oversubscribed resources are sent at regular // intervals, we only rescind offers containing revocable resources to // reduce churn. if (hasOversubscribed && !offered.revocable().empty()) { - LOG(INFO) << "Removing offer " << offer->id() + LOG(INFO) << "Rescinding offer " << offer->id() << " with revocable resources " << offered << " on agent " << *slave; - rescind = true; + rescindOffer(offer); + continue; } // Updates on resource providers can change the agent total @@ -8853,23 +8809,11 @@ void Master::updateSlave(UpdateSlaveMessage&& message) if (message.has_resource_providers() && !offeredResourceProviderResources.empty()) { LOG(INFO) - << "Removing offer " << offer->id() + << "Rescinding offer " << offer->id() << " with resources " << offered << " on agent " << *slave; - rescind = true; + rescindOffer(offer); } - - if (!rescind) { - continue; - } - - allocator->recoverResources( - offer->framework_id(), - offer->slave_id(), - offered, - None()); - - removeOffer(offer, true); // Rescind. } // NOTE: We don't need to rescind inverse offers here as they are unrelated to @@ -8914,13 +8858,10 @@ void Master::updateUnavailability( LOG(INFO) << "Removing unavailability of agent " << *slave; } - // Remove and rescind offers since we want to inform frameworks of the + // Rescind offers since we want to inform frameworks of the // unavailability change as soon as possible. foreach (Offer* offer, utils::copy(slave->offers)) { - allocator->recoverResources( - offer->framework_id(), slave->id, offer->resources(), None()); - - removeOffer(offer, true); // Rescind! + rescindOffer(offer); } // Remove and rescind inverse offers since the allocator will send new @@ -11180,12 +11121,9 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) void Master::_failoverFramework(Framework* framework) { - // Remove the framework's offers (if they weren't removed before). + // Discard the framework's offers, if they weren't removed before. foreach (Offer* offer, utils::copy(framework->offers)) { - allocator->recoverResources( - offer->framework_id(), offer->slave_id(), offer->resources(), None()); - - removeOffer(offer); + discardOffer(offer); } // Also remove the inverse offers. @@ -11750,13 +11688,7 @@ void Master::_removeSlave( } foreach (Offer* offer, utils::copy(slave->offers)) { - // TODO(vinod): We don't need to call 'Allocator::recoverResources' - // once MESOS-621 is fixed. - allocator->recoverResources( - offer->framework_id(), slave->id, offer->resources(), None()); - - // Remove and rescind offers. - removeOffer(offer, true); // Rescind! + rescindOffer(offer); } // Remove inverse offers because sending them for a slave that is @@ -11914,13 +11846,7 @@ void Master::__removeSlave( } foreach (Offer* offer, utils::copy(slave->offers)) { - // TODO(vinod): We don't need to call 'Allocator::recoverResources' - // once MESOS-621 is fixed. - allocator->recoverResources( - offer->framework_id(), slave->id, offer->resources(), None()); - - // Remove and rescind offers. - removeOffer(offer, true); // Rescind! + rescindOffer(offer); } // Remove inverse offers because sending them for a slave that is @@ -12694,23 +12620,48 @@ void Master::offerTimeout(const OfferID& offerId) { Offer* offer = getOffer(offerId); if (offer != nullptr) { - allocator->recoverResources( - offer->framework_id(), offer->slave_id(), offer->resources(), None()); - removeOffer(offer, true); + rescindOffer(offer); } } -// TODO(vinod): Instead of 'removeOffer()', consider implementing -// 'useOffer()', 'discardOffer()' and 'rescindOffer()' for clarity. -void Master::removeOffer(Offer* offer, bool rescind) +void Master::rescindOffer(Offer* offer, const Option<Filters>& filters) +{ + Framework* framework = getFramework(offer->framework_id()); + CHECK(framework != nullptr) + << "Unknown framework " << offer->framework_id() + << " in the offer " << offer->id(); + + RescindResourceOfferMessage message; + message.mutable_offer_id()->MergeFrom(offer->id()); + + framework->metrics.offers_rescinded++; + framework->send(message); + + allocator->recoverResources( + offer->framework_id(), offer->slave_id(), offer->resources(), filters); + + _removeOffer(framework, offer); +} + + +void Master::discardOffer(Offer* offer, const Option<Filters>& filters) { - // Remove from framework. Framework* framework = getFramework(offer->framework_id()); CHECK(framework != nullptr) << "Unknown framework " << offer->framework_id() << " in the offer " << offer->id(); + allocator->recoverResources( + offer->framework_id(), offer->slave_id(), offer->resources(), filters); + + _removeOffer(framework, offer); +} + + +void Master::_removeOffer(Framework* framework, Offer* offer) +{ + CHECK_EQ(framework->id(), offer->framework_id()); framework->removeOffer(offer); // Remove from slave. @@ -12722,13 +12673,6 @@ void Master::removeOffer(Offer* offer, bool rescind) slave->removeOffer(offer); - if (rescind) { - RescindResourceOfferMessage message; - message.mutable_offer_id()->MergeFrom(offer->id()); - framework->metrics.offers_rescinded++; - framework->send(message); - } - // Remove and cancel offer removal timers. Canceling the Timers is // only done to avoid having too many active Timers in libprocess. if (offerTimers.contains(offer->id())) { diff --git a/src/master/master.hpp b/src/master/master.hpp index 3f35b25..23eb2a6 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -986,8 +986,23 @@ protected: // Remove an offer after specified timeout void offerTimeout(const OfferID& offerId); - // Remove an offer and optionally rescind the offer as well. - void removeOffer(Offer* offer, bool rescind = false); + // Methods for removing an offer and handling associated resources. + // Both recover the resources in the allocator (optionally setting offer + // filters) and remove the offer in the master. `rescindOffer` further + // notifies the framework about the rescind. + // + // NOTE: the `filters` field in `rescindOffers` is needed only as + // a workaround for the race between the master and the allocator + // which happens when the master tries to free up resources to satisfy + // operator initiated operations. + void rescindOffer(Offer* offer, const Option<Filters>& filters = None()); + void discardOffer(Offer* offer, const Option<Filters>& filters = None()); + + // Helper for rescindOffer() / discardOffer() / _accept(). + // Do not use directly. + // + // The offer must belong to the framework. + void _removeOffer(Framework* framework, Offer* offer); // Remove an inverse offer after specified timeout void inverseOfferTimeout(const OfferID& inverseOfferId); diff --git a/src/master/quota_handler.cpp b/src/master/quota_handler.cpp index f28eb27..083ee30 100644 --- a/src/master/quota_handler.cpp +++ b/src/master/quota_handler.cpp @@ -303,9 +303,6 @@ void Master::QuotaHandler::rescindOffers(const QuotaInfo& request) const // Rescind all outstanding offers from the given agent. bool agentVisited = false; foreach (Offer* offer, utils::copy(slave->offers)) { - master->allocator->recoverResources( - offer->framework_id(), offer->slave_id(), offer->resources(), None()); - auto unallocated = [](const Resources& resources) { Resources result = resources; result.unallocate(); @@ -313,7 +310,7 @@ void Master::QuotaHandler::rescindOffers(const QuotaInfo& request) const }; rescinded += unallocated(offer->resources()); - master->removeOffer(offer, true); + master->rescindOffer(offer); agentVisited = true; } @@ -644,12 +641,7 @@ Future<http::Response> Master::QuotaHandler::_update( consumedAndOffered -= ResourceQuantities::fromResources(offer->resources()); - master->allocator->recoverResources( - offer->framework_id(), - offer->slave_id(), - offer->resources(), - None()); - master->removeOffer(offer, true); + master->rescindOffer(offer); } } @@ -685,13 +677,7 @@ Future<http::Response> Master::QuotaHandler::_update( } rescinded += ResourceQuantities::fromResources(offer->resources()); - - master->allocator->recoverResources( - offer->framework_id(), - offer->slave_id(), - offer->resources(), - None()); - master->removeOffer(offer, true); + master->rescindOffer(offer); } } } diff --git a/src/master/weights_handler.cpp b/src/master/weights_handler.cpp index dfb6f06..4ebeb34 100644 --- a/src/master/weights_handler.cpp +++ b/src/master/weights_handler.cpp @@ -303,13 +303,7 @@ void Master::WeightsHandler::rescindOffers( if (rescind) { foreachvalue (const Slave* slave, master->slaves.registered) { foreach (Offer* offer, utils::copy(slave->offers)) { - master->allocator->recoverResources( - offer->framework_id(), - offer->slave_id(), - offer->resources(), - None()); - - master->removeOffer(offer, true); + master->rescindOffer(offer); } } }
