Adapted storage local resource provider to use CSI v0.2.

This patch contains necessary changes for the storage local resource
provider to use CSI v0.2. Support for the `STAGE_UNSTAGE_VOLUME` CSI
node service capability is not implemented in this patch yet.

Review: https://reviews.apache.org/r/66410/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/aeffcd7d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/aeffcd7d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/aeffcd7d

Branch: refs/heads/master
Commit: aeffcd7d9a1f9c97e5e347063ceab71c43c00e2d
Parents: 6dfd259
Author: Chun-Hung Hsiao <chhs...@apache.org>
Authored: Thu Apr 12 12:07:19 2018 -0700
Committer: Chun-Hung Hsiao <chhs...@mesosphere.io>
Committed: Thu Apr 12 14:01:52 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 512 ++++++++++++------------
 1 file changed, 257 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/aeffcd7d/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index a07620d..40544e0 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -366,10 +366,11 @@ private:
   void reconcileOperations(
       const Event::ReconcileOperations& reconcile);
 
-  Future<csi::Client> connect(const string& endpoint);
-  Future<csi::Client> getService(const ContainerID& containerId);
+  Future<csi::v0::Client> connect(const string& endpoint);
+  Future<csi::v0::Client> getService(const ContainerID& containerId);
   Future<Nothing> killService(const ContainerID& containerId);
 
+  Future<Nothing> prepareIdentityService();
   Future<Nothing> prepareControllerService();
   Future<Nothing> prepareNodeService();
   Future<Nothing> controllerPublish(const string& volumeId);
@@ -384,7 +385,7 @@ private:
   Future<string> validateCapability(
       const string& volumeId,
       const Option<Labels>& metadata,
-      const csi::VolumeCapability& capability);
+      const csi::v0::VolumeCapability& capability);
   Future<Resources> listVolumes();
   Future<Resources> getCapacities();
 
@@ -436,9 +437,8 @@ private:
 
   shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
 
-  csi::Version csiVersion;
-  csi::VolumeCapability defaultMountCapability;
-  csi::VolumeCapability defaultBlockCapability;
+  csi::v0::VolumeCapability defaultMountCapability;
+  csi::v0::VolumeCapability defaultBlockCapability;
   string bootId;
   process::grpc::client::Runtime runtime;
   Owned<v1::resource_provider::Driver> driver;
@@ -453,14 +453,15 @@ private:
   // True if a reconciliation of storage pools is happening.
   bool reconciling;
 
-  ContainerID controllerContainerId;
-  ContainerID nodeContainerId;
   hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
-  hashmap<ContainerID, Owned<Promise<csi::Client>>> services;
-
-  Option<csi::GetPluginInfoResponse> controllerInfo;
-  Option<csi::GetPluginInfoResponse> nodeInfo;
-  Option<csi::ControllerCapabilities> controllerCapabilities;
+  hashmap<ContainerID, Owned<Promise<csi::v0::Client>>> services;
+
+  Option<ContainerID> nodeContainerId;
+  Option<ContainerID> controllerContainerId;
+  Option<csi::v0::GetPluginInfoResponse> pluginInfo;
+  csi::v0::PluginCapabilities pluginCapabilities;
+  csi::v0::ControllerCapabilities controllerCapabilities;
+  csi::v0::NodeCapabilities nodeCapabilities;
   Option<string> nodeId;
 
   // We maintain the following invariant: if one operation depends on
