Repository: mesos Updated Branches: refs/heads/master 20a877747 -> 1f06e8446
Added publish/unpublish in storage local resource provider. Storage local resource provider can now handle PUBLISH events. Although we don't do UNPUBLISH for now, the unpublish function is still required for deleting volumes. Review: https://reviews.apache.org/r/63387/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1f06e844 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1f06e844 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1f06e844 Branch: refs/heads/master Commit: 1f06e8446264e4d8ff7418fecfe3b9df7765ec4a Parents: 20a8777 Author: Chun-Hung Hsiao <[email protected]> Authored: Wed Dec 6 16:22:54 2017 -0800 Committer: Jie Yu <[email protected]> Committed: Wed Dec 6 16:22:54 2017 -0800 ---------------------------------------------------------------------- src/Makefile.am | 4 +- src/csi/state.hpp | 24 + src/csi/state.proto | 61 +++ src/csi/utils.cpp | 14 + src/csi/utils.hpp | 7 + src/resource_provider/storage/provider.cpp | 560 +++++++++++++++++++++++- 6 files changed, 666 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/1f06e844/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 05e8b95..be105f4 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -380,7 +380,9 @@ CXX_CSI_PROTOS = \ csi/csi.grpc.pb.cc \ csi/csi.grpc.pb.h \ csi/csi.pb.cc \ - csi/csi.pb.h + csi/csi.pb.h \ + csi/state.pb.cc \ + csi/state.pb.h endif CXX_LOG_PROTOS = \ http://git-wip-us.apache.org/repos/asf/mesos/blob/1f06e844/src/csi/state.hpp ---------------------------------------------------------------------- diff --git a/src/csi/state.hpp b/src/csi/state.hpp new file mode 100644 index 0000000..dcbc7ab --- /dev/null +++ b/src/csi/state.hpp @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#ifndef __CSI_STATE_HPP__ +#define __CSI_STATE_HPP__ + +// ONLY USEFUL AFTER RUNNING PROTOC. +#include "csi/state.pb.h" + +#endif // __CSI_STATE_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/1f06e844/src/csi/state.proto ---------------------------------------------------------------------- diff --git a/src/csi/state.proto b/src/csi/state.proto new file mode 100644 index 0000000..0373c8a --- /dev/null +++ b/src/csi/state.proto @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +import "csi.proto"; + +package mesos.csi.state; + + +// Represents the state of a provisioned volume with respect to a node. +message VolumeState { + enum State { + UNKNOWN = 0; + CREATED = 1; // The volume is provisioned but not published. + NODE_READY = 2; // The volume is attached to the node. + PUBLISHED = 3; // The volume is mounted on the node. + CONTROLLER_PUBLISH = 4; // `ControllerPublishVolume` is being called. + CONTROLLER_UNPUBLISH = 5; // `ControllerUnpublishVolume` is being called. + NODE_PUBLISH = 6; // `NodePublishVolume` is being called. + NODE_UNPUBLISH = 7; // `NodeUnpublishVolume` is being called. + } + + // The state of the volume. This is a REQUIRED field. + State state = 1; + + // The capability used to publish the volume. This is a + // REQUIRED field. + .csi.VolumeCapability volume_capability = 2; + + // Attributes of the volume to be used on the node. This field MUST + // match the attributes of the `VolumeInfo` returned by + // `CreateVolume`. This is an OPTIONAL field. + map<string, string> volume_attributes = 3; + + // If the plugin has the `PUBLISH_UNPUBLISH_VOLUME` controller + // capability, this field MUST be set to the value returned by + // `ControllerPublishVolume`. Otherwise, this field MUST remain unset. + // This is an OPTIONAL field. + map<string, string> publish_volume_info = 4; + + // This field is used to check if the node has been rebooted since the + // last time the volume is mounted. If yes, `NodePublishVolume` needs + // to be called to mount the volume again. It MUST be set to the boot + // ID of the node if the volume is mounted, and SHOULD remain unset + // otherwise. This is an OPTIONAL field. + string boot_id = 5; +} http://git-wip-us.apache.org/repos/asf/mesos/blob/1f06e844/src/csi/utils.cpp ---------------------------------------------------------------------- diff --git a/src/csi/utils.cpp b/src/csi/utils.cpp index 590e5f4..bae4654 100644 --- a/src/csi/utils.cpp +++ b/src/csi/utils.cpp @@ -76,3 +76,17 @@ ostream& operator<<(ostream& stream, const Version& version) } } // namespace csi { + + +namespace mesos { +namespace csi { +namespace state { + +ostream& operator<<(ostream& stream, const VolumeState::State& state) +{ + return stream << VolumeState::State_Name(state); +} + +} // namespace state { +} // namespace csi { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/1f06e844/src/csi/utils.hpp ---------------------------------------------------------------------- diff --git a/src/csi/utils.hpp b/src/csi/utils.hpp index 54cf34b..b49fa59 100644 --- a/src/csi/utils.hpp +++ b/src/csi/utils.hpp @@ -31,6 +31,7 @@ #include <stout/unreachable.hpp> #include "csi/spec.hpp" +#include "csi/state.hpp" namespace csi { @@ -114,6 +115,12 @@ struct ControllerCapabilities bool getCapacity = false; }; + +namespace state { + +std::ostream& operator<<(std::ostream& stream, const VolumeState::State& state); + +} // namespace state { } // namespace csi { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/1f06e844/src/resource_provider/storage/provider.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 256b214..d798742 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -29,6 +29,7 @@ #include <process/id.hpp> #include <process/loop.hpp> #include <process/process.hpp> +#include <process/sequence.hpp> #include <process/timeout.hpp> #include <mesos/resources.hpp> @@ -50,6 +51,7 @@ #include "csi/client.hpp" #include "csi/paths.hpp" +#include "csi/state.hpp" #include "csi/utils.hpp" #include "internal/devolve.hpp" @@ -78,6 +80,7 @@ using process::Future; using process::Owned; using process::Process; using process::Promise; +using process::Sequence; using process::Timeout; using process::after; @@ -298,11 +301,24 @@ private: google::protobuf::Map<string, string> parameters; }; + struct VolumeData + { + VolumeData(const csi::state::VolumeState& _state) + : state(_state), sequence(new Sequence("volume-sequence")) {} + + csi::state::VolumeState state; + + // We run all CSI operations for the same volume on a sequence to + // ensure that they are processed in a sequential order. + Owned<Sequence> sequence; + }; + void initialize() override; void fatal(const string& messsage, const string& failure); Future<Nothing> recover(); Future<Nothing> recoverServices(); + Future<Nothing> recoverVolumes(); void doReliableRegistration(); Future<Nothing> reconcile(); @@ -318,9 +334,14 @@ private: Future<Nothing> prepareControllerService(); Future<Nothing> prepareNodeService(); Future<Resources> importResources(); + Future<Nothing> controllerPublish(const string& volumeId); + Future<Nothing> controllerUnpublish(const string& volumeId); + Future<Nothing> nodePublish(const string& volumeId); + Future<Nothing> nodeUnpublish(const string& volumeId); void checkpointResourceProviderState(); void sendResourceProviderStateUpdate(); + void checkpointVolumeState(const string& volumeId); enum State { @@ -340,6 +361,7 @@ private: const Option<string> authToken; csi::Version csiVersion; + string bootId; hashmap<string, ProfileData> profiles; process::grpc::client::Runtime runtime; Owned<v1::resource_provider::Driver> driver; @@ -357,6 +379,7 @@ private: list<Event::Operation> pendingOperations; Resources totalResources; Option<UUID> resourceVersion; + hashmap<string, VolumeData> volumes; }; @@ -415,6 +438,13 @@ void StorageLocalResourceProviderProcess::initialize() csiVersion.set_minor(1); csiVersion.set_patch(0); + Try<string> _bootId = os::bootId(); + if (_bootId.isError()) { + return fatal("Failed to get boot ID", _bootId.error()); + } + + bootId = _bootId.get(); + foreach (const CSIPluginContainerInfo& container, info.storage().plugin().containers()) { auto it = find( @@ -478,6 +508,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() CHECK_EQ(RECOVERING, state); return recoverServices() + .then(defer(self(), &Self::recoverVolumes)) .then(defer(self(), [=]() -> Future<Nothing> { // Recover the resource provider ID and state from the latest // symlink. If the symlink cannot be resolved, this is a new @@ -628,6 +659,120 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverServices() } +Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes() +{ + // Recover the states of CSI volumes. + Try<list<string>> volumePaths = csi::paths::getVolumePaths( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name()); + + if (volumePaths.isError()) { + return Failure( + "Failed to find volumes for CSI plugin type '" + + info.storage().plugin().type() + "' and name '" + + info.storage().plugin().name() + ": " + volumePaths.error()); + } + + list<Future<Nothing>> futures; + + foreach (const string& path, volumePaths.get()) { + Try<csi::paths::VolumePath> volumePath = + csi::paths::parseVolumePath(slave::paths::getCsiRootDir(workDir), path); + + if (volumePath.isError()) { + return Failure( + "Failed to parse volume path '" + path + "': " + volumePath.error()); + } + + CHECK_EQ(info.storage().plugin().type(), volumePath->type); + CHECK_EQ(info.storage().plugin().name(), volumePath->name); + + const string& volumeId = volumePath->volumeId; + const string statePath = csi::paths::getVolumeStatePath( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name(), + volumeId); + + Result<csi::state::VolumeState> volumeState = + ::protobuf::read<csi::state::VolumeState>(statePath); + + if (volumeState.isError()) { + return Failure( + "Failed to read volume state from '" + statePath + "': " + + volumeState.error()); + } + + if (volumeState.isSome()) { + volumes.put(volumeId, std::move(volumeState.get())); + + Future<Nothing> recovered = Nothing(); + + switch (volumes.at(volumeId).state.state()) { + case csi::state::VolumeState::CREATED: + case csi::state::VolumeState::NODE_READY: { + break; + } + case csi::state::VolumeState::PUBLISHED: { + if (volumes.at(volumeId).state.boot_id() != bootId) { + // The node has been restarted since the volume is mounted, + // so it is no longer in the `PUBLISHED` state. + volumes.at(volumeId).state.set_state( + csi::state::VolumeState::NODE_READY); + volumes.at(volumeId).state.clear_boot_id(); + checkpointVolumeState(volumeId); + } + break; + } + case csi::state::VolumeState::CONTROLLER_PUBLISH: { + recovered = + volumes.at(volumeId).sequence->add(std::function<Future<Nothing>()>( + defer(self(), &Self::controllerPublish, volumeId))); + break; + } + case csi::state::VolumeState::CONTROLLER_UNPUBLISH: { + recovered = + volumes.at(volumeId).sequence->add(std::function<Future<Nothing>()>( + defer(self(), &Self::controllerUnpublish, volumeId))); + break; + } + case csi::state::VolumeState::NODE_PUBLISH: { + recovered = + volumes.at(volumeId).sequence->add(std::function<Future<Nothing>()>( + defer(self(), &Self::nodePublish, volumeId))); + break; + } + case csi::state::VolumeState::NODE_UNPUBLISH: { + recovered = + volumes.at(volumeId).sequence->add(std::function<Future<Nothing>()>( + defer(self(), &Self::nodeUnpublish, volumeId))); + break; + } + case csi::state::VolumeState::UNKNOWN: { + recovered = Failure( + "Volume '" + volumeId + "' is in " + + stringify(volumes.at(volumeId).state.state()) + " state"); + } + + // NOTE: We avoid using a default clause for the following + // values in proto3's open enum to enable the compiler to detcet + // missing enum cases for us. See: + // https://github.com/google/protobuf/issues/3917 + case google::protobuf::kint32min: + case google::protobuf::kint32max: { + UNREACHABLE(); + } + } + + futures.push_back(recovered); + } + } + + return collect(futures).then([] { return Nothing(); }); +} + + void StorageLocalResourceProviderProcess::doReliableRegistration() { if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) { @@ -684,6 +829,25 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcile() Resources result = totalResources; foreach (Resource resource, imported - stripped) { + if (resource.disk().source().has_id() && + !volumes.contains(resource.disk().source().id())) { + csi::state::VolumeState volumeState; + volumeState.set_state(csi::state::VolumeState::CREATED); + + // The default profile is used if `profile` is unset. + volumeState.mutable_volume_capability()->CopyFrom( + profiles.at(resource.disk().source().profile()).capability); + + if (resource.disk().source().has_metadata()) { + volumeState.mutable_volume_attributes()->swap( + convertLabelsToStringMap( + resource.disk().source().metadata()).get()); + } + + volumes.put(resource.disk().source().id(), std::move(volumeState)); + checkpointVolumeState(resource.disk().source().id()); + } + resource.mutable_reservations()->CopyFrom(info.default_reservations()); result += resource; @@ -753,12 +917,134 @@ void StorageLocalResourceProviderProcess::operation( void StorageLocalResourceProviderProcess::publish(const Event::Publish& publish) { + Option<Error> error; + hashset<string> volumeIds; + if (state == SUBSCRIBED) { - // TODO(chhsiao): Reject this publish request. - return; + error = Error("Cannot publish resources in SUBSCRIBED state"); + } else { + CHECK_EQ(READY, state); + + Resources resources = publish.resources(); + resources.unallocate(); + foreach (const Resource& resource, resources) { + if (!totalResources.contains(resource)) { + error = Error( + "Cannot publish unknown resource '" + stringify(resource) + "'"); + break; + } + + switch (resource.disk().source().type()) { + case Resource::DiskInfo::Source::PATH: + case Resource::DiskInfo::Source::MOUNT: + case Resource::DiskInfo::Source::BLOCK: { + CHECK(resource.disk().source().has_id()); + CHECK(volumes.contains(resource.disk().source().id())); + volumeIds.insert(resource.disk().source().id()); + break; + } + case Resource::DiskInfo::Source::UNKNOWN: + case Resource::DiskInfo::Source::RAW: { + error = Error( + "Cannot publish volume of " + + stringify(resource.disk().source().type()) + " type"); + break; + } + } + } } - CHECK_EQ(READY, state); + Future<list<Nothing>> allPublished; + + if (error.isSome()) { + allPublished = Failure(error.get()); + } else { + list<Future<Nothing>> futures; + + foreach (const string& volumeId, volumeIds) { + // We check the state of the volume along with the CSI calls + // atomically with respect to other publish or deletion requests + // for the same volume through dispatching the whole lambda on the + // volume's sequence. + std::function<Future<Nothing>()> controllerAndNodePublish = + defer(self(), [=] { + CHECK(volumes.contains(volumeId)); + + Future<Nothing> published = Nothing(); + + // NOTE: We don't break for `CREATED` and `NODE_READY` as + // publishing the volume in these states needs all operations + // beneath it. + switch (volumes.at(volumeId).state.state()) { + case csi::state::VolumeState::CREATED: { + published = published + .then(defer(self(), &Self::controllerPublish, volumeId)); + } + case csi::state::VolumeState::NODE_READY: { + published = published + .then(defer(self(), &Self::nodePublish, volumeId)); + } + case csi::state::VolumeState::PUBLISHED: { + break; + } + case csi::state::VolumeState::UNKNOWN: + case csi::state::VolumeState::CONTROLLER_PUBLISH: + case csi::state::VolumeState::CONTROLLER_UNPUBLISH: + case csi::state::VolumeState::NODE_PUBLISH: + case csi::state::VolumeState::NODE_UNPUBLISH: { + UNREACHABLE(); + } + + // NOTE: We avoid using a default clause for the following + // values in proto3's open enum to enable the compiler to detcet + // missing enum cases for us. See: + // https://github.com/google/protobuf/issues/3917 + case google::protobuf::kint32min: + case google::protobuf::kint32max: { + UNREACHABLE(); + } + } + + return published; + }); + + futures.push_back( + volumes.at(volumeId).sequence->add(controllerAndNodePublish)); + } + + allPublished = collect(futures); + } + + allPublished + .onAny(defer(self(), [=](const Future<list<Nothing>>& future) { + // TODO(chhsiao): Currently there is no way to reply to the + // resource provider manager with a failure message, so we log the + // failure here. + if (!future.isReady()) { + LOG(ERROR) + << "Failed to publish resources '" << publish.resources() << "': " + << (future.isFailed() ? future.failure() : "future discarded"); + } + + Call call; + call.mutable_resource_provider_id()->CopyFrom(info.id()); + call.set_type(Call::UPDATE_PUBLISH_STATUS); + + Call::UpdatePublishStatus* update = call.mutable_update_publish_status(); + update->set_uuid(publish.uuid()); + update->set_status(future.isReady() + ? Call::UpdatePublishStatus::OK : Call::UpdatePublishStatus::FAILED); + + auto err = [](const string& uuid, const string& message) { + LOG(ERROR) + << "Failed to send status update for publish " + << UUID::fromBytes(uuid).get() << ": " << message; + }; + + driver->send(evolve(call)) + .onFailed(std::bind(err, publish.uuid(), lambda::_1)) + .onDiscarded(std::bind(err, publish.uuid(), "future discarded")); + })); } @@ -1254,6 +1540,260 @@ Future<Resources> StorageLocalResourceProviderProcess::importResources() } +Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish( + const string& volumeId) +{ + // NOTE: This can only be called after `prepareControllerService` and + // `prepareNodeService`. + CHECK_SOME(controllerCapabilities); + CHECK_SOME(nodeId); + + CHECK(volumes.contains(volumeId)); + if (volumes.at(volumeId).state.state() == + csi::state::VolumeState::CONTROLLER_PUBLISH) { + // The resource provider failed over during the last + // `ControllerPublishVolume` call. + CHECK_EQ(RECOVERING, state); + } else { + CHECK_EQ(csi::state::VolumeState::CREATED, + volumes.at(volumeId).state.state()); + + volumes.at(volumeId).state.set_state( + csi::state::VolumeState::CONTROLLER_PUBLISH); + checkpointVolumeState(volumeId); + } + + Future<Nothing> controllerPublished; + + if (controllerCapabilities->publishUnpublishVolume) { + controllerPublished = getService(controllerContainerId) + .then(defer(self(), [=](csi::Client client) { + csi::ControllerPublishVolumeRequest request; + request.mutable_version()->CopyFrom(csiVersion); + request.set_volume_id(volumeId); + request.set_node_id(nodeId.get()); + request.mutable_volume_capability() + ->CopyFrom(volumes.at(volumeId).state.volume_capability()); + request.set_readonly(false); + *request.mutable_volume_attributes() = + volumes.at(volumeId).state.volume_attributes(); + + return client.ControllerPublishVolume(request) + .then(defer(self(), [=]( + const csi::ControllerPublishVolumeResponse& response) { + *volumes.at(volumeId).state.mutable_publish_volume_info() = + response.publish_volume_info(); + + return Nothing(); + })); + })); + } else { + controllerPublished = Nothing(); + } + + return controllerPublished + .then(defer(self(), [=] { + volumes.at(volumeId).state.set_state(csi::state::VolumeState::NODE_READY); + checkpointVolumeState(volumeId); + + return Nothing(); + })) + .repair(defer(self(), [=](const Future<Nothing>& future) { + volumes.at(volumeId).state.set_state(csi::state::VolumeState::CREATED); + checkpointVolumeState(volumeId); + + return future; + })); +} + + +Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish( + const string& volumeId) +{ + // NOTE: This can only be called after `prepareControllerService` and + // `prepareNodeService`. + CHECK_SOME(controllerCapabilities); + CHECK_SOME(nodeId); + + CHECK(volumes.contains(volumeId)); + if (volumes.at(volumeId).state.state() == + csi::state::VolumeState::CONTROLLER_UNPUBLISH) { + // The resource provider failed over during the last + // `ControllerUnpublishVolume` call. + CHECK_EQ(RECOVERING, state); + } else { + CHECK_EQ(csi::state::VolumeState::NODE_READY, + volumes.at(volumeId).state.state()); + + volumes.at(volumeId).state.set_state( + csi::state::VolumeState::CONTROLLER_UNPUBLISH); + checkpointVolumeState(volumeId); + } + + Future<Nothing> controllerUnpublished; + + if (controllerCapabilities->publishUnpublishVolume) { + controllerUnpublished = getService(controllerContainerId) + .then(defer(self(), [=](csi::Client client) { + csi::ControllerUnpublishVolumeRequest request; + request.mutable_version()->CopyFrom(csiVersion); + request.set_volume_id(volumeId); + request.set_node_id(nodeId.get()); + + return client.ControllerUnpublishVolume(request) + .then([] { return Nothing(); }); + })); + } else { + controllerUnpublished = Nothing(); + } + + return controllerUnpublished + .then(defer(self(), [=] { + volumes.at(volumeId).state.set_state(csi::state::VolumeState::CREATED); + volumes.at(volumeId).state.mutable_publish_volume_info()->clear(); + checkpointVolumeState(volumeId); + + return Nothing(); + })) + .repair(defer(self(), [=](const Future<Nothing>& future) { + volumes.at(volumeId).state.set_state(csi::state::VolumeState::NODE_READY); + checkpointVolumeState(volumeId); + + return future; + })); +} + + +Future<Nothing> StorageLocalResourceProviderProcess::nodePublish( + const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + if (volumes.at(volumeId).state.state() == + csi::state::VolumeState::NODE_PUBLISH) { + // The resource provider failed over during the last + // `NodePublishVolume` call. + CHECK_EQ(RECOVERING, state); + } else { + CHECK_EQ(csi::state::VolumeState::NODE_READY, + volumes.at(volumeId).state.state()); + + volumes.at(volumeId).state.set_state(csi::state::VolumeState::NODE_PUBLISH); + checkpointVolumeState(volumeId); + } + + const string mountPath = csi::paths::getMountPath( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name(), + volumeId); + + Try<Nothing> mkdir = os::mkdir(mountPath); + if (mkdir.isError()) { + return Failure( + "Failed to create mount point '" + mountPath + "': " + mkdir.error()); + } + + return getService(nodeContainerId) + .then(defer(self(), [=](csi::Client client) { + csi::NodePublishVolumeRequest request; + request.mutable_version()->CopyFrom(csiVersion); + request.set_volume_id(volumeId); + *request.mutable_publish_volume_info() = + volumes.at(volumeId).state.publish_volume_info(); + request.set_target_path(mountPath); + request.mutable_volume_capability() + ->CopyFrom(volumes.at(volumeId).state.volume_capability()); + request.set_readonly(false); + *request.mutable_volume_attributes() = + volumes.at(volumeId).state.volume_attributes(); + + return client.NodePublishVolume(request); + })) + .then(defer(self(), [=] { + volumes.at(volumeId).state.set_state(csi::state::VolumeState::PUBLISHED); + volumes.at(volumeId).state.set_boot_id(bootId); + checkpointVolumeState(volumeId); + + return Nothing(); + })) + .repair(defer(self(), [=](const Future<Nothing>& future) { + volumes.at(volumeId).state.set_state(csi::state::VolumeState::NODE_READY); + checkpointVolumeState(volumeId); + + return future; + })); +} + + +Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish( + const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + if (volumes.at(volumeId).state.state() == + csi::state::VolumeState::NODE_UNPUBLISH) { + // The resource provider failed over during the last + // `NodeUnpublishVolume` call. + CHECK_EQ(RECOVERING, state); + } else { + CHECK_EQ(csi::state::VolumeState::PUBLISHED, + volumes.at(volumeId).state.state()); + + volumes.at(volumeId).state.set_state( + csi::state::VolumeState::NODE_UNPUBLISH); + checkpointVolumeState(volumeId); + } + + const string mountPath = csi::paths::getMountPath( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name(), + volumeId); + + Future<Nothing> nodeUnpublished; + + if (os::exists(mountPath)) { + nodeUnpublished = getService(nodeContainerId) + .then(defer(self(), [=](csi::Client client) { + csi::NodeUnpublishVolumeRequest request; + request.mutable_version()->CopyFrom(csiVersion); + request.set_volume_id(volumeId); + request.set_target_path(mountPath); + + return client.NodeUnpublishVolume(request) + .then([] { return Nothing(); }); + })); + } else { + // The volume has been actually unpublished before failover. + CHECK_EQ(RECOVERING, state); + + nodeUnpublished = Nothing(); + } + + return nodeUnpublished + .then(defer(self(), [=]() -> Future<Nothing> { + volumes.at(volumeId).state.set_state(csi::state::VolumeState::NODE_READY); + volumes.at(volumeId).state.clear_boot_id(); + + Try<Nothing> rmdir = os::rmdir(mountPath); + if (rmdir.isError()) { + return Failure( + "Failed to remove mount point '" + mountPath + "': " + + rmdir.error()); + } + + checkpointVolumeState(volumeId); + + return Nothing(); + })) + .repair(defer(self(), [=](const Future<Nothing>& future) { + volumes.at(volumeId).state.set_state(csi::state::VolumeState::PUBLISHED); + checkpointVolumeState(volumeId); + + return future; + })); +} + + void StorageLocalResourceProviderProcess::checkpointResourceProviderState() { ResourceProviderState state; @@ -1320,6 +1860,20 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate() } +void StorageLocalResourceProviderProcess::checkpointVolumeState( + const string& volumeId) +{ + const string statePath = csi::paths::getVolumeStatePath( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name(), + volumeId); + + CHECK_SOME(slave::state::checkpoint(statePath, volumes.at(volumeId).state)) + << "Failed to checkpoint volume state to '" << statePath << "'"; +} + + Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create( const http::URL& url, const string& workDir,
