Refactored agent to keep track of local resource providers.

Currently, we don't explicitly keep track of local resources providers.
This causes the logic for a few methods to be quite complex because we
need to reconstruct the resource provider information everytime.

Review: https://reviews.apache.org/r/64477


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c9861e1a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c9861e1a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c9861e1a

Branch: refs/heads/master
Commit: c9861e1ae5225b4ee2cb160bbb53c3ea9fafd021
Parents: 3f862f3
Author: Jie Yu <[email protected]>
Authored: Fri Dec 8 17:31:24 2017 -0800
Committer: Jie Yu <[email protected]>
Committed: Mon Dec 11 14:02:00 2017 -0800

----------------------------------------------------------------------
 src/master/validation.cpp                     |   3 +
 src/resource_provider/manager.cpp             |  18 +-
 src/resource_provider/message.hpp             |   9 +-
 src/slave/http.cpp                            |  11 +-
 src/slave/slave.cpp                           | 368 +++++++++++----------
 src/slave/slave.hpp                           |  65 +++-
 src/tests/resource_provider_manager_tests.cpp |   2 +-
 7 files changed, 286 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 38d9a3c..585d8bf 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -1929,6 +1929,9 @@ Option<Error> validateInverseOffers(
 
 namespace operation {
 
+// TODO(jieyu): Validate that resources in an operation is not empty.
+
+
 Option<Error> validate(
     const Offer::Operation::Reserve& reserve,
     const Option<Principal>& principal,

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp 
b/src/resource_provider/manager.cpp
index f98611c..bfc917f 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -646,18 +646,26 @@ void ResourceProviderManagerProcess::updateState(
 
   // TODO(chhsiao): Report pending operations.
 
-  Try<UUID> resourceVersionUuid =
+  Try<UUID> resourceVersion =
     UUID::fromBytes(update.resource_version_uuid());
 
-  CHECK_SOME(resourceVersionUuid)
+  CHECK_SOME(resourceVersion)
     << "Could not deserialize version of resource provider "
-    << resourceProvider->info.id() << ": " << resourceVersionUuid.error();
+    << resourceProvider->info.id() << ": " << resourceVersion.error();
+
+  hashmap<UUID, OfferOperation> offerOperations;
+  foreach (const OfferOperation &operation, update.operations()) {
+    Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
+    CHECK_SOME(uuid);
+
+    offerOperations.put(uuid.get(), operation);
+  }
 
   ResourceProviderMessage::UpdateState updateState{
       resourceProvider->info,
-      resourceVersionUuid.get(),
+      resourceVersion.get(),
       update.resources(),
-      {update.operations().begin(), update.operations().end()}};
+      std::move(offerOperations)};
 
   ResourceProviderMessage message;
   message.type = ResourceProviderMessage::Type::UPDATE_STATE;

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/resource_provider/message.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/message.hpp 
b/src/resource_provider/message.hpp
index bbf6bb2..eab90cf 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -24,6 +24,7 @@
 #include <mesos/resources.hpp>
 
 #include <stout/check.hpp>
+#include <stout/hashmap.hpp>
 #include <stout/jsonify.hpp>
 #include <stout/option.hpp>
 #include <stout/protobuf.hpp>
@@ -46,9 +47,9 @@ struct ResourceProviderMessage
   struct UpdateState
   {
     ResourceProviderInfo info;
-    UUID resourceVersionUuid;
-    Resources total;
-    std::vector<OfferOperation> operations;
+    UUID resourceVersion;
+    Resources totalResources;
+    hashmap<UUID, OfferOperation> offerOperations;
   };
 
   struct UpdateOfferOperationStatus
@@ -77,7 +78,7 @@ inline std::ostream& operator<<(
       return stream
           << "UPDATE_STATE: "
           << updateState->info.id() << " "
-          << updateState->total;
+          << updateState->totalResources;
     }
 
     case ResourceProviderMessage::Type::UPDATE_OFFER_OPERATION_STATUS: {

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 738786f..f71adbc 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -1883,14 +1883,13 @@ Future<Response> Http::getResourceProviders(
   agent::Response::GetResourceProviders* resourceProviders =
     response.mutable_get_resource_providers();
 
-  foreachvalue (
-      const ResourceProviderInfo& resourceProviderInfo,
-      slave->resourceProviderInfos) {
-    agent::Response::GetResourceProviders::ResourceProvider* resourceProvider =
+  foreachvalue (ResourceProvider* resourceProvider,
+                slave->resourceProviders) {
+    agent::Response::GetResourceProviders::ResourceProvider* provider =
       resourceProviders->add_resource_providers();
 
-    resourceProvider->mutable_resource_provider_info()->CopyFrom(
-        resourceProviderInfo);
+    provider->mutable_resource_provider_info()
+      ->CopyFrom(resourceProvider->info);
   }
 
   return OK(serialize(acceptType, evolve(response)), stringify(acceptType));

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 373e393..5d4cd6d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -222,7 +222,7 @@ Slave::Slave(const string& id,
     qosController(_qosController),
     secretGenerator(_secretGenerator),
     authorizer(_authorizer),
-    resourceVersions({{Option<ResourceProviderID>::none(), UUID::random()}}) {}
+    resourceVersion(UUID::random()) {}
 
 
 Slave::~Slave()
@@ -1540,8 +1540,15 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     message.mutable_agent_capabilities()->CopyFrom(
         capabilities.toRepeatedPtrField());
 
-    message.mutable_resource_version_uuids()->CopyFrom(
-        protobuf::createResourceVersions(resourceVersions));
+    ResourceVersionUUID* uuid = message.add_resource_version_uuids();
+    uuid->set_uuid(resourceVersion.toBytes());
+
+    foreachvalue (ResourceProvider* provider, resourceProviders) {
+      ResourceVersionUUID* uuid = message.add_resource_version_uuids();
+      CHECK(provider->info.has_id());
+      uuid->mutable_resource_provider_id()->CopyFrom(provider->info.id());
+      uuid->set_uuid(provider->resourceVersion.toBytes());
+    }
 
     // Include checkpointed resources.
     message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
@@ -1555,8 +1562,15 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     message.mutable_agent_capabilities()->CopyFrom(
         capabilities.toRepeatedPtrField());
 
-    message.mutable_resource_version_uuids()->CopyFrom(
-        protobuf::createResourceVersions(resourceVersions));
+    ResourceVersionUUID* uuid = message.add_resource_version_uuids();
+    uuid->set_uuid(resourceVersion.toBytes());
+
+    foreachvalue (ResourceProvider* provider, resourceProviders) {
+      ResourceVersionUUID* uuid = message.add_resource_version_uuids();
+      CHECK(provider->info.has_id());
+      uuid->mutable_resource_provider_id()->CopyFrom(provider->info.id());
+      uuid->set_uuid(provider->resourceVersion.toBytes());
+    }
 
     // Include checkpointed resources.
     message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
@@ -2258,35 +2272,37 @@ void Slave::__run(
   // TODO(bbannier): Also check executor resources.
   bool kill = false;
   if (!resourceVersionUuids.empty()) {
-    hashset<Option<ResourceProviderID>> usedResourceProviders;
+    hashset<Option<ResourceProviderID>> usedResourceProviderIds;
     foreach (const TaskInfo& _task, tasks) {
       foreach (const Resource& resource, _task.resources()) {
-        if (resource.has_provider_id()) {
-          usedResourceProviders.insert(resource.provider_id());
-        } else {
-          usedResourceProviders.insert(None());
-        }
+        usedResourceProviderIds.insert(resource.has_provider_id()
+           ? Option<ResourceProviderID>(resource.provider_id())
+           : None());
       }
     }
 
     const hashmap<Option<ResourceProviderID>, UUID> receivedResourceVersions =
-      protobuf::parseResourceVersions(
-          {resourceVersionUuids.begin(), resourceVersionUuids.end()});
+      protobuf::parseResourceVersions({
+          resourceVersionUuids.begin(),
+          resourceVersionUuids.end()});
 
-    foreach (auto&& resourceProvider, usedResourceProviders) {
-      Option<Error> error = None();
+    foreach (const Option<ResourceProviderID>& resourceProviderId,
+             usedResourceProviderIds) {
+      if (resourceProviderId.isNone()) {
+        CHECK(receivedResourceVersions.contains(None()));
 
-      if (!resourceVersions.contains(resourceProvider)) {
-        // We do not expect the agent to forget about itself.
-        CHECK_SOME(resourceProvider);
-        kill = true;
-      }
-
-      CHECK(receivedResourceVersions.contains(resourceProvider));
+        if (resourceVersion != receivedResourceVersions.at(None())) {
+          kill = true;
+        }
+      } else {
+        ResourceProvider* resourceProvider =
+          getResourceProvider(resourceProviderId.get());
 
-      if (resourceVersions.at(resourceProvider) !=
-          receivedResourceVersions.at(resourceProvider)) {
-        kill = true;
+        if (resourceProvider == nullptr ||
+            resourceProvider->resourceVersion !=
+              receivedResourceVersions.at(resourceProviderId.get())) {
+          kill = true;
+        }
       }
     }
   }
@@ -7010,76 +7026,44 @@ UpdateSlaveMessage 
Slave::generateOversubscribedUpdate() const
 
 UpdateSlaveMessage Slave::generateResourceProviderUpdate() const
 {
-  UpdateSlaveMessage message;
-
-  message.mutable_slave_id()->CopyFrom(info.id());
-
   // Agent information (total resources, offer operations, resource
   // versions) is not passed as part of some `ResourceProvider`, but
   // globally in `UpdateStateMessage`.
   //
   // TODO(bbannier): Pass agent information as a resource provider.
-
-  // Process total resources.
-  hashmap<ResourceProviderID, UpdateSlaveMessage::ResourceProvider>
-    resourceProviders;
-
-  foreach (const Resource& resource, totalResources) {
-    if (resource.has_provider_id()) {
-      
resourceProviders[resource.provider_id()].add_total_resources()->CopyFrom(
-          resource);
-    }
-  }
-
-  // Process offer operations.
-  UpdateSlaveMessage::OfferOperations* operations =
-    message.mutable_offer_operations();
+  UpdateSlaveMessage message;
+  message.mutable_slave_id()->CopyFrom(info.id());
+  message.set_resource_version_uuid(resourceVersion.toBytes());
+  message.mutable_offer_operations();
 
   foreachvalue (const OfferOperation* operation, offerOperations) {
     Result<ResourceProviderID> resourceProviderId =
       getResourceProviderId(operation->info());
 
-    if (resourceProviderId.isSome()) {
-      resourceProviders[resourceProviderId.get()]
-        .mutable_operations()
-        ->add_operations()
-        ->CopyFrom(*operation);
-    } else if (resourceProviderId.isNone()) {
-      operations->add_operations()->CopyFrom(*operation);
+    if (resourceProviderId.isNone()) {
+      message.mutable_offer_operations()
+        ->add_operations()->CopyFrom(*operation);
     }
   }
 
-  // Make sure 'offer_operations' is always set for resource providers.
-  foreachkey (
-      const ResourceProviderID& resourceProviderId,
-      resourceProviderInfos) {
-    resourceProviders[resourceProviderId].mutable_operations();
-  }
-
-  // Process resource versions.
-  CHECK(resourceVersions.contains(None()));
-  message.set_resource_version_uuid(resourceVersions.at(None()).toBytes());
-
-  foreachpair (
-      const ResourceProviderID& providerId,
-      UpdateSlaveMessage::ResourceProvider& provider,
-      resourceProviders) {
-    CHECK(resourceVersions.contains(providerId));
-    provider.set_resource_version_uuid(
-        resourceVersions.at(providerId).toBytes());
+  foreachvalue (ResourceProvider* resourceProvider, resourceProviders) {
+    UpdateSlaveMessage::ResourceProvider* provider =
+      message.mutable_resource_providers()->add_providers();
 
-    CHECK(resourceProviderInfos.contains(providerId));
-    provider.mutable_info()->CopyFrom(resourceProviderInfos.at(providerId));
-  }
+    provider->mutable_info()->CopyFrom(
+        resourceProvider->info);
+    provider->mutable_total_resources()->CopyFrom(
+        resourceProvider->totalResources);
+    provider->set_resource_version_uuid(
+        resourceProvider->resourceVersion.toBytes());
 
-  // We only actually surface resource-provider related information if
-  // this agent is resource provider-capable.
-  if (capabilities.resourceProvider) {
-    list<UpdateSlaveMessage::ResourceProvider> resourceProviders_ =
-      resourceProviders.values();
+    provider->mutable_operations();
 
-    message.mutable_resource_providers()->mutable_providers()->CopyFrom(
-        {resourceProviders_.begin(), resourceProviders_.end()});
+    foreachvalue (const OfferOperation* operation,
+                  resourceProvider->offerOperations) {
+      provider->mutable_operations()
+        ->add_operations()->CopyFrom(*operation);
+    }
   }
 
   return message;
@@ -7120,75 +7104,61 @@ void Slave::handleResourceProviderMessage(
     case ResourceProviderMessage::Type::UPDATE_STATE: {
       CHECK_SOME(message->updateState);
 
-      const Resources& newTotal = message->updateState->total;
+      const ResourceProviderMessage::UpdateState& updateState =
+        message->updateState.get();
 
-      CHECK(message->updateState->info.has_id());
+      CHECK(updateState.info.has_id());
+      const ResourceProviderID& resourceProviderId = updateState.info.id();
 
-      const ResourceProviderID& resourceProviderId =
-        message->updateState->info.id();
+      ResourceProvider* resourceProvider =
+        getResourceProvider(resourceProviderId);
 
-      if (resourceProviderInfos.contains(resourceProviderId)) {
-        resourceProviderInfos[resourceProviderId] = message->updateState->info;
-      } else {
-        resourceProviderInfos.put(
-            resourceProviderId,
-            message->updateState->info);
-      }
+      if (resourceProvider == nullptr) {
+        resourceProvider = new ResourceProvider(
+            updateState.info,
+            updateState.totalResources,
+            updateState.resourceVersion);
 
-      const Resources oldTotal =
-        totalResources.filter([&resourceProviderId](const Resource& resource) {
-          return resource.provider_id() == resourceProviderId;
-        });
+        addResourceProvider(resourceProvider);
 
-      bool updated = false;
-
-      if (oldTotal != newTotal) {
-        totalResources -= oldTotal;
-        totalResources += newTotal;
-
-        updated = true;
-      }
-
-      // Update offer operation state.
-      //
-      // We only update offer operations which are not contained in both the
-      // known and just received sets. All other offer operations will be
-      // updated via relayed offer operation status updates.
-      auto isForResourceProvider = [resourceProviderId](
-                                      const OfferOperation& operation) {
-        Result<ResourceProviderID> id = 
getResourceProviderId(operation.info());
-        return id.isSome() && resourceProviderId == id.get();
-      };
-
-      hashmap<UUID, OfferOperation*> knownOfferOperations;
-      foreachpair(auto&& uuid, auto&& operation, offerOperations) {
-        if (isForResourceProvider(*operation)) {
-          knownOfferOperations.put(uuid, operation);
+        foreachvalue (const OfferOperation& operation,
+                      updateState.offerOperations) {
+          addOfferOperation(new OfferOperation(operation));
         }
-      }
 
-      hashmap<UUID, OfferOperation> receivedOfferOperations;
-      foreach (
-          const OfferOperation& operation,
-          message->updateState->operations) {
-        CHECK(isForResourceProvider(operation))
-          << "Received operation on unexpected resource provider "
-          << "from resource provider " << resourceProviderId;
-
-        Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
-        CHECK_SOME(operationUuid);
+        // Update the 'total' in the Slave.
+        totalResources += updateState.totalResources;
+      } else {
+        // Always update the resource provider info.
+        resourceProvider->info = updateState.info;
 
-        receivedOfferOperations.put(operationUuid.get(), operation);
-      }
+        if (resourceProvider->totalResources != updateState.totalResources) {
+          // Update the 'total' in the Slave.
+          CHECK(totalResources.contains(resourceProvider->totalResources));
+          totalResources -= resourceProvider->totalResources;
+          totalResources += updateState.totalResources;
 
-      const hashset<UUID> knownUuids = knownOfferOperations.keys();
-      const hashset<UUID> receivedUuids = receivedOfferOperations.keys();
+          // Update the 'total' in the resource provider.
+          resourceProvider->totalResources = updateState.totalResources;
+        }
 
-      if (knownUuids != receivedUuids) {
-        // Handle offer operations known to the agent but not reported by the
-        // resource provider. These could be operations where the agent has
-        // started tracking an offer operation, but the resource provider 
failed
-        // over before it could bookkeep the operation.
+        // Update offer operation state.
+        //
+        // We only update offer operations which are not contained in
+        // both the known and just received sets. All other offer
+        // operations will be updated via relayed offer operation
+        // status updates.
+        const hashset<UUID> knownUuids =
+          resourceProvider->offerOperations.keys();
+
+        const hashset<UUID> receivedUuids =
+          updateState.offerOperations.keys();
+
+        // Handle offer operations known to the agent but not reported
+        // by the resource provider. These could be operations where
+        // the agent has started tracking an offer operation, but the
+        // resource provider failed over before it could bookkeep the
+        // operation.
         //
         // NOTE: We do not mutate offer operations statuses here; this
         // would be the responsibility of a offer operation status
@@ -7203,13 +7173,13 @@ void Slave::handleResourceProviderMessage(
                 disappearedOperations, disappearedOperations.begin()));
 
         foreach (const UUID& uuid, disappearedOperations) {
-          // TODO(bbannier): Instead of simply dropping an operation with
-          // `removeOfferOperation` here we should instead send a `Reconcile`
-          // message with a failed state to the resource provider so its status
-          // update manager can reliably deliver the operation status to the
-          // framework.
-          CHECK(offerOperations.contains(uuid));
-          removeOfferOperation(offerOperations.at(uuid));
+          // TODO(bbannier): Instead of simply dropping an operation
+          // with `removeOfferOperation` here we should instead send a
+          // `Reconcile` message with a failed state to the resource
+          // provider so its status update manager can reliably
+          // deliver the operation status to the framework.
+          CHECK(resourceProvider->offerOperations.contains(uuid));
+          removeOfferOperation(resourceProvider->offerOperations.at(uuid));
         }
 
         // Handle offer operations known to the resource provider but
@@ -7228,27 +7198,13 @@ void Slave::handleResourceProviderMessage(
           //
           // NOTE: We do not need to update total resources here as its
           // state was sync explicitly with the received total above.
-          CHECK(receivedOfferOperations.contains(uuid));
+          CHECK(updateState.offerOperations.contains(uuid));
           addOfferOperation(
-              new OfferOperation(receivedOfferOperations.at(uuid)));
-        }
-
-        updated = true;
-      }
-
-      // Update resource version of this resource provider.
-      const UUID& resourceVersionUuid =
-        message->updateState->resourceVersionUuid;
-
-      if (!resourceVersions.contains(resourceProviderId) ||
-          resourceVersions.at(resourceProviderId) != resourceVersionUuid) {
-        if (resourceVersions.contains(resourceProviderId)) {
-          resourceVersions.at(resourceProviderId) = resourceVersionUuid;
-        } else {
-          resourceVersions.insert({resourceProviderId, resourceVersionUuid});
+              new OfferOperation(updateState.offerOperations.at(uuid)));
         }
 
-        updated = true;
+        // Update resource version of this resource provider.
+        resourceProvider->resourceVersion = updateState.resourceVersion;
       }
 
       // Send the updated resources to the master if the agent is running. Note
@@ -7263,14 +7219,12 @@ void Slave::handleResourceProviderMessage(
           break;
         }
         case RUNNING: {
-          if (updated) {
-            LOG(INFO) << "Forwarding new total resources " << totalResources;
+          LOG(INFO) << "Forwarding new total resources " << totalResources;
 
-            // Inform the master about the update from the resource provider.
-            send(master.get(), generateResourceProviderUpdate());
+          // Inform the master about the update from the resource provider.
+          send(master.get(), generateResourceProviderUpdate());
 
-            break;
-          }
+          break;
         }
       }
       break;
@@ -7338,6 +7292,22 @@ void Slave::addOfferOperation(OfferOperation* operation)
   CHECK_SOME(uuid);
 
   offerOperations.put(uuid.get(), operation);
+
+  Result<ResourceProviderID> resourceProviderId =
+    getResourceProviderId(operation->info());
+
+  CHECK(!resourceProviderId.isError())
+    << "Failed to get resource provider ID: "
+    << resourceProviderId.error();
+
+  if (resourceProviderId.isSome()) {
+    ResourceProvider* resourceProvider =
+      getResourceProvider(resourceProviderId.get());
+
+    CHECK_NOTNULL(resourceProvider);
+
+    resourceProvider->addOfferOperation(operation);
+  }
 }
 
 
@@ -7441,6 +7411,22 @@ void Slave::removeOfferOperation(OfferOperation* 
operation)
   Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
   CHECK_SOME(uuid);
 
+  Result<ResourceProviderID> resourceProviderId =
+    getResourceProviderId(operation->info());
+
+  CHECK(!resourceProviderId.isError())
+    << "Failed to get resource provider ID: "
+    << resourceProviderId.error();
+
+  if (resourceProviderId.isSome()) {
+    ResourceProvider* resourceProvider =
+      getResourceProvider(resourceProviderId.get());
+
+    CHECK_NOTNULL(resourceProvider);
+
+    resourceProvider->removeOfferOperation(operation);
+  }
+
   CHECK(offerOperations.contains(uuid.get()))
     << "Unknown offer operation (uuid: " << uuid->toString() << ")";
 
@@ -7458,6 +7444,26 @@ OfferOperation* Slave::getOfferOperation(const UUID& 
uuid) const
 }
 
 
+void Slave::addResourceProvider(ResourceProvider* resourceProvider)
+{
+  CHECK(resourceProvider->info.has_id());
+  CHECK(!resourceProviders.contains(resourceProvider->info.id()));
+
+  resourceProviders.put(
+      resourceProvider->info.id(),
+      resourceProvider);
+}
+
+
+ResourceProvider* Slave::getResourceProvider(const ResourceProviderID& id) 
const
+{
+  if (resourceProviders.contains(id)) {
+    return resourceProviders.at(id);
+  }
+  return nullptr;
+}
+
+
 void Slave::apply(const vector<ResourceConversion>& conversions)
 {
   Try<Resources> resources = totalResources.apply(conversions);
@@ -9076,6 +9082,30 @@ Resources Executor::allocatedResources() const
 }
 
 
+void ResourceProvider::addOfferOperation(OfferOperation* operation)
+{
+  Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+  CHECK_SOME(uuid);
+
+  CHECK(!offerOperations.contains(uuid.get()))
+    << "Offer operation (uuid: " << uuid->toString() << ") already exists";
+
+  offerOperations.put(uuid.get(), operation);
+}
+
+
+void ResourceProvider::removeOfferOperation(OfferOperation* operation)
+{
+  Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+  CHECK_SOME(uuid);
+
+  CHECK(offerOperations.contains(uuid.get()))
+    << "Unknown offer operation (uuid: " << uuid->toString() << ")";
+
+  offerOperations.erase(uuid.get());
+}
+
+
 map<string, string> executorEnvironment(
     const Flags& flags,
     const ExecutorInfo& executorInfo,

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index b3a1e70..7c40fc7 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -108,6 +108,7 @@ class Executor;
 class Framework;
 
 struct HttpConnection;
+struct ResourceProvider;
 
 
 class Slave : public ProtobufProcess<Slave>
@@ -577,6 +578,9 @@ private:
 
   OfferOperation* getOfferOperation(const UUID& uuid) const;
 
+  void addResourceProvider(ResourceProvider* resourceProvider);
+  ResourceProvider* getResourceProvider(const ResourceProviderID& id) const;
+
   void apply(const std::vector<ResourceConversion>& conversions);
 
   // Publish all resources that are needed to run the current set of
@@ -730,12 +734,28 @@ private:
 
   ResourceProviderManager resourceProviderManager;
   process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
-  hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
-
-  hashmap<ResourceProviderID, ResourceProviderInfo> resourceProviderInfos;
 
-  // Pending operations or terminal operations that have
-  // unacknowledged status updates.
+  // Local resource providers known by the agent.
+  hashmap<ResourceProviderID, ResourceProvider*> resourceProviders;
+
+  // Used to establish the relationship between the operation and the
+  // resources that the operation is operating on. Each resource
+  // provider will keep a resource version UUID, and change it when it
+  // believes that the resources from this resource provider are out
+  // of sync from the master's view.  The master will keep track of
+  // the last known resource version UUID for each resource provider,
+  // and attach the resource version UUID in each operation it sends
+  // out. The resource provider should reject operations that have a
+  // different resource version UUID than that it maintains, because
+  // this means the operation is operating on resources that might
+  // have already been invalidated.
+  UUID resourceVersion;
+
+  // Keeps track of the following:
+  // (1) Pending operations for resources from the agent.
+  // (2) Pending operations or terminal operations that have
+  //     unacknowledged status updates for resource provider
+  //     provided resources.
   hashmap<UUID, OfferOperation*> offerOperations;
 };
 
@@ -1034,6 +1054,41 @@ private:
 };
 
 
+struct ResourceProvider
+{
+  ResourceProvider(
+      const ResourceProviderInfo& _info,
+      const Resources& _totalResources,
+      const UUID& _resourceVersion)
+    : info(_info),
+      totalResources(_totalResources),
+      resourceVersion(_resourceVersion) {}
+
+  void addOfferOperation(OfferOperation* operation);
+  void removeOfferOperation(OfferOperation* operation);
+
+  ResourceProviderInfo info;
+  Resources totalResources;
+
+  // Used to establish the relationship between the operation and the
+  // resources that the operation is operating on. Each resource
+  // provider will keep a resource version UUID, and change it when it
+  // believes that the resources from this resource provider are out
+  // of sync from the master's view.  The master will keep track of
+  // the last known resource version UUID for each resource provider,
+  // and attach the resource version UUID in each operation it sends
+  // out. The resource provider should reject operations that have a
+  // different resource version UUID than that it maintains, because
+  // this means the operation is operating on resources that might
+  // have already been invalidated.
+  UUID resourceVersion;
+
+  // Pending operations or terminal operations that have
+  // unacknowledged status updates.
+  hashmap<UUID, OfferOperation*> offerOperations;
+};
+
+
 /**
  * Returns a map of environment variables necessary in order to launch
  * an executor.

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp 
b/src/tests/resource_provider_manager_tests.cpp
index a6eb4c9..e37a53a 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -343,7 +343,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateState)
     EXPECT_EQ(
         devolve(resourceProviderId.get()),
         message->updateState->info.id());
-    EXPECT_EQ(devolve(resources), message->updateState->total);
+    EXPECT_EQ(devolve(resources), message->updateState->totalResources);
   }
 }
 

Reply via email to