@@ -554,18 +555,13 @@ void StorageLocalResourceProviderProcess::received(const 
Event& event)
 
 void StorageLocalResourceProviderProcess::initialize()
 {
-  // Set CSI version to 0.1.0.
-  csiVersion.set_major(0);
-  csiVersion.set_minor(1);
-  csiVersion.set_patch(0);
-
   // Default mount and block capabilities for pre-existing volumes.
   defaultMountCapability.mutable_mount();
   defaultMountCapability.mutable_access_mode()
-    ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+    ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
   defaultBlockCapability.mutable_block();
   defaultBlockCapability.mutable_access_mode()
-    ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+    ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
 
   Try<string> _bootId = os::bootId();
   if (_bootId.isError()) {
@@ -577,24 +573,24 @@ void StorageLocalResourceProviderProcess::initialize()
 
   foreach (const CSIPluginContainerInfo& container,
            info.storage().plugin().containers()) {
-    auto it = find(
-        container.services().begin(),
-        container.services().end(),
-        CSIPluginContainerInfo::CONTROLLER_SERVICE);
-    if (it != container.services().end()) {
-      controllerContainerId = getContainerId(info, container);
+    if (container.services().end() != find(
+            container.services().begin(),
+            container.services().end(),
+            CSIPluginContainerInfo::NODE_SERVICE)) {
+      nodeContainerId = getContainerId(info, container);
       break;
     }
   }
 
+  CHECK_SOME(nodeContainerId);
+
   foreach (const CSIPluginContainerInfo& container,
            info.storage().plugin().containers()) {
-    auto it = find(
-        container.services().begin(),
-        container.services().end(),
-        CSIPluginContainerInfo::NODE_SERVICE);
-    if (it != container.services().end()) {
-      nodeContainerId = getContainerId(info, container);
+    if (container.services().end() != find(
+            container.services().begin(),
+            container.services().end(),
+            CSIPluginContainerInfo::CONTROLLER_SERVICE)) {
+      controllerContainerId = getContainerId(info, container);
       break;
     }
   }
@@ -720,10 +716,12 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recoverServices()
 
     const ContainerID& containerId = containerPath->containerId;
 
+    CHECK_SOME(nodeContainerId);
+
     // Do not kill the up-to-date controller or node container.
     // Otherwise, kill them and perform cleanups.
-    if (containerId == controllerContainerId ||
-        containerId == nodeContainerId) {
+    if (nodeContainerId == containerId ||
+        controllerContainerId == containerId) {
       const string configPath = csi::paths::getContainerInfoPath(
           slave::paths::getCsiRootDir(workDir),
           info.storage().plugin().type(),
@@ -776,11 +774,14 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recoverServices()
       })));
   }
 
-  // NOTE: The `GetNodeID` CSI call is only supported if the plugin has
-  // the `PUBLISH_UNPUBLISH_VOLUME` controller capability. So to decide
-  // if `GetNodeID` should be called in `prepareNodeService`, we need to
-  // run `prepareControllerService` first.
+  // NOTE: The `Controller` service is supported if the plugin has the
+  // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is
+  // supported if the `Controller` service has the
+  // `PUBLISH_UNPUBLISH_VOLUME` capability. Therefore, we first launch
+  // the node plugin to get the plugin capabilities, then decide if we
+  // need to launch the controller plugin and get the node ID.
   return collect(futures)
+    .then(defer(self(), &Self::prepareIdentityService))
     .then(defer(self(), &Self::prepareControllerService))
     .then(defer(self(), &Self::prepareNodeService));
 }
@@ -1672,19 +1673,19 @@ void 
StorageLocalResourceProviderProcess::reconcileOperations(
 
 // Returns a future of a CSI client that waits for the endpoint socket
 // to appear if necessary, then connects to the socket and check its
-// supported version.
-Future<csi::Client> StorageLocalResourceProviderProcess::connect(
+// readiness.
+Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect(
     const string& endpoint)
 {
-  Future<csi::Client> client;
+  Future<csi::v0::Client> future;
 
   if (os::exists(endpoint)) {
-    client = csi::Client("unix://" + endpoint, runtime);
+    future = csi::v0::Client("unix://" + endpoint, runtime);
   } else {
     // Wait for the endpoint socket to appear until the timeout expires.
     Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
 
-    client = loop(
+    future = loop(
         self(),
         [=]() -> Future<Nothing> {
           if (timeout.expired()) {
@@ -1693,30 +1694,19 @@ Future<csi::Client> 
StorageLocalResourceProviderProcess::connect(
 
           return after(Milliseconds(10));
         },
-        [=](const Nothing&) -> ControlFlow<csi::Client> {
+        [=](const Nothing&) -> ControlFlow<csi::v0::Client> {
           if (os::exists(endpoint)) {
-            return Break(csi::Client("unix://" + endpoint, runtime));
+            return Break(csi::v0::Client("unix://" + endpoint, runtime));
           }
 
           return Continue();
         });
   }
 
-  return client
-    .then(defer(self(), [=](csi::Client client) {
-      return client.GetSupportedVersions(csi::GetSupportedVersionsRequest())
-        .then(defer(self(), [=](
-            const csi::GetSupportedVersionsResponse& response)
-            -> Future<csi::Client> {
-          auto it = find(
-              response.supported_versions().begin(),
-              response.supported_versions().end(),
-              csiVersion);
-          if (it == response.supported_versions().end()) {
-            return Failure(
-                "CSI version " + stringify(csiVersion) + " is not supported");
-          }
-
+  return future
+    .then(defer(self(), [=](csi::v0::Client client) {
+      return client.Probe(csi::v0::ProbeRequest())
+        .then(defer(self(), [=](const csi::v0::ProbeResponse& response) {
           return client;
         }));
     }));
@@ -1726,7 +1716,7 @@ Future<csi::Client> 
StorageLocalResourceProviderProcess::connect(
 // Returns a future of the latest CSI client for the specified plugin
 // container. If the container is not already running, this method will
 // start a new a new container daemon.
-Future<csi::Client> StorageLocalResourceProviderProcess::getService(
+Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
     const ContainerID& containerId)
 {
   if (daemons.contains(containerId)) {
@@ -1802,7 +1792,7 @@ Future<csi::Client> 
StorageLocalResourceProviderProcess::getService(
     ->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL);
 
   CHECK(!services.contains(containerId));
-  services[containerId].reset(new Promise<csi::Client>());
+  services[containerId].reset(new Promise<csi::v0::Client>());
 
   Try<Owned<ContainerDaemon>> daemon = ContainerDaemon::create(
       extractParentEndpoint(url),
@@ -1815,7 +1805,7 @@ Future<csi::Client> 
StorageLocalResourceProviderProcess::getService(
         CHECK(services.at(containerId)->future().isPending());
 
         return connect(endpointPath)
-          .then(defer(self(), [=](const csi::Client& client) {
+          .then(defer(self(), [=](const csi::v0::Client& client) {
             services.at(containerId)->set(client);
             return Nothing();
           }))
@@ -1827,16 +1817,16 @@ Future<csi::Client> 
StorageLocalResourceProviderProcess::getService(
           }));
       })),
       std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
-        if (containerId == controllerContainerId) {
+        if (containerId == controllerContainerId.get()) {
           metrics.csi_controller_plugin_terminations++;
         }
 
-        if (containerId == nodeContainerId) {
+        if (containerId == nodeContainerId.get()) {
           metrics.csi_node_plugin_terminations++;
         }
 
         services.at(containerId)->discard();
-        services.at(containerId).reset(new Promise<csi::Client>());
+        services.at(containerId).reset(new Promise<csi::v0::Client>());
 
         if (os::exists(endpointPath)) {
           Try<Nothing> rm = os::rm(endpointPath);
@@ -1940,54 +1930,31 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::killService(
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
+Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
 {
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
-      // Get the plugin info and check for consistency.
-      csi::GetPluginInfoRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.GetPluginInfo(request)
-        .then(defer(self(), [=](const csi::GetPluginInfoResponse& response) {
-          controllerInfo = response;
+  CHECK_SOME(nodeContainerId);
 
-          LOG(INFO)
-            << "Controller plugin loaded: " << stringify(controllerInfo.get());
+  return getService(nodeContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the plugin info.
+      return client.GetPluginInfo(csi::v0::GetPluginInfoRequest())
+        .then(defer(self(), [=](
+            const csi::v0::GetPluginInfoResponse& response) {
+          pluginInfo = response;
 
-          if (nodeInfo.isSome() &&
-              (controllerInfo->name() != nodeInfo->name() ||
-               controllerInfo->vendor_version() !=
-                 nodeInfo->vendor_version())) {
-            LOG(WARNING)
-              << "Inconsistent controller and node plugin components. Please "
-                 "check with the plugin vendor to ensure compatibility.";
-          }
+          LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get());
 
-          // NOTE: We always get the latest service future before
-          // proceeding to the next step.
-          return getService(controllerContainerId);
+          // Get the latest service future before proceeding to the next step.
+          return getService(nodeContainerId.get());
         }));
     }))
-    .then(defer(self(), [=](csi::Client client) {
-      // Probe the plugin to validate the runtime environment.
-      csi::ControllerProbeRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.ControllerProbe(request)
-        .then(defer(self(), [=](const csi::ControllerProbeResponse& response) {
-          return getService(controllerContainerId);
-        }));
-    }))
-    .then(defer(self(), [=](csi::Client client) {
-      // Get the controller capabilities.
-      csi::ControllerGetCapabilitiesRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.ControllerGetCapabilities(request)
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the plugin capabilities.
+      return client.GetPluginCapabilities(
+          csi::v0::GetPluginCapabilitiesRequest())
         .then(defer(self(), [=](
-            const csi::ControllerGetCapabilitiesResponse& response) {
-          controllerCapabilities = response.capabilities();
+            const csi::v0::GetPluginCapabilitiesResponse& response) {
+          pluginCapabilities = response.capabilities();
 
           return Nothing();
         }));
@@ -1995,73 +1962,101 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::prepareControllerService()
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
+// NOTE: This can only be called after `prepareIdentityService`.
+Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
 {
-  // NOTE: This can only be called after `prepareControllerService`.
-  CHECK_SOME(controllerCapabilities);
+  CHECK_SOME(pluginInfo);
 
-  return getService(nodeContainerId)
-    .then(defer(self(), [=](csi::Client client) {
-      // Get the plugin info and check for consistency.
-      csi::GetPluginInfoRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
+  if (!pluginCapabilities.controllerService) {
+    return Nothing();
+  }
 
-      return client.GetPluginInfo(request)
-        .then(defer(self(), [=](const csi::GetPluginInfoResponse& response) {
-          nodeInfo = response;
+  if (controllerContainerId.isNone()) {
+    return Failure(
+        stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found");
+  }
 
-          LOG(INFO) << "Node plugin loaded: " << stringify(nodeInfo.get());
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the controller plugin info and check for consistency.
+      return client.GetPluginInfo(csi::v0::GetPluginInfoRequest())
+        .then(defer(self(), [=](
+            const csi::v0::GetPluginInfoResponse& response) {
+          LOG(INFO) << "Controller plugin loaded: " << stringify(response);
 
-          if (controllerInfo.isSome() &&
-              (controllerInfo->name() != nodeInfo->name() ||
-               controllerInfo->vendor_version() !=
-                 nodeInfo->vendor_version())) {
+          if (pluginInfo->name() != response.name() ||
+              pluginInfo->vendor_version() != response.vendor_version()) {
             LOG(WARNING)
               << "Inconsistent controller and node plugin components. Please "
                  "check with the plugin vendor to ensure compatibility.";
           }
 
-          // NOTE: We always get the latest service future before
-          // proceeding to the next step.
-          return getService(nodeContainerId);
+          // Get the latest service future before proceeding to the next step.
+          return getService(controllerContainerId.get());
         }));
     }))
-    .then(defer(self(), [=](csi::Client client) {
-      // Probe the plugin to validate the runtime environment.
-      csi::NodeProbeRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.NodeProbe(request)
-        .then(defer(self(), [=](const csi::NodeProbeResponse& response) {
-          return getService(nodeContainerId);
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the controller capabilities.
+      return client.ControllerGetCapabilities(
+          csi::v0::ControllerGetCapabilitiesRequest())
+        .then(defer(self(), [=](
+            const csi::v0::ControllerGetCapabilitiesResponse& response) {
+          controllerCapabilities = response.capabilities();
+
+          return Nothing();
         }));
-    }))
-    .then(defer(self(), [=](csi::Client client) -> Future<Nothing> {
-      if (!controllerCapabilities->publishUnpublishVolume) {
-        return Nothing();
-      }
+    }));
+}
 
-      // Get the node ID.
-      csi::GetNodeIDRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
 
-      return client.GetNodeID(request)
-        .then(defer(self(), [=](const csi::GetNodeIDResponse& response) {
-          nodeId = response.node_id();
+// NOTE: This can only be called after `prepareIdentityService` and
+// `prepareControllerService`.
+Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
+{
+  CHECK_SOME(nodeContainerId);
 
-          return Nothing();
+  return getService(nodeContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the node capabilities.
+      return client.NodeGetCapabilities(csi::v0::NodeGetCapabilitiesRequest())
+        .then(defer(self(), [=](
+            const csi::v0::NodeGetCapabilitiesResponse& response)
+            -> Future<csi::v0::Client> {
+          nodeCapabilities = response.capabilities();
+
+          // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
+          if (nodeCapabilities.stageUnstageVolume) {
+            return Failure(
+                "Node capability 'STAGE_UNSTAGE_VOLUME' is not supported");
+          }
+
+          // Get the latest service future before proceeding to the next step.
+          return getService(nodeContainerId.get());
+        }))
+        .then(defer(self(), [=](csi::v0::Client client) -> Future<Nothing> {
+          if (!controllerCapabilities.publishUnpublishVolume) {
+            return Nothing();
+          }
+
+          // Get the node ID.
+          return client.NodeGetId(csi::v0::NodeGetIdRequest())
+            .then(defer(self(), [=](
+                const csi::v0::NodeGetIdResponse& response) {
+              nodeId = response.node_id();
+
+              return Nothing();
+            }));
         }));
     }));
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
     const string& volumeId)
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // `prepareNodeService`.
-  CHECK_SOME(controllerCapabilities);
-  CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome());
+  CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome());
 
   CHECK(volumes.contains(volumeId));
   if (volumes.at(volumeId).state.state() ==
@@ -2080,11 +2075,12 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::controllerPublish(
 
   Future<Nothing> controllerPublished;
 
-  if (controllerCapabilities->publishUnpublishVolume) {
-    controllerPublished = getService(controllerContainerId)
-      .then(defer(self(), [=](csi::Client client) {
-        csi::ControllerPublishVolumeRequest request;
-        request.mutable_version()->CopyFrom(csiVersion);
+  if (controllerCapabilities.publishUnpublishVolume) {
+    CHECK_SOME(controllerContainerId);
+
+    controllerPublished = getService(controllerContainerId.get())
+      .then(defer(self(), [=](csi::v0::Client client) {
+        csi::v0::ControllerPublishVolumeRequest request;
         request.set_volume_id(volumeId);
         request.set_node_id(nodeId.get());
         request.mutable_volume_capability()
@@ -2095,9 +2091,9 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::controllerPublish(
 
         return client.ControllerPublishVolume(request)
           .then(defer(self(), [=](
-              const csi::ControllerPublishVolumeResponse& response) {
-            *volumes.at(volumeId).state.mutable_publish_volume_info() =
-              response.publish_volume_info();
+              const csi::v0::ControllerPublishVolumeResponse& response) {
+            *volumes.at(volumeId).state.mutable_publish_info() =
+              response.publish_info();
 
             return Nothing();
           }));
@@ -2122,13 +2118,13 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::controllerPublish(
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
     const string& volumeId)
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // `prepareNodeService`.
-  CHECK_SOME(controllerCapabilities);
-  CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome());
+  CHECK_SOME(controllerContainerId);
+  CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome());
 
   CHECK(volumes.contains(volumeId));
   if (volumes.at(volumeId).state.state() ==
@@ -2147,11 +2143,10 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::controllerUnpublish(
 
   Future<Nothing> controllerUnpublished;
 
-  if (controllerCapabilities->publishUnpublishVolume) {
-    controllerUnpublished = getService(controllerContainerId)
-      .then(defer(self(), [=](csi::Client client) {
-        csi::ControllerUnpublishVolumeRequest request;
-        request.mutable_version()->CopyFrom(csiVersion);
+  if (controllerCapabilities.publishUnpublishVolume) {
+    controllerUnpublished = getService(controllerContainerId.get())
+      .then(defer(self(), [=](csi::v0::Client client) {
+        csi::v0::ControllerUnpublishVolumeRequest request;
         request.set_volume_id(volumeId);
         request.set_node_id(nodeId.get());
 
@@ -2165,7 +2160,7 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::controllerUnpublish(
   return controllerUnpublished
     .then(defer(self(), [=] {
       volumes.at(volumeId).state.set_state(csi::state::VolumeState::CREATED);
-      volumes.at(volumeId).state.mutable_publish_volume_info()->clear();
+      volumes.at(volumeId).state.mutable_publish_info()->clear();
       checkpointVolumeState(volumeId);
 
       return Nothing();
@@ -2182,6 +2177,10 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::controllerUnpublish(
 Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
     const string& volumeId)
 {
+  // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
+
+  CHECK_SOME(nodeContainerId);
+
   CHECK(volumes.contains(volumeId));
   if (volumes.at(volumeId).state.state() ==
         csi::state::VolumeState::NODE_PUBLISH) {
@@ -2208,13 +2207,12 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::nodePublish(
         "Failed to create mount point '" + mountPath + "': " + mkdir.error());
   }
 
-  return getService(nodeContainerId)
-    .then(defer(self(), [=](csi::Client client) {
-      csi::NodePublishVolumeRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
+  return getService(nodeContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      csi::v0::NodePublishVolumeRequest request;
       request.set_volume_id(volumeId);
-      *request.mutable_publish_volume_info() =
-        volumes.at(volumeId).state.publish_volume_info();
+      *request.mutable_publish_info() =
+        volumes.at(volumeId).state.publish_info();
       request.set_target_path(mountPath);
       request.mutable_volume_capability()
         ->CopyFrom(volumes.at(volumeId).state.volume_capability());
@@ -2243,6 +2241,10 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::nodePublish(
 Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
     const string& volumeId)
 {
+  // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
+
+  CHECK_SOME(nodeContainerId);
+
   CHECK(volumes.contains(volumeId));
   if (volumes.at(volumeId).state.state() ==
         csi::state::VolumeState::NODE_UNPUBLISH) {
@@ -2267,10 +2269,9 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::nodeUnpublish(
   Future<Nothing> nodeUnpublished;
 
   if (os::exists(mountPath)) {
-    nodeUnpublished = getService(nodeContainerId)
-      .then(defer(self(), [=](csi::Client client) {
-        csi::NodeUnpublishVolumeRequest request;
-        request.mutable_version()->CopyFrom(csiVersion);
+    nodeUnpublished = getService(nodeContainerId.get())
+      .then(defer(self(), [=](csi::v0::Client client) {
+        csi::v0::NodeUnpublishVolumeRequest request;
         request.set_volume_id(volumeId);
         request.set_target_path(mountPath);
 
@@ -2310,22 +2311,22 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::nodeUnpublish(
 
 
 // Returns a CSI volume ID.
+// NOTE: This can only be called after `prepareControllerService`.
 Future<string> StorageLocalResourceProviderProcess::createVolume(
     const string& name,
     const Bytes& capacity,
     const DiskProfileAdaptor::ProfileInfo& profileInfo)
 {
-  // NOTE: This can only be called after `prepareControllerService`.
-  CHECK_SOME(controllerCapabilities);
-
-  if (!controllerCapabilities->createDeleteVolume) {
-    return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported");
+  if (!controllerCapabilities.createDeleteVolume) {
+    return Failure(
+        "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
   }
 
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
-      csi::CreateVolumeRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
+  CHECK_SOME(controllerContainerId);
+
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      csi::v0::CreateVolumeRequest request;
       request.set_name(name);
       request.mutable_capacity_range()
         ->set_required_bytes(capacity.bytes());
@@ -2335,47 +2336,49 @@ Future<string> 
StorageLocalResourceProviderProcess::createVolume(
       *request.mutable_parameters() = profileInfo.parameters;
 
       return client.CreateVolume(request)
-        .then(defer(self(), [=](const csi::CreateVolumeResponse& response) {
-          const csi::VolumeInfo& volumeInfo = response.volume_info();
+        .then(defer(self(), [=](const csi::v0::CreateVolumeResponse& response) 
{
+          const csi::v0::Volume& volume = response.volume();
 
-          if (volumes.contains(volumeInfo.id())) {
+          if (volumes.contains(volume.id())) {
             // The resource provider failed over after the last
             // `CreateVolume` call, but before the operation status was
             // checkpointed.
             CHECK_EQ(csi::state::VolumeState::CREATED,
-                     volumes.at(volumeInfo.id()).state.state());
+                     volumes.at(volume.id()).state.state());
           } else {
             csi::state::VolumeState volumeState;
             volumeState.set_state(csi::state::VolumeState::CREATED);
             volumeState.mutable_volume_capability()
               ->CopyFrom(profileInfo.capability);
-            *volumeState.mutable_volume_attributes() = volumeInfo.attributes();
+            *volumeState.mutable_volume_attributes() = volume.attributes();
 
-            volumes.put(volumeInfo.id(), std::move(volumeState));
-            checkpointVolumeState(volumeInfo.id());
+            volumes.put(volume.id(), std::move(volumeState));
+            checkpointVolumeState(volume.id());
           }
 
-          return volumeInfo.id();
+          return volume.id();
         }));
     }));
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// `prepareNodeService` (since it may require `NodeUnpublishVolume`).
 Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
     const string& volumeId,
     bool preExisting)
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // `prepareNodeService` (since it may require `NodeUnpublishVolume`).
-  CHECK_SOME(controllerCapabilities);
-  CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome());
+  CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome());
 
   // We do not need the capability for pre-existing volumes since no
   // actual `DeleteVolume` call will be made.
-  if (!preExisting && !controllerCapabilities->createDeleteVolume) {
-    return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported");
+  if (!preExisting && !controllerCapabilities.createDeleteVolume) {
+    return Failure(
+        "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
   }
 
+  CHECK_SOME(controllerContainerId);
+
   const string volumePath = csi::paths::getVolumePath(
       slave::paths::getCsiRootDir(workDir),
       info.storage().plugin().type(),
@@ -2399,10 +2402,9 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::deleteVolume(
       case csi::state::VolumeState::CREATED: {
         if (!preExisting) {
           deleted = deleted
-            .then(defer(self(), &Self::getService, controllerContainerId))
-            .then(defer(self(), [=](csi::Client client) {
-              csi::DeleteVolumeRequest request;
-              request.mutable_version()->CopyFrom(csiVersion);
+            .then(defer(self(), &Self::getService, 
controllerContainerId.get()))
+            .then(defer(self(), [=](csi::v0::Client client) {
+              csi::v0::DeleteVolumeRequest request;
               request.set_volume_id(volumeId);
 
               return client.DeleteVolume(request)
@@ -2443,34 +2445,41 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::deleteVolume(
 }
 
 
-// Validates if a volume has the specified capability. This is called
-// when applying `CREATE_VOLUME` or `CREATE_BLOCK` on a pre-existing
-// volume, so we make it returns a volume ID, similar to `createVolume`.
+// Validates if a volume has the specified capability. This is called when
+// applying `CREATE_VOLUME` or `CREATE_BLOCK` on a pre-existing volume, so we
+// make it returns a volume ID, similar to `createVolume`.
+// NOTE: This can only be called after `prepareIdentityService` and only for
+// newly discovered volumes.
 Future<string> StorageLocalResourceProviderProcess::validateCapability(
     const string& volumeId,
     const Option<Labels>& metadata,
-    const csi::VolumeCapability& capability)
+    const csi::v0::VolumeCapability& capability)
 {
-  // NOTE: This can only be called for newly discovered volumes.
   CHECK(!volumes.contains(volumeId));
 
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
+  if (!pluginCapabilities.controllerService) {
+    return Failure(
+        "Plugin capability 'CONTROLLER_SERVICE' is not supported");
+  }
+
+  CHECK_SOME(controllerContainerId);
+
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
       google::protobuf::Map<string, string> volumeAttributes;
 
       if (metadata.isSome()) {
         volumeAttributes = convertLabelsToStringMap(metadata.get()).get();
       }
 
-      csi::ValidateVolumeCapabilitiesRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
+      csi::v0::ValidateVolumeCapabilitiesRequest request;
       request.set_volume_id(volumeId);
       request.add_volume_capabilities()->CopyFrom(capability);
       *request.mutable_volume_attributes() = volumeAttributes;
 
       return client.ValidateVolumeCapabilities(request)
         .then(defer(self(), [=](
-            const csi::ValidateVolumeCapabilitiesResponse& response)
+            const csi::v0::ValidateVolumeCapabilitiesResponse& response)
             -> Future<string> {
           if (!response.supported()) {
             return Failure(
@@ -2492,27 +2501,25 @@ Future<string> 
StorageLocalResourceProviderProcess::validateCapability(
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// the resource provider ID has been obtained.
 Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // the resource provider ID has been obtained.
-  CHECK_SOME(controllerCapabilities);
   CHECK(info.has_id());
 
   // This is only used for reconciliation so no failure is returned.
-  if (!controllerCapabilities->listVolumes) {
+  if (!controllerCapabilities.listVolumes) {
     return Resources();
   }
 
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
+  CHECK_SOME(controllerContainerId);
+
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
       // TODO(chhsiao): Set the max entries and use a loop to do
       // mutliple `ListVolumes` calls.
-      csi::ListVolumesRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.ListVolumes(request)
-        .then(defer(self(), [=](const csi::ListVolumesResponse& response) {
+      return client.ListVolumes(csi::v0::ListVolumesRequest())
+        .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
           Resources resources;
 
           // Recover disk profiles from the checkpointed state.
@@ -2529,15 +2536,14 @@ Future<Resources> 
StorageLocalResourceProviderProcess::listVolumes()
           foreach (const auto& entry, response.entries()) {
             resources += createRawDiskResource(
                 info,
-                Bytes(entry.volume_info().capacity_bytes()),
-                volumesToProfiles.contains(entry.volume_info().id())
-                  ? volumesToProfiles.at(entry.volume_info().id())
+                Bytes(entry.volume().capacity_bytes()),
+                volumesToProfiles.contains(entry.volume().id())
+                  ? volumesToProfiles.at(entry.volume().id())
                   : Option<string>::none(),
-                entry.volume_info().id(),
-                entry.volume_info().attributes().empty()
+                entry.volume().id(),
+                entry.volume().attributes().empty()
                   ? Option<Labels>::none()
-                  : convertStringMapToLabels(
-                        entry.volume_info().attributes()));
+                  : convertStringMapToLabels(entry.volume().attributes()));
           }
 
           return resources;
@@ -2546,20 +2552,21 @@ Future<Resources> 
StorageLocalResourceProviderProcess::listVolumes()
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// the resource provider ID has been obtained.
 Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // the resource provider ID has been obtained.
-  CHECK_SOME(controllerCapabilities);
   CHECK(info.has_id());
 
   // This is only used for reconciliation so no failure is returned.
-  if (!controllerCapabilities->getCapacity) {
+  if (!controllerCapabilities.getCapacity) {
     return Resources();
   }
 
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
+  CHECK_SOME(controllerContainerId);
+
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
       list<Future<Resources>> futures;
 
       foreach (const string& profile, knownProfiles) {
@@ -2570,14 +2577,13 @@ Future<Resources> 
StorageLocalResourceProviderProcess::getCapacities()
         const DiskProfileAdaptor::ProfileInfo& profileInfo =
           profileInfos.at(profile);
 
-        csi::GetCapacityRequest request;
-        request.mutable_version()->CopyFrom(csiVersion);
+        csi::v0::GetCapacityRequest request;
         request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
         *request.mutable_parameters() = profileInfo.parameters;
 
         futures.push_back(client.GetCapacity(request)
           .then(defer(self(), [=](
-              const csi::GetCapacityResponse& response) -> Resources {
+              const csi::v0::GetCapacityResponse& response) -> Resources {
             if (response.available_capacity() == 0) {
               return Resources();
             }
@@ -3218,26 +3224,22 @@ Try<Owned<LocalResourceProvider>> 
StorageLocalResourceProvider::create(
         "' does not follow Java package naming convention");
   }
 
-  bool hasControllerService = false;
+  // Verify that the plugin provides the CSI node service.
+  // TODO(chhsiao): We should move this check to a validation function
+  // for `CSIPluginInfo`.
   bool hasNodeService = false;
 
   foreach (const CSIPluginContainerInfo& container,
            info.storage().plugin().containers()) {
-    for (int i = 0; i < container.services_size(); i++) {
-      const CSIPluginContainerInfo::Service service = container.services(i);
-      if (service == CSIPluginContainerInfo::CONTROLLER_SERVICE) {
-        hasControllerService = true;
-      } else if (service == CSIPluginContainerInfo::NODE_SERVICE) {
-        hasNodeService = true;
-      }
+    if (container.services().end() != find(
+            container.services().begin(),
+            container.services().end(),
+            CSIPluginContainerInfo::NODE_SERVICE)) {
+      hasNodeService = true;
+      break;
     }
   }
 
-  if (!hasControllerService) {
-    return Error(
-        stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found");
-  }
-
   if (!hasNodeService) {
     return Error(
         stringify(CSIPluginContainerInfo::NODE_SERVICE) + " not found");

Reply via email to