This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit fe7010099133962fd3d58ffde18d6c8e7472e01a
Author: Chun-Hung Hsiao <[email protected]>
AuthorDate: Thu May 9 17:59:16 2019 -0700

    Made SLRP allow changes in volume context.
    
    To make SLRP more robust against non-conforming CSI plugins that change
    volume contexts, the `getExistVolumes` method returns a list of resource
    conversions consisting of one for converting old volume contexts to new
    volume contexts, and one to remove missing volumes and add new volumes.
    
    To make the interfaces consistent, `getStoragePools` now also returns a
    list of resource conversions consisting of one conversion.
    
    Review: https://reviews.apache.org/r/70620
---
 include/mesos/mesos.proto                  |   3 +-
 include/mesos/v1/mesos.proto               |   3 +-
 src/resource_provider/storage/provider.cpp | 177 +++++++++++++++++++----------
 3 files changed, 121 insertions(+), 62 deletions(-)

diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index dc6a87f..2b4f350 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -1510,7 +1510,8 @@ message Resource {
       optional string id = 4; // EXPERIMENTAL.
 
       // Additional metadata for this source. This field maps onto CSI volume
-      // attributes and is not expected to be set by frameworks.
+      // context. Frameworks should neither alter this field, nor expect this
+      // field to remain unchanged.
       optional Labels metadata = 5; // EXPERIMENTAL.
 
       // This field serves as an indirection to a set of storage
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index e8086e0..bafc274 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -1502,7 +1502,8 @@ message Resource {
       optional string id = 4; // EXPERIMENTAL.
 
       // Additional metadata for this source. This field maps onto CSI volume
-      // attributes and is not expected to be set by frameworks.
+      // context. Frameworks should neither alter this field, nor expect this
+      // field to remain unchanged.
       optional Labels metadata = 5; // EXPERIMENTAL.
 
       // This field serves as an indirection to a set of storage
diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index 999fe95..6d63260 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -57,6 +57,7 @@
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
 #include <stout/linkedhashmap.hpp>
 #include <stout/nothing.hpp>
 #include <stout/os.hpp>
@@ -276,8 +277,14 @@ private:
       const Resources& checkpointed,
       const Resources& discovered);
 
-  Future<Resources> getRawVolumes();
-  Future<Resources> getStoragePools();
+  // Returns a list of resource conversions to updates volume contexts for
+  // existing volumes, remove disappeared unconverted volumes, and add newly
+  // appeared ones.
+  Future<vector<ResourceConversion>> getExistingVolumes();
+
+  // Returns a list of resource conversions to remove disappeared unconverted
+  // storage pools and add newly appeared ones.
+  Future<vector<ResourceConversion>> getStoragePools();
 
   // Spawns a loop to watch for changes in the set of known profiles and update
   // the profile mapping and storage pools accordingly.
@@ -711,21 +718,21 @@ 
StorageLocalResourceProviderProcess::reconcileResourceProviderState()
 {
   return reconcileOperationStatuses()
     .then(defer(self(), [=] {
-      return collect<Resources>({getRawVolumes(), getStoragePools()})
-        .then(defer(self(), [=](const vector<Resources>& discovered) {
-          ResourceConversion conversion = reconcileResources(
-              totalResources,
-              accumulate(discovered.begin(), discovered.end(), Resources()));
-
-          Try<Resources> result = totalResources.apply(conversion);
-          CHECK_SOME(result);
+      return collect<vector<ResourceConversion>>(
+          {getExistingVolumes(), getStoragePools()})
+        .then(defer(self(), [=](
+            const vector<vector<ResourceConversion>>& collected) {
+          Resources result = totalResources;
+          foreach (const vector<ResourceConversion>& conversions, collected) {
+            result = CHECK_NOTERROR(result.apply(conversions));
+          }
 
-          if (result.get() != totalResources) {
+          if (result != totalResources) {
             LOG(INFO)
-              << "Removing '" << conversion.consumed << "' and adding '"
-              << conversion.converted << "' to the total resources";
+              << "Removing '" << (totalResources - result) << "' and adding '"
+              << (result - totalResources) << "' to the total resources";
 
-            totalResources = result.get();
+            totalResources = result;
             checkpointResourceProviderState();
           }
 
@@ -919,21 +926,15 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::reconcileStoragePools()
   };
 
   return getStoragePools()
-    .then(defer(self(), [=](const Resources& discovered) {
-      ResourceConversion conversion = reconcileResources(
-          totalResources.filter(
-              [](const Resource& r) { return !r.disk().source().has_id(); }),
-          discovered);
-
-      Try<Resources> result = totalResources.apply(conversion);
-      CHECK_SOME(result);
+    .then(defer(self(), [=](const vector<ResourceConversion>& conversions) {
+      Resources result = CHECK_NOTERROR(totalResources.apply(conversions));
 
-      if (result.get() != totalResources) {
+      if (result != totalResources) {
         LOG(INFO)
-          << "Removing '" << conversion.consumed << "' and adding '"
-          << conversion.converted << "' to the total resources";
+          << "Removing '" << (totalResources - result) << "' and adding '"
+          << (result - totalResources) << "' to the total resources";
 
-        totalResources = result.get();
+        totalResources = result;
         checkpointResourceProviderState();
 
         // NOTE: We always update the resource version before sending
@@ -1007,7 +1008,7 @@ ResourceConversion 
StorageLocalResourceProviderProcess::reconcileResources(
   Resources toAdd = discovered;
 
   foreach (const Resource& resource, checkpointed) {
-    Resource unconverted = createRawDiskResource(
+    Resources unconverted = createRawDiskResource(
         info,
         Bytes(resource.scalar().value() * Bytes::MEGABYTES),
         resource.disk().source().has_profile()
@@ -1024,7 +1025,7 @@ ResourceConversion 
StorageLocalResourceProviderProcess::reconcileResources(
       // "unconverted" version of a checkpointed resource, this is not a
       // new resource.
       toAdd -= unconverted;
-    } else if (checkpointed.contains(unconverted)) {
+    } else if (unconverted == resource) {
       // If the remaining of the discovered resources does not contain
       // the "unconverted" version of the checkpointed resource, the
       // resource is missing. However, if it remains unconverted in the
@@ -1041,68 +1042,124 @@ ResourceConversion 
StorageLocalResourceProviderProcess::reconcileResources(
 }
 
 
-Future<Resources> StorageLocalResourceProviderProcess::getRawVolumes()
+Future<vector<ResourceConversion>>
+StorageLocalResourceProviderProcess::getExistingVolumes()
 {
   CHECK(info.has_id());
 
   return volumeManager->listVolumes()
     .then(defer(self(), [=](const vector<VolumeInfo>& volumeInfos) {
-      Resources resources;
-
-      // Recover disk profiles from the checkpointed state.
-      hashmap<string, string> volumesToProfiles;
+      // Since we only support "exclusive" (MOUNT or BLOCK) disks, there should
+      // be only one checkpointed resource for each volume ID.
+      hashmap<string, Resource> checkpointedMap;
       foreach (const Resource& resource, totalResources) {
-        if (resource.disk().source().has_id() &&
-            resource.disk().source().has_profile()) {
-          volumesToProfiles.put(
-              resource.disk().source().id(),
-              resource.disk().source().profile());
+        if (resource.disk().source().has_id()) {
+          CHECK(!checkpointedMap.contains(resource.disk().source().id()));
+          checkpointedMap.put(resource.disk().source().id(), resource);
         }
       }
 
+      // The "discovered" resources consist of RAW disk resources, one for each
+      // volume reported by the CSI plugin.
+      Resources discovered;
+
+      // If any volume context has been changed by a non-conforming CSI plugin,
+      // we need to construct a resources conversion to reflect the
+      // corresponding metadata changes, so we maintain the resources to be
+      // removed and those to be added here.
+      Resources metadataToRemove;
+      Resources metadataToAdd;
+
       foreach (const VolumeInfo& volumeInfo, volumeInfos) {
-        resources += createRawDiskResource(
+        Option<string> profile;
+        Option<Labels> metadata = volumeInfo.context.empty()
+          ? Option<Labels>::none()
+          : convertStringMapToLabels(volumeInfo.context);
+
+        if (checkpointedMap.contains(volumeInfo.id)) {
+          const Resource& resource = checkpointedMap.at(volumeInfo.id);
+
+          if (resource.disk().source().has_profile()) {
+            profile = resource.disk().source().profile();
+          }
+
+          // If the volume context has been changed by a non-conforming CSI
+          // plugin, the changes will be reflected in a resource conversion.
+          if (resource.disk().source().metadata() !=
+              metadata.getOrElse(Labels())) {
+            metadataToRemove += resource;
+
+            Resource changed = resource;
+            if (metadata.isSome()) {
+              *changed.mutable_disk()->mutable_source()->mutable_metadata() =
+                *metadata;
+            } else {
+              changed.mutable_disk()->mutable_source()->clear_metadata();
+            }
+
+            metadataToAdd += changed;
+          }
+        }
+
+        discovered += createRawDiskResource(
             info,
             volumeInfo.capacity,
-            volumesToProfiles.contains(volumeInfo.id)
-              ? volumesToProfiles.at(volumeInfo.id)
-              : Option<string>::none(),
+            profile,
             vendor,
             volumeInfo.id,
-            volumeInfo.context.empty()
-              ? Option<Labels>::none()
-              : convertStringMapToLabels(volumeInfo.context));
+            metadata);
       }
 
-      return resources;
+      ResourceConversion metadataConversion(
+          std::move(metadataToRemove), std::move(metadataToAdd));
+
+      Resources checkpointed = CHECK_NOTERROR(
+          totalResources.filter([](const Resource& resource) {
+            return resource.disk().source().has_id();
+          }).apply(metadataConversion));
+
+      return vector<ResourceConversion>{
+        std::move(metadataConversion),
+        reconcileResources(std::move(checkpointed), std::move(discovered))};
     }));
 }
 
 
-Future<Resources> StorageLocalResourceProviderProcess::getStoragePools()
+Future<vector<ResourceConversion>>
+StorageLocalResourceProviderProcess::getStoragePools()
 {
   CHECK(info.has_id());
 
-  vector<Future<Resources>> futures;
+  vector<Future<Resource>> futures;
 
   foreachpair (const string& profile,
                const DiskProfileAdaptor::ProfileInfo& profileInfo,
                profileInfos) {
-    futures.push_back(volumeManager->getCapacity(
-        profileInfo.capability, profileInfo.parameters)
-      .then(defer(self(), [=](const Bytes& capacity) -> Resources {
-        if (capacity == 0) {
-          return Resources();
-        }
-
-        return createRawDiskResource(info, capacity, profile, vendor);
-      })));
+    futures.push_back(
+        volumeManager->getCapacity(
+            profileInfo.capability, profileInfo.parameters)
+          .then(std::bind(
+              &createRawDiskResource,
+              info,
+              lambda::_1,
+              profile,
+              vendor,
+              None(),
+              None())));
   }
 
   return collect(futures)
-    .then([](const vector<Resources>& resources) {
-      return accumulate(resources.begin(), resources.end(), Resources());
-    });
+    .then(defer(self(), [=](const vector<Resource>& resources) {
+      Resources discovered(resources); // Zero resources will be ignored.
+      Resources checkpointed =
+        totalResources.filter([](const Resource& resource) {
+          return Resources::isDisk(resource, Resource::DiskInfo::Source::RAW) 
&&
+            !resource.disk().source().has_id();
+        });
+
+      return vector<ResourceConversion>{
+        reconcileResources(std::move(checkpointed), std::move(discovered))};
+    }));
 }
 
 

Reply via email to