This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch 1.8.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit a3c2632f87d1ccd5275dca073f87287fe09a370b Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Thu Apr 4 22:47:49 2019 -0700 Implemented CSI v1 volume manager. The v1 Volume manager is basically copied from the v0 volume manager, with the following changes: * `validateVolume` will validate volumes against parameters. * `publishVolume` will only create the parent directory of the target path. The creation of the target path itself is the responsibility of the plugin. * The node ID is obtained through the `NodeGetInfo` call. Review: https://reviews.apache.org/r/70402 --- src/CMakeLists.txt | 1 + src/Makefile.am | 3 + src/csi/v1_volume_manager.cpp | 1356 +++++++++++++++++++++++++++++++++ src/csi/v1_volume_manager.hpp | 116 +++ src/csi/v1_volume_manager_process.hpp | 218 ++++++ 5 files changed, 1694 insertions(+) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d36d0be..1d4f541 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -244,6 +244,7 @@ set(CSI_SRC csi/v1.cpp csi/v1_client.cpp csi/v1_utils.cpp + csi/v1_volume_manager.cpp csi/volume_manager.cpp) set(DOCKER_SRC diff --git a/src/Makefile.am b/src/Makefile.am index 64898df..5f97523 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1600,6 +1600,9 @@ libcsi_la_SOURCES = \ csi/v1_client.hpp \ csi/v1_utils.cpp \ csi/v1_utils.hpp \ + csi/v1_volume_manager.cpp \ + csi/v1_volume_manager.hpp \ + csi/v1_volume_manager_process.hpp \ csi/volume_manager.cpp \ csi/volume_manager.hpp diff --git a/src/csi/v1_volume_manager.cpp b/src/csi/v1_volume_manager.cpp new file mode 100644 index 0000000..bd334f1 --- /dev/null +++ b/src/csi/v1_volume_manager.cpp @@ -0,0 +1,1356 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "csi/v1_volume_manager.hpp" + +#include <algorithm> +#include <cstdlib> +#include <functional> +#include <list> + +#include <process/after.hpp> +#include <process/collect.hpp> +#include <process/defer.hpp> +#include <process/dispatch.hpp> +#include <process/id.hpp> +#include <process/loop.hpp> +#include <process/process.hpp> + +#include <stout/check.hpp> +#include <stout/duration.hpp> +#include <stout/foreach.hpp> +#include <stout/none.hpp> +#include <stout/os.hpp> +#include <stout/path.hpp> +#include <stout/result.hpp> +#include <stout/some.hpp> +#include <stout/stringify.hpp> +#include <stout/try.hpp> +#include <stout/unreachable.hpp> + +#include "csi/paths.hpp" +#include "csi/v1_client.hpp" +#include "csi/v1_utils.hpp" +#include "csi/v1_volume_manager_process.hpp" + +#include "slave/state.hpp" + +namespace http = process::http; +namespace slave = mesos::internal::slave; + +using std::list; +using std::string; +using std::vector; + +using google::protobuf::Map; + +using mesos::csi::state::VolumeState; + +using process::Break; +using process::Continue; +using process::ControlFlow; +using process::Failure; +using process::Future; +using process::ProcessBase; + +using process::grpc::StatusError; + +using process::grpc::client::Runtime; + +namespace mesos{ +namespace csi { +namespace v1 { + +VolumeManagerProcess::VolumeManagerProcess( + const http::URL& agentUrl, + const string& _rootDir, + const CSIPluginInfo& _info, + const hashset<Service> _services, + const string& containerPrefix, + const Option<string>& authToken, + const Runtime& _runtime, + Metrics* _metrics) + : ProcessBase(process::ID::generate("csi-v1-volume-manager")), + rootDir(_rootDir), + info(_info), + services(_services), + runtime(_runtime), + metrics(_metrics), + serviceManager(new ServiceManager( + agentUrl, + rootDir, + info, + services, + containerPrefix, + authToken, + runtime, + metrics)) +{ + // This should have been validated in `VolumeManager::create`. + CHECK(!services.empty()) + << "Must specify at least one service for CSI plugin type '" << info.type() + << "' and name '" << info.name() << "'"; +} + + +Future<Nothing> VolumeManagerProcess::recover() +{ + Try<string> bootId_ = os::bootId(); + if (bootId_.isError()) { + return Failure("Failed to get boot ID: " + bootId_.error()); + } + + bootId = bootId_.get(); + + return serviceManager->recover() + .then(process::defer(self(), &Self::prepareServices)) + .then(process::defer(self(), [this]() -> Future<Nothing> { + // Recover the states of CSI volumes. + Try<list<string>> volumePaths = + paths::getVolumePaths(rootDir, info.type(), info.name()); + + if (volumePaths.isError()) { + return Failure( + "Failed to find volumes for CSI plugin type '" + info.type() + + "' and name '" + info.name() + "': " + volumePaths.error()); + } + + vector<Future<Nothing>> futures; + + foreach (const string& path, volumePaths.get()) { + Try<paths::VolumePath> volumePath = + paths::parseVolumePath(rootDir, path); + + if (volumePath.isError()) { + return Failure( + "Failed to parse volume path '" + path + + "': " + volumePath.error()); + } + + CHECK_EQ(info.type(), volumePath->type); + CHECK_EQ(info.name(), volumePath->name); + + const string& volumeId = volumePath->volumeId; + const string statePath = paths::getVolumeStatePath( + rootDir, info.type(), info.name(), volumeId); + + if (!os::exists(statePath)) { + continue; + } + + Result<VolumeState> volumeState = + slave::state::read<VolumeState>(statePath); + + if (volumeState.isError()) { + return Failure( + "Failed to read volume state from '" + statePath + + "': " + volumeState.error()); + } + + if (volumeState.isNone()) { + continue; + } + + volumes.put(volumeId, std::move(volumeState.get())); + VolumeData& volume = volumes.at(volumeId); + + if (!VolumeState::State_IsValid(volume.state.state())) { + return Failure("Volume '" + volumeId + "' is in INVALID state"); + } + + // First, if there is a node reboot after the volume is made + // publishable, it should be reset to `NODE_READY`. + switch (volume.state.state()) { + case VolumeState::CREATED: + case VolumeState::NODE_READY: + case VolumeState::CONTROLLER_PUBLISH: + case VolumeState::CONTROLLER_UNPUBLISH: + case VolumeState::NODE_STAGE: { + break; + } + case VolumeState::VOL_READY: + case VolumeState::PUBLISHED: + case VolumeState::NODE_UNSTAGE: + case VolumeState::NODE_PUBLISH: + case VolumeState::NODE_UNPUBLISH: { + if (bootId != volume.state.boot_id()) { + // Since this is a no-op, no need to checkpoint here. + volume.state.set_state(VolumeState::NODE_READY); + volume.state.clear_boot_id(); + } + + break; + } + case VolumeState::UNKNOWN: { + return Failure("Volume '" + volumeId + "' is in UNKNOWN state"); + } + + // NOTE: We avoid using a default clause for the following values in + // proto3's open enum to enable the compiler to detect missing enum + // cases for us. See: https://github.com/google/protobuf/issues/3917 + case google::protobuf::kint32min: + case google::protobuf::kint32max: { + UNREACHABLE(); + } + } + + // Second, if the volume has been used by a container before recovery, + // we have to bring the volume back to `PUBLISHED` so data can be + // cleaned up synchronously when needed. + if (volume.state.node_publish_required()) { + futures.push_back(publishVolume(volumeId)); + } + } + + // Garbage collect leftover mount paths that were failed to remove before. + const string mountRootDir = + paths::getMountRootDir(rootDir, info.type(), info.name()); + + Try<list<string>> mountPaths = paths::getMountPaths(mountRootDir); + if (mountPaths.isError()) { + // TODO(chhsiao): This could indicate that something is seriously wrong. + // To help debugging the problem, we should surface the error via + // MESOS-8745. + return Failure( + "Failed to find mount paths for CSI plugin type '" + info.type() + + "' and name '" + info.name() + "': " + mountPaths.error()); + } + + foreach (const string& path, mountPaths.get()) { + Try<string> volumeId = paths::parseMountPath(mountRootDir, path); + if (volumeId.isError()) { + return Failure( + "Failed to parse mount path '" + path + "': " + volumeId.error()); + } + + if (!volumes.contains(volumeId.get())) { + garbageCollectMountPath(volumeId.get()); + } + } + + return process::collect(futures).then([] { return Nothing(); }); + })); +} + + +Future<vector<VolumeInfo>> VolumeManagerProcess::listVolumes() +{ + if (!controllerCapabilities->listVolumes) { + return vector<VolumeInfo>(); + } + + // TODO(chhsiao): Set the max entries and use a loop to do multiple + // `ListVolumes` calls. + return call(CONTROLLER_SERVICE, &Client::listVolumes, ListVolumesRequest()) + .then(process::defer(self(), [](const ListVolumesResponse& response) { + vector<VolumeInfo> result; + foreach (const auto& entry, response.entries()) { + result.push_back(VolumeInfo{Bytes(entry.volume().capacity_bytes()), + entry.volume().volume_id(), + entry.volume().volume_context()}); + } + + return result; + })); +} + + +Future<Bytes> VolumeManagerProcess::getCapacity( + const types::VolumeCapability& capability, + const Map<string, string>& parameters) +{ + if (!controllerCapabilities->getCapacity) { + return Bytes(0); + } + + GetCapacityRequest request; + *request.add_volume_capabilities() = evolve(capability); + *request.mutable_parameters() = parameters; + + return call(CONTROLLER_SERVICE, &Client::getCapacity, std::move(request)) + .then([](const GetCapacityResponse& response) { + return Bytes(response.available_capacity()); + }); +} + + +Future<VolumeInfo> VolumeManagerProcess::createVolume( + const string& name, + const Bytes& capacity, + const types::VolumeCapability& capability, + const Map<string, string>& parameters) +{ + if (!controllerCapabilities->createDeleteVolume) { + return Failure( + "CREATE_DELETE_VOLUME controller capability is not supported for CSI " + "plugin type '" + info.type() + "' and name '" + info.name()); + } + + LOG(INFO) << "Creating volume with name '" << name << "'"; + + CreateVolumeRequest request; + request.set_name(name); + request.mutable_capacity_range()->set_required_bytes(capacity.bytes()); + request.mutable_capacity_range()->set_limit_bytes(capacity.bytes()); + *request.add_volume_capabilities() = evolve(capability); + *request.mutable_parameters() = parameters; + + // We retry the `CreateVolume` call for MESOS-9517. + return call( + CONTROLLER_SERVICE, &Client::createVolume, std::move(request), true) + .then(process::defer(self(), [=]( + const CreateVolumeResponse& response) -> Future<VolumeInfo> { + const string& volumeId = response.volume().volume_id(); + + // NOTE: If the volume is already tracked, there might already be + // operations running in its sequence. Since this continuation runs + // outside the sequence, we fail the call here to avoid any race issue. + // This also means that this call is not idempotent. + if (volumes.contains(volumeId)) { + return Failure("Volume with name '" + name + "' already exists"); + } + + VolumeState volumeState; + volumeState.set_state(VolumeState::CREATED); + *volumeState.mutable_volume_capability() = capability; + *volumeState.mutable_parameters() = parameters; + *volumeState.mutable_volume_context() = + response.volume().volume_context(); + + volumes.put(volumeId, std::move(volumeState)); + checkpointVolumeState(volumeId); + + return VolumeInfo{capacity, volumeId, response.volume().volume_context()}; + })); +} + + +Future<Option<Error>> VolumeManagerProcess::validateVolume( + const VolumeInfo& volumeInfo, + const types::VolumeCapability& capability, + const Map<string, string>& parameters) +{ + // If the volume has been checkpointed, the validation succeeds only if the + // capability and parameters of the specified profile are the same as those in + // the checkpoint. + if (volumes.contains(volumeInfo.id)) { + const VolumeState& volumeState = volumes.at(volumeInfo.id).state; + + if (volumeState.volume_capability() != capability) { + return Some(Error( + "Unsupported volume capability for volume '" + volumeInfo.id + "'")); + } + + if (volumeState.parameters() != parameters) { + return Some(Error( + "Mismatched parameters for volume '" + volumeInfo.id + "'")); + } + + return None(); + } + + LOG(INFO) << "Validating volume '" << volumeInfo.id << "'"; + + ValidateVolumeCapabilitiesRequest request; + request.set_volume_id(volumeInfo.id); + *request.add_volume_capabilities() = evolve(capability); + *request.mutable_volume_context() = volumeInfo.context; + *request.mutable_parameters() = parameters; + + return call( + CONTROLLER_SERVICE, + &Client::validateVolumeCapabilities, + std::move(request)) + .then(process::defer(self(), [=]( + const ValidateVolumeCapabilitiesResponse& response) + -> Future<Option<Error>> { + if (!response.has_confirmed()) { + return Error( + "Validation failed for volume '" + volumeInfo.id + "': " + + response.message()); + } + + if (response.confirmed().volume_context() != volumeInfo.context) { + return Error( + "Validation failed for volume '" + volumeInfo.id + + "': Mismatched volume context"); + } + + if (std::none_of( + response.confirmed().volume_capabilities().begin(), + response.confirmed().volume_capabilities().end(), + [&](const csi::v1::VolumeCapability& _capability) { + return csi::v1::devolve(_capability) == capability; + })) { + return Error( + "Validation failed for volume '" + volumeInfo.id + + "': Unsupported volume capability"); + } + + if (response.confirmed().parameters() != parameters) { + return Error( + "Validation failed for volume '" + volumeInfo.id + + "': Mismatched parameters"); + } + + // NOTE: If the volume is already tracked, there might already be + // operations running in its sequence. Since this continuation runs + // outside the sequence, we fail the call here to avoid any race issue. + // This also means that this call is not idempotent. + if (volumes.contains(volumeInfo.id)) { + return Failure("Volume '" + volumeInfo.id + "' already validated"); + } + + VolumeState volumeState; + volumeState.set_state(VolumeState::CREATED); + *volumeState.mutable_volume_capability() = capability; + *volumeState.mutable_parameters() = parameters; + *volumeState.mutable_volume_context() = volumeInfo.context; + + volumes.put(volumeInfo.id, std::move(volumeState)); + checkpointVolumeState(volumeInfo.id); + + return None(); + })); +} + + +Future<bool> VolumeManagerProcess::deleteVolume(const string& volumeId) +{ + if (!volumes.contains(volumeId)) { + return __deleteVolume(volumeId); + } + + VolumeData& volume = volumes.at(volumeId); + + LOG(INFO) << "Deleting volume '" << volumeId << "' in " + << volume.state.state() << " state"; + + // Volume deletion is sequentialized with other operations on the same volume + // to avoid races. + return volume.sequence->add(std::function<Future<bool>()>( + process::defer(self(), &Self::_deleteVolume, volumeId))); +} + + +Future<Nothing> VolumeManagerProcess::attachVolume(const string& volumeId) +{ + if (!volumes.contains(volumeId)) { + return Failure("Cannot attach unknown volume '" + volumeId + "'"); + } + + VolumeData& volume = volumes.at(volumeId); + + LOG(INFO) << "Attaching volume '" << volumeId << "' in " + << volume.state.state() << " state"; + + // Volume attaching is serialized with other operations on the same volume to + // avoid races. + return volume.sequence->add(std::function<Future<Nothing>()>( + process::defer(self(), &Self::_attachVolume, volumeId))); +} + + +Future<Nothing> VolumeManagerProcess::detachVolume(const string& volumeId) +{ + if (!volumes.contains(volumeId)) { + return Failure("Cannot detach unknown volume '" + volumeId + "'"); + } + + VolumeData& volume = volumes.at(volumeId); + + LOG(INFO) << "Detaching volume '" << volumeId << "' in " + << volume.state.state() << " state"; + + // Volume detaching is serialized with other operations on the same volume to + // avoid races. + return volume.sequence->add(std::function<Future<Nothing>()>( + process::defer(self(), &Self::_detachVolume, volumeId))); +} + + +Future<Nothing> VolumeManagerProcess::publishVolume(const string& volumeId) +{ + if (!volumes.contains(volumeId)) { + return Failure("Cannot publish unknown volume '" + volumeId + "'"); + } + + VolumeData& volume = volumes.at(volumeId); + + LOG(INFO) << "Publishing volume '" << volumeId << "' in " + << volume.state.state() << " state"; + + // Volume publishing is serialized with other operations on the same volume to + // avoid races. + return volume.sequence->add(std::function<Future<Nothing>()>( + process::defer(self(), &Self::_publishVolume, volumeId))); +} + + +Future<Nothing> VolumeManagerProcess::unpublishVolume(const string& volumeId) +{ + if (!volumes.contains(volumeId)) { + return Failure("Cannot unpublish unknown volume '" + volumeId + "'"); + } + + VolumeData& volume = volumes.at(volumeId); + + LOG(INFO) << "Unpublishing volume '" << volumeId << "' in " + << volume.state.state() << " state"; + + // Volume unpublishing is serialized with other operations on the same volume + // to avoid races. + return volume.sequence->add(std::function<Future<Nothing>()>( + process::defer(self(), &Self::_unpublishVolume, volumeId))); +} + + +template <typename Request, typename Response> +Future<Response> VolumeManagerProcess::call( + const Service& service, + Future<RPCResult<Response>> (Client::*rpc)(Request), + const Request& request, + const bool retry) // Made immutable in the following mutable lambda. +{ + Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR; + + return process::loop( + self(), + [=] { + // Make the call to the latest service endpoint. + return serviceManager->getServiceEndpoint(service) + .then(process::defer( + self(), + &VolumeManagerProcess::_call<Request, Response>, + lambda::_1, + rpc, + request)); + }, + [=](const RPCResult<Response>& result) mutable + -> Future<ControlFlow<Response>> { + Option<Duration> backoff = retry + ? maxBackoff * (static_cast<double>(os::random()) / RAND_MAX) + : Option<Duration>::none(); + + maxBackoff = std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX); + + // We dispatch `__call` for testing purpose. + return process::dispatch( + self(), &VolumeManagerProcess::__call<Response>, result, backoff); + }); +} + + +template <typename Request, typename Response> +Future<RPCResult<Response>> VolumeManagerProcess::_call( + const string& endpoint, + Future<RPCResult<Response>> (Client::*rpc)(Request), + const Request& request) +{ + ++metrics->csi_plugin_rpcs_pending; + + return (Client(endpoint, runtime).*rpc)(request).onAny( + process::defer(self(), [=](const Future<RPCResult<Response>>& future) { + --metrics->csi_plugin_rpcs_pending; + if (future.isReady() && future->isSome()) { + ++metrics->csi_plugin_rpcs_finished; + } else if (future.isDiscarded()) { + ++metrics->csi_plugin_rpcs_cancelled; + } else { + ++metrics->csi_plugin_rpcs_failed; + } + })); +} + + +template <typename Response> +Future<ControlFlow<Response>> VolumeManagerProcess::__call( + const RPCResult<Response>& result, const Option<Duration>& backoff) +{ + if (result.isSome()) { + return Break(result.get()); + } + + if (backoff.isNone()) { + return Failure(result.error()); + } + + // See the link below for retryable status codes: + // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT + switch (result.error().status.error_code()) { + case grpc::DEADLINE_EXCEEDED: + case grpc::UNAVAILABLE: { + LOG(ERROR) << "Received '" << result.error() << "' while expecting " + << Response::descriptor()->name() << ". Retrying in " + << backoff.get(); + + return process::after(backoff.get()) + .then([]() -> Future<ControlFlow<Response>> { return Continue(); }); + } + case grpc::CANCELLED: + case grpc::UNKNOWN: + case grpc::INVALID_ARGUMENT: + case grpc::NOT_FOUND: + case grpc::ALREADY_EXISTS: + case grpc::PERMISSION_DENIED: + case grpc::UNAUTHENTICATED: + case grpc::RESOURCE_EXHAUSTED: + case grpc::FAILED_PRECONDITION: + case grpc::ABORTED: + case grpc::OUT_OF_RANGE: + case grpc::UNIMPLEMENTED: + case grpc::INTERNAL: + case grpc::DATA_LOSS: { + return Failure(result.error()); + } + case grpc::OK: + case grpc::DO_NOT_USE: { + UNREACHABLE(); + } + } + + UNREACHABLE(); +} + + +Future<Nothing> VolumeManagerProcess::prepareServices() +{ + CHECK(!services.empty()); + + // Get the plugin capabilities. + return call( + *services.begin(), + &Client::getPluginCapabilities, + GetPluginCapabilitiesRequest()) + .then(process::defer(self(), [=]( + const GetPluginCapabilitiesResponse& response) -> Future<Nothing> { + pluginCapabilities = response.capabilities(); + + if (services.contains(CONTROLLER_SERVICE) && + !pluginCapabilities->controllerService) { + return Failure( + "CONTROLLER_SERVICE plugin capability is not supported for CSI " + "plugin type '" + info.type() + "' and name '" + info.name() + "'"); + } + + return Nothing(); + })) + // Check if all services have consistent plugin infos. + .then(process::defer(self(), [this] { + vector<Future<GetPluginInfoResponse>> futures; + foreach (const Service& service, services) { + futures.push_back(call( + CONTROLLER_SERVICE, &Client::getPluginInfo, GetPluginInfoRequest()) + .onReady([service](const GetPluginInfoResponse& response) { + LOG(INFO) << service << " loaded: " << stringify(response); + })); + } + + return process::collect(futures) + .then([](const vector<GetPluginInfoResponse>& pluginInfos) { + for (size_t i = 1; i < pluginInfos.size(); ++i) { + if (pluginInfos[i].name() != pluginInfos[0].name() || + pluginInfos[i].vendor_version() != + pluginInfos[0].vendor_version()) { + LOG(WARNING) << "Inconsistent plugin services. Please check with " + "the plugin vendor to ensure compatibility."; + } + } + + return Nothing(); + }); + })) + // Get the controller capabilities. + .then(process::defer(self(), [this]() -> Future<Nothing> { + if (!services.contains(CONTROLLER_SERVICE)) { + controllerCapabilities = ControllerCapabilities(); + return Nothing(); + } + + return call( + CONTROLLER_SERVICE, + &Client::controllerGetCapabilities, + ControllerGetCapabilitiesRequest()) + .then(process::defer(self(), [this]( + const ControllerGetCapabilitiesResponse& response) { + controllerCapabilities = response.capabilities(); + return Nothing(); + })); + })) + // Get the node capabilities and ID. + .then(process::defer(self(), [this]() -> Future<Nothing> { + if (!services.contains(NODE_SERVICE)) { + nodeCapabilities = NodeCapabilities(); + return Nothing(); + } + + return call( + NODE_SERVICE, + &Client::nodeGetCapabilities, + NodeGetCapabilitiesRequest()) + .then(process::defer(self(), [this]( + const NodeGetCapabilitiesResponse& response) -> Future<Nothing> { + nodeCapabilities = response.capabilities(); + + if (controllerCapabilities->publishUnpublishVolume) { + return call( + NODE_SERVICE, &Client::nodeGetInfo, NodeGetInfoRequest()) + .then(process::defer(self(), [this]( + const NodeGetInfoResponse& response) { + nodeId = response.node_id(); + return Nothing(); + })); + } + + return Nothing(); + })); + })); +} + + +Future<bool> VolumeManagerProcess::_deleteVolume(const std::string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.node_publish_required()) { + CHECK_EQ(VolumeState::PUBLISHED, volumeState.state()); + + const string targetPath = paths::getMountTargetPath( + paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId); + + // NOTE: Normally the volume should have been cleaned up. However this may + // not be true for preprovisioned volumes (e.g., leftover from a previous + // resource provider instance). To prevent data leakage in such cases, we + // clean up the data (but not the target path) here. + Try<Nothing> rmdir = os::rmdir(targetPath, true, false); + if (rmdir.isError()) { + return Failure( + "Failed to clean up volume '" + volumeId + "': " + rmdir.error()); + } + + volumeState.set_node_publish_required(false); + checkpointVolumeState(volumeId); + } + + if (volumeState.state() != VolumeState::CREATED) { + // Retry after transitioning the volume to `CREATED` state. + return _detachVolume(volumeId) + .then(process::defer(self(), &Self::_deleteVolume, volumeId)); + } + + // NOTE: The last asynchronous continuation, which is supposed to be run in + // the volume's sequence, would cause the sequence to be destructed, which + // would in turn discard the returned future. However, since the continuation + // would have already been run, the returned future will become ready, making + // the future returned by the sequence ready as well. + return __deleteVolume(volumeId) + .then(process::defer(self(), [this, volumeId](bool deleted) { + volumes.erase(volumeId); + + const string volumePath = + paths::getVolumePath(rootDir, info.type(), info.name(), volumeId); + + Try<Nothing> rmdir = os::rmdir(volumePath); + CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '" + << volumePath << "': " << rmdir.error(); + + garbageCollectMountPath(volumeId); + + return deleted; + })); +} + + +Future<bool> VolumeManagerProcess::__deleteVolume( + const string& volumeId) +{ + if (!controllerCapabilities->createDeleteVolume) { + return false; + } + + LOG(INFO) << "Calling '/csi.v1.Controller/DeleteVolume' for volume '" + << volumeId << "'"; + + DeleteVolumeRequest request; + request.set_volume_id(volumeId); + + // We retry the `DeleteVolume` call for MESOS-9517. + return call( + CONTROLLER_SERVICE, &Client::deleteVolume, std::move(request), true) + .then([] { return true; }); +} + + +Future<Nothing> VolumeManagerProcess::_attachVolume(const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.state() == VolumeState::NODE_READY) { + return Nothing(); + } + + if (volumeState.state() != VolumeState::CREATED && + volumeState.state() != VolumeState::CONTROLLER_PUBLISH && + volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) { + return Failure( + "Cannot attach volume '" + volumeId + "' in " + + stringify(volumeState.state()) + " state"); + } + + if (!controllerCapabilities->publishUnpublishVolume) { + // Since this is a no-op, no need to checkpoint here. + volumeState.set_state(VolumeState::NODE_READY); + return Nothing(); + } + + // A previously failed `ControllerUnpublishVolume` call can be recovered + // through an extra `ControllerUnpublishVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v1.1.0/spec.md#controllerunpublishvolume // NOLINT + if (volumeState.state() == VolumeState::CONTROLLER_UNPUBLISH) { + // Retry after recovering the volume to `CREATED` state. + return _detachVolume(volumeId) + .then(process::defer(self(), &Self::_attachVolume, volumeId)); + } + + if (volumeState.state() == VolumeState::CREATED) { + volumeState.set_state(VolumeState::CONTROLLER_PUBLISH); + checkpointVolumeState(volumeId); + } + + LOG(INFO) + << "Calling '/csi.v1.Controller/ControllerPublishVolume' for volume '" + << volumeId << "'"; + + ControllerPublishVolumeRequest request; + request.set_volume_id(volumeId); + request.set_node_id(CHECK_NOTNONE(nodeId)); + *request.mutable_volume_capability() = + evolve(volumeState.volume_capability()); + request.set_readonly(false); + *request.mutable_volume_context() = volumeState.volume_context(); + + return call( + CONTROLLER_SERVICE, &Client::controllerPublishVolume, std::move(request)) + .then(process::defer(self(), [this, volumeId]( + const ControllerPublishVolumeResponse& response) { + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + volumeState.set_state(VolumeState::NODE_READY); + *volumeState.mutable_publish_context() = response.publish_context(); + + checkpointVolumeState(volumeId); + + return Nothing(); + })); +} + + +Future<Nothing> VolumeManagerProcess::_detachVolume(const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.state() == VolumeState::CREATED) { + return Nothing(); + } + + if (volumeState.state() != VolumeState::NODE_READY && + volumeState.state() != VolumeState::CONTROLLER_PUBLISH && + volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) { + // Retry after transitioning the volume to `CREATED` state. + return _unpublishVolume(volumeId) + .then(process::defer(self(), &Self::_detachVolume, volumeId)); + } + + if (!controllerCapabilities->publishUnpublishVolume) { + // Since this is a no-op, no need to checkpoint here. + volumeState.set_state(VolumeState::CREATED); + return Nothing(); + } + + // A previously failed `ControllerPublishVolume` call can be recovered through + // the current `ControllerUnpublishVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v1.1.0/spec.md#controllerpublishvolume // NOLINT + if (volumeState.state() == VolumeState::NODE_READY || + volumeState.state() == VolumeState::CONTROLLER_PUBLISH) { + volumeState.set_state(VolumeState::CONTROLLER_UNPUBLISH); + checkpointVolumeState(volumeId); + } + + LOG(INFO) + << "Calling '/csi.v1.Controller/ControllerUnpublishVolume' for volume '" + << volumeId << "'"; + + ControllerUnpublishVolumeRequest request; + request.set_volume_id(volumeId); + request.set_node_id(CHECK_NOTNONE(nodeId)); + + return call( + CONTROLLER_SERVICE, + &Client::controllerUnpublishVolume, + std::move(request)) + .then(process::defer(self(), [this, volumeId] { + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + volumeState.set_state(VolumeState::CREATED); + volumeState.mutable_publish_context()->clear(); + + checkpointVolumeState(volumeId); + + return Nothing(); + })); +} + + +Future<Nothing> VolumeManagerProcess::_publishVolume(const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.state() == VolumeState::PUBLISHED) { + CHECK(volumeState.node_publish_required()); + return Nothing(); + } + + if (volumeState.state() != VolumeState::VOL_READY && + volumeState.state() != VolumeState::NODE_PUBLISH && + volumeState.state() != VolumeState::NODE_UNPUBLISH) { + // Retry after transitioning the volume to `VOL_READY` state. + return __publishVolume(volumeId) + .then(process::defer(self(), &Self::_publishVolume, volumeId)); + } + + // A previously failed `NodeUnpublishVolume` call can be recovered through an + // extra `NodeUnpublishVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v1.1.0/spec.md#nodeunpublishvolume // NOLINT + if (volumeState.state() == VolumeState::NODE_UNPUBLISH) { + // Retry after recovering the volume to `VOL_READY` state. + return __unpublishVolume(volumeId) + .then(process::defer(self(), &Self::_publishVolume, volumeId)); + } + + const string targetPath = paths::getMountTargetPath( + paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId); + + // Ensure the parent directory of the target path exists. The target path + // itself will be created by the plugin. + // + // NOTE: The target path will be removed by the plugin as well, and The parent + // directory of the target path will be cleaned up during volume removal. + Try<Nothing> mkdir = os::mkdir(Path(targetPath).dirname()); + if (mkdir.isError()) { + return Failure( + "Failed to create parent directory of target path '" + targetPath + + "': " + mkdir.error()); + } + + if (volumeState.state() == VolumeState::VOL_READY) { + volumeState.set_state(VolumeState::NODE_PUBLISH); + checkpointVolumeState(volumeId); + } + + LOG(INFO) << "Calling '/csi.v1.Node/NodePublishVolume' for volume '" + << volumeId << "'"; + + NodePublishVolumeRequest request; + request.set_volume_id(volumeId); + *request.mutable_publish_context() = volumeState.publish_context(); + request.set_target_path(targetPath); + *request.mutable_volume_capability() = + evolve(volumeState.volume_capability()); + request.set_readonly(false); + *request.mutable_volume_context() = volumeState.volume_context(); + + if (nodeCapabilities->stageUnstageVolume) { + const string stagingPath = paths::getMountStagingPath( + paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId); + + CHECK(os::exists(stagingPath)); + request.set_staging_target_path(stagingPath); + } + + return call(NODE_SERVICE, &Client::nodePublishVolume, std::move(request)) + .then(defer(self(), [this, volumeId, targetPath] { + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + volumeState.set_state(VolumeState::PUBLISHED); + + // NOTE: This is the first time a container is going to consume the + // persistent volume, so the `node_publish_required` field is set to + // indicate that this volume must remain published so it can be + // synchronously cleaned up when the persistent volume is destroyed. + volumeState.set_node_publish_required(true); + + checkpointVolumeState(volumeId); + + return Nothing(); + })); +} + + +Future<Nothing> VolumeManagerProcess::__publishVolume(const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.state() == VolumeState::VOL_READY) { + CHECK(!volumeState.boot_id().empty()); + return Nothing(); + } + + if (volumeState.state() != VolumeState::NODE_READY && + volumeState.state() != VolumeState::NODE_STAGE && + volumeState.state() != VolumeState::NODE_UNSTAGE) { + // Retry after transitioning the volume to `NODE_READY` state. + return _attachVolume(volumeId) + .then(process::defer(self(), &Self::__publishVolume, volumeId)); + } + + if (!nodeCapabilities->stageUnstageVolume) { + // Since this is a no-op, no need to checkpoint here. + volumeState.set_state(VolumeState::VOL_READY); + volumeState.set_boot_id(CHECK_NOTNONE(bootId)); + return Nothing(); + } + + // A previously failed `NodeUnstageVolume` call can be recovered through an + // extra `NodeUnstageVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v1.1.0/spec.md#nodeunstagevolume // NOLINT + if (volumeState.state() == VolumeState::NODE_UNSTAGE) { + // Retry after recovering the volume to `NODE_READY` state. + return _unpublishVolume(volumeId) + .then(process::defer(self(), &Self::__publishVolume, volumeId)); + } + + const string stagingPath = paths::getMountStagingPath( + paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId); + + // NOTE: The staging path will be cleaned up in during volume removal. + Try<Nothing> mkdir = os::mkdir(stagingPath); + if (mkdir.isError()) { + return Failure( + "Failed to create mount staging path '" + stagingPath + + "': " + mkdir.error()); + } + + if (volumeState.state() == VolumeState::NODE_READY) { + volumeState.set_state(VolumeState::NODE_STAGE); + checkpointVolumeState(volumeId); + } + + LOG(INFO) << "Calling '/csi.v1.Node/NodeStageVolume' for volume '" << volumeId + << "'"; + + NodeStageVolumeRequest request; + request.set_volume_id(volumeId); + *request.mutable_publish_context() = volumeState.publish_context(); + request.set_staging_target_path(stagingPath); + *request.mutable_volume_capability() = + evolve(volumeState.volume_capability()); + *request.mutable_volume_context() = volumeState.volume_context(); + + return call(NODE_SERVICE, &Client::nodeStageVolume, std::move(request)) + .then(process::defer(self(), [this, volumeId] { + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + volumeState.set_state(VolumeState::VOL_READY); + volumeState.set_boot_id(CHECK_NOTNONE(bootId)); + + checkpointVolumeState(volumeId); + + return Nothing(); + })); +} + + +Future<Nothing> VolumeManagerProcess::_unpublishVolume(const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.state() == VolumeState::NODE_READY) { + CHECK(volumeState.boot_id().empty()); + return Nothing(); + } + + if (volumeState.state() != VolumeState::VOL_READY && + volumeState.state() != VolumeState::NODE_STAGE && + volumeState.state() != VolumeState::NODE_UNSTAGE) { + // Retry after transitioning the volume to `VOL_READY` state. + return __unpublishVolume(volumeId) + .then(process::defer(self(), &Self::_unpublishVolume, volumeId)); + } + + if (!nodeCapabilities->stageUnstageVolume) { + // Since this is a no-op, no need to checkpoint here. + volumeState.set_state(VolumeState::NODE_READY); + volumeState.clear_boot_id(); + return Nothing(); + } + + // A previously failed `NodeStageVolume` call can be recovered through the + // current `NodeUnstageVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v1.1.0/spec.md#nodestagevolume // NOLINT + if (volumeState.state() == VolumeState::VOL_READY || + volumeState.state() == VolumeState::NODE_STAGE) { + volumeState.set_state(VolumeState::NODE_UNSTAGE); + checkpointVolumeState(volumeId); + } + + const string stagingPath = paths::getMountStagingPath( + paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId); + + CHECK(os::exists(stagingPath)); + + LOG(INFO) << "Calling '/csi.v1.Node/NodeUnstageVolume' for volume '" + << volumeId << "'"; + + NodeUnstageVolumeRequest request; + request.set_volume_id(volumeId); + request.set_staging_target_path(stagingPath); + + return call(NODE_SERVICE, &Client::nodeUnstageVolume, std::move(request)) + .then(process::defer(self(), [this, volumeId] { + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + volumeState.set_state(VolumeState::NODE_READY); + volumeState.clear_boot_id(); + + checkpointVolumeState(volumeId); + + return Nothing(); + })); +} + + +Future<Nothing> VolumeManagerProcess::__unpublishVolume(const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.state() == VolumeState::VOL_READY) { + return Nothing(); + } + + if (volumeState.state() != VolumeState::PUBLISHED && + volumeState.state() != VolumeState::NODE_PUBLISH && + volumeState.state() != VolumeState::NODE_UNPUBLISH) { + return Failure( + "Cannot unpublish volume '" + volumeId + "' in " + + stringify(volumeState.state()) + "state"); + } + + // A previously failed `NodePublishVolume` call can be recovered through the + // current `NodeUnpublishVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v1.1.0/spec.md#nodepublishvolume // NOLINT + if (volumeState.state() == VolumeState::PUBLISHED || + volumeState.state() == VolumeState::NODE_PUBLISH) { + volumeState.set_state(VolumeState::NODE_UNPUBLISH); + checkpointVolumeState(volumeId); + } + + const string targetPath = paths::getMountTargetPath( + paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId); + + CHECK(os::exists(targetPath)); + + LOG(INFO) << "Calling '/csi.v1.Node/NodeUnpublishVolume' for volume '" + << volumeId << "'"; + + NodeUnpublishVolumeRequest request; + request.set_volume_id(volumeId); + request.set_target_path(targetPath); + + return call(NODE_SERVICE, &Client::nodeUnpublishVolume, std::move(request)) + .then(process::defer(self(), [this, volumeId] { + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + volumeState.set_state(VolumeState::VOL_READY); + + checkpointVolumeState(volumeId); + + return Nothing(); + })); +} + + +void VolumeManagerProcess::checkpointVolumeState(const string& volumeId) +{ + const string statePath = + paths::getVolumeStatePath(rootDir, info.type(), info.name(), volumeId); + + // NOTE: We ensure the checkpoint is synced to the filesystem to avoid + // resulting in a stale or empty checkpoint when a system crash happens. + Try<Nothing> checkpoint = + slave::state::checkpoint(statePath, volumes.at(volumeId).state, true); + + CHECK_SOME(checkpoint) + << "Failed to checkpoint volume state to '" << statePath << "':" + << checkpoint.error(); +} + + +void VolumeManagerProcess::garbageCollectMountPath(const string& volumeId) +{ + CHECK(!volumes.contains(volumeId)); + + const string path = paths::getMountPath( + paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId); + + if (os::exists(path)) { + Try<Nothing> rmdir = os::rmdir(path); + if (rmdir.isError()) { + LOG(ERROR) << "Failed to remove directory '" << path + << "': " << rmdir.error(); + } + } +} + + +VolumeManager::VolumeManager( + const http::URL& agentUrl, + const string& rootDir, + const CSIPluginInfo& info, + const hashset<Service>& services, + const string& containerPrefix, + const Option<string>& authToken, + const Runtime& runtime, + Metrics* metrics) + : process(new VolumeManagerProcess( + agentUrl, + rootDir, + info, + services, + containerPrefix, + authToken, + runtime, + metrics)) +{ + process::spawn(CHECK_NOTNULL(process.get())); + recovered = process::dispatch(process.get(), &VolumeManagerProcess::recover); +} + + +VolumeManager::~VolumeManager() +{ + process::terminate(process.get()); + process::wait(process.get()); +} + + +Future<Nothing> VolumeManager::recover() +{ + return recovered; +} + + +Future<vector<VolumeInfo>> VolumeManager::listVolumes() +{ + return recovered + .then(process::defer(process.get(), &VolumeManagerProcess::listVolumes)); +} + + +Future<Bytes> VolumeManager::getCapacity( + const types::VolumeCapability& capability, + const Map<string, string>& parameters) +{ + return recovered + .then(process::defer( + process.get(), + &VolumeManagerProcess::getCapacity, + capability, + parameters)); +} + + +Future<VolumeInfo> VolumeManager::createVolume( + const string& name, + const Bytes& capacity, + const types::VolumeCapability& capability, + const Map<string, string>& parameters) +{ + return recovered + .then(process::defer( + process.get(), + &VolumeManagerProcess::createVolume, + name, + capacity, + capability, + parameters)); +} + + +Future<Option<Error>> VolumeManager::validateVolume( + const VolumeInfo& volumeInfo, + const types::VolumeCapability& capability, + const Map<string, string>& parameters) +{ + return recovered + .then(process::defer( + process.get(), + &VolumeManagerProcess::validateVolume, + volumeInfo, + capability, + parameters)); +} + + +Future<bool> VolumeManager::deleteVolume(const string& volumeId) +{ + return recovered + .then(process::defer( + process.get(), &VolumeManagerProcess::deleteVolume, volumeId)); +} + + +Future<Nothing> VolumeManager::attachVolume(const string& volumeId) +{ + return recovered + .then(process::defer( + process.get(), &VolumeManagerProcess::attachVolume, volumeId)); +} + + +Future<Nothing> VolumeManager::detachVolume(const string& volumeId) +{ + return recovered + .then(process::defer( + process.get(), &VolumeManagerProcess::detachVolume, volumeId)); +} + + +Future<Nothing> VolumeManager::publishVolume(const string& volumeId) +{ + return recovered + .then(process::defer( + process.get(), &VolumeManagerProcess::publishVolume, volumeId)); +} + + +Future<Nothing> VolumeManager::unpublishVolume(const string& volumeId) +{ + return recovered + .then(process::defer( + process.get(), &VolumeManagerProcess::unpublishVolume, volumeId)); +} + +} // namespace v1 { +} // namespace csi { +} // namespace mesos { diff --git a/src/csi/v1_volume_manager.hpp b/src/csi/v1_volume_manager.hpp new file mode 100644 index 0000000..f8e6095 --- /dev/null +++ b/src/csi/v1_volume_manager.hpp @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __CSI_V1_VOLUME_MANAGER_HPP__ +#define __CSI_V1_VOLUME_MANAGER_HPP__ + +#include <string> +#include <vector> + +#include <google/protobuf/map.h> + +#include <mesos/mesos.hpp> + +#include <mesos/csi/types.hpp> + +#include <process/future.hpp> +#include <process/grpc.hpp> +#include <process/http.hpp> +#include <process/owned.hpp> + +#include <stout/bytes.hpp> +#include <stout/error.hpp> +#include <stout/hashset.hpp> +#include <stout/nothing.hpp> +#include <stout/option.hpp> + +#include "csi/metrics.hpp" +#include "csi/service_manager.hpp" +#include "csi/volume_manager.hpp" + +namespace mesos { +namespace csi { +namespace v1 { + +// Forward declarations. +class VolumeManagerProcess; + + +class VolumeManager : public csi::VolumeManager +{ +public: + VolumeManager( + const process::http::URL& agentUrl, + const std::string& rootDir, + const CSIPluginInfo& info, + const hashset<Service>& services, + const std::string& containerPrefix, + const Option<std::string>& authToken, + const process::grpc::client::Runtime& runtime, + Metrics* metrics); + + // Since this class contains `Owned` members which should not but can be + // copied, explicitly make this class non-copyable. + // + // TODO(chhsiao): Remove this once MESOS-5122 is fixed. + VolumeManager(const VolumeManager&) = delete; + VolumeManager& operator=(const VolumeManager&) = delete; + + ~VolumeManager() override; + + process::Future<Nothing> recover() override; + + process::Future<std::vector<VolumeInfo>> listVolumes() override; + + process::Future<Bytes> getCapacity( + const types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters) + override; + + process::Future<VolumeInfo> createVolume( + const std::string& name, + const Bytes& capacity, + const types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters) + override; + + process::Future<Option<Error>> validateVolume( + const VolumeInfo& volumeInfo, + const types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters) + override; + + process::Future<bool> deleteVolume(const std::string& volumeId) override; + + process::Future<Nothing> attachVolume(const std::string& volumeId) override; + + process::Future<Nothing> detachVolume(const std::string& volumeId) override; + + process::Future<Nothing> publishVolume(const std::string& volumeId) override; + + process::Future<Nothing> unpublishVolume( + const std::string& volumeId) override; + +private: + process::Owned<VolumeManagerProcess> process; + process::Future<Nothing> recovered; +}; + +} // namespace v1 { +} // namespace csi { +} // namespace mesos { + +#endif // __CSI_V1_VOLUME_MANAGER_HPP__ diff --git a/src/csi/v1_volume_manager_process.hpp b/src/csi/v1_volume_manager_process.hpp new file mode 100644 index 0000000..1c80399 --- /dev/null +++ b/src/csi/v1_volume_manager_process.hpp @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __CSI_V1_VOLUME_MANAGER_PROCESS_HPP__ +#define __CSI_V1_VOLUME_MANAGER_PROCESS_HPP__ + +#include <string> +#include <vector> + +#include <google/protobuf/map.h> + +#include <mesos/mesos.hpp> + +#include <mesos/csi/types.hpp> + +#include <process/future.hpp> +#include <process/grpc.hpp> +#include <process/http.hpp> +#include <process/loop.hpp> +#include <process/owned.hpp> +#include <process/process.hpp> +#include <process/sequence.hpp> + +#include <stout/bytes.hpp> +#include <stout/duration.hpp> +#include <stout/error.hpp> +#include <stout/hashmap.hpp> +#include <stout/hashset.hpp> +#include <stout/nothing.hpp> +#include <stout/option.hpp> +#include <stout/try.hpp> + +#include "csi/metrics.hpp" +#include "csi/service_manager.hpp" +#include "csi/state.hpp" +#include "csi/v1_client.hpp" +#include "csi/v1_utils.hpp" +#include "csi/v1_volume_manager.hpp" +#include "csi/volume_manager.hpp" + +namespace mesos { +namespace csi { +namespace v1 { + +// The CSI volume manager initially picks a random amount of time between +// `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI calls. +// Subsequent retries are exponentially backed off based on this interval (e.g., +// 2nd retry uses a random value between `[0, b * 2^1]`, 3rd retry between +// `[0, b * 2^2]`, etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`. +// +// TODO(chhsiao): Make the retry parameters configurable. +constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10); +constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10); + + +class VolumeManagerProcess : public process::Process<VolumeManagerProcess> +{ +public: + explicit VolumeManagerProcess( + const process::http::URL& agentUrl, + const std::string& _rootDir, + const CSIPluginInfo& _info, + const hashset<Service> _services, + const std::string& containerPrefix, + const Option<std::string>& authToken, + const process::grpc::client::Runtime& _runtime, + Metrics* _metrics); + + process::Future<Nothing> recover(); + + process::Future<std::vector<VolumeInfo>> listVolumes(); + + process::Future<Bytes> getCapacity( + const types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters); + + process::Future<VolumeInfo> createVolume( + const std::string& name, + const Bytes& capacity, + const types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters); + + process::Future<Option<Error>> validateVolume( + const VolumeInfo& volumeInfo, + const types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters); + + process::Future<bool> deleteVolume(const std::string& volumeId); + + process::Future<Nothing> attachVolume(const std::string& volumeId); + + process::Future<Nothing> detachVolume(const std::string& volumeId); + + process::Future<Nothing> publishVolume(const std::string& volumeId); + + process::Future<Nothing> unpublishVolume(const std::string& volumeId); + + // Wrapper functions to make CSI calls and update RPC metrics. Made public for + // testing purpose. + // + // The call is made asynchronously and thus no guarantee is provided on the + // order in which calls are sent. Callers need to either ensure to not have + // multiple conflicting calls in flight, or treat results idempotently. + // + // NOTE: We currently ensure this by 1) resource locking to forbid concurrent + // calls on the same volume, and 2) no profile update while there are ongoing + // `CREATE_DISK` or `DESTROY_DISK` operations. + template <typename Request, typename Response> + process::Future<Response> call( + const Service& service, + process::Future<RPCResult<Response>> (Client::*rpc)(Request), + const Request& request, + bool retry = false); + + template <typename Request, typename Response> + process::Future<RPCResult<Response>> _call( + const std::string& endpoint, + process::Future<RPCResult<Response>> (Client::*rpc)(Request), + const Request& request); + + template <typename Response> + process::Future<process::ControlFlow<Response>> __call( + const RPCResult<Response>& result, const Option<Duration>& backoff); + +private: + process::Future<Nothing> prepareServices(); + + process::Future<bool> _deleteVolume(const std::string& volumeId); + process::Future<bool> __deleteVolume(const std::string& volumeId); + + // The following methods are used to manage volume lifecycles. Transient + // states are omitted. + // + // +------------+ + // + + + | CREATED | ^ + // _attachVolume | | | +---+----^---+ | + // | | | | | | _detachVolume + // | | | +---v----+---+ | + // v + + | NODE_READY | + ^ + // | | +---+----^---+ | | + // __publishVolume | | | | | | _unpublishVolume + // | | +---v----+---+ | | + // v + | VOL_READY | + + ^ + // | +---+----^---+ | | | + // _publishVolume | | | | | | __unpublishVolume + // | +---v----+---+ | | | + // V | PUBLISHED | + + + + // +------------+ + + // Transition a volume to `NODE_READY` state from any state above. + process::Future<Nothing> _attachVolume(const std::string& volumeId); + + // Transition a volume to `CREATED` state from any state below. + process::Future<Nothing> _detachVolume(const std::string& volumeId); + + // Transition a volume to `PUBLISHED` state from any state above. + process::Future<Nothing> _publishVolume(const std::string& volumeId); + + // Transition a volume to `VOL_READY` state from any state above. + process::Future<Nothing> __publishVolume(const std::string& volumeId); + + // Transition a volume to `NODE_READY` state from any state below. + process::Future<Nothing> _unpublishVolume(const std::string& volumeId); + + // Transition a volume to `VOL_READY` state from any state below. + process::Future<Nothing> __unpublishVolume(const std::string& volumeId); + + void checkpointVolumeState(const std::string& volumeId); + + void garbageCollectMountPath(const std::string& volumeId); + + const std::string rootDir; + const CSIPluginInfo info; + const hashset<Service> services; + + process::grpc::client::Runtime runtime; + Metrics* metrics; + process::Owned<ServiceManager> serviceManager; + + Option<std::string> bootId; + Option<PluginCapabilities> pluginCapabilities; + Option<ControllerCapabilities> controllerCapabilities; + Option<NodeCapabilities> nodeCapabilities; + Option<std::string> nodeId; + + struct VolumeData + { + VolumeData(state::VolumeState&& _state) + : state(_state), sequence(new process::Sequence("csi-volume-sequence")) {} + + state::VolumeState state; + + // We call all CSI operations on the same volume in a sequence to ensure + // that they are processed in a sequential order. + process::Owned<process::Sequence> sequence; + }; + + hashmap<std::string, VolumeData> volumes; +}; + +} // namespace v1 { +} // namespace csi { +} // namespace mesos { + +#endif // __CSI_V1_VOLUME_MANAGER_PROCESS_HPP__
