Integrated the status update manager into SLRP.

This patch uses the status update manager to send status updates and
recover operations that are checkpointed as completed in the status
update manager but still in the pending list in SLRP. It also forwards
the acknowledgements to the status update manager, and garbage collect
the metadata for offer operations.

Review: https://reviews.apache.org/r/64551/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/065c74ce
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/065c74ce
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/065c74ce

Branch: refs/heads/master
Commit: 065c74ce47c4147d63ed96fb60d773c924d13a4b
Parents: eeb09cb
Author: Chun-Hung Hsiao <[email protected]>
Authored: Wed Dec 13 18:57:20 2017 -0800
Committer: Jie Yu <[email protected]>
Committed: Wed Dec 13 18:57:20 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/daemon.cpp           |  21 ++-
 src/resource_provider/daemon.hpp           |   3 +-
 src/resource_provider/local.cpp            |   6 +-
 src/resource_provider/local.hpp            |   3 +-
 src/resource_provider/storage/provider.cpp | 236 ++++++++++++++++++++----
 src/resource_provider/storage/provider.hpp |   6 +-
 6 files changed, 226 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/065c74ce/src/resource_provider/daemon.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/daemon.cpp b/src/resource_provider/daemon.cpp
index 7c783e3..f160a87 100644
--- a/src/resource_provider/daemon.cpp
+++ b/src/resource_provider/daemon.cpp
@@ -77,12 +77,14 @@ public:
       const http::URL& _url,
       const string& _workDir,
       const Option<string>& _configDir,
-      SecretGenerator* _secretGenerator)
+      SecretGenerator* _secretGenerator,
+      bool _strict)
     : ProcessBase(process::ID::generate("local-resource-provider-daemon")),
       url(_url),
       workDir(_workDir),
       configDir(_configDir),
-      secretGenerator(_secretGenerator) {}
+      secretGenerator(_secretGenerator),
+      strict(_strict) {}
 
   LocalResourceProviderDaemonProcess(
       const LocalResourceProviderDaemonProcess& other) = delete;
@@ -122,6 +124,7 @@ private:
   Future<Nothing> launch(
       const string& type,
       const string& name);
+
   Future<Nothing> _launch(
       const string& type,
       const string& name,
@@ -134,6 +137,7 @@ private:
   const string workDir;
   const Option<string> configDir;
   SecretGenerator* const secretGenerator;
+  const bool strict;
 
   Option<SlaveID> slaveId;
   hashmap<string, hashmap<string, ProviderData>> providers;
@@ -439,7 +443,7 @@ Future<Nothing> LocalResourceProviderDaemonProcess::_launch(
   }
 
   Try<Owned<LocalResourceProvider>> provider = LocalResourceProvider::create(
-      url, workDir, data.info, slaveId.get(), authToken);
+      url, workDir, data.info, slaveId.get(), authToken, strict);
 
   if (provider.isError()) {
     return Failure(
@@ -506,7 +510,8 @@ Try<Owned<LocalResourceProviderDaemon>> 
LocalResourceProviderDaemon::create(
       url,
       flags.work_dir,
       configDir,
-      secretGenerator);
+      secretGenerator,
+      flags.strict);
 }
 
 
@@ -514,12 +519,10 @@ LocalResourceProviderDaemon::LocalResourceProviderDaemon(
     const http::URL& url,
     const string& workDir,
     const Option<string>& configDir,
-    SecretGenerator* secretGenerator)
+    SecretGenerator* secretGenerator,
+    bool strict)
   : process(new LocalResourceProviderDaemonProcess(
-        url,
-        workDir,
-        configDir,
-        secretGenerator))
+        url, workDir, configDir, secretGenerator, strict))
 {
   spawn(CHECK_NOTNULL(process.get()));
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/065c74ce/src/resource_provider/daemon.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/daemon.hpp b/src/resource_provider/daemon.hpp
index 7c513a2..a6d0013 100644
--- a/src/resource_provider/daemon.hpp
+++ b/src/resource_provider/daemon.hpp
@@ -70,7 +70,8 @@ private:
       const process::http::URL& url,
       const std::string& workDir,
       const Option<std::string>& configDir,
-      SecretGenerator* secretGenerator);
+      SecretGenerator* secretGenerator,
+      bool strict);
 
   process::Owned<LocalResourceProviderDaemonProcess> process;
 };

