This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 05a228f3a5eb49ff4f325884e09bda933d45a715
Author: Chun-Hung Hsiao <[email protected]>
AuthorDate: Mon Apr 8 13:49:27 2019 -0700

    Auto-detect CSI API version to create the proper volume manager.
    
    The `mesos::csi::VolumeManager::create` function now first creates a
    `ServiceManager` to detect the API version, then instantiate the proper
    `VolumeManager` based on it.
    
    NOTE: This patch enables CSI v1 by default for all SLRP-related unit
    tests except for test `RetryRpcWithExponentialBackoff`.
    
    Review: https://reviews.apache.org/r/70428
---
 src/csi/service_manager.cpp                        | 182 +++++++++++++++------
 src/csi/service_manager.hpp                        |   1 +
 src/csi/v0_volume_manager.cpp                      |  28 +---
 src/csi/v0_volume_manager.hpp                      |   4 +-
 src/csi/v0_volume_manager_process.hpp              |  10 +-
 src/csi/v1_volume_manager.cpp                      |  28 +---
 src/csi/v1_volume_manager.hpp                      |   4 +-
 src/csi/v1_volume_manager_process.hpp              |  10 +-
 src/csi/volume_manager.cpp                         |  31 ++--
 src/csi/volume_manager.hpp                         |   7 +-
 src/resource_provider/storage/provider.cpp         |  77 ++++++---
 .../storage_local_resource_provider_tests.cpp      |   8 +
 12 files changed, 243 insertions(+), 147 deletions(-)

diff --git a/src/csi/service_manager.cpp b/src/csi/service_manager.cpp
index d6d0d4a..a87df96 100644
--- a/src/csi/service_manager.cpp
+++ b/src/csi/service_manager.cpp
@@ -52,6 +52,7 @@
 
 #include "csi/paths.hpp"
 #include "csi/v0_client.hpp"
+#include "csi/v1_client.hpp"
 
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
@@ -139,6 +140,7 @@ public:
   Future<Nothing> recover();
 
   Future<string> getServiceEndpoint(const Service& service);
+  Future<string> getApiVersion();
 
 private:
   // Returns the container info of the specified container for this CSI plugin.
@@ -155,9 +157,12 @@ private:
   // Kills the specified plugin container.
   Future<Nothing> killContainer(const ContainerID& containerId);
 
-  // Waits for the endpoint (URI to a Unix domain socket) to be ready.
+  // Waits for the endpoint (URI to a Unix domain socket) to be created.
   Future<Nothing> waitEndpoint(const string& endpoint);
 
+  // Probes the endpoint to detect the API version and check for readiness.
+  Future<Nothing> probeEndpoint(const string& endpoint);
+
   // Returns the URI of the latest service endpoint for the specified plugin
   // container. If the container is not already running, this method will start
   // a new container.
@@ -174,6 +179,7 @@ private:
   Metrics* metrics;
 
   http::Headers headers;
+  Option<string> apiVersion;
   hashmap<Service, ContainerID> serviceContainers;
 
   hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
@@ -350,6 +356,20 @@ Future<string> 
ServiceManagerProcess::getServiceEndpoint(const Service& service)
 }
 
 
+Future<string> ServiceManagerProcess::getApiVersion()
+{
+  if (apiVersion.isSome()) {
+    return apiVersion.get();
+  }
+
+  // Ensure that the plugin has been probed (which does the API version
+  // detection) through `getEndpoint` before returning the API version.
+  CHECK(!serviceContainers.empty());
+  return getEndpoint(serviceContainers.begin()->second)
+    .then(process::defer(self(), [=] { return CHECK_NOTNONE(apiVersion); }));
+}
+
+
 Option<CSIPluginContainerInfo> ServiceManagerProcess::getContainerInfo(
     const ContainerID& containerId)
 {
@@ -384,8 +404,8 @@ ServiceManagerProcess::getContainers()
             httpResponse.status + "' (" + httpResponse.body + ")");
       }
 
