This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 75cae10dbca0e3d4146ed1a75d939b1e29b194d7 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Mon Apr 1 23:23:51 2019 -0700 Cleanup volume creation, validation and deletion for SLRP. This patch introduces methods for volume creation, validation and deletion that conform to `VolumeManager`'s public interface in SLRP, and cleans up SLRP based on these functions. They will be moved out from SLRP to v0 `VolumeManager` later. Specifically, volume deletion now supports deleting untracked volumes. Review: https://reviews.apache.org/r/70217/ --- src/resource_provider/storage/provider.cpp | 350 +++++++++++---------- src/resource_provider/storage/provider_process.hpp | 22 +- 2 files changed, 200 insertions(+), 172 deletions(-) diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 84e5557..5a63e7b 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -81,6 +81,7 @@ #include "csi/service_manager.hpp" #include "csi/state.hpp" #include "csi/utils.hpp" +#include "csi/volume_manager.hpp" #include "internal/devolve.hpp" #include "internal/evolve.hpp" @@ -109,6 +110,8 @@ using std::shared_ptr; using std::string; using std::vector; +using google::protobuf::Map; + using process::after; using process::await; using process::Break; @@ -1965,45 +1968,51 @@ Future<Nothing> StorageLocalResourceProviderProcess::__unpublishVolume( } -Future<string> StorageLocalResourceProviderProcess::createVolume( +Future<VolumeInfo> StorageLocalResourceProviderProcess::createVolume( const string& name, const Bytes& capacity, - const DiskProfileAdaptor::ProfileInfo& profileInfo) + const types::VolumeCapability& capability, + const Map<string, string>& parameters) { if (!controllerCapabilities->createDeleteVolume) { return Failure( - "Controller capability 'CREATE_DELETE_VOLUME' is not supported"); + "CREATE_DELETE_VOLUME controller capability is not supported for CSI " + "plugin type '" + info.type() + "' and name '" + info.name()); } - csi::v0::CreateVolumeRequest request; + LOG(INFO) << "Creating volume with name '" << name << "'"; + + CreateVolumeRequest request; request.set_name(name); request.mutable_capacity_range()->set_required_bytes(capacity.bytes()); request.mutable_capacity_range()->set_limit_bytes(capacity.bytes()); - *request.add_volume_capabilities() = csi::v0::evolve(profileInfo.capability); - *request.mutable_parameters() = profileInfo.parameters; + *request.add_volume_capabilities() = csi::v0::evolve(capability); + *request.mutable_parameters() = parameters; - return call<csi::v0::CREATE_VOLUME>( - csi::CONTROLLER_SERVICE, std::move(request), true) // Retry. - .then(defer(self(), [=]( - const csi::v0::CreateVolumeResponse& response) -> string { - const csi::v0::Volume& volume = response.volume(); + // We retry the `CreateVolume` call for MESOS-9517. + return call<CREATE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true) + .then(process::defer(self(), [=]( + const CreateVolumeResponse& response) -> Future<VolumeInfo> { + const string& volumeId = response.volume().id(); - if (volumes.contains(volume.id())) { - // The resource provider failed over after the last `createVolume` call, - // but before the operation status was checkpointed. - CHECK_EQ(VolumeState::CREATED, volumes.at(volume.id()).state.state()); - } else { - VolumeState volumeState; - volumeState.set_state(VolumeState::CREATED); - *volumeState.mutable_volume_capability() = profileInfo.capability; - *volumeState.mutable_parameters() = profileInfo.parameters; - *volumeState.mutable_volume_attributes() = volume.attributes(); - - volumes.put(volume.id(), std::move(volumeState)); - checkpointVolumeState(volume.id()); + // NOTE: If the volume is already tracked, there might already be + // operations running in its sequence. Since this continuation runs + // outside the sequence, we fail the call here to avoid any race issue. + // This also means that this call is not idempotent. + if (volumes.contains(volumeId)) { + return Failure("Volume with name '" + name + "' already exists"); } - return volume.id(); + VolumeState volumeState; + volumeState.set_state(VolumeState::CREATED); + *volumeState.mutable_volume_capability() = capability; + *volumeState.mutable_parameters() = parameters; + *volumeState.mutable_volume_attributes() = response.volume().attributes(); + + volumes.put(volumeId, std::move(volumeState)); + checkpointVolumeState(volumeId); + + return VolumeInfo{capacity, volumeId, response.volume().attributes()}; })); } @@ -2011,145 +2020,162 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( Future<bool> StorageLocalResourceProviderProcess::deleteVolume( const string& volumeId) { - Future<Nothing> deleted = Nothing(); - - // If the volume has a checkpointed state, we transition it to `CREATED` state - // before deleting. Otherwise, we have to delete it directly because we have - // no idea what state this volume is in. - if (volumes.contains(volumeId)) { - VolumeData& volume = volumes.at(volumeId); - - if (volume.state.node_publish_required()) { - CHECK_EQ(VolumeState::PUBLISHED, volume.state.state()); - - const string targetPath = csi::paths::getMountTargetPath( - csi::paths::getMountRootDir( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name()), - volumeId); - - // NOTE: Normally the volume should have been cleaned up before - // `deleteVolume` is called. However this may not be true for - // preprovisioned volumes (e.g., leftover from a previous resource - // provider instance). To prevent data leakage in such cases, we clean up - // the data (but not the target path) here. - Try<Nothing> rmdir = os::rmdir(targetPath, true, false); - if (rmdir.isError()) { - return Failure( - "Failed to clean up volume '" + volumeId + "': " + rmdir.error()); - } + if (!volumes.contains(volumeId)) { + return __deleteVolume(volumeId); + } - volume.state.set_node_publish_required(false); - checkpointVolumeState(volumeId); + VolumeData& volume = volumes.at(volumeId); + + LOG(INFO) << "Deleting volume '" << volumeId << "' in " + << volume.state.state() << " state"; + + // Volume deletion is sequentialized with other operations on the same volume + // to avoid races. + return volume.sequence->add(std::function<Future<bool>()>( + process::defer(self(), &Self::_deleteVolume, volumeId))); +} + + +Future<bool> StorageLocalResourceProviderProcess::_deleteVolume( + const std::string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeState& volumeState = volumes.at(volumeId).state; + + if (volumeState.node_publish_required()) { + CHECK_EQ(VolumeState::PUBLISHED, volumeState.state()); + + const string targetPath = paths::getMountTargetPath( + paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()), + volumeId); + + // NOTE: Normally the volume should have been cleaned up. However this may + // not be true for preprovisioned volumes (e.g., leftover from a previous + // resource provider instance). To prevent data leakage in such cases, we + // clean up the data (but not the target path) here. + Try<Nothing> rmdir = os::rmdir(targetPath, true, false); + if (rmdir.isError()) { + return Failure( + "Failed to clean up volume '" + volumeId + "': " + rmdir.error()); } - deleted = _detachVolume(volumeId); + volumeState.set_node_publish_required(false); + checkpointVolumeState(volumeId); } - // We only delete the volume if the `CREATE_DELETE_VOLUME` capability is - // supported. Otherwise, we simply leave it as a preprovisioned volume. - if (controllerCapabilities->createDeleteVolume) { - deleted = deleted - .then(defer(self(), [this, volumeId] { - csi::v0::DeleteVolumeRequest request; - request.set_volume_id(volumeId); - - return call<csi::v0::DELETE_VOLUME>( - csi::CONTROLLER_SERVICE, std::move(request), true) // Retry. - .then([] { return Nothing(); }); - })); + if (volumeState.state() != VolumeState::CREATED) { + // Retry after transitioning the volume to `CREATED` state. + return _detachVolume(volumeId) + .then(process::defer(self(), &Self::_deleteVolume, volumeId)); } - // NOTE: The last asynchronous continuation of `deleteVolume`, which is - // supposed to be run in the volume's sequence if it exists, would cause the - // sequence to be destructed, which would in turn discard the returned future. - // However, since the continuation would have already been run, the returned - // future will become ready. - return deleted - .then(defer(self(), [this, volumeId] { - if (volumes.contains(volumeId)) { - volumes.erase(volumeId); + // NOTE: The last asynchronous continuation, which is supposed to be run in + // the volume's sequence, would cause the sequence to be destructed, which + // would in turn discard the returned future. However, since the continuation + // would have already been run, the returned future will become ready, making + // the future returned by the sequence ready as well. + return __deleteVolume(volumeId) + .then(process::defer(self(), [this, volumeId](bool deleted) { + volumes.erase(volumeId); - const string volumePath = csi::paths::getVolumePath( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name(), - volumeId); + const string volumePath = paths::getVolumePath( + rootDir, pluginInfo.type(), pluginInfo.name(), volumeId); - Try<Nothing> rmdir = os::rmdir(volumePath); - CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '" - << volumePath << "': " << rmdir.error(); + Try<Nothing> rmdir = os::rmdir(volumePath); + CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '" + << volumePath << "': " << rmdir.error(); - garbageCollectMountPath(volumeId); - } + garbageCollectMountPath(volumeId); - return controllerCapabilities->createDeleteVolume; + return deleted; })); } -Future<Nothing> StorageLocalResourceProviderProcess::validateVolume( - const string& volumeId, - const Option<Labels>& metadata, - const DiskProfileAdaptor::ProfileInfo& profileInfo) +Future<bool> StorageLocalResourceProviderProcess::__deleteVolume( + const string& volumeId) +{ + if (!controllerCapabilities->createDeleteVolume) { + return false; + } + + LOG(INFO) << "Calling '/csi.v0.Controller/DeleteVolume' for volume '" + << volumeId << "'"; + + DeleteVolumeRequest request; + request.set_volume_id(volumeId); + + // We retry the `DeleteVolume` call for MESOS-9517. + return call<DELETE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true) + .then([] { return true; }); +} + + +Future<Option<Error>> StorageLocalResourceProviderProcess::validateVolume( + const VolumeInfo& volumeInfo, + const types::VolumeCapability& capability, + const Map<string, string>& parameters) { - // If the volume has a checkpointed state, the validation succeeds only if the + // If the volume has been checkpointed, the validation succeeds only if the // capability and parameters of the specified profile are the same as those in // the checkpoint. - if (volumes.contains(volumeId)) { - const VolumeState& volumeState = volumes.at(volumeId).state; + if (volumes.contains(volumeInfo.id)) { + const VolumeState& volumeState = volumes.at(volumeInfo.id).state; - if (volumeState.volume_capability() != profileInfo.capability) { - return Failure("Invalid volume capability for volume '" + volumeId + "'"); + if (volumeState.volume_capability() != capability) { + return Some( + Error("Mismatched capability for volume '" + volumeInfo.id + "'")); } - if (volumeState.parameters() != profileInfo.parameters) { - return Failure("Invalid parameters for volume '" + volumeId + "'"); + if (volumeState.parameters() != parameters) { + return Some( + Error("Mismatched parameters for volume '" + volumeInfo.id + "'")); } - return Nothing(); + return None(); } - if (!pluginCapabilities->controllerService) { - return Failure( - "Plugin capability 'CONTROLLER_SERVICE' is not supported"); + if (!parameters.empty()) { + LOG(WARNING) + << "Validating volumes against parameters is not supported in CSI v0"; } - google::protobuf::Map<string, string> volumeAttributes; + LOG(INFO) << "Validating volume '" << volumeInfo.id << "'"; - if (metadata.isSome()) { - volumeAttributes = CHECK_NOTERROR(convertLabelsToStringMap(metadata.get())); - } + ValidateVolumeCapabilitiesRequest request; + request.set_volume_id(volumeInfo.id); + *request.add_volume_capabilities() = csi::v0::evolve(capability); + *request.mutable_volume_attributes() = volumeInfo.context; - // TODO(chhsiao): Validate the volume against the parameters of the profile - // once we get CSI v1. - csi::v0::ValidateVolumeCapabilitiesRequest request; - request.set_volume_id(volumeId); - *request.add_volume_capabilities() = csi::v0::evolve(profileInfo.capability); - *request.mutable_volume_attributes() = volumeAttributes; - - return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>( - csi::CONTROLLER_SERVICE, std::move(request)) - .then(defer(self(), [=]( - const csi::v0::ValidateVolumeCapabilitiesResponse& response) - -> Future<Nothing> { + return call<VALIDATE_VOLUME_CAPABILITIES>( + CONTROLLER_SERVICE, std::move(request)) + .then(process::defer(self(), [=]( + const ValidateVolumeCapabilitiesResponse& response) + -> Future<Option<Error>> { if (!response.supported()) { - return Failure( - "Unsupported volume capability for volume '" + volumeId + "': " + - response.message()); + return Error( + "Unsupported volume capability for volume '" + volumeInfo.id + + "': " + response.message()); + } + + // NOTE: If the volume is already tracked, there might already be + // operations running in its sequence. Since this continuation runs + // outside the sequence, we fail the call here to avoid any race issue. + // This also means that this call is not idempotent. + if (volumes.contains(volumeInfo.id)) { + return Failure("Volume '" + volumeInfo.id + "' already validated"); } VolumeState volumeState; volumeState.set_state(VolumeState::CREATED); - *volumeState.mutable_volume_capability() = profileInfo.capability; - *volumeState.mutable_parameters() = profileInfo.parameters; - *volumeState.mutable_volume_attributes() = volumeAttributes; + *volumeState.mutable_volume_capability() = capability; + *volumeState.mutable_parameters() = parameters; + *volumeState.mutable_volume_attributes() = volumeInfo.context; - volumes.put(volumeId, std::move(volumeState)); - checkpointVolumeState(volumeId); + volumes.put(volumeInfo.id, std::move(volumeState)); + checkpointVolumeState(volumeInfo.id); - return Nothing(); + return None(); })); } @@ -2471,32 +2497,45 @@ StorageLocalResourceProviderProcess::applyCreateDisk( // TODO(chhsiao): Consider calling `createVolume` sequentially with other // create or delete operations, and send an `UPDATE_STATE` for storage pools // afterward. See MESOS-9254. - Future<string> created = resource.disk().source().has_profile() - ? createVolume( - operationUuid.toString(), - Bytes(resource.scalar().value() * Bytes::MEGABYTES), - profileInfo) - : validateVolume( - resource.disk().source().id(), - resource.disk().source().has_metadata() - ? resource.disk().source().metadata() - : Option<Labels>::none(), - profileInfo) - .then([=]() -> string { return resource.disk().source().id(); }); + Future<VolumeInfo> created; + if (resource.disk().source().has_profile()) { + created = createVolume( + operationUuid.toString(), + resource.scalar().value() * Bytes::MEGABYTES, + profileInfo.capability, + profileInfo.parameters); + } else { + VolumeInfo volumeInfo = { + resource.scalar().value() * Bytes::MEGABYTES, + resource.disk().source().id(), + CHECK_NOTERROR(convertLabelsToStringMap( + resource.disk().source().metadata())) + }; - return created - .then(defer(self(), [=](const string& volumeId) { - CHECK(volumes.contains(volumeId)); - const VolumeState& volumeState = volumes.at(volumeId).state; + created = validateVolume( + volumeInfo, profileInfo.capability, profileInfo.parameters) + .then([resource, profile, volumeInfo]( + const Option<Error>& error) -> Future<VolumeInfo> { + if (error.isSome()) { + return Failure( + "Cannot apply profile '" + profile + "' to resource '" + + stringify(resource) + "': " + error->message); + } + + return volumeInfo; + }); + } + return created + .then(defer(self(), [=](const VolumeInfo& volumeInfo) { Resource converted = resource; - converted.mutable_disk()->mutable_source()->set_id(volumeId); + converted.mutable_disk()->mutable_source()->set_id(volumeInfo.id); converted.mutable_disk()->mutable_source()->set_type(targetType); converted.mutable_disk()->mutable_source()->set_profile(profile); - if (!volumeState.volume_attributes().empty()) { - converted.mutable_disk()->mutable_source()->mutable_metadata() - ->CopyFrom(convertStringMapToLabels(volumeState.volume_attributes())); + if (!volumeInfo.context.empty()) { + *converted.mutable_disk()->mutable_source()->mutable_metadata() = + convertStringMapToLabels(volumeInfo.context); } const string mountRootDir = csi::paths::getMountRootDir( @@ -2507,8 +2546,8 @@ StorageLocalResourceProviderProcess::applyCreateDisk( switch (targetType) { case Resource::DiskInfo::Source::MOUNT: { // Set the root path relative to agent work dir. - converted.mutable_disk()->mutable_source()->mutable_mount() - ->set_root(mountRootDir); + converted.mutable_disk()->mutable_source()->mutable_mount()->set_root( + mountRootDir); break; } @@ -2537,16 +2576,7 @@ StorageLocalResourceProviderProcess::applyDestroyDisk( CHECK(!Resources::isPersistentVolume(resource)); CHECK(resource.disk().source().has_id()); - const string& volumeId = resource.disk().source().id(); - - Future<bool> deleted = - volumes.contains(volumeId) - ? volumes.at(volumeId).sequence->add(std::function<Future<bool>()>( - defer(self(), &Self::deleteVolume, volumeId))) - : deleteVolume(volumeId); - - // Sequentialize the deletion with other operation on the same volume. - return deleted + return deleteVolume(resource.disk().source().id()) .then(defer(self(), [=](bool deprovisioned) { Resource converted = resource; converted.mutable_disk()->mutable_source()->set_type( diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp index 0b73531..b298a8e 100644 --- a/src/resource_provider/storage/provider_process.hpp +++ b/src/resource_provider/storage/provider_process.hpp @@ -56,6 +56,7 @@ #include "csi/service_manager.hpp" #include "csi/state.hpp" #include "csi/utils.hpp" +#include "csi/volume_manager.hpp" #include "status_update_manager/operation.hpp" @@ -229,26 +230,23 @@ private: // Transition a volume to `VOL_READY` state from any state below. process::Future<Nothing> __unpublishVolume(const std::string& volumeId); - // Returns a CSI volume ID. - // // NOTE: This can only be called after `prepareServices`. - process::Future<std::string> createVolume( + process::Future<csi::VolumeInfo> createVolume( const std::string& name, const Bytes& capacity, - const DiskProfileAdaptor::ProfileInfo& profileInfo); + const csi::types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters); - // Returns true if the volume has been deprovisioned. - // // NOTE: This can only be called after `prepareServices`. process::Future<bool> deleteVolume(const std::string& volumeId); + process::Future<bool> _deleteVolume(const std::string& volumeId); + process::Future<bool> __deleteVolume(const std::string& volumeId); - // Validates if a volume supports the capability of the specified profile. - // // NOTE: This can only be called after `prepareServices`. - process::Future<Nothing> validateVolume( - const std::string& volumeId, - const Option<Labels>& metadata, - const DiskProfileAdaptor::ProfileInfo& profileInfo); + process::Future<Option<Error>> validateVolume( + const csi::VolumeInfo& volumeInfo, + const csi::types::VolumeCapability& capability, + const google::protobuf::Map<std::string, std::string>& parameters); // NOTE: This can only be called after `prepareServices`. process::Future<Resources> listVolumes();