http://git-wip-us.apache.org/repos/asf/mesos/blob/065c74ce/src/resource_provider/local.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/local.cpp b/src/resource_provider/local.cpp
index d1d6835..ae23c20 100644
--- a/src/resource_provider/local.cpp
+++ b/src/resource_provider/local.cpp
@@ -37,7 +37,8 @@ Try<Owned<LocalResourceProvider>> 
LocalResourceProvider::create(
     const string& workDir,
     const ResourceProviderInfo& info,
     const SlaveID& slaveId,
-    const Option<string>& authToken)
+    const Option<string>& authToken,
+    bool strict)
 {
   // TODO(jieyu): Document the built-in local resource providers.
   const hashmap<string, lambda::function<decltype(create)>> creators = {
@@ -47,7 +48,8 @@ Try<Owned<LocalResourceProvider>> 
LocalResourceProvider::create(
   };
 
   if (creators.contains(info.type())) {
-    return creators.at(info.type())(url, workDir, info, slaveId, authToken);
+    return creators.at(info.type())(
+        url, workDir, info, slaveId, authToken, strict);
   }
 
   return Error("Unknown local resource provider type '" + info.type() + "'");

http://git-wip-us.apache.org/repos/asf/mesos/blob/065c74ce/src/resource_provider/local.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/local.hpp b/src/resource_provider/local.hpp
index 46111d9..20bcc78 100644
--- a/src/resource_provider/local.hpp
+++ b/src/resource_provider/local.hpp
@@ -36,7 +36,8 @@ public:
       const std::string& workDir,
       const ResourceProviderInfo& info,
       const SlaveID& slaveId,
-      const Option<std::string>& authToken);
+      const Option<std::string>& authToken,
+      bool strict);
 
   static Try<process::http::authentication::Principal> principal(
       const ResourceProviderInfo& info);

http://git-wip-us.apache.org/repos/asf/mesos/blob/065c74ce/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index 80bcea0..03a12c7 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -283,7 +283,8 @@ public:
       const string& _workDir,
       const ResourceProviderInfo& _info,
       const SlaveID& _slaveId,
-      const Option<string>& _authToken)
+      const Option<string>& _authToken,
+      bool _strict)
     : ProcessBase(process::ID::generate("storage-local-resource-provider")),
       state(RECOVERING),
       url(_url),
@@ -293,6 +294,7 @@ public:
       info(_info),
       slaveId(_slaveId),
       authToken(_authToken),
+      strict(_strict),
       resourceVersion(UUID::random()) {}
 
   StorageLocalResourceProviderProcess(
@@ -374,7 +376,9 @@ private:
   Future<vector<ResourceConversion>> applyDestroyVolumeOrBlock(
       const Resource& resource);
 
-  // Synchronously update `totalResources` and the offer operation status.
+  // Synchronously updates `totalResources` and the offer operation
+  // status and then asks the status update manager to send status
+  // updates.
   Try<Nothing> updateOfferOperationStatus(
       const UUID& operationUuid,
       const Try<vector<ResourceConversion>>& conversions);
@@ -405,6 +409,7 @@ private:
   ResourceProviderInfo info;
   const SlaveID slaveId;
   const Option<string> authToken;
+  const bool strict;
 
   csi::Version csiVersion;
   string bootId;
@@ -616,27 +621,6 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recover()
         }
       }
 
