This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch 1.8.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 20f076b28e3eaf06f7c942c6117a26d4dad8ed32 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Mon Apr 8 13:49:27 2019 -0700 Auto-detect CSI API version to create the proper volume manager. The `mesos::csi::VolumeManager::create` function now first creates a `ServiceManager` to detect the API version, then instantiate the proper `VolumeManager` based on it. NOTE: This patch enables CSI v1 by default for all SLRP-related unit tests except for test `RetryRpcWithExponentialBackoff`. Review: https://reviews.apache.org/r/70428 --- src/csi/service_manager.cpp | 182 +++++++++++++++------ src/csi/service_manager.hpp | 1 + src/csi/v0_volume_manager.cpp | 28 +--- src/csi/v0_volume_manager.hpp | 4 +- src/csi/v0_volume_manager_process.hpp | 10 +- src/csi/v1_volume_manager.cpp | 28 +--- src/csi/v1_volume_manager.hpp | 4 +- src/csi/v1_volume_manager_process.hpp | 10 +- src/csi/volume_manager.cpp | 31 ++-- src/csi/volume_manager.hpp | 7 +- src/resource_provider/storage/provider.cpp | 77 ++++++--- .../storage_local_resource_provider_tests.cpp | 8 + 12 files changed, 243 insertions(+), 147 deletions(-) diff --git a/src/csi/service_manager.cpp b/src/csi/service_manager.cpp index d6d0d4a..a87df96 100644 --- a/src/csi/service_manager.cpp +++ b/src/csi/service_manager.cpp @@ -52,6 +52,7 @@ #include "csi/paths.hpp" #include "csi/v0_client.hpp" +#include "csi/v1_client.hpp" #include "internal/devolve.hpp" #include "internal/evolve.hpp" @@ -139,6 +140,7 @@ public: Future<Nothing> recover(); Future<string> getServiceEndpoint(const Service& service); + Future<string> getApiVersion(); private: // Returns the container info of the specified container for this CSI plugin. @@ -155,9 +157,12 @@ private: // Kills the specified plugin container. Future<Nothing> killContainer(const ContainerID& containerId); - // Waits for the endpoint (URI to a Unix domain socket) to be ready. + // Waits for the endpoint (URI to a Unix domain socket) to be created. Future<Nothing> waitEndpoint(const string& endpoint); + // Probes the endpoint to detect the API version and check for readiness. + Future<Nothing> probeEndpoint(const string& endpoint); + // Returns the URI of the latest service endpoint for the specified plugin // container. If the container is not already running, this method will start // a new container. @@ -174,6 +179,7 @@ private: Metrics* metrics; http::Headers headers; + Option<string> apiVersion; hashmap<Service, ContainerID> serviceContainers; hashmap<ContainerID, Owned<ContainerDaemon>> daemons; @@ -350,6 +356,20 @@ Future<string> ServiceManagerProcess::getServiceEndpoint(const Service& service) } +Future<string> ServiceManagerProcess::getApiVersion() +{ + if (apiVersion.isSome()) { + return apiVersion.get(); + } + + // Ensure that the plugin has been probed (which does the API version + // detection) through `getEndpoint` before returning the API version. + CHECK(!serviceContainers.empty()); + return getEndpoint(serviceContainers.begin()->second) + .then(process::defer(self(), [=] { return CHECK_NOTNONE(apiVersion); })); +} + + Option<CSIPluginContainerInfo> ServiceManagerProcess::getContainerInfo( const ContainerID& containerId) { @@ -384,8 +404,8 @@ ServiceManagerProcess::getContainers() httpResponse.status + "' (" + httpResponse.body + ")"); } - Try<v1::agent::Response> v1Response = - internal::deserialize<v1::agent::Response>( + Try<mesos::v1::agent::Response> v1Response = + internal::deserialize<mesos::v1::agent::Response>( contentType, httpResponse.body); if (v1Response.isError()) { @@ -473,55 +493,117 @@ Future<Nothing> ServiceManagerProcess::waitEndpoint(const string& endpoint) const string endpointPath = strings::remove(endpoint, "unix://", strings::PREFIX); - Future<Nothing> created = Nothing(); - if (!os::exists(endpointPath)) { - // Wait for the endpoint socket to appear until the timeout expires. - Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT); + if (os::exists(endpointPath)) { + return Nothing(); + } - created = process::loop( - [=]() -> Future<Nothing> { - if (timeout.expired()) { - return Failure("Timed out waiting for endpoint '" + endpoint + "'"); - } + // Wait for the endpoint socket to appear until the timeout expires. + Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT); - return process::after(Milliseconds(10)); - }, - [=](const Nothing&) -> ControlFlow<Nothing> { - if (os::exists(endpointPath)) { - return Break(); - } + return process::loop( + [=]() -> Future<Nothing> { + if (timeout.expired()) { + return Failure("Timed out waiting for endpoint '" + endpoint + "'"); + } - return Continue(); - }); + return process::after(Milliseconds(10)); + }, + [=](const Nothing&) -> ControlFlow<Nothing> { + if (os::exists(endpointPath)) { + return Break(); + } + + return Continue(); + }); +} + + +Future<Nothing> ServiceManagerProcess::probeEndpoint(const string& endpoint) +{ + // Each probe function returns its API version if the probe is successful, + // an error if the API version is implemented but the probe fails, or a `None` + // if the API version is not implemented. + static const hashmap< + string, + std::function<Future<Result<string>>(const string&, const Runtime&)>> + probers = { + {v0::API_VERSION, + [](const string& endpoint, const Runtime& runtime) { + LOG(INFO) << "Probing endpoint '" << endpoint << "' with CSI v0"; + + return v0::Client(endpoint, runtime) + .probe(v0::ProbeRequest()) + .then([](const v0::RPCResult<v0::ProbeResponse>& result) { + return result.isError() + ? (result.error().status.error_code() == grpc::UNIMPLEMENTED + ? Result<string>::none() : result.error()) + : v0::API_VERSION; + }); + }}, + {v1::API_VERSION, + [](const string& endpoint, const Runtime& runtime) { + LOG(INFO) << "Probing endpoint '" << endpoint << "' with CSI v1"; + + return v1::Client(endpoint, runtime).probe(v1::ProbeRequest()) + .then([](const v1::RPCResult<v1::ProbeResponse>& result) { + // TODO(chhsiao): Retry when `result->ready` is false. + return result.isError() + ? (result.error().status.error_code() == grpc::UNIMPLEMENTED + ? Result<string>::none() : result.error()) + : v1::API_VERSION; + }); + }}, + }; + + ++metrics->csi_plugin_rpcs_pending; + + Future<Result<string>> probed; + + if (apiVersion.isSome()) { + CHECK(probers.contains(apiVersion.get())); + probed = probers.at(apiVersion.get())(endpoint, runtime); + } else { + probed = probers.at(v1::API_VERSION)(endpoint, runtime) + .then(process::defer(self(), [=](const Result<string>& result) { + return result.isNone() + ? probers.at(v0::API_VERSION)(endpoint, runtime) : result; + })); } - return created - .then(process::defer(self(), [=]() -> Future<Nothing> { - // TODO(chhsiao): Detect which CSI version to use through versioned - // `Probe` calls to support CSI v1 in a backward compatible way. - ++metrics->csi_plugin_rpcs_pending; - - return v0::Client(endpoint, runtime).probe(v0::ProbeRequest()) - .then(process::defer(self(), [=]( - const v0::RPCResult<v0::ProbeResponse>& result) -> Future<Nothing> { - if (result.isError()) { - return Failure( - "Failed to probe endpoint '" + endpoint + - "': " + stringify(result.error())); - } + return probed + .then(process::defer(self(), [=]( + const Result<string>& result) -> Future<Nothing> { + if (result.isError()) { + return Failure( + "Failed to probe endpoint '" + endpoint + "': " + result.error()); + } - return Nothing(); - })) - .onAny(process::defer(self(), [this](const Future<Nothing>& future) { - --metrics->csi_plugin_rpcs_pending; - if (future.isReady()) { - ++metrics->csi_plugin_rpcs_finished; - } else if (future.isDiscarded()) { - ++metrics->csi_plugin_rpcs_cancelled; - } else { - ++metrics->csi_plugin_rpcs_failed; - } - })); + if (result.isNone()) { + return Failure( + "Failed to probe endpoint '" + endpoint + "': Unknown API version"); + } + + if (apiVersion.isNone()) { + apiVersion = result.get(); + } else if (apiVersion != result.get()) { + return Failure( + "Failed to probe endpoint '" + endpoint + + "': Inconsistent API version"); + } + + return Nothing(); + })) + .onAny(process::defer(self(), [this](const Future<Nothing>& future) { + // We only update the metrics after the whole detection loop is done so + // it won't introduce much noise. + --metrics->csi_plugin_rpcs_pending; + if (future.isReady()) { + ++metrics->csi_plugin_rpcs_finished; + } else if (future.isDiscarded()) { + ++metrics->csi_plugin_rpcs_cancelled; + } else { + ++metrics->csi_plugin_rpcs_failed; + } })); } @@ -624,6 +706,7 @@ Future<string> ServiceManagerProcess::getEndpoint( CHECK(endpoints.at(containerId)->associate( waitEndpoint(endpoint) + .then(process::defer(self(), &Self::probeEndpoint, endpoint)) .then([endpoint]() -> string { return endpoint; }))); return endpoints.at(containerId)->future().then([] { @@ -728,5 +811,12 @@ Future<string> ServiceManager::getServiceEndpoint(const Service& service) process.get(), &ServiceManagerProcess::getServiceEndpoint, service)); } + +Future<string> ServiceManager::getApiVersion() +{ + return recovered + .then(process::defer(process.get(), &ServiceManagerProcess::getApiVersion)); +} + } // namespace csi { } // namespace mesos { diff --git a/src/csi/service_manager.hpp b/src/csi/service_manager.hpp index 5dd6bc7..60a0805 100644 --- a/src/csi/service_manager.hpp +++ b/src/csi/service_manager.hpp @@ -73,6 +73,7 @@ public: process::Future<Nothing> recover(); process::Future<std::string> getServiceEndpoint(const Service& service); + process::Future<std::string> getApiVersion(); private: process::Owned<ServiceManagerProcess> process; diff --git a/src/csi/v0_volume_manager.cpp b/src/csi/v0_volume_manager.cpp index 02de17e..e19dc7c 100644 --- a/src/csi/v0_volume_manager.cpp +++ b/src/csi/v0_volume_manager.cpp @@ -63,6 +63,7 @@ using process::Continue; using process::ControlFlow; using process::Failure; using process::Future; +using process::Owned; using process::ProcessBase; using process::grpc::StatusError; @@ -74,29 +75,19 @@ namespace csi { namespace v0 { VolumeManagerProcess::VolumeManagerProcess( - const http::URL& agentUrl, const string& _rootDir, const CSIPluginInfo& _info, const hashset<Service> _services, - const string& containerPrefix, - const Option<string>& authToken, const Runtime& _runtime, + ServiceManager* _serviceManager, Metrics* _metrics) : ProcessBase(process::ID::generate("csi-v0-volume-manager")), rootDir(_rootDir), info(_info), services(_services), runtime(_runtime), - metrics(_metrics), - serviceManager(new ServiceManager( - agentUrl, - rootDir, - info, - services, - containerPrefix, - authToken, - runtime, - metrics)) + serviceManager(_serviceManager), + metrics(_metrics) { // This should have been validated in `VolumeManager::create`. CHECK(!services.empty()) @@ -114,8 +105,7 @@ Future<Nothing> VolumeManagerProcess::recover() bootId = bootId_.get(); - return serviceManager->recover() - .then(process::defer(self(), &Self::prepareServices)) + return prepareServices() .then(process::defer(self(), [this]() -> Future<Nothing> { // Recover the states of CSI volumes. Try<list<string>> volumePaths = @@ -1198,22 +1188,18 @@ void VolumeManagerProcess::garbageCollectMountPath(const string& volumeId) VolumeManager::VolumeManager( - const http::URL& agentUrl, const string& rootDir, const CSIPluginInfo& info, const hashset<Service>& services, - const string& containerPrefix, - const Option<string>& authToken, const Runtime& runtime, + ServiceManager* serviceManager, Metrics* metrics) : process(new VolumeManagerProcess( - agentUrl, rootDir, info, services, - containerPrefix, - authToken, runtime, + serviceManager, metrics)) { process::spawn(CHECK_NOTNULL(process.get())); diff --git a/src/csi/v0_volume_manager.hpp b/src/csi/v0_volume_manager.hpp index 6c15f29..9d572e7 100644 --- a/src/csi/v0_volume_manager.hpp +++ b/src/csi/v0_volume_manager.hpp @@ -53,13 +53,11 @@ class VolumeManager : public csi::VolumeManager { public: VolumeManager( - const process::http::URL& agentUrl, const std::string& rootDir, const CSIPluginInfo& info, const hashset<Service>& services, - const std::string& containerPrefix, - const Option<std::string>& authToken, const process::grpc::client::Runtime& runtime, + ServiceManager* serviceManager, Metrics* metrics); // Since this class contains `Owned` members which should not but can be diff --git a/src/csi/v0_volume_manager_process.hpp b/src/csi/v0_volume_manager_process.hpp index c3cd6ca..4cfb5b5 100644 --- a/src/csi/v0_volume_manager_process.hpp +++ b/src/csi/v0_volume_manager_process.hpp @@ -70,13 +70,11 @@ class VolumeManagerProcess : public process::Process<VolumeManagerProcess> { public: explicit VolumeManagerProcess( - const process::http::URL& agentUrl, const std::string& _rootDir, const CSIPluginInfo& _info, const hashset<Service> _services, - const std::string& containerPrefix, - const Option<std::string>& authToken, const process::grpc::client::Runtime& _runtime, + ServiceManager* _serviceManager, Metrics* _metrics); process::Future<Nothing> recover(); @@ -146,8 +144,8 @@ private: // // +------------+ // + + + | CREATED | ^ - // _attachVolume | | | +---+----^---+ | - // | | | | | | _detachVolume + // | | | +---+----^---+ | + // _attachVolume | | | | | | _detachVolume // | | | +---v----+---+ | // v + + | NODE_READY | + ^ // | | +---+----^---+ | | @@ -187,8 +185,8 @@ private: const hashset<Service> services; process::grpc::client::Runtime runtime; + ServiceManager* serviceManager; Metrics* metrics; - process::Owned<ServiceManager> serviceManager; Option<std::string> bootId; Option<PluginCapabilities> pluginCapabilities; diff --git a/src/csi/v1_volume_manager.cpp b/src/csi/v1_volume_manager.cpp index bd334f1..bf640f9 100644 --- a/src/csi/v1_volume_manager.cpp +++ b/src/csi/v1_volume_manager.cpp @@ -64,6 +64,7 @@ using process::Continue; using process::ControlFlow; using process::Failure; using process::Future; +using process::Owned; using process::ProcessBase; using process::grpc::StatusError; @@ -75,29 +76,19 @@ namespace csi { namespace v1 { VolumeManagerProcess::VolumeManagerProcess( - const http::URL& agentUrl, const string& _rootDir, const CSIPluginInfo& _info, const hashset<Service> _services, - const string& containerPrefix, - const Option<string>& authToken, const Runtime& _runtime, + ServiceManager* _serviceManager, Metrics* _metrics) : ProcessBase(process::ID::generate("csi-v1-volume-manager")), rootDir(_rootDir), info(_info), services(_services), runtime(_runtime), - metrics(_metrics), - serviceManager(new ServiceManager( - agentUrl, - rootDir, - info, - services, - containerPrefix, - authToken, - runtime, - metrics)) + serviceManager(_serviceManager), + metrics(_metrics) { // This should have been validated in `VolumeManager::create`. CHECK(!services.empty()) @@ -115,8 +106,7 @@ Future<Nothing> VolumeManagerProcess::recover() bootId = bootId_.get(); - return serviceManager->recover() - .then(process::defer(self(), &Self::prepareServices)) + return prepareServices() .then(process::defer(self(), [this]() -> Future<Nothing> { // Recover the states of CSI volumes. Try<list<string>> volumePaths = @@ -1224,22 +1214,18 @@ void VolumeManagerProcess::garbageCollectMountPath(const string& volumeId) VolumeManager::VolumeManager( - const http::URL& agentUrl, const string& rootDir, const CSIPluginInfo& info, const hashset<Service>& services, - const string& containerPrefix, - const Option<string>& authToken, const Runtime& runtime, + ServiceManager* serviceManager, Metrics* metrics) : process(new VolumeManagerProcess( - agentUrl, rootDir, info, services, - containerPrefix, - authToken, runtime, + serviceManager, metrics)) { process::spawn(CHECK_NOTNULL(process.get())); diff --git a/src/csi/v1_volume_manager.hpp b/src/csi/v1_volume_manager.hpp index f8e6095..ba984a9 100644 --- a/src/csi/v1_volume_manager.hpp +++ b/src/csi/v1_volume_manager.hpp @@ -53,13 +53,11 @@ class VolumeManager : public csi::VolumeManager { public: VolumeManager( - const process::http::URL& agentUrl, const std::string& rootDir, const CSIPluginInfo& info, const hashset<Service>& services, - const std::string& containerPrefix, - const Option<std::string>& authToken, const process::grpc::client::Runtime& runtime, + ServiceManager* serviceManager, Metrics* metrics); // Since this class contains `Owned` members which should not but can be diff --git a/src/csi/v1_volume_manager_process.hpp b/src/csi/v1_volume_manager_process.hpp index 1c80399..30788c3 100644 --- a/src/csi/v1_volume_manager_process.hpp +++ b/src/csi/v1_volume_manager_process.hpp @@ -70,13 +70,11 @@ class VolumeManagerProcess : public process::Process<VolumeManagerProcess> { public: explicit VolumeManagerProcess( - const process::http::URL& agentUrl, const std::string& _rootDir, const CSIPluginInfo& _info, const hashset<Service> _services, - const std::string& containerPrefix, - const Option<std::string>& authToken, const process::grpc::client::Runtime& _runtime, + ServiceManager* _serviceManager, Metrics* _metrics); process::Future<Nothing> recover(); @@ -146,8 +144,8 @@ private: // // +------------+ // + + + | CREATED | ^ - // _attachVolume | | | +---+----^---+ | - // | | | | | | _detachVolume + // | | | +---+----^---+ | + // _attachVolume | | | | | | _detachVolume // | | | +---v----+---+ | // v + + | NODE_READY | + ^ // | | +---+----^---+ | | @@ -187,8 +185,8 @@ private: const hashset<Service> services; process::grpc::client::Runtime runtime; + ServiceManager* serviceManager; Metrics* metrics; - process::Owned<ServiceManager> serviceManager; Option<std::string> bootId; Option<PluginCapabilities> pluginCapabilities; diff --git a/src/csi/volume_manager.cpp b/src/csi/volume_manager.cpp index cbe45cb..c47adfe 100644 --- a/src/csi/volume_manager.cpp +++ b/src/csi/volume_manager.cpp @@ -16,9 +16,14 @@ #include "csi/volume_manager.hpp" -#include <process/grpc.hpp> +#include <memory> +#include <mesos/csi/v0.hpp> +#include <mesos/csi/v1.hpp> + +#include "csi/service_manager.hpp" #include "csi/v0_volume_manager.hpp" +#include "csi/v1_volume_manager.hpp" namespace http = process::http; @@ -32,12 +37,12 @@ namespace mesos { namespace csi { Try<Owned<VolumeManager>> VolumeManager::create( - const http::URL& agentUrl, const string& rootDir, const CSIPluginInfo& info, const hashset<Service>& services, - const string& containerPrefix, - const Option<string>& authToken, + const string& apiVersion, + const Runtime& runtime, + ServiceManager* serviceManager, Metrics* metrics) { if (services.empty()) { @@ -46,15 +51,15 @@ Try<Owned<VolumeManager>> VolumeManager::create( info.type() + "' and name '" + info.name() + "'"); } - return new v0::VolumeManager( - agentUrl, - rootDir, - info, - services, - containerPrefix, - authToken, - Runtime(), - metrics); + if (apiVersion == v0::API_VERSION) { + return Try<Owned<VolumeManager>>(new v0::VolumeManager( + rootDir, info, services, runtime, serviceManager, metrics)); + } else if (apiVersion == v1::API_VERSION) { + return Try<Owned<VolumeManager>>(new v1::VolumeManager( + rootDir, info, services, runtime, serviceManager, metrics)); + } + + return Error("Unsupported CSI API version: " + apiVersion); } } // namespace csi { diff --git a/src/csi/volume_manager.hpp b/src/csi/volume_manager.hpp index cc20f46..0aa6337 100644 --- a/src/csi/volume_manager.hpp +++ b/src/csi/volume_manager.hpp @@ -27,6 +27,7 @@ #include <mesos/csi/types.hpp> #include <process/future.hpp> +#include <process/grpc.hpp> #include <process/http.hpp> #include <process/owned.hpp> @@ -56,12 +57,12 @@ class VolumeManager { public: static Try<process::Owned<VolumeManager>> create( - const process::http::URL& agentUrl, const std::string& rootDir, const CSIPluginInfo& info, const hashset<Service>& services, - const std::string& containerPrefix, - const Option<std::string>& authToken, + const std::string& apiVersion, + const process::grpc::client::Runtime& runtime, + ServiceManager* serviceManager, Metrics* metrics); virtual ~VolumeManager() = default; diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 2dc5c26..b2ca5d0 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -72,6 +72,7 @@ #include "csi/metrics.hpp" #include "csi/paths.hpp" +#include "csi/service_manager.hpp" #include "csi/volume_manager.hpp" #include "internal/devolve.hpp" @@ -113,11 +114,14 @@ using process::spawn; using process::grpc::StatusError; +using process::grpc::client::Runtime; + using process::http::authentication::Principal; using process::metrics::Counter; using process::metrics::PushGauge; +using mesos::csi::ServiceManager; using mesos::csi::VolumeInfo; using mesos::csi::VolumeManager; @@ -180,6 +184,14 @@ static inline http::URL extractParentEndpoint(const http::URL& url) } +static string getContainerPrefix(const ResourceProviderInfo& info) +{ + const Principal principal = LocalResourceProvider::principal(info); + CHECK(principal.claims.contains("cid_prefix")); + return principal.claims.at("cid_prefix"); +} + + static inline Resource createRawDiskResource( const ResourceProviderInfo& info, const Bytes& capacity, @@ -363,6 +375,11 @@ private: // The mapping of known profiles fetched from the DiskProfileAdaptor. hashmap<string, DiskProfileAdaptor::ProfileInfo> profileInfos; + Runtime runtime; + + // NOTE: `serviceManager` must be destructed after `volumeManager` since the + // latter holds a pointer of the former. + Owned<ServiceManager> serviceManager; Owned<VolumeManager> volumeManager; // We maintain the following invariant: if one operation depends on @@ -494,30 +511,6 @@ void StorageLocalResourceProviderProcess::received(const Event& event) void StorageLocalResourceProviderProcess::initialize() { - const Principal principal = LocalResourceProvider::principal(info); - CHECK(principal.claims.contains("cid_prefix")); - const string& containerPrefix = principal.claims.at("cid_prefix"); - - Try<Owned<VolumeManager>> volumeManager_ = VolumeManager::create( - extractParentEndpoint(url), - slave::paths::getCsiRootDir(workDir), - info.storage().plugin(), - {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE}, - containerPrefix, - authToken, - &metrics); - - if (volumeManager_.isError()) { - LOG(ERROR) - << "Failed to create CSI volume manager for resource provider with type '" - << info.type() << "' and name '" << info.name() - << "': " << volumeManager_.error(); - - fatal(); - } - - volumeManager = std::move(volumeManager_).get(); - auto die = [=](const string& message) { LOG(ERROR) << "Failed to recover resource provider with type '" << info.type() @@ -547,7 +540,41 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() { CHECK_EQ(RECOVERING, state); - return volumeManager->recover() + serviceManager.reset(new ServiceManager( + extractParentEndpoint(url), + slave::paths::getCsiRootDir(workDir), + info.storage().plugin(), + {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE}, + getContainerPrefix(info), + authToken, + runtime, + &metrics)); + + return serviceManager->recover() + .then(defer(self(), [=] { + return serviceManager->getApiVersion(); + })) + .then(defer(self(), [=](const string& apiVersion) -> Future<Nothing> { + Try<Owned<VolumeManager>> volumeManager_ = VolumeManager::create( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin(), + {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE}, + apiVersion, + runtime, + serviceManager.get(), + &metrics); + + if (volumeManager_.isError()) { + return Failure( + "Failed to create CSI volume manager for resource provider with " + "type '" + info.type() + "' and name '" + info.name() + "': " + + volumeManager_.error()); + } + + volumeManager = std::move(volumeManager_.get()); + + return volumeManager->recover(); + })) .then(defer(self(), [=]() -> Future<Nothing> { // Recover the resource provider ID and state from the latest symlink. If // the symlink does not exist, this is a new resource provider, and the diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index bd35150..8bf4d23 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -22,6 +22,9 @@ #include <tuple> #include <vector> +#include <mesos/csi/v0.hpp> +#include <mesos/csi/v1.hpp> + #include <process/clock.hpp> #include <process/collect.hpp> #include <process/future.hpp> @@ -4795,6 +4798,11 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) MockCSIPlugin plugin; ASSERT_SOME(plugin.startup(mockCsiEndpoint)); + // TODO(chhsiao): Since this test expects CSI v0 protobufs, we disable CSI v1 + // for now. Remove this once the expectations are parameterized. + EXPECT_CALL(plugin, Probe(_, _, A<csi::v1::ProbeResponse*>())) + .WillRepeatedly(Return(grpc::Status(grpc::UNIMPLEMENTED, ""))); + EXPECT_CALL(plugin, GetCapacity(_, _, A<csi::v0::GetCapacityResponse*>())) .WillRepeatedly(Invoke([]( grpc::ServerContext* context,
