This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit fea9905c08f8c30090652a6047e3f3c242b46781 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Mon Apr 1 23:23:38 2019 -0700 Refactored SLRP with `ServiceManager` to manage container lifecycles. Container management is moved out from SLRP to `ServiceManager`. It is agnostic to CSI versions, so can be used to manage plugin containers for both CSI v0 and v1 plugins. This patch squashes the changes from r/70169. Review: https://reviews.apache.org/r/70168/ --- src/CMakeLists.txt | 1 + src/Makefile.am | 2 + src/csi/service_manager.cpp | 726 +++++++++++++++++++++ src/csi/service_manager.hpp | 85 +++ src/resource_provider/storage/provider.cpp | 630 ++---------------- src/resource_provider/storage/provider_process.hpp | 40 +- 6 files changed, 868 insertions(+), 616 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 53b0e7b..f1c3114 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -244,6 +244,7 @@ set(CSI_SRC csi/metrics.cpp csi/paths.cpp csi/rpc.cpp + csi/service_manager.cpp csi/utils.cpp) set(DOCKER_SRC diff --git a/src/Makefile.am b/src/Makefile.am index 7ef8825..ea8e176 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1560,6 +1560,8 @@ libcsi_la_SOURCES = \ csi/paths.hpp \ csi/rpc.cpp \ csi/rpc.hpp \ + csi/service_manager.cpp \ + csi/service_manager.hpp \ csi/state.hpp \ csi/state.proto \ csi/utils.cpp \ diff --git a/src/csi/service_manager.cpp b/src/csi/service_manager.cpp new file mode 100644 index 0000000..27de0c9 --- /dev/null +++ b/src/csi/service_manager.cpp @@ -0,0 +1,726 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "csi/service_manager.hpp" + +#include <algorithm> +#include <functional> +#include <list> +#include <utility> +#include <vector> + +#include <mesos/http.hpp> + +#include <mesos/agent/agent.hpp> + +#include <process/after.hpp> +#include <process/collect.hpp> +#include <process/defer.hpp> +#include <process/dispatch.hpp> +#include <process/id.hpp> +#include <process/loop.hpp> +#include <process/process.hpp> +#include <process/timeout.hpp> + +#include <stout/check.hpp> +#include <stout/duration.hpp> +#include <stout/foreach.hpp> +#include <stout/hashmap.hpp> +#include <stout/os.hpp> +#include <stout/result.hpp> +#include <stout/stringify.hpp> +#include <stout/strings.hpp> +#include <stout/try.hpp> + +#include <stout/os/realpath.hpp> + +#include "common/http.hpp" + +#include "csi/client.hpp" +#include "csi/paths.hpp" +#include "csi/utils.hpp" + +#include "internal/devolve.hpp" +#include "internal/evolve.hpp" + +#include "slave/container_daemon.hpp" +#include "slave/state.hpp" + +namespace http = process::http; +namespace slave = mesos::internal::slave; + +using std::list; +using std::string; +using std::vector; + +using process::Break; +using process::Continue; +using process::ControlFlow; +using process::Failure; +using process::Future; +using process::Owned; +using process::Process; +using process::ProcessBase; +using process::Promise; +using process::Timeout; + +using process::grpc::StatusError; + +using process::grpc::client::Runtime; + +using slave::ContainerDaemon; + +namespace mesos { +namespace csi { + +// Timeout for a CSI plugin component to create its endpoint socket. +// +// TODO(chhsiao): Make the timeout configurable. +constexpr Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1); + + +// Returns the container ID of the specified CSI plugin container. The container +// ID is of the following format: +// +// <container_prefix><plugin_type>-<plugin_name>--<list_of_services> +// +// where <plugin_type> and <plugin_name> are the type and name of the CSI plugin +// with dots replaced by dashes, and <list_of_services> lists the CSI services +// provided by the component, concatenated with dashes. +static ContainerID getContainerId( + const CSIPluginInfo& info, + const string& containerPrefix, + const CSIPluginContainerInfo& container) +{ + ContainerID containerId; + containerId.set_value( + containerPrefix + + strings::join("-", strings::replace(info.type(), ".", "-"), info.name()) + + "--" + strings::join("-", container.services())); + + return containerId; +} + + +class ServiceManagerProcess : public Process<ServiceManagerProcess> +{ +public: + ServiceManagerProcess( + const http::URL& _agentUrl, + const string& _rootDir, + const CSIPluginInfo& _info, + const hashset<Service>& services, + const string& _containerPrefix, + const Option<string>& _authToken, + const Runtime& _runtime, + Metrics* _metrics); + + Future<Nothing> recover(); + + Future<string> getServiceEndpoint(const Service& service); + +private: + // Returns the container info of the specified container for this CSI plugin. + Option<CSIPluginContainerInfo> getContainerInfo( + const ContainerID& containerId); + + // Returns a map of any existing container of this CSI plugin to its status, + // or `None` if it does not have a status (e.g., being destroyed). + Future<hashmap<ContainerID, Option<ContainerStatus>>> getContainers(); + + // Waits for the specified plugin container to be terminated. + Future<Nothing> waitContainer(const ContainerID& containerId); + + // Kills the specified plugin container. + Future<Nothing> killContainer(const ContainerID& containerId); + + // Waits for the endpoint (URI to a Unix domain socket) to be ready. + Future<Nothing> waitEndpoint(const string& endpoint); + + // Returns the URI of the latest service endpoint for the specified plugin + // container. If the container is not already running, this method will start + // a new container. + Future<string> getEndpoint(const ContainerID& containerId); + + const http::URL agentUrl; + const string rootDir; + const CSIPluginInfo info; + const string containerPrefix; + const Option<string> authToken; + const ContentType contentType; + + Runtime runtime; + Metrics* metrics; + + http::Headers headers; + hashmap<Service, ContainerID> serviceContainers; + + hashmap<ContainerID, Owned<ContainerDaemon>> daemons; + hashmap<ContainerID, Owned<Promise<string>>> endpoints; +}; + + +ServiceManagerProcess::ServiceManagerProcess( + const http::URL& _agentUrl, + const string& _rootDir, + const CSIPluginInfo& _info, + const hashset<Service>& services, + const string& _containerPrefix, + const Option<string>& _authToken, + const Runtime& _runtime, + Metrics* _metrics) + : ProcessBase(process::ID::generate("csi-service-manager")), + agentUrl(_agentUrl), + rootDir(_rootDir), + info(_info), + containerPrefix(_containerPrefix), + authToken(_authToken), + contentType(ContentType::PROTOBUF), + runtime(_runtime), + metrics(_metrics) +{ + headers["Accept"] = stringify(contentType); + if (authToken.isSome()) { + headers["Authorization"] = "Bearer " + authToken.get(); + } + + foreach (const Service& service, services) { + // Each service is served by the first container providing the service. See + // `CSIPluginInfo` in `mesos.proto` for details. + foreach (const CSIPluginContainerInfo& container, info.containers()) { + if (container.services().end() != std::find( + container.services().begin(), + container.services().end(), + service)) { + serviceContainers[service] = + getContainerId(info, containerPrefix, container); + + break; + } + } + + CHECK(serviceContainers.contains(service)) + << service << " not found for CSI plugin type '" << info.type() + << "' and name '" << info.name() << "'"; + } +} + + +Future<Nothing> ServiceManagerProcess::recover() +{ + return getContainers() + .then(process::defer(self(), [=]( + const hashmap<ContainerID, Option<ContainerStatus>>& containers) + -> Future<Nothing> { + Try<list<string>> containerPaths = + paths::getContainerPaths(rootDir, info.type(), info.name()); + + if (containerPaths.isError()) { + return Failure( + "Failed to find service container paths for CSI plugin type '" + + info.type() + "' and name '" + info.name() + + "': " + containerPaths.error()); + } + + vector<Future<Nothing>> futures; + + foreach (const string& path, containerPaths.get()) { + Try<paths::ContainerPath> containerPath = + paths::parseContainerPath(rootDir, path); + + if (containerPath.isError()) { + return Failure( + "Failed to parse service container path '" + path + + "': " + containerPath.error()); + } + + CHECK_EQ(info.type(), containerPath->type); + CHECK_EQ(info.name(), containerPath->name); + + const ContainerID& containerId = containerPath->containerId; + + // NOTE: Since `GET_CONTAINERS` might return containers that are being + // destroyed, to identify if the container is actually running, we check + // if the `executor_pid` field is set as a workaround. + bool isRunningContainer = + containers.contains(containerId) && + containers.at(containerId).isSome() && + containers.at(containerId)->has_executor_pid(); + + // Do not kill the up-to-date running controller or node container. + if (serviceContainers.contains_value(containerId) && + isRunningContainer) { + const string configPath = paths::getContainerInfoPath( + rootDir, info.type(), info.name(), containerId); + + if (os::exists(configPath)) { + Result<CSIPluginContainerInfo> config = + slave::state::read<CSIPluginContainerInfo>(configPath); + + if (config.isError()) { + return Failure( + "Failed to read service container config from '" + + configPath + "': " + config.error()); + } + + if (config.isSome() && + getContainerInfo(containerId) == config.get()) { + continue; + } + } + } + + LOG(INFO) << "Cleaning up plugin container '" << containerId << "'"; + + // Otherwise, kill the container if it is running. We always wait for + // the container to be destroyed before performing the cleanup even if + // it is not killed here. + Future<Nothing> cleanup = Nothing(); + if (containers.contains(containerId)) { + if (isRunningContainer) { + cleanup = killContainer(containerId); + } + cleanup = cleanup + .then(process::defer(self(), &Self::waitContainer, containerId)); + } + + cleanup = cleanup + .then(process::defer(self(), [=]() -> Future<Nothing> { + Result<string> endpointDir = + os::realpath(paths::getEndpointDirSymlinkPath( + rootDir, info.type(), info.name(), containerId)); + + if (endpointDir.isSome()) { + Try<Nothing> rmdir = os::rmdir(endpointDir.get()); + if (rmdir.isError()) { + return Failure( + "Failed to remove endpoint directory '" + + endpointDir.get() + "': " + rmdir.error()); + } + } + + Try<Nothing> rmdir = os::rmdir(path); + if (rmdir.isError()) { + return Failure( + "Failed to remove plugin container directory '" + path + + "': " + rmdir.error()); + } + + return Nothing(); + })); + + futures.push_back(cleanup); + } + + return process::collect(futures).then([] { return Nothing(); }); + })); +} + + +Future<string> ServiceManagerProcess::getServiceEndpoint(const Service& service) +{ + if (!serviceContainers.contains(service)) { + return Failure( + stringify(service) + " not found for CSI plugin type '" + info.type() + + "' and name '" + info.name() + "'"); + } + + return getEndpoint(serviceContainers.at(service)); +} + + +Option<CSIPluginContainerInfo> ServiceManagerProcess::getContainerInfo( + const ContainerID& containerId) +{ + foreach (const auto& container, info.containers()) { + if (getContainerId(info, containerPrefix, container) == containerId) { + return container; + } + } + + return None(); +} + + +Future<hashmap<ContainerID, Option<ContainerStatus>>> +ServiceManagerProcess::getContainers() +{ + agent::Call call; + call.set_type(agent::Call::GET_CONTAINERS); + call.mutable_get_containers()->set_show_nested(false); + call.mutable_get_containers()->set_show_standalone(true); + + return http::post( + agentUrl, + headers, + internal::serialize(contentType, internal::evolve(call)), + stringify(contentType)) + .then(process::defer(self(), [this](const http::Response& httpResponse) + -> Future<hashmap<ContainerID, Option<ContainerStatus>>> { + if (httpResponse.status != http::OK().status) { + return Failure( + "Failed to get containers: Unexpected response '" + + httpResponse.status + "' (" + httpResponse.body + ")"); + } + + Try<v1::agent::Response> v1Response = + internal::deserialize<v1::agent::Response>( + contentType, httpResponse.body); + + if (v1Response.isError()) { + return Failure("Failed to get containers: " + v1Response.error()); + } + + hashmap<ContainerID, Option<ContainerStatus>> result; + + agent::Response response = internal::devolve(v1Response.get()); + foreach (const agent::Response::GetContainers::Container& container, + response.get_containers().containers()) { + // Container IDs of this CSI plugin must contain the given prefix. See + // `LocalResourceProvider::principal` for details. + if (!strings::startsWith( + container.container_id().value(), containerPrefix)) { + continue; + } + + result.put( + container.container_id(), + container.has_container_status() ? container.container_status() + : Option<ContainerStatus>::none()); + } + + return std::move(result); + })); +} + + +Future<Nothing> ServiceManagerProcess::waitContainer( + const ContainerID& containerId) +{ + agent::Call call; + call.set_type(agent::Call::WAIT_CONTAINER); + call.mutable_wait_container()->mutable_container_id()->CopyFrom(containerId); + + return http::post( + agentUrl, + headers, + internal::serialize(contentType, internal::evolve(call)), + stringify(contentType)) + .then([containerId](const http::Response& response) -> Future<Nothing> { + if (response.status != http::OK().status && + response.status != http::NotFound().status) { + return Failure( + "Failed to wait for container '" + stringify(containerId) + + "': Unexpected response '" + response.status + "' (" + response.body + + ")"); + } + + return Nothing(); + }); +} + + +Future<Nothing> ServiceManagerProcess::killContainer( + const ContainerID& containerId) +{ + agent::Call call; + call.set_type(agent::Call::KILL_CONTAINER); + call.mutable_kill_container()->mutable_container_id()->CopyFrom(containerId); + + return http::post( + agentUrl, + headers, + internal::serialize(contentType, internal::evolve(call)), + stringify(contentType)) + .then([containerId](const http::Response& response) -> Future<Nothing> { + if (response.status != http::OK().status && + response.status != http::NotFound().status) { + return Failure( + "Failed to kill container '" + stringify(containerId) + + "': Unexpected response '" + response.status + "' (" + response.body + + ")"); + } + + return Nothing(); + }); +} + + +Future<Nothing> ServiceManagerProcess::waitEndpoint(const string& endpoint) +{ + CHECK(strings::startsWith(endpoint, "unix://")); + const string endpointPath = + strings::remove(endpoint, "unix://", strings::PREFIX); + + Future<Nothing> created = Nothing(); + if (!os::exists(endpointPath)) { + // Wait for the endpoint socket to appear until the timeout expires. + Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT); + + created = process::loop( + [=]() -> Future<Nothing> { + if (timeout.expired()) { + return Failure("Timed out waiting for endpoint '" + endpoint + "'"); + } + + return process::after(Milliseconds(10)); + }, + [=](const Nothing&) -> ControlFlow<Nothing> { + if (os::exists(endpointPath)) { + return Break(); + } + + return Continue(); + }); + } + + return created + .then(process::defer(self(), [=]() -> Future<Nothing> { + // TODO(chhsiao): Detect which CSI version to use through versioned + // `Probe` calls to support CSI v1 in a backward compatible way. + ++metrics->csi_plugin_rpcs_pending.at(v0::PROBE); + + return v0::Client(endpoint, runtime) + .call<v0::PROBE>(v0::ProbeRequest()) + .then(process::defer(self(), [=]( + const Try<v0::ProbeResponse, StatusError>& result) + -> Future<Nothing> { + if (result.isError()) { + return Failure( + "Failed to probe endpoint '" + endpoint + + "': " + stringify(result.error())); + } + + return Nothing(); + })) + .onAny(process::defer(self(), [this](const Future<Nothing>& future) { + --metrics->csi_plugin_rpcs_pending.at(v0::PROBE); + if (future.isReady()) { + ++metrics->csi_plugin_rpcs_successes.at(v0::PROBE); + } else if (future.isDiscarded()) { + ++metrics->csi_plugin_rpcs_cancelled.at(v0::PROBE); + } else { + ++metrics->csi_plugin_rpcs_errors.at(v0::PROBE); + } + })); + })); +} + + +Future<string> ServiceManagerProcess::getEndpoint( + const ContainerID& containerId) +{ + if (endpoints.contains(containerId)) { + return endpoints.at(containerId)->future(); + } + + CHECK(!daemons.contains(containerId)); + + Option<CSIPluginContainerInfo> config = getContainerInfo(containerId); + CHECK_SOME(config); + + // We checkpoint the config first to keep track of the plugin container even + // if we fail to create its container daemon. + const string configPath = + paths::getContainerInfoPath(rootDir, info.type(), info.name(), containerId); + + Try<Nothing> checkpoint = slave::state::checkpoint(configPath, config.get()); + if (checkpoint.isError()) { + return Failure( + "Failed to checkpoint plugin container config to '" + configPath + + "': " + checkpoint.error()); + } + + CommandInfo commandInfo; + + if (config->has_command()) { + commandInfo = config->command(); + } + + // Set the `CSI_ENDPOINT` environment variable. + Try<string> endpointPath = paths::getEndpointSocketPath( + rootDir, info.type(), info.name(), containerId); + + if (endpointPath.isError()) { + return Failure( + "Failed to resolve endpoint path for plugin container '" + + stringify(containerId) + "': " + endpointPath.error()); + } + + const string endpoint = "unix://" + endpointPath.get(); + Environment::Variable* endpoint_ = + commandInfo.mutable_environment()->add_variables(); + endpoint_->set_name("CSI_ENDPOINT"); + endpoint_->set_value(endpoint); + + ContainerInfo containerInfo; + + if (config->has_container()) { + containerInfo = config->container(); + } else { + containerInfo.set_type(ContainerInfo::MESOS); + } + + // Prepare a volume where the endpoint socket will be placed. + const string endpointDir = Path(endpointPath.get()).dirname(); + Volume* endpointVolume = containerInfo.add_volumes(); + endpointVolume->set_mode(Volume::RW); + endpointVolume->set_container_path(endpointDir); + endpointVolume->set_host_path(endpointDir); + + // Prepare the directory where the mount points will be placed. + const string mountRootDir = + paths::getMountRootDir(rootDir, info.type(), info.name()); + + Try<Nothing> mkdir = os::mkdir(mountRootDir); + if (mkdir.isError()) { + return Failure( + "Failed to create directory '" + mountRootDir + "': " + mkdir.error()); + } + + // Prepare a volume where the mount points will be placed. + Volume* mountVolume = containerInfo.add_volumes(); + mountVolume->set_mode(Volume::RW); + mountVolume->set_container_path(mountRootDir); + mountVolume->mutable_source()->set_type(Volume::Source::HOST_PATH); + mountVolume->mutable_source()->mutable_host_path()->set_path(mountRootDir); + mountVolume->mutable_source()->mutable_host_path() + ->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL); + + CHECK(!endpoints.contains(containerId)); + endpoints[containerId].reset(new Promise<string>()); + + Try<Owned<ContainerDaemon>> daemon = ContainerDaemon::create( + agentUrl, + authToken, + containerId, + std::move(commandInfo), + config->resources(), + std::move(containerInfo), + std::function<Future<Nothing>()>( + process::defer(self(), [=]() -> Future<Nothing> { + LOG(INFO) + << "Connecting to endpoint '" << endpoint + << "' of CSI plugin container " << containerId; + + CHECK(endpoints.at(containerId)->associate( + waitEndpoint(endpoint) + .then([endpoint]() -> string { return endpoint; }))); + + return endpoints.at(containerId)->future().then([] { + return Nothing(); + }); + })), + std::function<Future<Nothing>()>( + process::defer(self(), [=]() -> Future<Nothing> { + ++metrics->csi_plugin_container_terminations; + + endpoints.at(containerId)->discard(); + endpoints.at(containerId).reset(new Promise<string>()); + + LOG(INFO) + << "Disconnected from endpoint '" << endpoint + << "' of CSI plugin container " << containerId; + + const string endpointPath = + strings::remove(endpoint, "unix://", strings::PREFIX); + + if (os::exists(endpointPath)) { + Try<Nothing> rm = os::rm(endpointPath); + if (rm.isError()) { + return Failure( + "Failed to remove endpoint socket '" + endpointPath + + "': " + rm.error()); + } + } + + return Nothing(); + }))); + + if (daemon.isError()) { + return Failure( + "Failed to create container daemon for plugin container '" + + stringify(containerId) + "': " + daemon.error()); + } + + daemon.get()->wait() + .recover(process::defer(self(), [this, containerId]( + const Future<Nothing>& future) { + LOG(ERROR) + << "Container daemon for '" << containerId << "' failed: " + << (future.isFailed() ? future.failure() : "future discarded"); + + // Fail or discard the corresponding endpoint promise if is has not been + // associated by the post-start hook above yet. + endpoints.at(containerId)->associate( + future.then([]() -> string { UNREACHABLE(); })); + + return future; + })); + + daemons.put(containerId, std::move(daemon.get())); + + return endpoints.at(containerId)->future(); +} + + +ServiceManager::ServiceManager( + const http::URL& agentUrl, + const string& rootDir, + const CSIPluginInfo& info, + const hashset<Service>& services, + const string& containerPrefix, + const Option<string>& authToken, + const Runtime& runtime, + Metrics* metrics) + : process(new ServiceManagerProcess( + agentUrl, + rootDir, + info, + services, + containerPrefix, + authToken, + runtime, + metrics)) +{ + process::spawn(CHECK_NOTNULL(process.get())); + recovered = process::dispatch(process.get(), &ServiceManagerProcess::recover); +} + + +ServiceManager::~ServiceManager() +{ + recovered.discard(); + process::terminate(process.get()); + process::wait(process.get()); +} + + +Future<Nothing> ServiceManager::recover() +{ + return recovered; +} + + +Future<string> ServiceManager::getServiceEndpoint(const Service& service) +{ + return recovered + .then(process::defer( + process.get(), &ServiceManagerProcess::getServiceEndpoint, service)); +} + +} // namespace csi { +} // namespace mesos { diff --git a/src/csi/service_manager.hpp b/src/csi/service_manager.hpp new file mode 100644 index 0000000..5dd6bc7 --- /dev/null +++ b/src/csi/service_manager.hpp @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __CSI_SERVICE_MANAGER_HPP__ +#define __CSI_SERVICE_MANAGER_HPP__ + +#include <string> + +#include <mesos/mesos.hpp> + +#include <process/future.hpp> +#include <process/grpc.hpp> +#include <process/http.hpp> +#include <process/owned.hpp> + +#include <stout/hashset.hpp> +#include <stout/nothing.hpp> +#include <stout/option.hpp> + +#include "csi/metrics.hpp" + +namespace mesos { +namespace csi { + +using Service = CSIPluginContainerInfo::Service; + +constexpr Service CONTROLLER_SERVICE = + CSIPluginContainerInfo::CONTROLLER_SERVICE; + +constexpr Service NODE_SERVICE = CSIPluginContainerInfo::NODE_SERVICE; + + +// Forward declarations. +class ServiceManagerProcess; + + +// Manages the service containers of a CSI plugin instance. +class ServiceManager +{ +public: + ServiceManager( + const process::http::URL& agentUrl, + const std::string& rootDir, + const CSIPluginInfo& info, + const hashset<Service>& services, + const std::string& containerPrefix, + const Option<std::string>& authToken, + const process::grpc::client::Runtime& runtime, + Metrics* metrics); + + // Since this class contains `Owned` members which should not but can be + // copied, explicitly make this class non-copyable. + // + // TODO(chhsiao): Remove this once MESOS-5122 is fixed. + ServiceManager(const ServiceManager&) = delete; + ServiceManager& operator=(const ServiceManager&) = delete; + + ~ServiceManager(); + + process::Future<Nothing> recover(); + + process::Future<std::string> getServiceEndpoint(const Service& service); + +private: + process::Owned<ServiceManagerProcess> process; + process::Future<Nothing> recovered; +}; + +} // namespace csi { +} // namespace mesos { + +#endif // __CSI_SERVICE_MANAGER_HPP__ diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index f307f8e..7c14ff1 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -41,7 +41,6 @@ #include <process/loop.hpp> #include <process/process.hpp> #include <process/sequence.hpp> -#include <process/timeout.hpp> #include <process/metrics/counter.hpp> #include <process/metrics/metrics.hpp> @@ -72,7 +71,6 @@ #include <stout/os/realpath.hpp> -#include "common/http.hpp" #include "common/protobuf_utils.hpp" #include "common/resources_utils.hpp" @@ -80,6 +78,7 @@ #include "csi/metrics.hpp" #include "csi/paths.hpp" #include "csi/rpc.hpp" +#include "csi/service_manager.hpp" #include "csi/state.hpp" #include "csi/utils.hpp" @@ -91,7 +90,6 @@ #include "resource_provider/storage/provider_process.hpp" -#include "slave/container_daemon.hpp" #include "slave/paths.hpp" #include "slave/state.hpp" @@ -124,7 +122,6 @@ using process::ProcessBase; using process::Promise; using process::Sequence; using process::spawn; -using process::Timeout; using process::grpc::StatusError; @@ -133,13 +130,13 @@ using process::http::authentication::Principal; using process::metrics::Counter; using process::metrics::PushGauge; +using mesos::csi::ServiceManager; + using mesos::csi::state::VolumeState; using mesos::internal::protobuf::convertLabelsToStringMap; using mesos::internal::protobuf::convertStringMapToLabels; -using mesos::internal::slave::ContainerDaemon; - using mesos::resource_provider::Call; using mesos::resource_provider::Event; using mesos::resource_provider::ResourceProviderState; @@ -149,12 +146,6 @@ using mesos::v1::resource_provider::Driver; namespace mesos { namespace internal { -// Timeout for a CSI plugin component to create its endpoint socket. -// -// TODO(chhsiao): Make the timeout configurable. -constexpr Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1); - - // Returns true if the string is a valid Java identifier. static bool isValidName(const string& s) { @@ -189,52 +180,6 @@ static bool isValidType(const string& s) } -// Returns the container ID of the standalone container to run a CSI plugin -// component. The container ID is of the following format: -// <cid_prefix><csi_type>-<csi_name>--<list_of_services> -// where <cid_prefix> comes from the principal of the resource provider, -// <csi_type> and <csi_name> are the type and name of the CSI plugin, with dots -// replaced by dashes. <list_of_services> lists the CSI services provided by the -// component, concatenated with dashes. -static inline ContainerID getContainerId( - const ResourceProviderInfo& info, - const CSIPluginContainerInfo& container) -{ - const Principal principal = LocalResourceProvider::principal(info); - CHECK(principal.claims.contains("cid_prefix")); - - string value = principal.claims.at("cid_prefix") + strings::join( - "-", - strings::replace(info.storage().plugin().type(), ".", "-"), - info.storage().plugin().name(), - ""); - - for (int i = 0; i < container.services_size(); i++) { - value += "-" + stringify(container.services(i)); - } - - ContainerID containerId; - containerId.set_value(value); - - return containerId; -} - - -static Option<CSIPluginContainerInfo> getCSIPluginContainerInfo( - const ResourceProviderInfo& info, - const ContainerID& containerId) -{ - foreach (const CSIPluginContainerInfo& container, - info.storage().plugin().containers()) { - if (getContainerId(info, container) == containerId) { - return container; - } - } - - return None(); -} - - // Returns the parent endpoint as a URL. // TODO(jieyu): Consider using a more reliable way to get the agent v1 // operator API endpoint URL. @@ -248,20 +193,6 @@ static inline http::URL extractParentEndpoint(const http::URL& url) } -// Returns the 'Bearer' credential as a header for calling the V1 agent -// API if the `authToken` is presented, or empty otherwise. -static inline http::Headers getAuthHeader(const Option<string>& authToken) -{ - http::Headers headers; - - if (authToken.isSome()) { - headers["Authorization"] = "Bearer " + authToken.get(); - } - - return headers; -} - - static inline Resource createRawDiskResource( const ResourceProviderInfo& info, const Bytes& capacity, @@ -405,17 +336,17 @@ template < csi::v0::RPC rpc, typename std::enable_if<rpc != csi::v0::PROBE, int>::type> Future<csi::v0::Response<rpc>> StorageLocalResourceProviderProcess::call( - const ContainerID& containerId, + const csi::Service& service, const csi::v0::Request<rpc>& request, - const bool retry) + const bool retry) // Make immutable in the following mutable lambda. { Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR; return loop( self(), [=] { - // Perform the call with the latest service future. - return getService(containerId) + // Make the call to the latest service endpoint. + return serviceManager->getServiceEndpoint(service) .then(defer( self(), &StorageLocalResourceProviderProcess::_call<rpc>, @@ -443,11 +374,11 @@ Future<csi::v0::Response<rpc>> StorageLocalResourceProviderProcess::call( template <csi::v0::RPC rpc> Future<Try<csi::v0::Response<rpc>, StatusError>> StorageLocalResourceProviderProcess::_call( - csi::v0::Client client, const csi::v0::Request<rpc>& request) + const string& endpoint, const csi::v0::Request<rpc>& request) { ++metrics.csi_plugin_rpcs_pending.at(rpc); - return client.call<rpc>(request) + return csi::v0::Client(endpoint, runtime).call<rpc>(request) .onAny(defer(self(), [=]( const Future<Try<csi::v0::Response<rpc>, StatusError>>& future) { --metrics.csi_plugin_rpcs_pending.at(rpc); @@ -526,29 +457,19 @@ void StorageLocalResourceProviderProcess::initialize() bootId = _bootId.get(); - foreach (const CSIPluginContainerInfo& container, - info.storage().plugin().containers()) { - if (container.services().end() != find( - container.services().begin(), - container.services().end(), - CSIPluginContainerInfo::NODE_SERVICE)) { - nodeContainerId = getContainerId(info, container); - break; - } - } - - CHECK_SOME(nodeContainerId); + const Principal principal = LocalResourceProvider::principal(info); + CHECK(principal.claims.contains("cid_prefix")); + const string& containerPrefix = principal.claims.at("cid_prefix"); - foreach (const CSIPluginContainerInfo& container, - info.storage().plugin().containers()) { - if (container.services().end() != find( - container.services().begin(), - container.services().end(), - CSIPluginContainerInfo::CONTROLLER_SERVICE)) { - controllerContainerId = getContainerId(info, container); - break; - } - } + serviceManager.reset(new ServiceManager( + extractParentEndpoint(url), + slave::paths::getCsiRootDir(workDir), + info.storage().plugin(), + {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE}, + containerPrefix, + authToken, + runtime, + &metrics)); auto die = [=](const string& message) { LOG(ERROR) @@ -579,7 +500,15 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() { CHECK_EQ(RECOVERING, state); - return recoverServices() + // NOTE: The `Controller` service is supported if the plugin has the + // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is supported if + // the `Controller` service has the `PUBLISH_UNPUBLISH_VOLUME` capability. So + // we first launch the node plugin to get the plugin capabilities, then decide + // if we need to launch the controller plugin and get the node ID. + return serviceManager->recover() + .then(defer(self(), &Self::prepareIdentityService)) + .then(defer(self(), &Self::prepareControllerService)) + .then(defer(self(), &Self::prepareNodeService)) .then(defer(self(), &Self::recoverVolumes)) .then(defer(self(), &Self::recoverResourceProviderState)) .then(defer(self(), [=]() -> Future<Nothing> { @@ -612,135 +541,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() } -Future<Nothing> StorageLocalResourceProviderProcess::recoverServices() -{ - return getContainers() - .then(defer(self(), [=]( - const hashmap<ContainerID, Option<ContainerStatus>>& runningContainers) - -> Future<Nothing> { - Try<list<string>> containerPaths = csi::paths::getContainerPaths( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name()); - - if (containerPaths.isError()) { - return Failure( - "Failed to find plugin containers for CSI plugin type '" + - info.storage().plugin().type() + "' and name '" + - info.storage().plugin().name() + "': " + - containerPaths.error()); - } - - vector<Future<Nothing>> futures; - - foreach (const string& path, containerPaths.get()) { - Try<csi::paths::ContainerPath> containerPath = - csi::paths::parseContainerPath( - slave::paths::getCsiRootDir(workDir), - path); - - if (containerPath.isError()) { - return Failure( - "Failed to parse container path '" + path + "': " + - containerPath.error()); - } - - CHECK_EQ(info.storage().plugin().type(), containerPath->type); - CHECK_EQ(info.storage().plugin().name(), containerPath->name); - - const ContainerID& containerId = containerPath->containerId; - - // NOTE: Since `getContainers` might return containers that are not - // actually running, to identify if the container is actually running, - // we check if the `executor_pid` field is set as a workaround. - bool isRunningContainer = runningContainers.contains(containerId) && - runningContainers.at(containerId).isSome() && - runningContainers.at(containerId)->has_executor_pid(); - - // Do not kill the up-to-date running controller or node container. - if ((nodeContainerId == containerId || - controllerContainerId == containerId) && isRunningContainer) { - const string configPath = csi::paths::getContainerInfoPath( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name(), - containerId); - - if (os::exists(configPath)) { - Result<CSIPluginContainerInfo> config = - slave::state::read<CSIPluginContainerInfo>(configPath); - - if (config.isError()) { - return Failure( - "Failed to read plugin container config from '" + configPath + - "': " + config.error()); - } - - if (config.isSome() && - getCSIPluginContainerInfo(info, containerId) == config.get()) { - continue; - } - } - } - - LOG(INFO) << "Cleaning up plugin container '" << containerId << "'"; - - // Otherwise, kill the container only if it is actually running (i.e., - // not already being destroyed), then wait for the container to be - // destroyed before performing the cleanup despite if we kill it. - Future<Nothing> cleanup = Nothing(); - if (runningContainers.contains(containerId)) { - if (isRunningContainer) { - cleanup = killContainer(containerId); - } - cleanup = cleanup - .then(defer(self(), &Self::waitContainer, containerId)); - } - - cleanup = cleanup - .then(defer(self(), [=]() -> Future<Nothing> { - Result<string> endpointDir = - os::realpath(csi::paths::getEndpointDirSymlinkPath( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name(), - containerId)); - - if (endpointDir.isSome()) { - Try<Nothing> rmdir = os::rmdir(endpointDir.get()); - if (rmdir.isError()) { - return Failure( - "Failed to remove endpoint directory '" + - endpointDir.get() + "': " + rmdir.error()); - } - } - - Try<Nothing> rmdir = os::rmdir(path); - if (rmdir.isError()) { - return Failure( - "Failed to remove plugin container directory '" + path + - "': " + rmdir.error()); - } - - return Nothing(); - })); - - futures.push_back(cleanup); - } - - return collect(futures).then([] { return Nothing(); }); - })) - // NOTE: The `Controller` service is supported if the plugin has the - // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is supported if - // the `Controller` service has the `PUBLISH_UNPUBLISH_VOLUME` capability. - // So we first launch the node plugin to get the plugin capabilities, then - // decide if we need to launch the controller plugin and get the node ID. - .then(defer(self(), &Self::prepareIdentityService)) - .then(defer(self(), &Self::prepareControllerService)) - .then(defer(self(), &Self::prepareNodeService)); -} - - Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes() { // Recover the states of CSI volumes. @@ -1874,311 +1674,11 @@ void StorageLocalResourceProviderProcess::reconcileOperations( } -Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService( - const string& endpoint) -{ - Future<csi::v0::Client> service; - - if (os::exists(endpoint)) { - service = csi::v0::Client("unix://" + endpoint, runtime); - } else { - // Wait for the endpoint socket to appear until the timeout expires. - Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT); - - service = loop( - self(), - [=]() -> Future<Nothing> { - if (timeout.expired()) { - return Failure("Timed out waiting for endpoint '" + endpoint + "'"); - } - - return after(Milliseconds(10)); - }, - [=](const Nothing&) -> ControlFlow<csi::v0::Client> { - if (os::exists(endpoint)) { - return Break(csi::v0::Client("unix://" + endpoint, runtime)); - } - - return Continue(); - }); - } - - return service - .then(defer(self(), [=](csi::v0::Client client) { - return _call<csi::v0::PROBE>(client, csi::v0::ProbeRequest()) - .then([=](const Try<csi::v0::ProbeResponse, StatusError>& result) - -> Future<csi::v0::Client> { - if (result.isError()) { - return Failure(result.error()); - } - - return client; - }); - })); -} - - -Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService( - const ContainerID& containerId) -{ - if (daemons.contains(containerId)) { - CHECK(services.contains(containerId)); - return services.at(containerId)->future(); - } - - Option<CSIPluginContainerInfo> config = - getCSIPluginContainerInfo(info, containerId); - CHECK_SOME(config); - - // We checkpoint the config first to keep track of the plugin container even - // if we fail to create its container daemon. - const string configPath = csi::paths::getContainerInfoPath( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name(), - containerId); - - Try<Nothing> checkpoint = slave::state::checkpoint(configPath, config.get()); - if (checkpoint.isError()) { - return Failure( - "Failed to checkpoint plugin container config to '" + configPath + - "': " + checkpoint.error()); - } - - CommandInfo commandInfo; - if (config->has_command()) { - commandInfo.CopyFrom(config->command()); - } - - // Set the `CSI_ENDPOINT` environment variable. - Try<string> endpoint = csi::paths::getEndpointSocketPath( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name(), - containerId); - - if (endpoint.isError()) { - return Failure( - "Failed to resolve endpoint path for plugin container '" + - stringify(containerId) + "': " + endpoint.error()); - } - - const string& endpointPath = endpoint.get(); - Environment::Variable* endpointVar = - commandInfo.mutable_environment()->add_variables(); - endpointVar->set_name("CSI_ENDPOINT"); - endpointVar->set_value("unix://" + endpointPath); - - ContainerInfo containerInfo; - if (config->has_container()) { - containerInfo.CopyFrom(config->container()); - } else { - containerInfo.set_type(ContainerInfo::MESOS); - } - - // Prepare a volume where the endpoint socket will be placed. - const string endpointDir = Path(endpointPath).dirname(); - Volume* endpointVolume = containerInfo.add_volumes(); - endpointVolume->set_mode(Volume::RW); - endpointVolume->set_container_path(endpointDir); - endpointVolume->set_host_path(endpointDir); - - // Prepare the directory where the mount points will be placed. - const string mountRootDir = csi::paths::getMountRootDir( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name()); - - Try<Nothing> mkdir = os::mkdir(mountRootDir); - if (mkdir.isError()) { - return Failure( - "Failed to create directory '" + mountRootDir + "': " + mkdir.error()); - } - - // Prepare a volume where the mount points will be placed. - Volume* mountVolume = containerInfo.add_volumes(); - mountVolume->set_mode(Volume::RW); - mountVolume->set_container_path(mountRootDir); - mountVolume->mutable_source()->set_type(Volume::Source::HOST_PATH); - mountVolume->mutable_source()->mutable_host_path()->set_path(mountRootDir); - mountVolume->mutable_source()->mutable_host_path() - ->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL); - - CHECK(!services.contains(containerId)); - services[containerId].reset(new Promise<csi::v0::Client>()); - - Try<Owned<ContainerDaemon>> daemon = ContainerDaemon::create( - extractParentEndpoint(url), - authToken, - containerId, - commandInfo, - config->resources(), - containerInfo, - std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> { - LOG(INFO) - << "CSI plugin container '" << containerId << "' started for plugin" - << " type '" << info.storage().plugin().type() << "' and " - << " name '" << info.storage().plugin().name() << "'"; - - CHECK(services.at(containerId)->associate(waitService(endpointPath))); - return services.at(containerId)->future() - .then([] { return Nothing(); }); - })), - std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> { - ++metrics.csi_plugin_container_terminations; - - services.at(containerId)->discard(); - services.at(containerId).reset(new Promise<csi::v0::Client>()); - - LOG(INFO) - << "CSI plugin container '" << containerId << "' stopped for plugin" - << " type '" << info.storage().plugin().type() << "' and " - << " name '" << info.storage().plugin().name() << "'"; - - if (os::exists(endpointPath)) { - Try<Nothing> rm = os::rm(endpointPath); - if (rm.isError()) { - return Failure( - "Failed to remove endpoint '" + endpointPath + "': " + - rm.error()); - } - } - - return Nothing(); - }))); - - if (daemon.isError()) { - return Failure( - "Failed to create container daemon for plugin container '" + - stringify(containerId) + "': " + daemon.error()); - } - - auto die = [=](const string& message) { - LOG(ERROR) - << "Container daemon for '" << containerId << "' failed: " << message; - fatal(); - }; - - daemons[containerId] = daemon.get(); - daemon.get()->wait() - .onFailed(defer(self(), std::bind(die, lambda::_1))) - .onDiscarded(defer(self(), std::bind(die, "future discarded"))); - - return services.at(containerId)->future(); -} - - -Future<hashmap<ContainerID, Option<ContainerStatus>>> -StorageLocalResourceProviderProcess::getContainers() -{ - agent::Call call; - call.set_type(agent::Call::GET_CONTAINERS); - call.mutable_get_containers()->set_show_nested(false); - call.mutable_get_containers()->set_show_standalone(true); - - return http::post( - extractParentEndpoint(url), - getAuthHeader(authToken) + - http::Headers{{"Accept", stringify(contentType)}}, - serialize(contentType, evolve(call)), - stringify(contentType)) - .then(defer(self(), [=](const http::Response& httpResponse) - -> Future<hashmap<ContainerID, Option<ContainerStatus>>> { - hashmap<ContainerID, Option<ContainerStatus>> result; - - if (httpResponse.status != http::OK().status) { - return Failure( - "Failed to get containers: Unexpected response '" + - httpResponse.status + "' (" + httpResponse.body + ")"); - } - - Try<v1::agent::Response> v1Response = - deserialize<v1::agent::Response>(contentType, httpResponse.body); - if (v1Response.isError()) { - return Failure("Failed to get containers: " + v1Response.error()); - } - - const Principal principal = LocalResourceProvider::principal(info); - CHECK(principal.claims.contains("cid_prefix")); - - const string& cidPrefix = principal.claims.at("cid_prefix"); - - agent::Response response = devolve(v1Response.get()); - foreach (const agent::Response::GetContainers::Container& container, - response.get_containers().containers()) { - if (strings::startsWith(container.container_id().value(), cidPrefix)) { - result.put( - container.container_id(), - container.has_container_status() - ? container.container_status() - : Option<ContainerStatus>::none()); - } - } - - return result; - })); -} - - -Future<Nothing> StorageLocalResourceProviderProcess::waitContainer( - const ContainerID& containerId) -{ - agent::Call call; - call.set_type(agent::Call::WAIT_CONTAINER); - call.mutable_wait_container()->mutable_container_id()->CopyFrom(containerId); - - return http::post( - extractParentEndpoint(url), - getAuthHeader(authToken), - serialize(contentType, evolve(call)), - stringify(contentType)) - .then([containerId](const http::Response& response) -> Future<Nothing> { - if (response.status != http::OK().status && - response.status != http::NotFound().status) { - return Failure( - "Failed to wait for container '" + stringify(containerId) + - "': Unexpected response '" + response.status + "' (" + response.body - + ")"); - } - - return Nothing(); - }); -} - - -Future<Nothing> StorageLocalResourceProviderProcess::killContainer( - const ContainerID& containerId) -{ - agent::Call call; - call.set_type(agent::Call::KILL_CONTAINER); - call.mutable_kill_container()->mutable_container_id()->CopyFrom(containerId); - - return http::post( - extractParentEndpoint(url), - getAuthHeader(authToken), - serialize(contentType, evolve(call)), - stringify(contentType)) - .then([containerId](const http::Response& response) -> Future<Nothing> { - if (response.status != http::OK().status && - response.status != http::NotFound().status) { - return Failure( - "Failed to kill container '" + stringify(containerId) + - "': Unexpected response '" + response.status + "' (" + response.body - + ")"); - } - - return Nothing(); - }); -} - - Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService() { - CHECK_SOME(nodeContainerId); - // Get the plugin info. return call<csi::v0::GET_PLUGIN_INFO>( - nodeContainerId.get(), csi::v0::GetPluginInfoRequest()) + csi::NODE_SERVICE, csi::v0::GetPluginInfoRequest()) .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) { pluginInfo = response; @@ -2186,7 +1686,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService() // Get the plugin capabilities. return call<csi::v0::GET_PLUGIN_CAPABILITIES>( - nodeContainerId.get(), csi::v0::GetPluginCapabilitiesRequest()); + csi::NODE_SERVICE, csi::v0::GetPluginCapabilitiesRequest()); })) .then(defer(self(), [=]( const csi::v0::GetPluginCapabilitiesResponse& response) { @@ -2205,14 +1705,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService() return Nothing(); } - if (controllerContainerId.isNone()) { - return Failure( - stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found"); - } - // Get the controller plugin info and check for consistency. return call<csi::v0::GET_PLUGIN_INFO>( - controllerContainerId.get(), csi::v0::GetPluginInfoRequest()) + csi::CONTROLLER_SERVICE, csi::v0::GetPluginInfoRequest()) .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) { LOG(INFO) << "Controller plugin loaded: " << stringify(response); @@ -2225,8 +1720,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService() // Get the controller capabilities. return call<csi::v0::CONTROLLER_GET_CAPABILITIES>( - controllerContainerId.get(), - csi::v0::ControllerGetCapabilitiesRequest()); + csi::CONTROLLER_SERVICE, csi::v0::ControllerGetCapabilitiesRequest()); })) .then(defer(self(), [=]( const csi::v0::ControllerGetCapabilitiesResponse& response) { @@ -2239,11 +1733,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService() Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService() { - CHECK_SOME(nodeContainerId); - // Get the node capabilities. return call<csi::v0::NODE_GET_CAPABILITIES>( - nodeContainerId.get(), csi::v0::NodeGetCapabilitiesRequest()) + csi::NODE_SERVICE, csi::v0::NodeGetCapabilitiesRequest()) .then(defer(self(), [=]( const csi::v0::NodeGetCapabilitiesResponse& response) -> Future<Nothing> { @@ -2255,7 +1747,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService() // Get the node ID. return call<csi::v0::NODE_GET_ID>( - nodeContainerId.get(), csi::v0::NodeGetIdRequest()) + csi::NODE_SERVICE, csi::v0::NodeGetIdRequest()) .then(defer(self(), [=](const csi::v0::NodeGetIdResponse& response) { nodeId = response.node_id(); @@ -2296,10 +1788,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish( request.set_readonly(false); *request.mutable_volume_attributes() = volume.state.volume_attributes(); - CHECK_SOME(controllerContainerId); - return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>( - controllerContainerId.get(), std::move(request)) + csi::CONTROLLER_SERVICE, std::move(request)) .then(defer(self(), [this, volumeId]( const csi::v0::ControllerPublishVolumeResponse& response) { VolumeData& volume = volumes.at(volumeId); @@ -2345,10 +1835,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish( request.set_volume_id(volumeId); request.set_node_id(nodeId.get()); - CHECK_SOME(controllerContainerId); - return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>( - controllerContainerId.get(), std::move(request)) + csi::CONTROLLER_SERVICE, std::move(request)) .then(defer(self(), [this, volumeId] { VolumeData& volume = volumes.at(volumeId); @@ -2406,10 +1894,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage( *request.mutable_volume_capability() = volume.state.volume_capability(); *request.mutable_volume_attributes() = volume.state.volume_attributes(); - CHECK_SOME(nodeContainerId); - - return call<csi::v0::NODE_STAGE_VOLUME>( - nodeContainerId.get(), std::move(request)) + return call<csi::v0::NODE_STAGE_VOLUME>(csi::NODE_SERVICE, std::move(request)) .then(defer(self(), [this, volumeId] { VolumeData& volume = volumes.at(volumeId); @@ -2462,10 +1947,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage( request.set_volume_id(volumeId); request.set_staging_target_path(stagingPath); - CHECK_SOME(nodeContainerId); - return call<csi::v0::NODE_UNSTAGE_VOLUME>( - nodeContainerId.get(), std::move(request)) + csi::NODE_SERVICE, std::move(request)) .then(defer(self(), [this, volumeId] { VolumeData& volume = volumes.at(volumeId); @@ -2527,10 +2010,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish( request.set_staging_target_path(stagingPath); } - CHECK_SOME(nodeContainerId); - return call<csi::v0::NODE_PUBLISH_VOLUME>( - nodeContainerId.get(), std::move(request)) + csi::NODE_SERVICE, std::move(request)) .then(defer(self(), [this, volumeId] { VolumeData& volume = volumes.at(volumeId); @@ -2579,10 +2060,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish( request.set_volume_id(volumeId); request.set_target_path(targetPath); - CHECK_SOME(nodeContainerId); - return call<csi::v0::NODE_UNPUBLISH_VOLUME>( - nodeContainerId.get(), std::move(request)) + csi::NODE_SERVICE, std::move(request)) .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> { VolumeData& volume = volumes.at(volumeId); @@ -2611,10 +2090,8 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( *request.add_volume_capabilities() = profileInfo.capability; *request.mutable_parameters() = profileInfo.parameters; - CHECK_SOME(controllerContainerId); - return call<csi::v0::CREATE_VOLUME>( - controllerContainerId.get(), std::move(request), true) // Retry. + csi::CONTROLLER_SERVICE, std::move(request), true) // Retry. .then(defer(self(), [=]( const csi::v0::CreateVolumeResponse& response) -> string { const csi::v0::Volume& volume = response.volume(); @@ -2728,10 +2205,8 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume( csi::v0::DeleteVolumeRequest request; request.set_volume_id(volumeId); - CHECK_SOME(controllerContainerId); - return call<csi::v0::DELETE_VOLUME>( - controllerContainerId.get(), std::move(request), true) // Retry. + csi::CONTROLLER_SERVICE, std::move(request), true) // Retry. .then([] { return Nothing(); }); })); } @@ -2804,10 +2279,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::validateVolume( *request.add_volume_capabilities() = profileInfo.capability; *request.mutable_volume_attributes() = volumeAttributes; - CHECK_SOME(controllerContainerId); - return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>( - controllerContainerId.get(), std::move(request)) + csi::CONTROLLER_SERVICE, std::move(request)) .then(defer(self(), [=]( const csi::v0::ValidateVolumeCapabilitiesResponse& response) -> Future<Nothing> { @@ -2840,12 +2313,10 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes() return Resources(); } - CHECK_SOME(controllerContainerId); - // TODO(chhsiao): Set the max entries and use a loop to do // multiple `ListVolumes` calls. return call<csi::v0::LIST_VOLUMES>( - controllerContainerId.get(), csi::v0::ListVolumesRequest()) + csi::CONTROLLER_SERVICE, csi::v0::ListVolumesRequest()) .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) { Resources resources; @@ -2888,8 +2359,6 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities() return Resources(); } - CHECK_SOME(controllerContainerId); - vector<Future<Resources>> futures; foreachpair (const string& profile, @@ -2901,7 +2370,7 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities() futures.push_back( call<csi::v0::GET_CAPACITY>( - controllerContainerId.get(), std::move(request)) + csi::CONTROLLER_SERVICE, std::move(request)) .then(defer(self(), [=]( const csi::v0::GetCapacityResponse& response) -> Resources { if (response.available_capacity() == 0) { @@ -3788,15 +3257,14 @@ Option<Error> StorageLocalResourceProvider::validate( if (container.services().end() != find( container.services().begin(), container.services().end(), - CSIPluginContainerInfo::NODE_SERVICE)) { + csi::NODE_SERVICE)) { hasNodeService = true; break; } } if (!hasNodeService) { - return Error( - stringify(CSIPluginContainerInfo::NODE_SERVICE) + " not found"); + return Error(stringify(csi::NODE_SERVICE) + " not found"); } return None(); diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp index 8640fbc..01f5fce 100644 --- a/src/resource_provider/storage/provider_process.hpp +++ b/src/resource_provider/storage/provider_process.hpp @@ -17,13 +17,11 @@ #ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__ #define __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__ -#include <functional> #include <memory> #include <string> #include <type_traits> #include <vector> -#include <mesos/http.hpp> #include <mesos/mesos.hpp> #include <mesos/resources.hpp> @@ -53,14 +51,12 @@ #include <stout/try.hpp> #include <stout/uuid.hpp> -#include "csi/client.hpp" #include "csi/metrics.hpp" #include "csi/rpc.hpp" +#include "csi/service_manager.hpp" #include "csi/state.hpp" #include "csi/utils.hpp" -#include "slave/container_daemon.hpp" - #include "status_update_manager/operation.hpp" namespace mesos { @@ -118,13 +114,13 @@ public: csi::v0::RPC rpc, typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0> process::Future<csi::v0::Response<rpc>> call( - const ContainerID& containerId, + const csi::Service& service, const csi::v0::Request<rpc>& request, - const bool retry = false); // remains const in a mutable lambda. + bool retry = false); template <csi::v0::RPC rpc> process::Future<Try<csi::v0::Response<rpc>, process::grpc::StatusError>> - _call(csi::v0::Client client, const csi::v0::Request<rpc>& request); + _call(const std::string& endpoint, const csi::v0::Request<rpc>& request); template <csi::v0::RPC rpc> process::Future<process::ControlFlow<csi::v0::Response<rpc>>> __call( @@ -150,7 +146,6 @@ private: // The recover functions are responsible to recover the state of the // resource provider and CSI volumes from checkpointed data. process::Future<Nothing> recover(); - process::Future<Nothing> recoverServices(); process::Future<Nothing> recoverVolumes(); process::Future<Nothing> recoverResourceProviderState(); @@ -194,27 +189,6 @@ private: void reconcileOperations( const resource_provider::Event::ReconcileOperations& reconcile); - // Returns a future of a CSI client that waits for the endpoint socket to - // appear if necessary, then connects to the socket and check its readiness. - process::Future<csi::v0::Client> waitService(const std::string& endpoint); - - // Returns a future of the latest CSI client for the specified plugin - // container. If the container is not already running, this method will start - // a new a new container daemon. - process::Future<csi::v0::Client> getService(const ContainerID& containerId); - - // Lists all running plugin containers for this resource provider. - // NOTE: This might return containers that are not actually running, e.g., if - // they are being destroyed. - process::Future<hashmap<ContainerID, Option<ContainerStatus>>> - getContainers(); - - // Waits for the specified plugin container to be terminated. - process::Future<Nothing> waitContainer(const ContainerID& containerId); - - // Kills the specified plugin container. - process::Future<Nothing> killContainer(const ContainerID& containerId); - process::Future<Nothing> prepareIdentityService(); // NOTE: This can only be called after `prepareIdentityService`. @@ -367,12 +341,8 @@ private: // The mapping of known profiles fetched from the DiskProfileAdaptor. hashmap<std::string, DiskProfileAdaptor::ProfileInfo> profileInfos; - hashmap<ContainerID, process::Owned<slave::ContainerDaemon>> daemons; - hashmap<ContainerID, process::Owned<process::Promise<csi::v0::Client>>> - services; + process::Owned<csi::ServiceManager> serviceManager; - Option<ContainerID> nodeContainerId; - Option<ContainerID> controllerContainerId; Option<csi::v0::GetPluginInfoResponse> pluginInfo; csi::v0::PluginCapabilities pluginCapabilities; csi::v0::ControllerCapabilities controllerCapabilities;
