Repository: mesos
Updated Branches:
  refs/heads/master 6dfb749c9 -> f031e031f


Imported and reconcile resources from CSI plugins.

The following lists the steps to reconcile resources:

1. Import resources from the CSI plugin:
  a. Get preprovisioned volumes:
       ListVolumes
       ValidateVolumeCapabilities for each volume
  b. GetCapacity for each profile to get RAW resources
2. For each resource in the checkpointed resources:
  a. Strip away metadata (reservation, persistence, etc)
  b. Additional resources after subtracting the stripped resources
     from the imported resources are new resources.
3. Report the reconciled resources through UPDATE_STATE

Review: https://reviews.apache.org/r/63022/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f031e031
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f031e031
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f031e031

Branch: refs/heads/master
Commit: f031e031f541212e6612a34d08899f16b8a1bc58
Parents: 6dfb749
Author: Chun-Hung Hsiao <[email protected]>
Authored: Wed Dec 6 12:20:59 2017 -0800
Committer: Jie Yu <[email protected]>
Committed: Wed Dec 6 14:03:01 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 335 ++++++++++++++++++++++--
 1 file changed, 313 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f031e031/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index dc77368..256b214 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -18,6 +18,7 @@
 
 #include <algorithm>
 #include <cctype>
+#include <numeric>
 
 #include <glog/logging.h>
 
@@ -39,15 +40,13 @@
 
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
+#include <stout/os.hpp>
 #include <stout/path.hpp>
 
-#include <stout/os/exists.hpp>
-#include <stout/os/mkdir.hpp>
 #include <stout/os/realpath.hpp>
-#include <stout/os/rm.hpp>
-#include <stout/os/rmdir.hpp>
 
 #include "common/http.hpp"
+#include "common/protobuf_utils.hpp"
 
 #include "csi/client.hpp"
 #include "csi/paths.hpp"
@@ -65,6 +64,7 @@
 
 namespace http = process::http;
 
+using std::accumulate;
 using std::find;
 using std::list;
 using std::queue;
@@ -88,6 +88,9 @@ using process::spawn;
 
 using process::http::authentication::Principal;
 
+using mesos::internal::protobuf::convertLabelsToStringMap;
+using mesos::internal::protobuf::convertStringMapToLabels;
+
 using mesos::internal::slave::ContainerDaemon;
 
 using mesos::resource_provider::Call;
@@ -226,6 +229,38 @@ static inline http::Headers getAuthHeader(const 
Option<string>& authToken)
 }
 
 
+static inline Resource createRawDiskResource(
+    const ResourceProviderID& resourceProviderId,
+    double capacity,
+    const Option<string>& profile,
+    const Option<string>& id = None(),
+    const Option<Labels>& metadata = None())
+{
+  Resource resource;
+  resource.set_name("disk");
+  resource.set_type(Value::SCALAR);
+  resource.mutable_scalar()->set_value(capacity);
+  resource.mutable_provider_id()->CopyFrom(resourceProviderId),
+  resource.mutable_disk()->mutable_source()
+    ->set_type(Resource::DiskInfo::Source::RAW);
+
+  if (profile.isSome()) {
+    resource.mutable_disk()->mutable_source()->set_profile(profile.get());
+  }
+
+  if (id.isSome()) {
+    resource.mutable_disk()->mutable_source()->set_id(id.get());
+  }
+
+  if (metadata.isSome()) {
+    resource.mutable_disk()->mutable_source()->mutable_metadata()
+      ->CopyFrom(metadata.get());
+  }
+
+  return resource;
+}
+
+
 class StorageLocalResourceProviderProcess
   : public Process<StorageLocalResourceProviderProcess>
 {
@@ -257,12 +292,19 @@ public:
   void received(const Event& event);
 
 private:
+  struct ProfileData
+  {
+    csi::VolumeCapability capability;
+    google::protobuf::Map<string, string> parameters;
+  };
+
   void initialize() override;
   void fatal(const string& messsage, const string& failure);
 
   Future<Nothing> recover();
   Future<Nothing> recoverServices();
   void doReliableRegistration();
+  Future<Nothing> reconcile();
 
   // Functions for received events.
   void subscribed(const Event::Subscribed& subscribed);
@@ -275,8 +317,10 @@ private:
 
   Future<Nothing> prepareControllerService();
   Future<Nothing> prepareNodeService();
+  Future<Resources> importResources();
 
   void checkpointResourceProviderState();
+  void sendResourceProviderStateUpdate();
 
   enum State
   {
@@ -296,6 +340,7 @@ private:
   const Option<string> authToken;
 
   csi::Version csiVersion;
+  hashmap<string, ProfileData> profiles;
   process::grpc::client::Runtime runtime;
   Owned<v1::resource_provider::Driver> driver;
 
@@ -311,7 +356,7 @@ private:
 
   list<Event::Operation> pendingOperations;
   Resources totalResources;
-  string resourceVersion;
+  Option<UUID> resourceVersion;
 };
 
 
@@ -394,10 +439,21 @@ void StorageLocalResourceProviderProcess::initialize()
     }
   }
 