-      Try<v1::agent::Response> v1Response =
-        internal::deserialize<v1::agent::Response>(
+      Try<mesos::v1::agent::Response> v1Response =
+        internal::deserialize<mesos::v1::agent::Response>(
             contentType, httpResponse.body);
 
       if (v1Response.isError()) {
@@ -473,55 +493,117 @@ Future<Nothing> 
ServiceManagerProcess::waitEndpoint(const string& endpoint)
   const string endpointPath =
     strings::remove(endpoint, "unix://", strings::PREFIX);
 
-  Future<Nothing> created = Nothing();
-  if (!os::exists(endpointPath)) {
-    // Wait for the endpoint socket to appear until the timeout expires.
-    Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
+  if (os::exists(endpointPath)) {
+    return Nothing();
+  }
 
-    created = process::loop(
-        [=]() -> Future<Nothing> {
-          if (timeout.expired()) {
-            return Failure("Timed out waiting for endpoint '" + endpoint + 
"'");
-          }
+  // Wait for the endpoint socket to appear until the timeout expires.
+  Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
 
-          return process::after(Milliseconds(10));
-        },
-        [=](const Nothing&) -> ControlFlow<Nothing> {
-          if (os::exists(endpointPath)) {
-            return Break();
-          }
+  return process::loop(
+      [=]() -> Future<Nothing> {
+        if (timeout.expired()) {
+          return Failure("Timed out waiting for endpoint '" + endpoint + "'");
+        }
 
-          return Continue();
-        });
+        return process::after(Milliseconds(10));
+      },
+      [=](const Nothing&) -> ControlFlow<Nothing> {
+        if (os::exists(endpointPath)) {
+          return Break();
+        }
+
+        return Continue();
+      });
+}
+
+
+Future<Nothing> ServiceManagerProcess::probeEndpoint(const string& endpoint)
+{
+  // Each probe function returns its API version if the probe is successful,
+  // an error if the API version is implemented but the probe fails, or a 
`None`
+  // if the API version is not implemented.
+  static const hashmap<
+      string,
+      std::function<Future<Result<string>>(const string&, const Runtime&)>>
+    probers = {
+      {v0::API_VERSION,
+       [](const string& endpoint, const Runtime& runtime) {
+         LOG(INFO) << "Probing endpoint '" << endpoint << "' with CSI v0";
+
+         return v0::Client(endpoint, runtime)
+           .probe(v0::ProbeRequest())
+           .then([](const v0::RPCResult<v0::ProbeResponse>& result) {
+             return result.isError()
+               ? (result.error().status.error_code() == grpc::UNIMPLEMENTED
+                    ? Result<string>::none() : result.error())
+               : v0::API_VERSION;
+           });
+       }},
+      {v1::API_VERSION,
+       [](const string& endpoint, const Runtime& runtime) {
+         LOG(INFO) << "Probing endpoint '" << endpoint << "' with CSI v1";
+
+         return v1::Client(endpoint, runtime).probe(v1::ProbeRequest())
+           .then([](const v1::RPCResult<v1::ProbeResponse>& result) {
+             // TODO(chhsiao): Retry when `result->ready` is false.
+             return result.isError()
+               ? (result.error().status.error_code() == grpc::UNIMPLEMENTED
+                    ? Result<string>::none() : result.error())
+               : v1::API_VERSION;
+           });
+       }},
+    };
+
+  ++metrics->csi_plugin_rpcs_pending;
+
+  Future<Result<string>> probed;
+
+  if (apiVersion.isSome()) {
+    CHECK(probers.contains(apiVersion.get()));
+    probed = probers.at(apiVersion.get())(endpoint, runtime);
+  } else {
+    probed = probers.at(v1::API_VERSION)(endpoint, runtime)
+      .then(process::defer(self(), [=](const Result<string>& result) {
+        return result.isNone()
+          ? probers.at(v0::API_VERSION)(endpoint, runtime) : result;
+      }));
   }
 
-  return created
-    .then(process::defer(self(), [=]() -> Future<Nothing> {
-      // TODO(chhsiao): Detect which CSI version to use through versioned
-      // `Probe` calls to support CSI v1 in a backward compatible way.
-      ++metrics->csi_plugin_rpcs_pending;
-
-      return v0::Client(endpoint, runtime).probe(v0::ProbeRequest())
-        .then(process::defer(self(), [=](
-            const v0::RPCResult<v0::ProbeResponse>& result) -> Future<Nothing> 
{
-          if (result.isError()) {
-            return Failure(
-                "Failed to probe endpoint '" + endpoint +
-                "': " + stringify(result.error()));
-          }
+  return probed
+    .then(process::defer(self(), [=](
+        const Result<string>& result) -> Future<Nothing> {
+      if (result.isError()) {
+        return Failure(
+            "Failed to probe endpoint '" + endpoint + "': " + result.error());
+      }
 
-          return Nothing();
-        }))
-        .onAny(process::defer(self(), [this](const Future<Nothing>& future) {
-          --metrics->csi_plugin_rpcs_pending;
-          if (future.isReady()) {
-            ++metrics->csi_plugin_rpcs_finished;
-          } else if (future.isDiscarded()) {
-            ++metrics->csi_plugin_rpcs_cancelled;
-          } else {
-            ++metrics->csi_plugin_rpcs_failed;
-          }
-        }));
+      if (result.isNone()) {
+        return Failure(
+            "Failed to probe endpoint '" + endpoint + "': Unknown API 
version");
+      }
+
+      if (apiVersion.isNone()) {
+        apiVersion = result.get();
+      } else if (apiVersion != result.get()) {
+        return Failure(
+            "Failed to probe endpoint '" + endpoint +
+            "': Inconsistent API version");
+      }
+
+      return Nothing();
+    }))
+    .onAny(process::defer(self(), [this](const Future<Nothing>& future) {
+      // We only update the metrics after the whole detection loop is done so
+      // it won't introduce much noise.
+      --metrics->csi_plugin_rpcs_pending;
+      if (future.isReady()) {
+        ++metrics->csi_plugin_rpcs_finished;
+      } else if (future.isDiscarded()) {
+        ++metrics->csi_plugin_rpcs_cancelled;
+      } else {
+        ++metrics->csi_plugin_rpcs_failed;
+      }
     }));
 }
 
@@ -624,6 +706,7 @@ Future<string> ServiceManagerProcess::getEndpoint(
 
             CHECK(endpoints.at(containerId)->associate(
                 waitEndpoint(endpoint)
+                  .then(process::defer(self(), &Self::probeEndpoint, endpoint))
                   .then([endpoint]() -> string { return endpoint; })));
 
             return endpoints.at(containerId)->future().then([] {
@@ -728,5 +811,12 @@ Future<string> ServiceManager::getServiceEndpoint(const 
Service& service)
         process.get(), &ServiceManagerProcess::getServiceEndpoint, service));
 }
 
+
+Future<string> ServiceManager::getApiVersion()
+{
+  return recovered
+    .then(process::defer(process.get(), 
&ServiceManagerProcess::getApiVersion));
+}
+
 } // namespace csi {
 } // namespace mesos {
diff --git a/src/csi/service_manager.hpp b/src/csi/service_manager.hpp
index 5dd6bc7..60a0805 100644
--- a/src/csi/service_manager.hpp
+++ b/src/csi/service_manager.hpp
@@ -73,6 +73,7 @@ public:
   process::Future<Nothing> recover();
 
   process::Future<std::string> getServiceEndpoint(const Service& service);
+  process::Future<std::string> getApiVersion();
 
 private:
   process::Owned<ServiceManagerProcess> process;
diff --git a/src/csi/v0_volume_manager.cpp b/src/csi/v0_volume_manager.cpp
index 02de17e..e19dc7c 100644
--- a/src/csi/v0_volume_manager.cpp
+++ b/src/csi/v0_volume_manager.cpp
@@ -63,6 +63,7 @@ using process::Continue;
 using process::ControlFlow;
 using process::Failure;
 using process::Future;
+using process::Owned;
 using process::ProcessBase;
 
 using process::grpc::StatusError;
@@ -74,29 +75,19 @@ namespace csi {
 namespace v0 {
 
 VolumeManagerProcess::VolumeManagerProcess(
-    const http::URL& agentUrl,
     const string& _rootDir,
     const CSIPluginInfo& _info,
     const hashset<Service> _services,
-    const string& containerPrefix,
-    const Option<string>& authToken,
     const Runtime& _runtime,
+    ServiceManager* _serviceManager,
     Metrics* _metrics)
   : ProcessBase(process::ID::generate("csi-v0-volume-manager")),
     rootDir(_rootDir),
     info(_info),
     services(_services),
     runtime(_runtime),
-    metrics(_metrics),
-    serviceManager(new ServiceManager(
-        agentUrl,
-        rootDir,
-        info,
-        services,
-        containerPrefix,
-        authToken,
-        runtime,
-        metrics))
+    serviceManager(_serviceManager),
+    metrics(_metrics)
 {
   // This should have been validated in `VolumeManager::create`.
   CHECK(!services.empty())
@@ -114,8 +105,7 @@ Future<Nothing> VolumeManagerProcess::recover()
 
   bootId = bootId_.get();
 
-  return serviceManager->recover()
-    .then(process::defer(self(), &Self::prepareServices))
+  return prepareServices()
     .then(process::defer(self(), [this]() -> Future<Nothing> {
       // Recover the states of CSI volumes.
       Try<list<string>> volumePaths =
@@ -1198,22 +1188,18 @@ void 
VolumeManagerProcess::garbageCollectMountPath(const string& volumeId)
 
 
 VolumeManager::VolumeManager(
-    const http::URL& agentUrl,
     const string& rootDir,
     const CSIPluginInfo& info,
     const hashset<Service>& services,
-    const string& containerPrefix,
-    const Option<string>& authToken,
     const Runtime& runtime,
+    ServiceManager* serviceManager,
     Metrics* metrics)
   : process(new VolumeManagerProcess(
-        agentUrl,
         rootDir,
         info,
         services,
-        containerPrefix,
-        authToken,
         runtime,
+        serviceManager,
         metrics))
 {
   process::spawn(CHECK_NOTNULL(process.get()));
diff --git a/src/csi/v0_volume_manager.hpp b/src/csi/v0_volume_manager.hpp
index 6c15f29..9d572e7 100644
--- a/src/csi/v0_volume_manager.hpp
+++ b/src/csi/v0_volume_manager.hpp
@@ -53,13 +53,11 @@ class VolumeManager : public csi::VolumeManager
 {
 public:
   VolumeManager(
-      const process::http::URL& agentUrl,
       const std::string& rootDir,
       const CSIPluginInfo& info,
       const hashset<Service>& services,
-      const std::string& containerPrefix,
-      const Option<std::string>& authToken,
       const process::grpc::client::Runtime& runtime,
+      ServiceManager* serviceManager,
       Metrics* metrics);
 
   // Since this class contains `Owned` members which should not but can be
diff --git a/src/csi/v0_volume_manager_process.hpp 
b/src/csi/v0_volume_manager_process.hpp
index c3cd6ca..4cfb5b5 100644
--- a/src/csi/v0_volume_manager_process.hpp
+++ b/src/csi/v0_volume_manager_process.hpp
@@ -70,13 +70,11 @@ class VolumeManagerProcess : public 
process::Process<VolumeManagerProcess>
 {
 public:
   explicit VolumeManagerProcess(
-      const process::http::URL& agentUrl,
       const std::string& _rootDir,
       const CSIPluginInfo& _info,
       const hashset<Service> _services,
-      const std::string& containerPrefix,
-      const Option<std::string>& authToken,
       const process::grpc::client::Runtime& _runtime,
+      ServiceManager* _serviceManager,
       Metrics* _metrics);
 
   process::Future<Nothing> recover();
@@ -146,8 +144,8 @@ private:
   //
   //                          +------------+
   //                 +  +  +  |  CREATED   |  ^
-  //   _attachVolume |  |  |  +---+----^---+  |
-  //                 |  |  |      |    |      | _detachVolume
+  //                 |  |  |  +---+----^---+  |
+  //   _attachVolume |  |  |      |    |      | _detachVolume
   //                 |  |  |  +---v----+---+  |
   //                 v  +  +  | NODE_READY |  +  ^
   //                    |  |  +---+----^---+  |  |
@@ -187,8 +185,8 @@ private:
   const hashset<Service> services;
 
   process::grpc::client::Runtime runtime;
+  ServiceManager* serviceManager;
   Metrics* metrics;
-  process::Owned<ServiceManager> serviceManager;
 
   Option<std::string> bootId;
   Option<PluginCapabilities> pluginCapabilities;
diff --git a/src/csi/v1_volume_manager.cpp b/src/csi/v1_volume_manager.cpp
index bd334f1..bf640f9 100644
--- a/src/csi/v1_volume_manager.cpp
+++ b/src/csi/v1_volume_manager.cpp
@@ -64,6 +64,7 @@ using process::Continue;
 using process::ControlFlow;
 using process::Failure;
 using process::Future;
+using process::Owned;
 using process::ProcessBase;
 
 using process::grpc::StatusError;
@@ -75,29 +76,19 @@ namespace csi {
 namespace v1 {
 
 VolumeManagerProcess::VolumeManagerProcess(
-    const http::URL& agentUrl,
     const string& _rootDir,
     const CSIPluginInfo& _info,
     const hashset<Service> _services,
-    const string& containerPrefix,
-    const Option<string>& authToken,
     const Runtime& _runtime,
+    ServiceManager* _serviceManager,
     Metrics* _metrics)
   : ProcessBase(process::ID::generate("csi-v1-volume-manager")),
     rootDir(_rootDir),
     info(_info),
     services(_services),
     runtime(_runtime),
-    metrics(_metrics),
-    serviceManager(new ServiceManager(
-        agentUrl,
-        rootDir,
-        info,
-        services,
-        containerPrefix,
-        authToken,
-        runtime,
-        metrics))
+    serviceManager(_serviceManager),
+    metrics(_metrics)
 {
   // This should have been validated in `VolumeManager::create`.
   CHECK(!services.empty())
@@ -115,8 +106,7 @@ Future<Nothing> VolumeManagerProcess::recover()
 
   bootId = bootId_.get();
 
-  return serviceManager->recover()
-    .then(process::defer(self(), &Self::prepareServices))
+  return prepareServices()
     .then(process::defer(self(), [this]() -> Future<Nothing> {
       // Recover the states of CSI volumes.
       Try<list<string>> volumePaths =
@@ -1224,22 +1214,18 @@ void 
VolumeManagerProcess::garbageCollectMountPath(const string& volumeId)
 
 
 VolumeManager::VolumeManager(
-    const http::URL& agentUrl,
     const string& rootDir,
     const CSIPluginInfo& info,
     const hashset<Service>& services,
-    const string& containerPrefix,
-    const Option<string>& authToken,
     const Runtime& runtime,
+    ServiceManager* serviceManager,
     Metrics* metrics)
   : process(new VolumeManagerProcess(
-        agentUrl,
         rootDir,
         info,
         services,
-        containerPrefix,
-        authToken,
         runtime,
+        serviceManager,
         metrics))
 {
   process::spawn(CHECK_NOTNULL(process.get()));
diff --git a/src/csi/v1_volume_manager.hpp b/src/csi/v1_volume_manager.hpp
index f8e6095..ba984a9 100644
--- a/src/csi/v1_volume_manager.hpp
+++ b/src/csi/v1_volume_manager.hpp
@@ -53,13 +53,11 @@ class VolumeManager : public csi::VolumeManager
 {
 public:
   VolumeManager(
-      const process::http::URL& agentUrl,
       const std::string& rootDir,
       const CSIPluginInfo& info,
       const hashset<Service>& services,
-      const std::string& containerPrefix,
-      const Option<std::string>& authToken,
       const process::grpc::client::Runtime& runtime,
+      ServiceManager* serviceManager,
       Metrics* metrics);
 
   // Since this class contains `Owned` members which should not but can be
diff --git a/src/csi/v1_volume_manager_process.hpp 
b/src/csi/v1_volume_manager_process.hpp
index 1c80399..30788c3 100644
--- a/src/csi/v1_volume_manager_process.hpp
+++ b/src/csi/v1_volume_manager_process.hpp
@@ -70,13 +70,11 @@ class VolumeManagerProcess : public 
process::Process<VolumeManagerProcess>
 {
 public:
   explicit VolumeManagerProcess(
-      const process::http::URL& agentUrl,
       const std::string& _rootDir,
       const CSIPluginInfo& _info,
       const hashset<Service> _services,
-      const std::string& containerPrefix,
-      const Option<std::string>& authToken,
       const process::grpc::client::Runtime& _runtime,
+      ServiceManager* _serviceManager,
       Metrics* _metrics);
 
   process::Future<Nothing> recover();
@@ -146,8 +144,8 @@ private:
   //
   //                          +------------+
   //                 +  +  +  |  CREATED   |  ^
-  //   _attachVolume |  |  |  +---+----^---+  |
-  //                 |  |  |      |    |      | _detachVolume
+  //                 |  |  |  +---+----^---+  |
+  //   _attachVolume |  |  |      |    |      | _detachVolume
   //                 |  |  |  +---v----+---+  |
   //                 v  +  +  | NODE_READY |  +  ^
   //                    |  |  +---+----^---+  |  |
@@ -187,8 +185,8 @@ private:
   const hashset<Service> services;
 
   process::grpc::client::Runtime runtime;
+  ServiceManager* serviceManager;
   Metrics* metrics;
-  process::Owned<ServiceManager> serviceManager;
 
   Option<std::string> bootId;
   Option<PluginCapabilities> pluginCapabilities;
diff --git a/src/csi/volume_manager.cpp b/src/csi/volume_manager.cpp
index cbe45cb..c47adfe 100644
--- a/src/csi/volume_manager.cpp
+++ b/src/csi/volume_manager.cpp
@@ -16,9 +16,14 @@
 
 #include "csi/volume_manager.hpp"
 
-#include <process/grpc.hpp>
+#include <memory>
 
+#include <mesos/csi/v0.hpp>
+#include <mesos/csi/v1.hpp>
+
+#include "csi/service_manager.hpp"
 #include "csi/v0_volume_manager.hpp"
+#include "csi/v1_volume_manager.hpp"
 
 namespace http = process::http;
 
@@ -32,12 +37,12 @@ namespace mesos {
 namespace csi {
 
 Try<Owned<VolumeManager>> VolumeManager::create(
-    const http::URL& agentUrl,
     const string& rootDir,
     const CSIPluginInfo& info,
     const hashset<Service>& services,
-    const string& containerPrefix,
-    const Option<string>& authToken,
+    const string& apiVersion,
+    const Runtime& runtime,
+    ServiceManager* serviceManager,
     Metrics* metrics)
 {
   if (services.empty()) {
@@ -46,15 +51,15 @@ Try<Owned<VolumeManager>> VolumeManager::create(
         info.type() + "' and name '" + info.name() + "'");
   }
 
-  return new v0::VolumeManager(
-      agentUrl,
-      rootDir,
-      info,
-      services,
-      containerPrefix,
-      authToken,
-      Runtime(),
-      metrics);
+  if (apiVersion == v0::API_VERSION) {
+    return Try<Owned<VolumeManager>>(new v0::VolumeManager(
+        rootDir, info, services, runtime, serviceManager, metrics));
+  } else if (apiVersion == v1::API_VERSION) {
+    return Try<Owned<VolumeManager>>(new v1::VolumeManager(
+        rootDir, info, services, runtime, serviceManager, metrics));
+  }
+
+  return Error("Unsupported CSI API version: " + apiVersion);
 }
 
 } // namespace csi {
diff --git a/src/csi/volume_manager.hpp b/src/csi/volume_manager.hpp
index cc20f46..0aa6337 100644
--- a/src/csi/volume_manager.hpp
+++ b/src/csi/volume_manager.hpp
@@ -27,6 +27,7 @@
 #include <mesos/csi/types.hpp>
 
 #include <process/future.hpp>
+#include <process/grpc.hpp>
 #include <process/http.hpp>
 #include <process/owned.hpp>
 
@@ -56,12 +57,12 @@ class VolumeManager
 {
 public:
   static Try<process::Owned<VolumeManager>> create(
-      const process::http::URL& agentUrl,
       const std::string& rootDir,
       const CSIPluginInfo& info,
       const hashset<Service>& services,
-      const std::string& containerPrefix,
-      const Option<std::string>& authToken,
+      const std::string& apiVersion,
+      const process::grpc::client::Runtime& runtime,
+      ServiceManager* serviceManager,
       Metrics* metrics);
 
   virtual ~VolumeManager() = default;
diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index 2dc5c26..b2ca5d0 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -72,6 +72,7 @@
 
 #include "csi/metrics.hpp"
 #include "csi/paths.hpp"
+#include "csi/service_manager.hpp"
 #include "csi/volume_manager.hpp"
 
 #include "internal/devolve.hpp"
@@ -113,11 +114,14 @@ using process::spawn;
 
 using process::grpc::StatusError;
 
+using process::grpc::client::Runtime;
+
 using process::http::authentication::Principal;
 
 using process::metrics::Counter;
 using process::metrics::PushGauge;
 
+using mesos::csi::ServiceManager;
 using mesos::csi::VolumeInfo;
 using mesos::csi::VolumeManager;
 
@@ -180,6 +184,14 @@ static inline http::URL extractParentEndpoint(const 
http::URL& url)
 }
 
 
+static string getContainerPrefix(const ResourceProviderInfo& info)
+{
+  const Principal principal = LocalResourceProvider::principal(info);
+  CHECK(principal.claims.contains("cid_prefix"));
+  return principal.claims.at("cid_prefix");
+}
+
+
 static inline Resource createRawDiskResource(
     const ResourceProviderInfo& info,
     const Bytes& capacity,
@@ -363,6 +375,11 @@ private:
   // The mapping of known profiles fetched from the DiskProfileAdaptor.
   hashmap<string, DiskProfileAdaptor::ProfileInfo> profileInfos;
 
+  Runtime runtime;
+
+  // NOTE: `serviceManager` must be destructed after `volumeManager` since the
+  // latter holds a pointer of the former.
+  Owned<ServiceManager> serviceManager;
   Owned<VolumeManager> volumeManager;
 
   // We maintain the following invariant: if one operation depends on
@@ -494,30 +511,6 @@ void StorageLocalResourceProviderProcess::received(const 
Event& event)
 
 void StorageLocalResourceProviderProcess::initialize()
 {
-  const Principal principal = LocalResourceProvider::principal(info);
-  CHECK(principal.claims.contains("cid_prefix"));
-  const string& containerPrefix = principal.claims.at("cid_prefix");
-
-  Try<Owned<VolumeManager>> volumeManager_ = VolumeManager::create(
-      extractParentEndpoint(url),
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin(),
-      {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE},
-      containerPrefix,
-      authToken,
-      &metrics);
-
-  if (volumeManager_.isError()) {
-    LOG(ERROR)
-      << "Failed to create CSI volume manager for resource provider with type 
'"
-      << info.type() << "' and name '" << info.name()
-      << "': " << volumeManager_.error();
-
-    fatal();
-  }
-
-  volumeManager = std::move(volumeManager_).get();
-
   auto die = [=](const string& message) {
     LOG(ERROR)
       << "Failed to recover resource provider with type '" << info.type()
@@ -547,7 +540,41 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recover()
 {
   CHECK_EQ(RECOVERING, state);
 
-  return volumeManager->recover()
+  serviceManager.reset(new ServiceManager(
+      extractParentEndpoint(url),
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().plugin(),
+      {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE},
+      getContainerPrefix(info),
+      authToken,
+      runtime,
+      &metrics));
+
+  return serviceManager->recover()
+    .then(defer(self(), [=] {
+      return serviceManager->getApiVersion();
+    }))
+    .then(defer(self(), [=](const string& apiVersion) -> Future<Nothing> {
+      Try<Owned<VolumeManager>> volumeManager_ = VolumeManager::create(
+          slave::paths::getCsiRootDir(workDir),
+          info.storage().plugin(),
+          {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE},
+          apiVersion,
+          runtime,
+          serviceManager.get(),
+          &metrics);
+
+      if (volumeManager_.isError()) {
+        return Failure(
+            "Failed to create CSI volume manager for resource provider with "
+            "type '" + info.type() + "' and name '" + info.name() + "': " +
+            volumeManager_.error());
+      }
+
+      volumeManager = std::move(volumeManager_.get());
+
+      return volumeManager->recover();
+    }))
     .then(defer(self(), [=]() -> Future<Nothing> {
       // Recover the resource provider ID and state from the latest symlink. If
       // the symlink does not exist, this is a new resource provider, and the
diff --git a/src/tests/storage_local_resource_provider_tests.cpp 
b/src/tests/storage_local_resource_provider_tests.cpp
index bd35150..8bf4d23 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -22,6 +22,9 @@
 #include <tuple>
 #include <vector>
 
+#include <mesos/csi/v0.hpp>
+#include <mesos/csi/v1.hpp>
+
 #include <process/clock.hpp>
 #include <process/collect.hpp>
 #include <process/future.hpp>
@@ -4795,6 +4798,11 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryRpcWithExponentialBackoff)
   MockCSIPlugin plugin;
   ASSERT_SOME(plugin.startup(mockCsiEndpoint));
 
+  // TODO(chhsiao): Since this test expects CSI v0 protobufs, we disable CSI v1
+  // for now. Remove this once the expectations are parameterized.
+  EXPECT_CALL(plugin, Probe(_, _, A<csi::v1::ProbeResponse*>()))
+    .WillRepeatedly(Return(grpc::Status(grpc::UNIMPLEMENTED, "")));
+
   EXPECT_CALL(plugin, GetCapacity(_, _, A<csi::v0::GetCapacityResponse*>()))
     .WillRepeatedly(Invoke([](
         grpc::ServerContext* context,

Reply via email to