This is an automated email from the ASF dual-hosted git repository. grag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 4ff51041df860dbcc2247ef47a0596e5132da190 Author: Greg Mann <[email protected]> AuthorDate: Thu Aug 20 19:27:23 2020 -0700 Initialized plugins lazily in the CSI server. Review: https://reviews.apache.org/r/72779/ --- src/slave/csi_server.cpp | 403 +++++++++++++++++++++++++++++------------------ src/slave/csi_server.hpp | 8 +- 2 files changed, 253 insertions(+), 158 deletions(-) diff --git a/src/slave/csi_server.cpp b/src/slave/csi_server.cpp index 2ba4f22..0ffe020 100644 --- a/src/slave/csi_server.cpp +++ b/src/slave/csi_server.cpp @@ -19,6 +19,7 @@ #include <vector> #include <mesos/mesos.hpp> +#include <mesos/type_utils.hpp> #include <mesos/secret/resolver.hpp> @@ -58,6 +59,7 @@ using mesos::csi::state::VolumeState; using process::Failure; using process::Future; using process::Owned; +using process::Promise; using process::grpc::client::Runtime; @@ -85,17 +87,17 @@ public: CSIServerProcess( const process::http::URL& _agentUrl, const string& _rootDir, + const string& _pluginConfigDir, SecretGenerator* _secretGenerator, - SecretResolver* _secretResolver, - hashmap<string, CSIPluginInfo> _pluginConfigs) + SecretResolver* _secretResolver) : process::ProcessBase(process::ID::generate("csi-server")), agentUrl(_agentUrl), rootDir(_rootDir), + pluginConfigDir(_pluginConfigDir), secretGenerator(_secretGenerator), - secretResolver(_secretResolver), - pluginConfigs(_pluginConfigs) {} + secretResolver(_secretResolver) {} - Future<Nothing> start(); + Future<Nothing> start(const SlaveID& _agentId); Future<string> publishVolume(const Volume::Source::CSIVolume& volume); @@ -106,73 +108,125 @@ public: private: struct CSIPlugin { - CSIPlugin(const string& metricsPrefix) : metrics(metricsPrefix) {} + CSIPlugin( + const CSIPluginInfo& _info, + const string& metricsPrefix) + : info(_info), + metrics(metricsPrefix) {} CSIPluginInfo info; Owned<ServiceManager> serviceManager; Owned<VolumeManager> volumeManager; Runtime runtime; csi::Metrics metrics; + + // CSI plugins are initialized lazily. When a publish/unpublish call is + // received for a plugin which is not yet initialized, this promise is used + // to perform the call after initialization is complete. + Promise<Nothing> initialized; }; + // Attempts to load configuration for a plugin with the specified name and + // then initializes the plugin. If no name is specified, then all + // configurations found in the plugin config directory are loaded. + Try<Nothing> initializePlugin(const Option<string>& name = None()); + // Contains the plugins loaded by the server. The key of this map is the // plugin name. hashmap<string, CSIPlugin> plugins; const process::http::URL agentUrl; + Option<SlaveID> agentId; const string rootDir; + const string pluginConfigDir; SecretGenerator* secretGenerator; SecretResolver* secretResolver; Option<string> authToken; - hashmap<string, CSIPluginInfo> pluginConfigs; - Option<SlaveID> agentId; }; -Future<Nothing> CSIServerProcess::start() +Try<Nothing> CSIServerProcess::initializePlugin(const Option<string>& name) { - Future<Nothing> result = Nothing(); + if (name.isSome()) { + CHECK(!plugins.contains(name.get())); + } - // The contents of this principal are arbitrary. We choose to avoid a - // principal with a 'value' string so that we do not unintentionally collide - // with another real principal with restricted permissions. - Principal principal(Option<string>::none(), {{"key", "csi-server"}}); + Try<list<string>> entries = os::ls(pluginConfigDir); + if (entries.isError()) { + return Error( + "Unable to list the CSI plugin configuration directory '" + + pluginConfigDir + "': " + entries.error()); + } + + // We are either looking for one specific plugin (if `name` is SOME), or we + // are loading all configs we find (if `name` is NONE). First, we populate + // `pluginConfigs` with one or more valid configurations. Then, we will + // initialize the plugin(s) based on the configuration(s) found. + hashmap<string, CSIPluginInfo> pluginConfigs; + + foreach (const string& entry, entries.get()) { + const string path = path::join(pluginConfigDir, entry); + + // Ignore directory entries. + if (os::stat::isdir(path)) { + continue; + } + + Try<string> read = os::read(path); + if (read.isError()) { + // In case of an error we log and skip to the next entry. + LOG(ERROR) << "Failed to read CSI plugin configuration file '" + << path << "': " << read.error(); + + continue; + } + + Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get()); + if (json.isError()) { + return Error("JSON parse of '" + path + "' failed: " + json.error()); + } + + Try<CSIPluginInfo> parse = ::protobuf::parse<CSIPluginInfo>(json.get()); + if (parse.isError()) { + return Error("Protobuf parse of '" + path + "' failed: " + parse.error()); + } + + const CSIPluginInfo& csiPluginConfig = parse.get(); + const string& type = csiPluginConfig.type(); + + if (pluginConfigs.contains(type)) { + LOG(ERROR) << "Multiple configurations for a CSI plugin are not allowed. " + << "Skipping configuration file '" << path << "' since CSI " + << "plugin '" << type << "' already exists"; + continue; + } + + if (name.isNone() || name.get() == type) { + pluginConfigs[type] = csiPluginConfig; - if (secretGenerator) { - result = secretGenerator->generate(principal) - .then([=](const Secret& secret) -> Future<Nothing> { - Option<Error> error = common::validation::validateSecret(secret); - if (error.isSome()) { - return Failure( - "CSI server failed to validate generated secret: " + - error->message); - } - - if (secret.type() != Secret::VALUE) { - return Failure( - "CSI server expecting generated secret to be of VALUE type " - "instead of " + stringify(secret.type()) + " type; " + - "only VALUE type secrets are supported at this time"); - } - - CHECK(secret.has_value()); - - authToken = secret.value().data(); - - return Nothing(); - }); + if (name.isSome()) { + break; + } + } } - // Initialize CSI plugins. - vector<Future<Nothing>> initializations; + if (pluginConfigs.empty()) { + return Error( + "No valid CSI plugin configurations found in '" + + pluginConfigDir + "'"); + } - foreachpair (const string& name, const CSIPluginInfo& info, pluginConfigs) { + foreachpair (const string& _name, const CSIPluginInfo& info, pluginConfigs) { // Default-construct the plugin struct so that we have a valid runtime // to pass into the service manager. - plugins.put(name, CSIPlugin("csi_plugins/" + name + "/")); + plugins.put(_name, CSIPlugin(info, "csi_plugins/" + _name + "/")); + + CSIPlugin& plugin = plugins.at(_name); if (info.containers_size() > 0) { - plugins.at(name).serviceManager.reset(new ServiceManager( + CHECK_SOME(agentId); + + plugin.serviceManager.reset(new ServiceManager( agentId.get(), agentUrl, rootDir, @@ -180,56 +234,121 @@ Future<Nothing> CSIServerProcess::start() extractServices(info), "org-apache-mesos-internal-", authToken, - plugins.at(name).runtime, - &plugins.at(name).metrics)); + plugin.runtime, + &plugin.metrics)); } else { CHECK(info.endpoints_size() > 0); - plugins.at(name).serviceManager.reset(new ServiceManager( + plugin.serviceManager.reset(new ServiceManager( info, extractServices(info), - plugins.at(name).runtime, - &plugins.at(name).metrics)); + plugin.runtime, + &plugin.metrics)); } - initializations.push_back(plugins.at(name).serviceManager->recover() - .then(defer(self(), [=]() { - CHECK(plugins.contains(name)); + plugin.initialized.associate( + plugin.serviceManager->recover() + .then(defer(self(), [=]() { + CHECK(plugins.contains(_name)); + + return plugins.at(_name).serviceManager->getApiVersion(); + })) + .then(defer(self(), [=](const string& apiVersion) -> Future<Nothing> { + CHECK(plugins.contains(_name)); + + Try<Owned<VolumeManager>> volumeManager = VolumeManager::create( + rootDir, + info, + extractServices(info), + apiVersion, + plugins.at(_name).runtime, + plugins.at(_name).serviceManager.get(), + &plugins.at(_name).metrics, + secretResolver); + + if (volumeManager.isError()) { + return Failure( + "CSI server failed to create volume manager for plugin" + " '" + info.name() + "': " + volumeManager.error()); + } + + plugins.at(_name).volumeManager = std::move(volumeManager.get()); + + return plugins.at(_name).volumeManager->recover(); + })) + .onAny(defer(self(), [=](const Future<Nothing>& future) { + if (!future.isReady()) { + plugins.erase(_name); + + LOG(ERROR) + << "CSI server failed to initialize plugin '" << _name << "': " + << (future.isFailed() ? future.failure() : "discarded"); + } else { + LOG(INFO) + << "CSI server successfully initialized plugin '" + << _name << "'"; + } + }))); + } - return plugins.at(name).serviceManager->getApiVersion(); - })) - .then(defer(self(), [=](const string& apiVersion) -> Future<Nothing> { - CHECK(plugins.contains(name)); - - Try<Owned<VolumeManager>> volumeManager = VolumeManager::create( - rootDir, - info, - extractServices(info), - apiVersion, - plugins.at(name).runtime, - plugins.at(name).serviceManager.get(), - &plugins.at(name).metrics, - secretResolver); - - if (volumeManager.isError()) { - return Failure( - "CSI server failed to create volume manager for plugin" - " '" + info.name() + "': " + volumeManager.error()); - } - - plugins.at(name).volumeManager = std::move(volumeManager.get()); - - return plugins.at(name).volumeManager->recover(); - }))); + return Nothing(); +} + + +Future<Nothing> CSIServerProcess::start(const SlaveID& _agentId) +{ + // NOTE: It's possible that the agent receives multiple + // `SlaveRegisteredMessage`s and detects a disconnection in between. + // In that case, `start` will be called multiple times from + // `Slave::registered`. + if (agentId.isSome()) { + CHECK_EQ(agentId.get(), _agentId) + << "Cannot start CSI server with agent ID " << _agentId + << " (expected: " << agentId.get() << ")"; + + return Nothing(); + } + + agentId = _agentId; + + // Load all CSI plugin configurations found. + Try<Nothing> init = initializePlugin(); + if (init.isError()) { + return Failure( + "CSI server failed to initialize CSI plugins: " + init.error()); } - return result - .then([=]() { - return process::collect(initializations); - }) - .then([=]() { + if (!secretGenerator) { + return Nothing(); + } + + // The contents of this principal are arbitrary. We choose to avoid a + // principal with a 'value' string so that we do not unintentionally collide + // with another real principal with restricted permissions. + Principal principal(Option<string>::none(), {{"key", "csi-server"}}); + + return secretGenerator->generate(principal) + .then([=](const Secret& secret) -> Future<Nothing> { + Option<Error> error = common::validation::validateSecret(secret); + if (error.isSome()) { + return Failure( + "CSI server failed to validate generated secret: " + + error->message); + } + + if (secret.type() != Secret::VALUE) { + return Failure( + "CSI server expecting generated secret to be of VALUE type " + "instead of " + stringify(secret.type()) + " type; " + + "only VALUE type secrets are supported at this time"); + } + + CHECK(secret.has_value()); + + authToken = secret.value().data(); + return Nothing(); - }); + }); } @@ -238,15 +357,31 @@ Future<string> CSIServerProcess::publishVolume( { CHECK(volume.has_static_provisioning()); - if (!plugins.contains(volume.plugin_name())) { - return Failure("Invalid CSI plugin '" + volume.plugin_name() + "'"); + const string& name = volume.plugin_name(); + + if (!plugins.contains(name)) { + // This will attempt to load the plugin's configuration, initialize the + // plugin, and insert it into the `plugins` map. + Try<Nothing> pluginInit = initializePlugin(name); + if (pluginInit.isError()) { + return Failure( + "Failed to initialize CSI plugin '" + + name + "': " + pluginInit.error()); + } } - return plugins.at(volume.plugin_name()).volumeManager->publishVolume( - volume.static_provisioning().volume_id(), - createVolumeState(volume.static_provisioning())) - .then([=]() { - CHECK(plugins.contains(volume.plugin_name())); + CHECK(plugins.contains(name)); + + return plugins.at(name).initialized.future() + .then(defer(self(), [=]() { + CHECK(plugins.contains(name)); + + return plugins.at(name).volumeManager->publishVolume( + volume.static_provisioning().volume_id(), + createVolumeState(volume.static_provisioning())); + })) + .then(defer(self(), [=]() { + CHECK(plugins.contains(name)); const CSIPluginInfo& info = plugins.at(volume.plugin_name()).info; @@ -257,7 +392,7 @@ Future<string> CSIServerProcess::publishVolume( return csi::paths::getMountTargetPath( mountRootDir, volume.static_provisioning().volume_id()); - }); + })); } @@ -266,10 +401,22 @@ Future<Nothing> CSIServerProcess::unpublishVolume( const string& volumeId) { if (!plugins.contains(pluginName)) { - return Failure("Invalid CSI plugin '" + pluginName + "'"); + // This will attempt to load the plugin's configuration, initialize the + // plugin, and insert it into the `plugins` map. + Try<Nothing> pluginInit = initializePlugin(pluginName); + if (pluginInit.isError()) { + return Failure( + "Failed to initialize CSI plugin '" + + pluginName + "': " + pluginInit.error()); + } } - return plugins.at(pluginName).volumeManager->unpublishVolume(volumeId); + CHECK(plugins.contains(pluginName)); + + return plugins.at(pluginName).initialized.future() + .then(defer(self(), [=]() { + return plugins.at(pluginName).volumeManager->unpublishVolume(volumeId); + })); } @@ -313,15 +460,15 @@ hashset<CSIPluginContainerInfo::Service> extractServices( CSIServer::CSIServer( const process::http::URL& agentUrl, const string& rootDir, + const string& pluginConfigDir, SecretGenerator* secretGenerator, - SecretResolver* secretResolver, - const hashmap<string, CSIPluginInfo>& pluginConfigs) + SecretResolver* secretResolver) : process(new CSIServerProcess( agentUrl, rootDir, + pluginConfigDir, secretGenerator, - secretResolver, - pluginConfigs)) + secretResolver)) { process::spawn(CHECK_NOTNULL(process.get())); } @@ -355,73 +502,21 @@ Try<Owned<CSIServer>> CSIServer::create( flags.csi_plugin_config_dir.get() + "' does not exist"); } - Try<list<string>> entries = os::ls(flags.csi_plugin_config_dir.get()); - if (entries.isError()) { - return Error( - "Unable to list the CSI plugin configuration directory '" + - flags.csi_plugin_config_dir.get()+ "': " + entries.error()); - } - - hashmap<std::string, CSIPluginInfo> pluginConfigs; - - foreach (const string& entry, entries.get()) { - const string path = path::join(flags.csi_plugin_config_dir.get(), entry); - - // Ignore directory entries. - if (os::stat::isdir(path)) { - continue; - } - - Try<string> read = os::read(path); - if (read.isError()) { - // In case of an error we log and skip to the next entry. - LOG(ERROR) << "Failed to read CSI plugin configuration file '" - << path << "': " << read.error(); - - continue; - } - - Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get()); - if (json.isError()) { - return Error("JSON parse failed: " + json.error()); - } - - Try<CSIPluginInfo> parse = ::protobuf::parse<CSIPluginInfo>(json.get()); - if (parse.isError()) { - return Error("Protobuf parse failed: " + parse.error()); - } - - const CSIPluginInfo& csiPluginConfig = parse.get(); - const string& type = csiPluginConfig.type(); - - if (pluginConfigs.contains(type)) { - LOG(ERROR) << "Multiple configurations for a CSI plugin are not allowed. " - << "Skipping configuration file '" << path << "' since CSI " - << "plugin '" << type << "' already exists"; - continue; - } - - pluginConfigs[type] = csiPluginConfig; - } - - if (pluginConfigs.empty()) { - return Error( - "No valid CSI plugin configurations found in '" + - flags.csi_plugin_config_dir.get() + "'"); - } - return new CSIServer( agentUrl, slave::paths::getCsiRootDir(flags.work_dir), + flags.csi_plugin_config_dir.get(), secretGenerator, - secretResolver, - pluginConfigs); + secretResolver); } -Future<Nothing> CSIServer::start() +Future<Nothing> CSIServer::start(const SlaveID& agentId) { - started.associate(process::dispatch(process.get(), &CSIServerProcess::start)); + started.associate(process::dispatch( + process.get(), + &CSIServerProcess::start, + agentId)); return started.future(); } diff --git a/src/slave/csi_server.hpp b/src/slave/csi_server.hpp index f5ec766..de5c6b6 100644 --- a/src/slave/csi_server.hpp +++ b/src/slave/csi_server.hpp @@ -60,7 +60,7 @@ public: // Starts the CSI server. Any `publishVolume()` or `unpublishVolume()` calls // which were made previously will be executed after this method is called. // Returns a future which is satisfied once initialization is complete. - process::Future<Nothing> start(); + process::Future<Nothing> start(const SlaveID& agentId); // Publish a CSI volume to this agent. If the `start()` method has not yet // been called, then the publishing of this volume will not be completed until @@ -79,10 +79,10 @@ public: private: CSIServer( const process::http::URL& agentUrl, - const std::string& csiRootDir, + const std::string& rootDir, + const std::string& pluginConfigDir, SecretGenerator* secretGenerator, - SecretResolver* secretResolver, - const hashmap<std::string, CSIPluginInfo>& csiPluginConfigs); + SecretResolver* secretResolver); process::Owned<CSIServerProcess> process;