-      // We replay all pending operations here, so that if a volume is
-      // actually created or deleted before the last failover, it will
-      // be reflected in the total resources before reconciliation.
-      foreachpair (const UUID& uuid,
-                   const OfferOperation& operation,
-                   offerOperations) {
-        if (protobuf::isTerminalState(operation.latest_status().state())) {
-          continue;
-        }
-
-        auto err = [](const UUID& uuid, const string& message) {
-          LOG(ERROR)
-            << "Falied to apply offer operation with UUID " << uuid << ": "
-            << message;
-        };
-
-        _applyOfferOperation(uuid)
-          .onFailed(std::bind(err, uuid, lambda::_1))
-          .onDiscarded(std::bind(err, uuid, "future discarded"));
-      }
-
       state = DISCONNECTED;
 
       driver.reset(new Driver(
@@ -869,8 +853,116 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recoverStatusUpdates()
 
   statusUpdateManager.pause();
 
-  // TODO(chhsiao): Recover status updates.
-  return Nothing();
+  Try<list<string>> operationPaths = slave::paths::getOfferOperationPaths(
+      slave::paths::getResourceProviderPath(
+          metaDir, slaveId, info.type(), info.name(), info.id()));
+
+  if (operationPaths.isError()) {
+    return Failure(
+        "Failed to find offer operations for resource provider " +
+        stringify(info.id()) + ": " + operationPaths.error());
+  }
+
+  list<UUID> operationUuids;
+  foreach (const string& path, operationPaths.get()) {
+    Try<UUID> uuid =
+      slave::paths::parseOfferOperationPath(resourceProviderDir, path);
+
+    if (uuid.isError()) {
+      return Failure(
+          "Failed to parse offer operation path '" + path + "': " +
+          uuid.error());
+    }
+
+    CHECK(offerOperations.contains(uuid.get()));
+    operationUuids.emplace_back(std::move(uuid.get()));
+  }
+
+  return statusUpdateManager.recover(operationUuids, strict)
+    .then(defer(self(), [=](
+        const OfferOperationStatusManagerState& statusUpdateManagerState)
+        -> Future<Nothing> {
+      using StreamState =
+        typename OfferOperationStatusManagerState::StreamState;
+
+      // Clean up the operations that are terminated.
+      foreachpair (const UUID& uuid,
+                   const Option<StreamState>& stream,
+                   statusUpdateManagerState.streams) {
+        if (stream.isSome() && stream->terminated) {
+          offerOperations.erase(uuid);
+
+          // Garbage collect the offer operation metadata.
+          const string path = slave::paths::getOfferOperationPath(
+              slave::paths::getResourceProviderPath(
+                  metaDir, slaveId, info.type(), info.name(), info.id()),
+              uuid);
+
+          Try<Nothing> rmdir = os::rmdir(path);
+          if (rmdir.isError()) {
+            return Failure(
+                "Failed to remove directory '" + path + "': " + rmdir.error());
+          }
+        }
+      }
+
+      // Send updates for all missing statuses.
+      foreachpair (const UUID& uuid,
+                   const OfferOperation& operation,
+                   offerOperations) {
+        if (operation.latest_status().state() == OFFER_OPERATION_PENDING) {
+          continue;
+        }
+
+        const int numStatuses =
+          statusUpdateManagerState.streams.contains(uuid) &&
+          statusUpdateManagerState.streams.at(uuid).isSome()
+            ? statusUpdateManagerState.streams.at(uuid)->updates.size() : 0;
+
+        for (int i = numStatuses; i < operation.statuses().size(); i++) {
+          OfferOperationStatusUpdate update =
+            protobuf::createOfferOperationStatusUpdate(
+                uuid,
+                operation.statuses(i),
+                None(),
+                operation.has_framework_id()
+                  ? operation.framework_id() : Option<FrameworkID>::none(),
+                slaveId);
+
+          const string message =
+            "Failed to update status of offer operation with UUID " +
+            stringify(uuid);
+
+          statusUpdateManager.update(std::move(update))
+            .onFailed(defer(self(), &Self::fatal, message, lambda::_1))
+            .onDiscarded(
+                defer(self(), &Self::fatal, message, "future discarded"));
+        }
+      }
+
+      // We replay all pending operations here, so that if a volume is
+      // created or deleted before the last failover, the result will be
+      // reflected in the total resources before reconciliation.
+      foreachpair (const UUID& uuid,
+                   const OfferOperation& operation,
+                   offerOperations) {
+        if (protobuf::isTerminalState(operation.latest_status().state())) {
+          continue;
+        }
+
+        auto err = [](const UUID& uuid, const string& message) {
+          LOG(ERROR)
+            << "Falied to apply offer operation with UUID " << uuid << ": "
+            << message;
+        };
+
+        _applyOfferOperation(uuid)
+          .onFailed(std::bind(err, uuid, lambda::_1))
+          .onDiscarded(std::bind(err, uuid, "future discarded"));
+      }
+
+      return Nothing();
+    }));
 }
 
 
@@ -1214,6 +1306,45 @@ void 
StorageLocalResourceProviderProcess::acknowledgeOfferOperation(
     const Event::AcknowledgeOfferOperation& acknowledge)
 {
   CHECK_EQ(READY, state);
+
+  Try<UUID> operationUuid = UUID::fromBytes(acknowledge.operation_uuid());
+  CHECK_SOME(operationUuid);
+
+  Try<UUID> statusUuid = UUID::fromBytes(acknowledge.status_uuid());
+  CHECK_SOME(statusUuid);
+
+  auto err = [](const UUID& uuid, const string& message) {
+    LOG(ERROR)
+      << "Failed to acknowledge status update for offer operation with UUID "
+      << uuid << ": " << message;
+  };
+
+  // NOTE: It is possible that an incoming acknowledgement races with an
+  // outgoing retry of status update, and then a duplicated
+  // acknowledgement will be received. In this case, the following call
+  // will fail, so we just leave an error log.
+  statusUpdateManager.acknowledgement(operationUuid.get(), statusUuid.get())
+    .then(defer(self(), [=](bool continuation) -> Future<Nothing> {
+      if (!continuation) {
+        offerOperations.erase(operationUuid.get());
+
+        // Garbage collect the offer operation metadata.
+        const string path = slave::paths::getOfferOperationPath(
+            slave::paths::getResourceProviderPath(
+                metaDir, slaveId, info.type(), info.name(), info.id()),
+            operationUuid.get());
+
+        Try<Nothing> rmdir = os::rmdir(path);
+        if (rmdir.isError()) {
+          return Failure(
+              "Failed to remove directory '" + path + "': " + rmdir.error());
+        }
+      }
+
+      return Nothing();
+    }))
+    .onFailed(std::bind(err, operationUuid.get(), lambda::_1))
+    .onDiscarded(std::bind(err, operationUuid.get(), "future discarded"));
 }
 
 
@@ -1221,6 +1352,36 @@ void 
StorageLocalResourceProviderProcess::reconcileOfferOperations(
     const Event::ReconcileOfferOperations& reconcile)
 {
   CHECK_EQ(READY, state);
+
+  foreach (const string& operationUuid, reconcile.operation_uuids()) {
+    Try<UUID> uuid = UUID::fromBytes(operationUuid);
+    CHECK_SOME(uuid);
+
+    if (offerOperations.contains(uuid.get())) {
+      // When the agent asks for reconciliation for a known operation,
+      // that means the `APPLY_OFFER_OPERATION` event races with the
+      // last `UPDATE_STATE` call and arrives after the call. Since the
+      // event is received, nothing needs to be done here.
+      continue;
+    }
+
+    OfferOperationStatusUpdate update =
+      protobuf::createOfferOperationStatusUpdate(
+          uuid.get(),
+          protobuf::createOfferOperationStatus(
+              OFFER_OPERATION_DROPPED, None(), None(), None(), UUID::random()),
+          None(),
+          None(),
+          slaveId);
+
+    const string message =
+      "Failed to update status of offer operation with UUID " +
+      stringify(uuid.get());
+
+    statusUpdateManager.update(std::move(update))
+      .onFailed(defer(self(), &Self::fatal, message, lambda::_1))
+      .onDiscarded(defer(self(), &Self::fatal, message, "future discarded"));
+  }
 }
 
 
@@ -2358,7 +2519,7 @@ Try<Nothing> 
StorageLocalResourceProviderProcess::updateOfferOperationStatus(
       convertedResources += conversion.converted;
       conversion.consumed.unallocate();
       conversion.converted.unallocate();
-      _conversions.push_back(std::move(conversion));
+      _conversions.emplace_back(std::move(conversion));
     }
 
     Try<Resources> result = totalResources.apply(_conversions);
@@ -2399,17 +2560,22 @@ Try<Nothing> 
StorageLocalResourceProviderProcess::updateOfferOperationStatus(
   checkpointResourceProviderState();
 
   // Send out the status update for the offer operation.
-  // TODO(chhsiao): Use the status update manager.
   OfferOperationStatusUpdate update =
     protobuf::createOfferOperationStatusUpdate(
         operationUuid,
         operation.latest_status(),
-        operation.latest_status(),
+        None(),
         operation.has_framework_id()
           ? operation.framework_id() : Option<FrameworkID>::none(),
         slaveId);
 
-  sendOfferOperationStatusUpdate(update);
+  const string message =
+    "Failed to update status of offer operation with UUID " +
+    stringify(operationUuid);
+
+  statusUpdateManager.update(std::move(update))
+    .onFailed(defer(self(), &Self::fatal, message, lambda::_1))
+    .onDiscarded(defer(self(), &Self::fatal, message, "future discarded"));
 
   return error.isNone() ? Nothing() : Try<Nothing>::error(error.get());
 }
@@ -2519,7 +2685,8 @@ Try<Owned<LocalResourceProvider>> 
StorageLocalResourceProvider::create(
     const string& workDir,
     const ResourceProviderInfo& info,
     const SlaveID& slaveId,
-    const Option<string>& authToken)
+    const Option<string>& authToken,
+    bool strict)
 {
   // Verify that the name follows Java package naming convention.
   // TODO(chhsiao): We should move this check to a validation function
@@ -2571,8 +2738,8 @@ Try<Owned<LocalResourceProvider>> 
StorageLocalResourceProvider::create(
         stringify(CSIPluginContainerInfo::NODE_SERVICE) + " not found");
   }
 
