Refactored agent to keep track of local resource providers. Currently, we don't explicitly keep track of local resources providers. This causes the logic for a few methods to be quite complex because we need to reconstruct the resource provider information everytime.
Review: https://reviews.apache.org/r/64477 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c9861e1a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c9861e1a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c9861e1a Branch: refs/heads/master Commit: c9861e1ae5225b4ee2cb160bbb53c3ea9fafd021 Parents: 3f862f3 Author: Jie Yu <[email protected]> Authored: Fri Dec 8 17:31:24 2017 -0800 Committer: Jie Yu <[email protected]> Committed: Mon Dec 11 14:02:00 2017 -0800 ---------------------------------------------------------------------- src/master/validation.cpp | 3 + src/resource_provider/manager.cpp | 18 +- src/resource_provider/message.hpp | 9 +- src/slave/http.cpp | 11 +- src/slave/slave.cpp | 368 +++++++++++---------- src/slave/slave.hpp | 65 +++- src/tests/resource_provider_manager_tests.cpp | 2 +- 7 files changed, 286 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/master/validation.cpp ---------------------------------------------------------------------- diff --git a/src/master/validation.cpp b/src/master/validation.cpp index 38d9a3c..585d8bf 100644 --- a/src/master/validation.cpp +++ b/src/master/validation.cpp @@ -1929,6 +1929,9 @@ Option<Error> validateInverseOffers( namespace operation { +// TODO(jieyu): Validate that resources in an operation is not empty. + + Option<Error> validate( const Offer::Operation::Reserve& reserve, const Option<Principal>& principal, http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/resource_provider/manager.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp index f98611c..bfc917f 100644 --- a/src/resource_provider/manager.cpp +++ b/src/resource_provider/manager.cpp @@ -646,18 +646,26 @@ void ResourceProviderManagerProcess::updateState( // TODO(chhsiao): Report pending operations. - Try<UUID> resourceVersionUuid = + Try<UUID> resourceVersion = UUID::fromBytes(update.resource_version_uuid()); - CHECK_SOME(resourceVersionUuid) + CHECK_SOME(resourceVersion) << "Could not deserialize version of resource provider " - << resourceProvider->info.id() << ": " << resourceVersionUuid.error(); + << resourceProvider->info.id() << ": " << resourceVersion.error(); + + hashmap<UUID, OfferOperation> offerOperations; + foreach (const OfferOperation &operation, update.operations()) { + Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid()); + CHECK_SOME(uuid); + + offerOperations.put(uuid.get(), operation); + } ResourceProviderMessage::UpdateState updateState{ resourceProvider->info, - resourceVersionUuid.get(), + resourceVersion.get(), update.resources(), - {update.operations().begin(), update.operations().end()}}; + std::move(offerOperations)}; ResourceProviderMessage message; message.type = ResourceProviderMessage::Type::UPDATE_STATE; http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/resource_provider/message.hpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp index bbf6bb2..eab90cf 100644 --- a/src/resource_provider/message.hpp +++ b/src/resource_provider/message.hpp @@ -24,6 +24,7 @@ #include <mesos/resources.hpp> #include <stout/check.hpp> +#include <stout/hashmap.hpp> #include <stout/jsonify.hpp> #include <stout/option.hpp> #include <stout/protobuf.hpp> @@ -46,9 +47,9 @@ struct ResourceProviderMessage struct UpdateState { ResourceProviderInfo info; - UUID resourceVersionUuid; - Resources total; - std::vector<OfferOperation> operations; + UUID resourceVersion; + Resources totalResources; + hashmap<UUID, OfferOperation> offerOperations; }; struct UpdateOfferOperationStatus @@ -77,7 +78,7 @@ inline std::ostream& operator<<( return stream << "UPDATE_STATE: " << updateState->info.id() << " " - << updateState->total; + << updateState->totalResources; } case ResourceProviderMessage::Type::UPDATE_OFFER_OPERATION_STATUS: { http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/slave/http.cpp ---------------------------------------------------------------------- diff --git a/src/slave/http.cpp b/src/slave/http.cpp index 738786f..f71adbc 100644 --- a/src/slave/http.cpp +++ b/src/slave/http.cpp @@ -1883,14 +1883,13 @@ Future<Response> Http::getResourceProviders( agent::Response::GetResourceProviders* resourceProviders = response.mutable_get_resource_providers(); - foreachvalue ( - const ResourceProviderInfo& resourceProviderInfo, - slave->resourceProviderInfos) { - agent::Response::GetResourceProviders::ResourceProvider* resourceProvider = + foreachvalue (ResourceProvider* resourceProvider, + slave->resourceProviders) { + agent::Response::GetResourceProviders::ResourceProvider* provider = resourceProviders->add_resource_providers(); - resourceProvider->mutable_resource_provider_info()->CopyFrom( - resourceProviderInfo); + provider->mutable_resource_provider_info() + ->CopyFrom(resourceProvider->info); } return OK(serialize(acceptType, evolve(response)), stringify(acceptType)); http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 373e393..5d4cd6d 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -222,7 +222,7 @@ Slave::Slave(const string& id, qosController(_qosController), secretGenerator(_secretGenerator), authorizer(_authorizer), - resourceVersions({{Option<ResourceProviderID>::none(), UUID::random()}}) {} + resourceVersion(UUID::random()) {} Slave::~Slave() @@ -1540,8 +1540,15 @@ void Slave::doReliableRegistration(Duration maxBackoff) message.mutable_agent_capabilities()->CopyFrom( capabilities.toRepeatedPtrField()); - message.mutable_resource_version_uuids()->CopyFrom( - protobuf::createResourceVersions(resourceVersions)); + ResourceVersionUUID* uuid = message.add_resource_version_uuids(); + uuid->set_uuid(resourceVersion.toBytes()); + + foreachvalue (ResourceProvider* provider, resourceProviders) { + ResourceVersionUUID* uuid = message.add_resource_version_uuids(); + CHECK(provider->info.has_id()); + uuid->mutable_resource_provider_id()->CopyFrom(provider->info.id()); + uuid->set_uuid(provider->resourceVersion.toBytes()); + } // Include checkpointed resources. message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_); @@ -1555,8 +1562,15 @@ void Slave::doReliableRegistration(Duration maxBackoff) message.mutable_agent_capabilities()->CopyFrom( capabilities.toRepeatedPtrField()); - message.mutable_resource_version_uuids()->CopyFrom( - protobuf::createResourceVersions(resourceVersions)); + ResourceVersionUUID* uuid = message.add_resource_version_uuids(); + uuid->set_uuid(resourceVersion.toBytes()); + + foreachvalue (ResourceProvider* provider, resourceProviders) { + ResourceVersionUUID* uuid = message.add_resource_version_uuids(); + CHECK(provider->info.has_id()); + uuid->mutable_resource_provider_id()->CopyFrom(provider->info.id()); + uuid->set_uuid(provider->resourceVersion.toBytes()); + } // Include checkpointed resources. message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_); @@ -2258,35 +2272,37 @@ void Slave::__run( // TODO(bbannier): Also check executor resources. bool kill = false; if (!resourceVersionUuids.empty()) { - hashset<Option<ResourceProviderID>> usedResourceProviders; + hashset<Option<ResourceProviderID>> usedResourceProviderIds; foreach (const TaskInfo& _task, tasks) { foreach (const Resource& resource, _task.resources()) { - if (resource.has_provider_id()) { - usedResourceProviders.insert(resource.provider_id()); - } else { - usedResourceProviders.insert(None()); - } + usedResourceProviderIds.insert(resource.has_provider_id() + ? Option<ResourceProviderID>(resource.provider_id()) + : None()); } } const hashmap<Option<ResourceProviderID>, UUID> receivedResourceVersions = - protobuf::parseResourceVersions( - {resourceVersionUuids.begin(), resourceVersionUuids.end()}); + protobuf::parseResourceVersions({ + resourceVersionUuids.begin(), + resourceVersionUuids.end()}); - foreach (auto&& resourceProvider, usedResourceProviders) { - Option<Error> error = None(); + foreach (const Option<ResourceProviderID>& resourceProviderId, + usedResourceProviderIds) { + if (resourceProviderId.isNone()) { + CHECK(receivedResourceVersions.contains(None())); - if (!resourceVersions.contains(resourceProvider)) { - // We do not expect the agent to forget about itself. - CHECK_SOME(resourceProvider); - kill = true; - } - - CHECK(receivedResourceVersions.contains(resourceProvider)); + if (resourceVersion != receivedResourceVersions.at(None())) { + kill = true; + } + } else { + ResourceProvider* resourceProvider = + getResourceProvider(resourceProviderId.get()); - if (resourceVersions.at(resourceProvider) != - receivedResourceVersions.at(resourceProvider)) { - kill = true; + if (resourceProvider == nullptr || + resourceProvider->resourceVersion != + receivedResourceVersions.at(resourceProviderId.get())) { + kill = true; + } } } } @@ -7010,76 +7026,44 @@ UpdateSlaveMessage Slave::generateOversubscribedUpdate() const UpdateSlaveMessage Slave::generateResourceProviderUpdate() const { - UpdateSlaveMessage message; - - message.mutable_slave_id()->CopyFrom(info.id()); - // Agent information (total resources, offer operations, resource // versions) is not passed as part of some `ResourceProvider`, but // globally in `UpdateStateMessage`. // // TODO(bbannier): Pass agent information as a resource provider. - - // Process total resources. - hashmap<ResourceProviderID, UpdateSlaveMessage::ResourceProvider> - resourceProviders; - - foreach (const Resource& resource, totalResources) { - if (resource.has_provider_id()) { - resourceProviders[resource.provider_id()].add_total_resources()->CopyFrom( - resource); - } - } - - // Process offer operations. - UpdateSlaveMessage::OfferOperations* operations = - message.mutable_offer_operations(); + UpdateSlaveMessage message; + message.mutable_slave_id()->CopyFrom(info.id()); + message.set_resource_version_uuid(resourceVersion.toBytes()); + message.mutable_offer_operations(); foreachvalue (const OfferOperation* operation, offerOperations) { Result<ResourceProviderID> resourceProviderId = getResourceProviderId(operation->info()); - if (resourceProviderId.isSome()) { - resourceProviders[resourceProviderId.get()] - .mutable_operations() - ->add_operations() - ->CopyFrom(*operation); - } else if (resourceProviderId.isNone()) { - operations->add_operations()->CopyFrom(*operation); + if (resourceProviderId.isNone()) { + message.mutable_offer_operations() + ->add_operations()->CopyFrom(*operation); } } - // Make sure 'offer_operations' is always set for resource providers. - foreachkey ( - const ResourceProviderID& resourceProviderId, - resourceProviderInfos) { - resourceProviders[resourceProviderId].mutable_operations(); - } - - // Process resource versions. - CHECK(resourceVersions.contains(None())); - message.set_resource_version_uuid(resourceVersions.at(None()).toBytes()); - - foreachpair ( - const ResourceProviderID& providerId, - UpdateSlaveMessage::ResourceProvider& provider, - resourceProviders) { - CHECK(resourceVersions.contains(providerId)); - provider.set_resource_version_uuid( - resourceVersions.at(providerId).toBytes()); + foreachvalue (ResourceProvider* resourceProvider, resourceProviders) { + UpdateSlaveMessage::ResourceProvider* provider = + message.mutable_resource_providers()->add_providers(); - CHECK(resourceProviderInfos.contains(providerId)); - provider.mutable_info()->CopyFrom(resourceProviderInfos.at(providerId)); - } + provider->mutable_info()->CopyFrom( + resourceProvider->info); + provider->mutable_total_resources()->CopyFrom( + resourceProvider->totalResources); + provider->set_resource_version_uuid( + resourceProvider->resourceVersion.toBytes()); - // We only actually surface resource-provider related information if - // this agent is resource provider-capable. - if (capabilities.resourceProvider) { - list<UpdateSlaveMessage::ResourceProvider> resourceProviders_ = - resourceProviders.values(); + provider->mutable_operations(); - message.mutable_resource_providers()->mutable_providers()->CopyFrom( - {resourceProviders_.begin(), resourceProviders_.end()}); + foreachvalue (const OfferOperation* operation, + resourceProvider->offerOperations) { + provider->mutable_operations() + ->add_operations()->CopyFrom(*operation); + } } return message; @@ -7120,75 +7104,61 @@ void Slave::handleResourceProviderMessage( case ResourceProviderMessage::Type::UPDATE_STATE: { CHECK_SOME(message->updateState); - const Resources& newTotal = message->updateState->total; + const ResourceProviderMessage::UpdateState& updateState = + message->updateState.get(); - CHECK(message->updateState->info.has_id()); + CHECK(updateState.info.has_id()); + const ResourceProviderID& resourceProviderId = updateState.info.id(); - const ResourceProviderID& resourceProviderId = - message->updateState->info.id(); + ResourceProvider* resourceProvider = + getResourceProvider(resourceProviderId); - if (resourceProviderInfos.contains(resourceProviderId)) { - resourceProviderInfos[resourceProviderId] = message->updateState->info; - } else { - resourceProviderInfos.put( - resourceProviderId, - message->updateState->info); - } + if (resourceProvider == nullptr) { + resourceProvider = new ResourceProvider( + updateState.info, + updateState.totalResources, + updateState.resourceVersion); - const Resources oldTotal = - totalResources.filter([&resourceProviderId](const Resource& resource) { - return resource.provider_id() == resourceProviderId; - }); + addResourceProvider(resourceProvider); - bool updated = false; - - if (oldTotal != newTotal) { - totalResources -= oldTotal; - totalResources += newTotal; - - updated = true; - } - - // Update offer operation state. - // - // We only update offer operations which are not contained in both the - // known and just received sets. All other offer operations will be - // updated via relayed offer operation status updates. - auto isForResourceProvider = [resourceProviderId]( - const OfferOperation& operation) { - Result<ResourceProviderID> id = getResourceProviderId(operation.info()); - return id.isSome() && resourceProviderId == id.get(); - }; - - hashmap<UUID, OfferOperation*> knownOfferOperations; - foreachpair(auto&& uuid, auto&& operation, offerOperations) { - if (isForResourceProvider(*operation)) { - knownOfferOperations.put(uuid, operation); + foreachvalue (const OfferOperation& operation, + updateState.offerOperations) { + addOfferOperation(new OfferOperation(operation)); } - } - hashmap<UUID, OfferOperation> receivedOfferOperations; - foreach ( - const OfferOperation& operation, - message->updateState->operations) { - CHECK(isForResourceProvider(operation)) - << "Received operation on unexpected resource provider " - << "from resource provider " << resourceProviderId; - - Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid()); - CHECK_SOME(operationUuid); + // Update the 'total' in the Slave. + totalResources += updateState.totalResources; + } else { + // Always update the resource provider info. + resourceProvider->info = updateState.info; - receivedOfferOperations.put(operationUuid.get(), operation); - } + if (resourceProvider->totalResources != updateState.totalResources) { + // Update the 'total' in the Slave. + CHECK(totalResources.contains(resourceProvider->totalResources)); + totalResources -= resourceProvider->totalResources; + totalResources += updateState.totalResources; - const hashset<UUID> knownUuids = knownOfferOperations.keys(); - const hashset<UUID> receivedUuids = receivedOfferOperations.keys(); + // Update the 'total' in the resource provider. + resourceProvider->totalResources = updateState.totalResources; + } - if (knownUuids != receivedUuids) { - // Handle offer operations known to the agent but not reported by the - // resource provider. These could be operations where the agent has - // started tracking an offer operation, but the resource provider failed - // over before it could bookkeep the operation. + // Update offer operation state. + // + // We only update offer operations which are not contained in + // both the known and just received sets. All other offer + // operations will be updated via relayed offer operation + // status updates. + const hashset<UUID> knownUuids = + resourceProvider->offerOperations.keys(); + + const hashset<UUID> receivedUuids = + updateState.offerOperations.keys(); + + // Handle offer operations known to the agent but not reported + // by the resource provider. These could be operations where + // the agent has started tracking an offer operation, but the + // resource provider failed over before it could bookkeep the + // operation. // // NOTE: We do not mutate offer operations statuses here; this // would be the responsibility of a offer operation status @@ -7203,13 +7173,13 @@ void Slave::handleResourceProviderMessage( disappearedOperations, disappearedOperations.begin())); foreach (const UUID& uuid, disappearedOperations) { - // TODO(bbannier): Instead of simply dropping an operation with - // `removeOfferOperation` here we should instead send a `Reconcile` - // message with a failed state to the resource provider so its status - // update manager can reliably deliver the operation status to the - // framework. - CHECK(offerOperations.contains(uuid)); - removeOfferOperation(offerOperations.at(uuid)); + // TODO(bbannier): Instead of simply dropping an operation + // with `removeOfferOperation` here we should instead send a + // `Reconcile` message with a failed state to the resource + // provider so its status update manager can reliably + // deliver the operation status to the framework. + CHECK(resourceProvider->offerOperations.contains(uuid)); + removeOfferOperation(resourceProvider->offerOperations.at(uuid)); } // Handle offer operations known to the resource provider but @@ -7228,27 +7198,13 @@ void Slave::handleResourceProviderMessage( // // NOTE: We do not need to update total resources here as its // state was sync explicitly with the received total above. - CHECK(receivedOfferOperations.contains(uuid)); + CHECK(updateState.offerOperations.contains(uuid)); addOfferOperation( - new OfferOperation(receivedOfferOperations.at(uuid))); - } - - updated = true; - } - - // Update resource version of this resource provider. - const UUID& resourceVersionUuid = - message->updateState->resourceVersionUuid; - - if (!resourceVersions.contains(resourceProviderId) || - resourceVersions.at(resourceProviderId) != resourceVersionUuid) { - if (resourceVersions.contains(resourceProviderId)) { - resourceVersions.at(resourceProviderId) = resourceVersionUuid; - } else { - resourceVersions.insert({resourceProviderId, resourceVersionUuid}); + new OfferOperation(updateState.offerOperations.at(uuid))); } - updated = true; + // Update resource version of this resource provider. + resourceProvider->resourceVersion = updateState.resourceVersion; } // Send the updated resources to the master if the agent is running. Note @@ -7263,14 +7219,12 @@ void Slave::handleResourceProviderMessage( break; } case RUNNING: { - if (updated) { - LOG(INFO) << "Forwarding new total resources " << totalResources; + LOG(INFO) << "Forwarding new total resources " << totalResources; - // Inform the master about the update from the resource provider. - send(master.get(), generateResourceProviderUpdate()); + // Inform the master about the update from the resource provider. + send(master.get(), generateResourceProviderUpdate()); - break; - } + break; } } break; @@ -7338,6 +7292,22 @@ void Slave::addOfferOperation(OfferOperation* operation) CHECK_SOME(uuid); offerOperations.put(uuid.get(), operation); + + Result<ResourceProviderID> resourceProviderId = + getResourceProviderId(operation->info()); + + CHECK(!resourceProviderId.isError()) + << "Failed to get resource provider ID: " + << resourceProviderId.error(); + + if (resourceProviderId.isSome()) { + ResourceProvider* resourceProvider = + getResourceProvider(resourceProviderId.get()); + + CHECK_NOTNULL(resourceProvider); + + resourceProvider->addOfferOperation(operation); + } } @@ -7441,6 +7411,22 @@ void Slave::removeOfferOperation(OfferOperation* operation) Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid()); CHECK_SOME(uuid); + Result<ResourceProviderID> resourceProviderId = + getResourceProviderId(operation->info()); + + CHECK(!resourceProviderId.isError()) + << "Failed to get resource provider ID: " + << resourceProviderId.error(); + + if (resourceProviderId.isSome()) { + ResourceProvider* resourceProvider = + getResourceProvider(resourceProviderId.get()); + + CHECK_NOTNULL(resourceProvider); + + resourceProvider->removeOfferOperation(operation); + } + CHECK(offerOperations.contains(uuid.get())) << "Unknown offer operation (uuid: " << uuid->toString() << ")"; @@ -7458,6 +7444,26 @@ OfferOperation* Slave::getOfferOperation(const UUID& uuid) const } +void Slave::addResourceProvider(ResourceProvider* resourceProvider) +{ + CHECK(resourceProvider->info.has_id()); + CHECK(!resourceProviders.contains(resourceProvider->info.id())); + + resourceProviders.put( + resourceProvider->info.id(), + resourceProvider); +} + + +ResourceProvider* Slave::getResourceProvider(const ResourceProviderID& id) const +{ + if (resourceProviders.contains(id)) { + return resourceProviders.at(id); + } + return nullptr; +} + + void Slave::apply(const vector<ResourceConversion>& conversions) { Try<Resources> resources = totalResources.apply(conversions); @@ -9076,6 +9082,30 @@ Resources Executor::allocatedResources() const } +void ResourceProvider::addOfferOperation(OfferOperation* operation) +{ + Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid()); + CHECK_SOME(uuid); + + CHECK(!offerOperations.contains(uuid.get())) + << "Offer operation (uuid: " << uuid->toString() << ") already exists"; + + offerOperations.put(uuid.get(), operation); +} + + +void ResourceProvider::removeOfferOperation(OfferOperation* operation) +{ + Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid()); + CHECK_SOME(uuid); + + CHECK(offerOperations.contains(uuid.get())) + << "Unknown offer operation (uuid: " << uuid->toString() << ")"; + + offerOperations.erase(uuid.get()); +} + + map<string, string> executorEnvironment( const Flags& flags, const ExecutorInfo& executorInfo, http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index b3a1e70..7c40fc7 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -108,6 +108,7 @@ class Executor; class Framework; struct HttpConnection; +struct ResourceProvider; class Slave : public ProtobufProcess<Slave> @@ -577,6 +578,9 @@ private: OfferOperation* getOfferOperation(const UUID& uuid) const; + void addResourceProvider(ResourceProvider* resourceProvider); + ResourceProvider* getResourceProvider(const ResourceProviderID& id) const; + void apply(const std::vector<ResourceConversion>& conversions); // Publish all resources that are needed to run the current set of @@ -730,12 +734,28 @@ private: ResourceProviderManager resourceProviderManager; process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon; - hashmap<Option<ResourceProviderID>, UUID> resourceVersions; - - hashmap<ResourceProviderID, ResourceProviderInfo> resourceProviderInfos; - // Pending operations or terminal operations that have - // unacknowledged status updates. + // Local resource providers known by the agent. + hashmap<ResourceProviderID, ResourceProvider*> resourceProviders; + + // Used to establish the relationship between the operation and the + // resources that the operation is operating on. Each resource + // provider will keep a resource version UUID, and change it when it + // believes that the resources from this resource provider are out + // of sync from the master's view. The master will keep track of + // the last known resource version UUID for each resource provider, + // and attach the resource version UUID in each operation it sends + // out. The resource provider should reject operations that have a + // different resource version UUID than that it maintains, because + // this means the operation is operating on resources that might + // have already been invalidated. + UUID resourceVersion; + + // Keeps track of the following: + // (1) Pending operations for resources from the agent. + // (2) Pending operations or terminal operations that have + // unacknowledged status updates for resource provider + // provided resources. hashmap<UUID, OfferOperation*> offerOperations; }; @@ -1034,6 +1054,41 @@ private: }; +struct ResourceProvider +{ + ResourceProvider( + const ResourceProviderInfo& _info, + const Resources& _totalResources, + const UUID& _resourceVersion) + : info(_info), + totalResources(_totalResources), + resourceVersion(_resourceVersion) {} + + void addOfferOperation(OfferOperation* operation); + void removeOfferOperation(OfferOperation* operation); + + ResourceProviderInfo info; + Resources totalResources; + + // Used to establish the relationship between the operation and the + // resources that the operation is operating on. Each resource + // provider will keep a resource version UUID, and change it when it + // believes that the resources from this resource provider are out + // of sync from the master's view. The master will keep track of + // the last known resource version UUID for each resource provider, + // and attach the resource version UUID in each operation it sends + // out. The resource provider should reject operations that have a + // different resource version UUID than that it maintains, because + // this means the operation is operating on resources that might + // have already been invalidated. + UUID resourceVersion; + + // Pending operations or terminal operations that have + // unacknowledged status updates. + hashmap<UUID, OfferOperation*> offerOperations; +}; + + /** * Returns a map of environment variables necessary in order to launch * an executor. http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/tests/resource_provider_manager_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp index a6eb4c9..e37a53a 100644 --- a/src/tests/resource_provider_manager_tests.cpp +++ b/src/tests/resource_provider_manager_tests.cpp @@ -343,7 +343,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateState) EXPECT_EQ( devolve(resourceProviderId.get()), message->updateState->info.id()); - EXPECT_EQ(devolve(resources), message->updateState->total); + EXPECT_EQ(devolve(resources), message->updateState->totalResources); } }