+  // NOTE: The name of the default profile is an empty string, which is
+  // the default value for `Resource.disk.source.profile` when unset.
+  // TODO(chhsiao): Use the volume profile module.
+  ProfileData& defaultProfile = profiles[""];
+  defaultProfile.capability.mutable_mount();
+  defaultProfile.capability.mutable_access_mode()
+    ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+
   const string message =
     "Failed to recover resource provider with type '" + info.type() +
     "' and name '" + info.name() + "'";
 
+  // NOTE: Most resource provider events rely on the plugins being
+  // prepared. To avoid race conditions, we connect to the agent after
+  // preparing the plugins.
   recover()
     .onFailed(defer(self(), &Self::fatal, message, lambda::_1))
     .onDiscarded(defer(self(), &Self::fatal, message, "future discarded"));
@@ -423,10 +479,11 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recover()
 
   return recoverServices()
     .then(defer(self(), [=]() -> Future<Nothing> {
-      // Recover the resource provider ID from the latest symlink. If
-      // the symlink does not exist or it points to a non-exist
-      // directory, treat this as a new resource provider.
-      // TODO(chhsiao): State recovery.
+      // Recover the resource provider ID and state from the latest
+      // symlink. If the symlink cannot be resolved, this is a new
+      // resource provider, and `totalResources` and `resourceVersion`
+      // will be empty, which is fine since they will be set up during
+      // reconciliation.
       Result<string> realpath = os::realpath(
           slave::paths::getLatestResourceProviderPath(
               metaDir, slaveId, info.type(), info.name()));
@@ -438,9 +495,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recover()
             ": " + realpath.error());
       }
 
-      if (realpath.isNone()) {
-        resourceVersion = UUID::random().toBytes();
-      } else {
+      if (realpath.isSome()) {
         info.mutable_id()->set_value(Path(realpath.get()).basename());
 
         const string statePath = slave::paths::getResourceProviderStatePath(
@@ -455,16 +510,19 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recover()
               "': " + resourceProviderState.error());
         }
 
-        if (resourceProviderState.isNone()) {
-          resourceVersion = UUID::random().toBytes();
-        } else {
+        if (resourceProviderState.isSome()) {
           foreach (const Event::Operation& operation,
                    resourceProviderState->operations()) {
             pendingOperations.push_back(operation);
           }
 
           totalResources = resourceProviderState->resources();
-          resourceVersion = resourceProviderState->resource_version_uuid();
+
+          Try<UUID> uuid =
+            UUID::fromBytes(resourceProviderState->resource_version_uuid());
+          CHECK_SOME(uuid);
+
+          resourceVersion = uuid.get();
         }
       }
 
@@ -578,23 +636,75 @@ void 
StorageLocalResourceProviderProcess::doReliableRegistration()
 
   CHECK_EQ(CONNECTED, state);
 
-  const string message =
-    "Failed to subscribe resource provider with type '" + info.type() +
-    "' and name '" + info.name() + "'";
-
   Call call;
   call.set_type(Call::SUBSCRIBE);
 
   Call::Subscribe* subscribe = call.mutable_subscribe();
   subscribe->mutable_resource_provider_info()->CopyFrom(info);
 
-  driver->send(evolve(call));
+  auto err = [](const ResourceProviderInfo& info, const string& message) {
+    LOG(ERROR)
+      << "Failed to subscribe resource provider with type '" << info.type()
+      << "' and name '" << info.name() << "': " << message;
+  };
+
+  driver->send(evolve(call))
+    .onFailed(std::bind(err, info, lambda::_1))
+    .onDiscarded(std::bind(err, info, "future discarded"));
 
   // TODO(chhsiao): Consider doing an exponential backoff.
   delay(Seconds(1), self(), &Self::doReliableRegistration);
 }
 
 
+Future<Nothing> StorageLocalResourceProviderProcess::reconcile()
+{
+  return importResources()
+    .then(defer(self(), [=](Resources imported) {
+      // NOTE: We do not support decreasing the total resources for now.
+      // Any resource in the checkpointed state will be reported to the
+      // resource provider manager, even if it is missing in the
+      // imported resources from the plugin. Additional resources from
+      // the plugin will be reported with the default reservation.
+
+      Resources stripped;
+      foreach (const Resource& resource, totalResources) {
+        CHECK(resource.disk().source().has_id());
+
+        stripped += createRawDiskResource(
+            resource.provider_id(),
+            resource.scalar().value(),
+            resource.disk().source().has_profile()
+              ? resource.disk().source().profile() : Option<string>::none(),
+            resource.disk().source().has_id()
+              ? resource.disk().source().id() : Option<string>::none(),
+            resource.disk().source().has_metadata()
+              ? resource.disk().source().metadata() : Option<Labels>::none());
+      }
+
+      Resources result = totalResources;
+      foreach (Resource resource, imported - stripped) {
+        resource.mutable_reservations()->CopyFrom(info.default_reservations());
+        result += resource;
+
+        LOG(INFO) << "Adding new resource '" << resource << "'";
+      }
+
+      if (resourceVersion.isNone() || result != totalResources) {
+        totalResources = result;
+        resourceVersion = UUID::random();
+        checkpointResourceProviderState();
+      }
+
+      sendResourceProviderStateUpdate();
+
+      state = READY;
+
+      return Nothing();
+    }));
+}
+
+
 void StorageLocalResourceProviderProcess::subscribed(
     const Event::Subscribed& subscribed)
 {
@@ -614,6 +724,15 @@ void StorageLocalResourceProviderProcess::subscribed(
         info.name(),
         info.id());
   }
+
+  const string message =
+    "Failed to update state for resource provider " + stringify(info.id());
+
+  // Reconcile resources after obtaining the resource provider ID.
+  // TODO(chhsiao): Do the reconciliation early.
+  reconcile()
+    .onFailed(defer(self(), &Self::fatal, message, lambda::_1))
+    .onDiscarded(defer(self(), &Self::fatal, message, "future discarded"));
 }
 
 