-  return Owned<LocalResourceProvider>(
-      new StorageLocalResourceProvider(url, workDir, info, slaveId, 
authToken));
+  return Owned<LocalResourceProvider>(new StorageLocalResourceProvider(
+      url, workDir, info, slaveId, authToken, strict));
 }
 
 
@@ -2590,9 +2757,10 @@ 
StorageLocalResourceProvider::StorageLocalResourceProvider(
     const string& workDir,
     const ResourceProviderInfo& info,
     const SlaveID& slaveId,
-    const Option<string>& authToken)
+    const Option<string>& authToken,
+    bool strict)
   : process(new StorageLocalResourceProviderProcess(
-        url, workDir, info, slaveId, authToken))
+        url, workDir, info, slaveId, authToken, strict))
 {
   spawn(CHECK_NOTNULL(process.get()));
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/065c74ce/src/resource_provider/storage/provider.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.hpp 
b/src/resource_provider/storage/provider.hpp
index 374f837..5a371b1 100644
--- a/src/resource_provider/storage/provider.hpp
+++ b/src/resource_provider/storage/provider.hpp
@@ -41,7 +41,8 @@ public:
       const std::string& workDir,
       const mesos::ResourceProviderInfo& info,
       const SlaveID& slaveId,
-      const Option<std::string>& authToken);
+      const Option<std::string>& authToken,
+      bool strict);
 
   static Try<process::http::authentication::Principal> principal(
       const mesos::ResourceProviderInfo& info);
@@ -60,7 +61,8 @@ private:
       const std::string& workDir,
       const mesos::ResourceProviderInfo& info,
       const SlaveID& slaveId,
-      const Option<std::string>& authToken);
+      const Option<std::string>& authToken,
+      bool strict);
 
   process::Owned<StorageLocalResourceProviderProcess> process;
 };

Reply via email to