Added plumbing for master to reconcile offer operations with agent.

This patch adds the RECONCILE_OFFER_OPERATIONS event to the resource
provider API, along with the internal message
'ReconcileOfferOperationsMessage' used for explicit operation
reconciliation between master and agent. Handlers for these are
added to the agent and resource provider manager as well.

This explicit reconciliation is useful in cases where an agent's
'UpdateSlaveMessage' races with an incoming task launch so that
the master's view of the agent's state is not consistent with the
agent's actual state when the 'UpdateSlaveMessage' was sent.

Review: https://reviews.apache.org/r/63804/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9cb85e95
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9cb85e95
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9cb85e95

Branch: refs/heads/master
Commit: 9cb85e9551254c359f9d3989701ac5bd9e9adf8f
Parents: 761f47f
Author: Greg Mann <g...@mesosphere.io>
Authored: Thu Dec 7 11:36:22 2017 -0800
Committer: Greg Mann <gregorywm...@gmail.com>
Committed: Thu Dec 7 11:39:01 2017 -0800

----------------------------------------------------------------------
 .../resource_provider/resource_provider.proto   | 10 +++
 .../resource_provider/resource_provider.proto   | 10 +++
 src/messages/messages.proto                     | 16 +++++
 src/resource_provider/manager.cpp               | 71 ++++++++++++++++++++
 src/resource_provider/manager.hpp               |  5 ++
 src/resource_provider/storage/provider.cpp      | 14 ++++
 src/slave/slave.cpp                             | 25 +++++++
 src/slave/slave.hpp                             |  6 ++
 src/tests/mesos.hpp                             |  6 ++
 9 files changed, 163 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb85e95/include/mesos/resource_provider/resource_provider.proto
----------------------------------------------------------------------
diff --git a/include/mesos/resource_provider/resource_provider.proto 
b/include/mesos/resource_provider/resource_provider.proto
index 1784df2..d2b9c79 100644
--- a/include/mesos/resource_provider/resource_provider.proto
+++ b/include/mesos/resource_provider/resource_provider.proto
@@ -36,6 +36,7 @@ message Event {
     OPERATION = 2;                   // See 'Operation' below.
     PUBLISH = 3;                     // See 'Publish' below.
     ACKNOWLEDGE_OFFER_OPERATION = 4; // See 'AcknowledgeOfferOperation' below.
+    RECONCILE_OFFER_OPERATIONS = 5;  // See 'ReconcileOfferOperations' below.
   }
 
   // First event received by the resource provider when it subscribes
@@ -87,11 +88,20 @@ message Event {
     required bytes operation_uuid = 2;
   }
 
+  // Received when the resource provider manager wants to reconcile its view of
+  // the resource provider's offer operation state. The resource provider 
should
+  // generate offer operation status updates for any operation UUIDs in this
+  // message which are unknown to the resource provider.
+  message ReconcileOfferOperations {
+    repeated bytes operation_uuids = 1;
+  }
+
   optional Type type = 1;
   optional Subscribed subscribed = 2;
   optional Operation operation = 3;
   optional Publish publish = 4;
   optional AcknowledgeOfferOperation acknowledge_offer_operation = 5;
+  optional ReconcileOfferOperations reconcile_offer_operations = 6;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb85e95/include/mesos/v1/resource_provider/resource_provider.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/resource_provider/resource_provider.proto 
b/include/mesos/v1/resource_provider/resource_provider.proto
index a13eed3..0c23e91 100644
--- a/include/mesos/v1/resource_provider/resource_provider.proto
+++ b/include/mesos/v1/resource_provider/resource_provider.proto
@@ -36,6 +36,7 @@ message Event {
     OPERATION = 2;                   // See 'Operation' below.
     PUBLISH = 3;                     // See 'Publish' below.
     ACKNOWLEDGE_OFFER_OPERATION = 4; // See 'AcknowledgeOfferOperation' below.
+    RECONCILE_OFFER_OPERATIONS = 5;  // See 'ReconcileOfferOperations' below.
   }
 
   // First event received by the resource provider when it subscribes
@@ -87,11 +88,20 @@ message Event {
     required bytes operation_uuid = 2;
   }
 
+  // Received when the resource provider manager wants to reconcile its view of
+  // the resource provider's offer operation state. The resource provider 
should
+  // generate offer operation status updates for any operation UUIDs in this
+  // message which are unknown to the resource provider.
+  message ReconcileOfferOperations {
+    repeated bytes operation_uuids = 1;
+  }
+
   optional Type type = 1;
   optional Subscribed subscribed = 2;
   optional Operation operation = 3;
   optional Publish publish = 4;
   optional AcknowledgeOfferOperation acknowledge_offer_operation = 5;