@@ -1010,6 +1129,131 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::prepareNodeService()
 }
 
 
+// Returns resources reported by the CSI plugin, which are unreserved
+// raw disk resources without any persistent volume.
+Future<Resources> StorageLocalResourceProviderProcess::importResources()
+{
+  // NOTE: This can only be called after `prepareControllerService` and
+  // the resource provider ID has been obtained.
+  CHECK_SOME(controllerCapabilities);
+  CHECK(info.has_id());
+
+  Future<Resources> preprovisioned;
+
+  if (controllerCapabilities->listVolumes) {
+    preprovisioned = getService(controllerContainerId)
+      .then(defer(self(), [=](csi::Client client) {
+        // TODO(chhsiao): Set the max entries and use a loop to do
+        // mutliple `ListVolumes` calls.
+        csi::ListVolumesRequest request;
+        request.mutable_version()->CopyFrom(csiVersion);
+
+        return client.ListVolumes(request)
+          .then(defer(self(), [=](const csi::ListVolumesResponse& response) {
+            Resources resources;
+
+            // Recover volume profiles from the checkpointed state.
+            hashmap<string, string> volumesToProfiles;
+            foreach (const Resource& resource, totalResources) {
+              if (resource.disk().source().has_id() &&
+                  resource.disk().source().has_profile()) {
+                volumesToProfiles[resource.disk().source().id()] =
+                  resource.disk().source().profile();
+              }
+            }
+
+            foreach (const auto& entry, response.entries()) {
+              resources += createRawDiskResource(
+                  info.id(),
+                  entry.volume_info().capacity_bytes(),
+                  volumesToProfiles.contains(entry.volume_info().id())
+                    ? volumesToProfiles.at(entry.volume_info().id())
+                    : Option<string>::none(),
+                  entry.volume_info().id(),
+                  entry.volume_info().attributes().empty()
+                    ? Option<Labels>::none()
+                    : convertStringMapToLabels(
+                          entry.volume_info().attributes()));
+            }
+
+            return resources;
+          }));
+      }));
+  } else {
+    preprovisioned = Resources();
+  }
+
+  return preprovisioned
+    .then(defer(self(), [=](const Resources& preprovisioned) {
+      list<Future<Resources>> futures;
+
+      foreach (const Resource& resource, preprovisioned) {
+        futures.push_back(getService(controllerContainerId)
+          .then(defer(self(), [=](csi::Client client) {
+            csi::ValidateVolumeCapabilitiesRequest request;
+            request.mutable_version()->CopyFrom(csiVersion);
+            request.set_volume_id(resource.disk().source().id());
+
+            // The default profile is used if `profile` is unset.
+            request.add_volume_capabilities()->CopyFrom(
+                profiles.at(resource.disk().source().profile()).capability);
+
+            if (resource.disk().source().has_metadata()) {
+              request.mutable_volume_attributes()->swap(
+                  convertLabelsToStringMap(
+                      resource.disk().source().metadata()).get());
+            }
+
+            return client.ValidateVolumeCapabilities(request)
+              .then(defer(self(), [=](
+                  const csi::ValidateVolumeCapabilitiesResponse& response)
+                  -> Future<Resources> {
+                if (!response.supported()) {
+                  return Failure(
+                      "Unsupported volume capability for resource " +
+                      stringify(resource) + ": " + response.message());
+                }
+
+                return resource;
+              }));
+          })));
+      }
+
+      if (controllerCapabilities->getCapacity) {
+        foreachkey (const string& profile, profiles) {
+          futures.push_back(getService(controllerContainerId)
+            .then(defer(self(), [=](csi::Client client) {
+              csi::GetCapacityRequest request;
+              request.mutable_version()->CopyFrom(csiVersion);
+              request.add_volume_capabilities()
+                ->CopyFrom(profiles.at(profile).capability);
+              *request.mutable_parameters() = profiles.at(profile).parameters;
+
+              return client.GetCapacity(request)
+                .then(defer(self(), [=](
+                    const csi::GetCapacityResponse& response)
+                    -> Future<Resources> {
+                  if (response.available_capacity() == 0) {
+                    return Resources();
+                  }
+
+                  return createRawDiskResource(
+                      info.id(),
+                      response.available_capacity(),
+                      profile.empty() ? Option<string>::none() : profile);
+              }));
+            })));
+        }
+      }
+
+      return collect(futures)
+        .then(defer(self(), [=](const list<Resources>& resources) {
+          return accumulate(resources.begin(), resources.end(), Resources());
+        }));
+    }));
+}
+
+
 void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
 {
   ResourceProviderState state;
@@ -1019,7 +1263,9 @@ void 
StorageLocalResourceProviderProcess::checkpointResourceProviderState()
   }
 
   state.mutable_resources()->CopyFrom(totalResources);
