Initialized offer operation status update manager in SLRP.

This patch adds an agent filesystem layout for checkpointing offer
operation status updates for resource providers, and initialized
a status update manager in the storage local resource provider.

Review: https://reviews.apache.org/r/64475/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3100e9aa
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3100e9aa
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3100e9aa

Branch: refs/heads/master
Commit: 3100e9aa0ac9b6bcc92643b145e2730fc862ea39
Parents: c728f8e
Author: Chun-Hung Hsiao <[email protected]>
Authored: Wed Dec 13 16:02:33 2017 -0800
Committer: Greg Mann <[email protected]>
Committed: Wed Dec 13 17:05:57 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 130 +++++++++++++++++++-----
 src/slave/paths.cpp                        |  54 ++++++++++
 src/slave/paths.hpp                        |  23 +++++
 3 files changed, 179 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index e806f44..2fd4a3b 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -66,6 +66,8 @@
 #include "slave/paths.hpp"
 #include "slave/state.hpp"
 
+#include "status_update_manager/offer_operation.hpp"
+
 namespace http = process::http;
 
 using std::accumulate;
@@ -327,6 +329,7 @@ private:
   Future<Nothing> recover();
   Future<Nothing> recoverServices();
   Future<Nothing> recoverVolumes();
+  Future<Nothing> recoverStatusUpdates();
   void doReliableRegistration();
   Future<Nothing> reconcile();
 
@@ -359,8 +362,8 @@ private:
   Future<Nothing> deleteVolume(const string& volumeId);
 
   // Applies the offer operation. Conventional operations will be
-  // synchoronusly applied.
-  Future<Nothing> applyOfferOperation(const UUID& operationUuid);
+  // synchronously applied.
+  Future<Nothing> _applyOfferOperation(const UUID& operationUuid);
 
   Future<vector<ResourceConversion>> applyCreateVolumeOrBlock(
       const Resource& resource,
@@ -375,9 +378,15 @@ private:
       const Try<vector<ResourceConversion>>& conversions);
 
   void checkpointResourceProviderState();
-  void sendResourceProviderStateUpdate();
   void checkpointVolumeState(const string& volumeId);
 
