Reconciled offer operations between agent and master. This patch adds master reconciliation logic to interpret agent offer operation state from 'UpdateSlaveMessage'. The approach we take is to unpack the agent update into resource providers updates and update the master's view of each resource provider individually.
Review: https://reviews.apache.org/r/63732/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8ccbdb9b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8ccbdb9b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8ccbdb9b Branch: refs/heads/master Commit: 8ccbdb9babe960f86bba5ae573e5939ee22981da Parents: b220abc Author: Benjamin Bannier <[email protected]> Authored: Thu Nov 30 17:03:57 2017 +0100 Committer: Benjamin Bannier <[email protected]> Committed: Thu Nov 30 18:33:58 2017 +0100 ---------------------------------------------------------------------- src/master/master.cpp | 334 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 330 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/8ccbdb9b/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 16978c0..eadc008 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -25,6 +25,7 @@ #include <set> #include <sstream> #include <tuple> +#include <utility> #include <mesos/module.hpp> #include <mesos/roles.hpp> @@ -7027,6 +7028,8 @@ void Master::updateSlave(const UpdateSlaveMessage& message) message_.mutable_total_resources(), POST_RESERVATION_REFINEMENT); + // Agents will send a total if a resource provider subscribed or went away. + // Process resources and operations grouped by resource provider. if (hasTotal) { const Resources& totalResources = message_.total_resources(); @@ -7053,22 +7056,345 @@ void Master::updateSlave(const UpdateSlaveMessage& message) newTotal.getOrElse(slave->totalResources.nonRevocable()) + newOversubscribed.getOrElse(slave->totalResources.revocable()); + bool updated = slave->totalResources != newSlaveResources; + // Agents which can support resource providers always update the // master on their resource versions uuids via `UpdateSlaveMessage`. if (slave->capabilities.resourceProvider) { - slave->resourceVersions = + hashmap<Option<ResourceProviderID>, UUID> resourceVersions = protobuf::parseResourceVersions(message.resource_version_uuids()); + + updated = updated || slave->resourceVersions != resourceVersions; + slave->resourceVersions = resourceVersions; } - if (newSlaveResources == slave->totalResources) { + // Check if the known offer operations for this agent changed. + updated = + updated || + (slave->offerOperations.empty() && message.has_offer_operations()) || + (!slave->offerOperations.empty() && !message.has_offer_operations()); + if (!updated) { + const hashset<UUID> knownOfferOperations = slave->offerOperations.keys(); + hashset<UUID> receivedOfferOperations; + + foreach ( + const OfferOperation& operation, + message.offer_operations().operations()) { + Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid()); + CHECK_SOME(operationUuid); + receivedOfferOperations.insert(operationUuid.get()); + } + + updated = updated || knownOfferOperations != receivedOfferOperations; + } + + if (!updated) { LOG(INFO) << "Ignoring update on agent " << *slave << " as it reports no changes"; return; } - slave->totalResources = newSlaveResources; + struct ResourceProvider + { + Option<Resources> oldTotal; + Option<Resources> newTotal; + Option<hashmap<UUID, OfferOperation>> oldOfferOperations; + Option<hashmap<UUID, OfferOperation>> newOfferOperations; + }; + + // We store information on the different `ResourceProvider`s on this agent in + // a map, indexed by an optional provider id. Since the provider ID field for + // resources is only set for resources from true resource providers and is not + // set for agent default resources, the value for the key `None` points to + // information about the agent itself, not its resource providers. + hashmap<Option<ResourceProviderID>, ResourceProvider> resourceProviders; + + // Group the resources and operation updates by resource provider. + { + auto groupResourcesByProviderId = [](const Resources& resources) { + hashmap<Option<ResourceProviderID>, Resources> result; + + foreach (const Resource& resource, resources) { + Option<ResourceProviderID> providerId = + Resources::hasResourceProvider(resource) + ? resource.provider_id() + : Option<ResourceProviderID>::none(); + + result[std::move(providerId)] += resource; + } + + return result; + }; + + foreachpair ( + const Option<ResourceProviderID>& providerId, + const Resources& resources, + groupResourcesByProviderId(slave->totalResources)) { + resourceProviders[providerId].oldTotal = resources; + } + + foreachpair ( + const Option<ResourceProviderID>& providerId, + const Resources& resources, + groupResourcesByProviderId(newSlaveResources)) { + // Implicitly create a new record if none exists. + resourceProviders[providerId].newTotal = resources; + } + + foreachpair ( + const UUID& uuid, + OfferOperation* operation, + slave->offerOperations) { + Result<ResourceProviderID> providerId_ = + getResourceProviderId(operation->info()); + + CHECK(!providerId_.isError()) + << "Failed to extract resource provider id from known operation: " + << providerId_.error(); + + Option<ResourceProviderID> providerId = + providerId_.isSome() + ? providerId_.get() + : Option<ResourceProviderID>::none(); + + // Set up an init empty list of existing operations. We might + // create a record for this resource provider if needed. + if (resourceProviders[providerId].oldOfferOperations.isNone()) { + resourceProviders.at(providerId).oldOfferOperations = + hashmap<UUID, OfferOperation>(); + } + + resourceProviders.at(providerId) + .oldOfferOperations->emplace(uuid, *operation); + } + + // Process received offer operations. + foreach ( + const OfferOperation& operation, + message.offer_operations().operations()) { + Result<ResourceProviderID> providerId_ = + getResourceProviderId(operation.info()); + + CHECK(!providerId_.isError()) + << "Failed to extract resource provider id from known operation: " + << providerId_.error(); + + Option<ResourceProviderID> providerId = + providerId_.isSome() + ? providerId_.get() + : Option<ResourceProviderID>::none(); + + // Set up an init empty list of new operations. We might + // create a record for this resource provider if needed. + if (resourceProviders[providerId].newOfferOperations.isNone()) { + resourceProviders.at(providerId).newOfferOperations = + hashmap<UUID, OfferOperation>(); + } + + Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid()); + CHECK_SOME(uuid) << "Could not deserialize operation id when reconciling " + "offer operations"; + + resourceProviders.at(providerId) + .newOfferOperations->emplace(uuid.get(), operation); + } + } + + // Check invariants of the received update. + { + foreachpair ( + const Option<ResourceProviderID>& providerId, + const ResourceProvider& provider, + resourceProviders) { + const bool isNewResourceProvider = + provider.oldTotal.isNone() && provider.oldOfferOperations.isNone(); + + if (!isNewResourceProvider) { + // For known resource providers the master should always know at least + // as many non-terminal offer operations as the agent. While an + // operation might get lost on the way to the agent or resource + // provider, or become terminal inside the agent, the master would never + // make an offer operation known to the agent terminal with the agent + // doing that first. + // + // NOTE: We only consider non-terminal operations here as there is an + // edge case where the master removes a terminal offer operation from + // its own state when it passes on an acknowledgement from a framework + // to the agent, but the agent fails over before it can process the + // acknowledgement, or the agent initiates an unrelated + // `UpdateSlaveMessage`. + auto extractPendingOperations = + [](const hashmap<UUID, OfferOperation>& source, + hashset<UUID>* target) { + foreachpair ( + const UUID& uuid, const OfferOperation& operation, source) { + if (!protobuf::isTerminalState( + operation.latest_status().state())) { + target->insert(uuid); + } + } + }; + + hashset<UUID> oldPendingOperations; + hashset<UUID> newPendingOperations; + + if (provider.oldOfferOperations.isSome()) { + extractPendingOperations( + provider.oldOfferOperations.get(), &oldPendingOperations); + } + + if (provider.newOfferOperations.isSome()) { + extractPendingOperations( + provider.newOfferOperations.get(), &newPendingOperations); + } + + foreach (const UUID& uuid, newPendingOperations) { + CHECK(oldPendingOperations.contains(uuid)) + << "Agent tried to reconcile unknown non-terminal offer " + "operation " + << uuid.toString(); + } + } + + if (providerId.isNone()) { + // We do not permit changes to agent (i.e., non-resource + // provider) non-revocable resources. + CHECK_SOME(provider.oldTotal); + CHECK_SOME(provider.newTotal); + + Resources oldNonRevocable = + provider.oldTotal->nonRevocable().createStrippedScalarQuantity(); + Resources newNonRevocable = + provider.newTotal->nonRevocable().createStrippedScalarQuantity(); + CHECK_EQ( + provider.oldTotal->nonRevocable(), + provider.newTotal->nonRevocable()); + + // For agents only speculative operations can be reconciled. + // + // TODO(bbannier): Reconcile agent operations in + // `ReregisterSlaveMessage` in which case we expect agents to + // send the already known offer operations again here + // (possibly with changed status). + if (provider.newOfferOperations.isSome()) { + foreachvalue ( + const OfferOperation& operation, + provider.newOfferOperations.get()) { + CHECK(protobuf::isSpeculativeOperation(operation.info())); + } + } + } + } + } + + // Update master and allocator state. + foreachpair ( + const Option<ResourceProviderID>& providerId, + const ResourceProvider& provider, + resourceProviders) { + const bool isNewResourceProvider = + provider.oldTotal.isNone() && provider.oldOfferOperations.isNone(); + + // Below we only add offer operations to our state from resource providers + // which are unknown, or possibly remove them for known resource providers. + // This works since the master should always known more offer operations of + // known resource provider than any resource provider itself. + // + // NOTE: We do not mutate offer operations statuses here; this + // would be the responsibility of a offer operation status + // update handler. + // + // There still exists a edge case where the master might remove a + // terminal offer operation from its state when passing an + // acknowledgement from a framework on to the agent with the agent + // failing over before the acknowledgement can be processed. In + // that case the agent would track an operation unknown to the + // master. + // + // TODO(bbannier): We might want to consider to also learn about + // new (terminal) operations when observing messages from status + // update managers to frameworks. + + if (isNewResourceProvider) { + // If this is a not previously seen resource provider with + // operations we had a master failover. Add the resources and + // operations to our state. + CHECK_SOME(providerId); + CHECK_SOME(provider.newTotal); + CHECK(!slave->totalResources.contains(provider.newTotal.get())); + + slave->totalResources += provider.newTotal.get(); + + hashmap<FrameworkID, Resources> usedByOperations; + + if (provider.newOfferOperations.isSome()) { + foreachpair ( + const UUID& uuid, + const OfferOperation& operation, + provider.newOfferOperations.get()) { + // Update to bookkeeping of operations. + CHECK(!slave->offerOperations.contains(uuid)) + << "New operation " << uuid.toString() << " is already known"; + + Framework* framework = nullptr; + if (operation.has_framework_id()) { + framework = getFramework(operation.framework_id()); + } + + addOfferOperation(framework, slave, new OfferOperation(operation)); + } + } + + allocator->addResourceProvider( + slaveId, + provider.newTotal.get(), + usedByOperations); + } else { + // If this is a known resource provider or agent its total capacity cannot + // have changed, and it would not know about any non-terminal offer + // operations not already known to the master. It might however have not + // received an offer operations since the resource provider or agent fell + // over before the message could be received. We need to remove these + // operations from our state. + + // Reconcile offer operations. This includes recovering + // resources in used by operations which did not reach the + // agent or resource provider. + if (provider.oldOfferOperations.isSome()) { + foreachkey (const UUID& uuid, provider.oldOfferOperations.get()) { + if (provider.newOfferOperations.isNone() || + !provider.newOfferOperations->contains(uuid)) { + // TODO(bbannier): Instead of simply dropping an operation with + // `removeOfferOperation` here we should instead send a `Reconcile` + // message with a failed state to the agent so its status update + // manager can reliably deliver the operation status to the + // framework. + LOG(WARNING) << "Dropping known offer operation " << uuid.toString() + << " since it was not present in reconciliation " + "message from agent"; + + CHECK(slave->offerOperations.contains(uuid)); + removeOfferOperation(slave->offerOperations.at(uuid)); + } + } + } + + // Reconcile the total resources. This includes undoing + // speculated operations which are only visible in the total, + // but never in the used resources. We explicitly allow for + // resource providers to change from or to zero capacity. + if (provider.oldTotal.isSome()) { + CHECK(slave->totalResources.contains(provider.oldTotal.get())); + slave->totalResources -= provider.oldTotal.get(); + } + + if (provider.newTotal.isSome()) { + slave->totalResources += provider.newTotal.get(); + } + } + } - // Now update the agent's resources in the allocator. + // Now update the agent's total resources in the allocator. allocator->updateSlave(slaveId, slave->totalResources); // Then rescind outstanding offers affected by the update.