-  state.set_resource_version_uuid(resourceVersion);
+
+  CHECK_SOME(resourceVersion);
+  state.set_resource_version_uuid(resourceVersion->toBytes());
 
   const string statePath = slave::paths::getResourceProviderStatePath(
       metaDir, slaveId, info.type(), info.name(), info.id());
@@ -1029,6 +1275,51 @@ void 
StorageLocalResourceProviderProcess::checkpointResourceProviderState()
 }
 
 
+void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
+{
+  Call call;
+  call.set_type(Call::UPDATE_STATE);
+  call.mutable_resource_provider_id()->CopyFrom(info.id());
+
+  Call::UpdateState* update = call.mutable_update_state();
+
+  foreach (const Event::Operation& operation, pendingOperations) {
+    Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
+    CHECK_SOME(operationUuid);
+
+    // TODO(chhsiao): Maintain a list of terminated but unacknowledged
+    // offer operations in memory and reconstruc that during recovery
+    // by querying status update manager.
+    update->add_operations()->CopyFrom(
+        protobuf::createOfferOperation(
+            operation.info(),
+            protobuf::createOfferOperationStatus(
+                OFFER_OPERATION_PENDING,
+                operation.info().has_id()
+                  ? Option<OfferOperationID>(operation.info().id())
+                  : None()),
+            operation.framework_id(),
+            slaveId,
+            operationUuid.get()));
+  }
+
+  update->mutable_resources()->CopyFrom(totalResources);
+
+  CHECK_SOME(resourceVersion);
+  update->set_resource_version_uuid(resourceVersion->toBytes());
+
+  auto err = [](const ResourceProviderID& id, const string& message) {
+    LOG(ERROR)
+      << "Failed to update state for resource provider " << id << ": "
+      << message;
+  };
+
+  driver->send(evolve(call))
+    .onFailed(std::bind(err, info.id(), lambda::_1))
+    .onDiscarded(std::bind(err, info.id(), "future discarded"));
+}
+
+
 Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
     const http::URL& url,
     const string& workDir,

Reply via email to