+  void sendResourceProviderStateUpdate();
+
+  // NOTE: This is a callback for the status update manager and should
+  // not be called directly.
+  void sendOfferOperationStatusUpdate(
+      const OfferOperationStatusUpdate& statusUpdate);
+
   enum State
   {
     RECOVERING,
@@ -400,6 +409,7 @@ private:
   hashmap<string, ProfileData> profiles;
   process::grpc::client::Runtime runtime;
   Owned<v1::resource_provider::Driver> driver;
+  OfferOperationStatusUpdateManager statusUpdateManager;
 
   ContainerID controllerContainerId;
   ContainerID nodeContainerId;
@@ -611,15 +621,15 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recover()
       // We replay all pending operations here, so that if a volume is
       // actually created before the last failover, it will be reflected
       // in the updated total resources before we do the reconciliation.
-      // NOTE: `applyOfferOperation` will remove the applied operation
+      // NOTE: `_applyOfferOperation` will remove the applied operation
       // from the list of pending operations, so we make a copy of keys
       // here.
       foreach (const UUID& uuid, pendingOperations.keys()) {
-        applyOfferOperation(uuid)
+        _applyOfferOperation(uuid)
           .onAny(defer(self(), [=](const Future<Nothing>& future) {
             if (!future.isReady()) {
               LOG(ERROR)
-                << "Failed to apply operation " << uuid << ": "
+                << "Failed to apply offer operation with UUID " << uuid << ": "
                 << (future.isFailed() ? future.failure() : "future discarded");
             }
           }));
@@ -841,6 +851,27 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recoverVolumes()
 }
 
 
+Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
+{
+  CHECK(info.has_id());
+
+  const string resourceProviderDir = slave::paths::getResourceProviderPath(
+      metaDir, slaveId, info.type(), info.name(), info.id());
+
+  statusUpdateManager.initialize(
+      defer(self(), &Self::sendOfferOperationStatusUpdate, lambda::_1),
+      std::bind(
+          &slave::paths::getOfferOperationUpdatesPath,
+          resourceProviderDir,
+          lambda::_1));
+
+  statusUpdateManager.pause();
+
+  // TODO(chhsiao): Recover status updates.
+  return Nothing();
+}
+
+
 void StorageLocalResourceProviderProcess::doReliableRegistration()
 {
   if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) {
@@ -872,7 +903,8 @@ void 
StorageLocalResourceProviderProcess::doReliableRegistration()
 
 Future<Nothing> StorageLocalResourceProviderProcess::reconcile()
 {
-  return importResources()
+  return recoverStatusUpdates()
+    .then(defer(self(), &Self::importResources))
     .then(defer(self(), [=](Resources importedResources) {
       // NODE: If a resource in the checkpointed total resources is
       // missing in the imported resources, we will still keep it if it
@@ -945,6 +977,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::reconcile()
       }
 
       sendResourceProviderStateUpdate();
+      statusUpdateManager.resume();
 
       state = READY;
 
@@ -1005,7 +1038,7 @@ void 
StorageLocalResourceProviderProcess::applyOfferOperation(
   pendingOperations[uuid.get()] = operation;
   checkpointResourceProviderState();
 
-  applyOfferOperation(uuid.get())
+  _applyOfferOperation(uuid.get())
     .onAny(defer(self(), [=](const Future<Nothing>& future) {
       if (!future.isReady()) {
         LOG(ERROR)
@@ -1942,8 +1975,8 @@ Future<string> 
StorageLocalResourceProviderProcess::createVolume(
 
           if (volumes.contains(volumeInfo.id())) {
             // The resource provider failed over after the last
-            // `CreateVolume` call, but before the operation status
-            // was checkpointed.
+            // `CreateVolume` call, but before the offer operation
+            // status was checkpointed.
             CHECK_EQ(csi::state::VolumeState::CREATED,
                      volumes.at(volumeInfo.id()).state.state());
           } else {
@@ -2027,7 +2060,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::deleteVolume(
     }
   } else {
     // The resource provider failed over after the last `DeleteVolume`
-    // call, but before the operation status was checkpointed.
+    // call, but before the offer operation status was checkpointed.
     CHECK(!os::exists(volumePath));
   }
 
@@ -2038,7 +2071,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::deleteVolume(
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::applyOfferOperation(
+Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
     const UUID& operationUuid)
 {
   Future<vector<ResourceConversion>> conversions;
@@ -2205,7 +2238,7 @@ 
StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
     created = resource.disk().source().id();
   } else {
     // We use the operation UUID as the name of the volume, so the same
-    // operation will create the same volume after recovery.
+    // offer operation will create the same volume after recovery.
     // TODO(chhsiao): Call `CreateVolume` sequentially with other create
     // or delete operations.
     // TODO(chhsiao): Send `UPDATE_STATE` for RAW resources.
@@ -2389,8 +2422,8 @@ Try<Nothing> 
StorageLocalResourceProviderProcess::applyResourceConversions(
 
   auto err = [](const UUID& operationUuid, const string& message) {
     LOG(ERROR)
-      << "Failed to send status update for offer operation " << operationUuid
-      << ": " << message;
+      << "Failed to send status update for offer operation with UUID "
+      << operationUuid << ": " << message;
   };
 
   driver->send(evolve(call))
@@ -2426,8 +2459,28 @@ void 
StorageLocalResourceProviderProcess::checkpointResourceProviderState()
   const string statePath = slave::paths::getResourceProviderStatePath(
       metaDir, slaveId, info.type(), info.name(), info.id());
 
-  CHECK_SOME(slave::state::checkpoint(statePath, state))
-    << "Failed to checkpoint resource provider state to '" << statePath << "'";
+  Try<Nothing> checkpoint = slave::state::checkpoint(statePath, state);
+  CHECK_SOME(checkpoint)
+    << "Failed to checkpoint resource provider state to '" << statePath << "': 
"
+    << checkpoint.error();
+}
+
+
+void StorageLocalResourceProviderProcess::checkpointVolumeState(
+    const string& volumeId)
+{
+  const string statePath = csi::paths::getVolumeStatePath(
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().plugin().type(),
+      info.storage().plugin().name(),
+      volumeId);
+
+  Try<Nothing> checkpoint =
+    slave::state::checkpoint(statePath, volumes.at(volumeId).state);
+
+  CHECK_SOME(checkpoint)
+    << "Failed to checkpoint volume state to '" << statePath << "':"
+    << checkpoint.error();
 }
 
 
@@ -2443,8 +2496,8 @@ void 
StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
                const Event::ApplyOfferOperation& operation,
                pendingOperations) {
     // TODO(chhsiao): Maintain a list of terminated but unacknowledged
-    // offer operations in memory and reconstruc that during recovery
-    // by querying status update manager.
+    // offer operations in memory and reconstruct it during recovery
+    // by querying the status update manager.
     update->add_operations()->CopyFrom(
         protobuf::createOfferOperation(
             operation.info(),
@@ -2475,17 +2528,38 @@ void 
StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
 }
 
 
-void StorageLocalResourceProviderProcess::checkpointVolumeState(
-    const string& volumeId)
+void StorageLocalResourceProviderProcess::sendOfferOperationStatusUpdate(
+      const OfferOperationStatusUpdate& statusUpdate)
 {
-  const string statePath = csi::paths::getVolumeStatePath(
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin().type(),
-      info.storage().plugin().name(),
-      volumeId);
+  Call call;
+  call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS);
+  call.mutable_resource_provider_id()->CopyFrom(info.id());
 
-  CHECK_SOME(slave::state::checkpoint(statePath, volumes.at(volumeId).state))
-    << "Failed to checkpoint volume state to '" << statePath << "'";
+  Call::UpdateOfferOperationStatus* update =
+    call.mutable_update_offer_operation_status();
+  update->set_operation_uuid(statusUpdate.operation_uuid());
+  update->mutable_status()->CopyFrom(statusUpdate.status());
+
+  if (statusUpdate.has_framework_id()) {
+    update->mutable_framework_id()->CopyFrom(statusUpdate.framework_id());
+  }
+
+  // The latest status should have been set by the status update manager.
+  CHECK(statusUpdate.has_latest_status());
+  update->mutable_latest_status()->CopyFrom(statusUpdate.latest_status());
+
+  auto err = [](const UUID& uuid, const string& message) {
+    LOG(ERROR)
+      << "Failed to send status update for offer operation with UUID " << uuid
+      << ": " << message;
+  };
+
+  Try<UUID> uuid = UUID::fromBytes(statusUpdate.operation_uuid());
+  CHECK_SOME(uuid);
+
+  driver->send(evolve(call))
+    .onFailed(std::bind(err, uuid.get(), lambda::_1))
+    .onDiscarded(std::bind(err, uuid.get(), "future discarded"));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/slave/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.cpp b/src/slave/paths.cpp
index b8004e7..f9f0c78 100644
--- a/src/slave/paths.cpp
+++ b/src/slave/paths.cpp
@@ -65,6 +65,7 @@ const char TASK_UPDATES_FILE[] = "task.updates";
 const char RESOURCES_INFO_FILE[] = "resources.info";
 const char RESOURCES_TARGET_FILE[] = "resources.target";
 const char RESOURCE_PROVIDER_STATE_FILE[] = "resource_provider.state";
+const char OFFER_OPERATION_UPDATES_FILE[] = "operation.updates";
 
 
 const char CONTAINERS_DIR[] = "containers";
@@ -75,6 +76,7 @@ const char EXECUTORS_DIR[] = "executors";
 const char EXECUTOR_RUNS_DIR[] = "runs";
 const char RESOURCE_PROVIDER_REGISTRY[] = "resource_provider_registry";
 const char RESOURCE_PROVIDERS_DIR[] = "resource_providers";
+const char OFFER_OPERATIONS_DIR[] = "operations";
 
 
 Try<ExecutorRunPath> parseExecutorRunPath(
@@ -545,6 +547,58 @@ string getLatestResourceProviderPath(
 }
 
 
+Try<list<string>> getOfferOperationPaths(
+    const string& rootDir)
+{
+  return fs::list(path::join(rootDir, OFFER_OPERATIONS_DIR, "*"));
+}
+
+
+string getOfferOperationPath(
+    const string& rootDir,
+    const UUID& operationUuid)
+{
+  return path::join(rootDir, OFFER_OPERATIONS_DIR, operationUuid.toString());
+}
+
+
+Try<UUID> parseOfferOperationPath(
+    const string& rootDir,
+    const string& dir)
+{
+  // TODO(chhsiao): Consider using `<regex>`, which requires GCC 4.9+.
+
+  // Make sure there's a separator at the end of the prefix so that we
+  // don't accidently slice off part of a directory.
+  const string prefix = path::join(rootDir, OFFER_OPERATIONS_DIR, "");
+
+  if (!strings::startsWith(dir, prefix)) {
+    return Error(
+        "Directory '" + dir + "' does not fall under operations directory '" +
+        prefix + "'");
+  }
+
+  Try<UUID> operationUuid = UUID::fromString(Path(dir).basename());
+  if (operationUuid.isError()) {
+    return Error(
+        "Could not decode offer operation UUID from string '" +
+        Path(dir).basename() + "': " + operationUuid.error());
+  }
+
+  return operationUuid.get();
+}
+
+
+string getOfferOperationUpdatesPath(
+    const string& rootDir,
+    const UUID& operationUuid)
+{
+  return path::join(
+      getOfferOperationPath(rootDir, operationUuid),
+      OFFER_OPERATION_UPDATES_FILE);
+}
+
+
 string getResourcesInfoPath(
     const string& rootDir)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/slave/paths.hpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.hpp b/src/slave/paths.hpp
index d645d87..bae68d0 100644
--- a/src/slave/paths.hpp
+++ b/src/slave/paths.hpp
@@ -23,6 +23,7 @@
 #include <mesos/mesos.hpp>
 
 #include <stout/try.hpp>
+#include <stout/uuid.hpp>
 
 namespace mesos {
 namespace internal {
@@ -79,6 +80,9 @@ namespace paths {
 //   |           |           |-- latest (symlink)
 //   |           |           |-- <resource_provider_id>
 //   |           |               |-- resource_provider.state
+//   |           |               |-- operations
+//   |           |                   |-- <operation_uuid>
+//   |           |                       |-- operation.updates
 //   |           |-- frameworks
 //   |               |-- <framework_id>
 //   |                   |-- framework.info
@@ -349,6 +353,25 @@ std::string getLatestResourceProviderPath(
     const std::string& resourceProviderName);
 
 
+Try<std::list<std::string>> getOfferOperationPaths(
+    const std::string& rootDir);
+
+
+std::string getOfferOperationPath(
+    const std::string& rootDir,
+    const UUID& operationUuid);
+
+
+Try<UUID> parseOfferOperationPath(
+    const std::string& rootDir,
+    const std::string& dir);
+
+
+std::string getOfferOperationUpdatesPath(
+    const std::string& rootDir,
+    const UUID& operationUuid);
+
+
 std::string getResourcesInfoPath(
     const std::string& rootDir);
 

Reply via email to