+  optional ReconcileOfferOperations reconcile_offer_operations = 6;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb85e95/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 2ab0fe8..7ab07d7 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -433,6 +433,22 @@ message ReconcileTasksMessage {
 
 
 /**
+ * The master uses this message to query an agent about the state of
+ * one or more offer operations. This is useful to resolve
+ * discrepancies between the master and agent's view after agent
+ * reregistration.
+ */
+message ReconcileOfferOperationsMessage {
+  message Operation {
+    required bytes operation_uuid = 1;
+    optional ResourceProviderID resource_provider_id = 2;
+  }
+
+  repeated Operation operations = 1;
+}
+
+
+/**
  * Notifies the framework about errors during registration.
  *
  * See scheduler::Event::Error.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb85e95/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp 
b/src/resource_provider/manager.cpp
index 75eb6c1..2aee46e 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -161,6 +161,8 @@ public:
   void acknowledgeOfferOperationUpdate(
       const OfferOperationUpdateAcknowledgementMessage& message);
 
+  void reconcileOfferOperations(const ReconcileOfferOperationsMessage& 
message);
+
   Future<Nothing> publish(const Resources& resources);
 
   Queue<ResourceProviderMessage> messages;
@@ -435,6 +437,65 @@ void 
ResourceProviderManagerProcess::acknowledgeOfferOperationUpdate(
 }
 
 
+void ResourceProviderManagerProcess::reconcileOfferOperations(
+    const ReconcileOfferOperationsMessage& message)
+{
+  hashmap<ResourceProviderID, Event> events;
+
+  auto addOperation =
+    [&events](const ReconcileOfferOperationsMessage::Operation& operation) {
+      const ResourceProviderID resourceProviderId =
+        operation.resource_provider_id();
+
+      if (events.contains(resourceProviderId)) {
+        events.at(resourceProviderId).mutable_reconcile_offer_operations()
+          ->add_operation_uuids(operation.operation_uuid());
+      } else {
+        Event event;
+        event.set_type(Event::RECONCILE_OFFER_OPERATIONS);
+        event.mutable_reconcile_offer_operations()
+          ->add_operation_uuids(operation.operation_uuid());
+
+        events[resourceProviderId] = event;
+      }
+  };
+
+  // Construct events for individual resource providers.
+  foreach (
+      const ReconcileOfferOperationsMessage::Operation& operation,
+      message.operations()) {
+    if (operation.has_resource_provider_id()) {
+      if (!resourceProviders.subscribed.contains(
+              operation.resource_provider_id())) {
+        LOG(WARNING) << "Dropping offer operation reconciliation message with"
+                     << " operation_uuid " << operation.operation_uuid()
+                     << " because resource provider "
+                     << operation.resource_provider_id()
+                     << " is not subscribed";
+        continue;
+      }
+
+      addOperation(operation);
+    }
+  }
+
+  foreachpair (
+      const ResourceProviderID& resourceProviderId,
+      const Event& event,
+      events) {
+    CHECK(resourceProviders.subscribed.contains(resourceProviderId));
+    ResourceProvider& resourceProvider =
+      *resourceProviders.subscribed.at(resourceProviderId);
+
+    if (!resourceProvider.http.send(event)) {
+      LOG(WARNING) << "Failed to send offer operation reconciliation event"
+                   << " to resource provider " << resourceProviderId
+                   << ": connection closed";
+    }
+  }
+}
+
+
 Future<Nothing> ResourceProviderManagerProcess::publish(
     const Resources& resources)
 {
@@ -685,6 +746,16 @@ void 
ResourceProviderManager::acknowledgeOfferOperationUpdate(
 }
 
 
+void ResourceProviderManager::reconcileOfferOperations(
+    const ReconcileOfferOperationsMessage& message) const
+{
+  return dispatch(
+      process.get(),
+      &ResourceProviderManagerProcess::reconcileOfferOperations,
+      message);
+}
+
+
 Future<Nothing> ResourceProviderManager::publish(const Resources& resources)
 {
   return dispatch(

http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb85e95/src/resource_provider/manager.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.hpp 
b/src/resource_provider/manager.hpp
index 9f9b201..e94b836 100644
--- a/src/resource_provider/manager.hpp
+++ b/src/resource_provider/manager.hpp
@@ -58,6 +58,11 @@ public:
   void acknowledgeOfferOperationUpdate(
       const OfferOperationUpdateAcknowledgementMessage& message) const;
 
+  // Forwards offer operation update reconciliation requests from the master to
+  // the relevant resource providers.
+  void reconcileOfferOperations(
+      const ReconcileOfferOperationsMessage& message) const;
+
   // Ensure that the resources are ready for use.
   process::Future<Nothing> publish(const Resources& resources);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb85e95/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index e771af6..a2794ac 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -328,6 +328,8 @@ private:
   void publish(const Event::Publish& publish);
   void acknowledgeOfferOperation(
       const Event::AcknowledgeOfferOperation& acknowledge);
+  void reconcileOfferOperations(
+      const Event::ReconcileOfferOperations& reconcile);
 
   Future<csi::Client> connect(const string& endpoint);
   Future<csi::Client> getService(const ContainerID& containerId);
@@ -430,6 +432,11 @@ void StorageLocalResourceProviderProcess::received(const 
Event& event)
       acknowledgeOfferOperation(event.acknowledge_offer_operation());
       break;
     }
+    case Event::RECONCILE_OFFER_OPERATIONS: {
+      CHECK(event.has_reconcile_offer_operations());
+      reconcileOfferOperations(event.reconcile_offer_operations());
+      break;
+    }
     case Event::UNKNOWN: {
       LOG(WARNING) << "Received an UNKNOWN event and ignored";
       break;
@@ -1062,6 +1069,13 @@ void 
StorageLocalResourceProviderProcess::acknowledgeOfferOperation(
 }
 
 
+void StorageLocalResourceProviderProcess::reconcileOfferOperations(
+    const Event::ReconcileOfferOperations& reconcile)
+{
+  CHECK_EQ(SUBSCRIBED, state);
+}
+
+
 // Returns a future of a CSI client that waits for the endpoint socket
 // to appear if necessary, then connects to the socket and check its
 // supported version.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb85e95/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 133c0d5..fb077b7 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -691,6 +691,9 @@ void Slave::initialize()
   install<ApplyOfferOperationMessage>(
       &Slave::applyOfferOperation);
 
+  install<ReconcileOfferOperationsMessage>(
+      &Slave::reconcileOfferOperations);
+
   install<StatusUpdateAcknowledgementMessage>(
       &Slave::statusUpdateAcknowledgement,
       &StatusUpdateAcknowledgementMessage::slave_id,
@@ -3848,6 +3851,28 @@ void Slave::applyOfferOperation(const 
ApplyOfferOperationMessage& message)
 }
 
 
+void Slave::reconcileOfferOperations(
+    const ReconcileOfferOperationsMessage& message)
+{
+  bool containsResourceProviderOperations = false;
+
+  // TODO(greggomann): Implement reconciliation for offer
+  // operations on the agent's default resources.
+  foreach (
+      const ReconcileOfferOperationsMessage::Operation& operation,
+      message.operations()) {
+    if (operation.has_resource_provider_id()) {
+      containsResourceProviderOperations = true;
+      break;
+    }
+  }
+
+  if (containsResourceProviderOperations) {
+    resourceProviderManager.reconcileOfferOperations(message);
+  }
+}
+
+
 void Slave::statusUpdateAcknowledgement(
     const UPID& from,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb85e95/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index fc762fb..bbf5b79 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -205,6 +205,12 @@ public:
 
   void applyOfferOperation(const ApplyOfferOperationMessage& message);
 
+  // Reconciles pending offer operations with the master. This is necessary to
+  // handle cases in which operations were dropped in transit, or in which an
+  // agent's `UpdateSlaveMessage` was sent at the same time as an operation was
+  // en route from the master to the agent.
+  void reconcileOfferOperations(const ReconcileOfferOperationsMessage& 
message);
+
   void subscribe(
       HttpConnection http,
       const executor::Call::Subscribe& subscribe,

http://git-wip-us.apache.org/repos/asf/mesos/blob/9cb85e95/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 9bfc184..53890d8 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -2858,6 +2858,9 @@ public:
   MOCK_METHOD1_T(
       acknowledgeOfferOperation,
       void(const typename Event::AcknowledgeOfferOperation&));
+  MOCK_METHOD1_T(
+      reconcileOfferOperations,
+      void(const typename Event::ReconcileOfferOperations&));
 
   void events(std::queue<Event> events)
   {
@@ -2878,6 +2881,9 @@ public:
         case Event::ACKNOWLEDGE_OFFER_OPERATION:
           acknowledgeOfferOperation(event.acknowledge_offer_operation());
           break;
+        case Event::RECONCILE_OFFER_OPERATIONS:
+          reconcileOfferOperations(event.reconcile_offer_operations());
+          break;
         case Event::UNKNOWN:
           LOG(FATAL) << "Received unexpected UNKNOWN event";
           break;

Reply via email to