Supporting pre-existing volumes with no profile in SLRP. The storage local resource provider will now report two catagories of RAW resources: resources that represents storage pools with profiles, and resources that represents pre-existing volumes with IDs but no profile. When applying `CREATE_VOLUME` or `CREATE_BLOCK` on pre-existing volumes, we issue a `ValicateVolumeCapabilities` CSI call with the default Mount or Block capabilities and convert the RAW resources into MOUNT, PATH or BLOCK resources. When applying `DESTROY_VOLUME` or `DESTROY_BLOCK` on these resources, they will be converted back to RAW resources with IDs, but the pre-existing volumes won't be deleted.
Review: https://reviews.apache.org/r/64583/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/673b421d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/673b421d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/673b421d Branch: refs/heads/master Commit: 673b421d414f2c8855ffb7ca47e0dc23e52af3ce Parents: dbd4596 Author: Chun-Hung Hsiao <[email protected]> Authored: Wed Dec 13 20:46:56 2017 -0800 Committer: Jie Yu <[email protected]> Committed: Wed Dec 13 20:46:56 2017 -0800 ---------------------------------------------------------------------- src/resource_provider/storage/provider.cpp | 431 ++++++++++++++---------- 1 file changed, 257 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/673b421d/src/resource_provider/storage/provider.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 03a12c7..e239317 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -351,22 +351,22 @@ private: Future<Nothing> prepareControllerService(); Future<Nothing> prepareNodeService(); - Future<Resources> importResources(); + Future<Resources> discoverResources(); Future<Nothing> controllerPublish(const string& volumeId); Future<Nothing> controllerUnpublish(const string& volumeId); Future<Nothing> nodePublish(const string& volumeId); Future<Nothing> nodeUnpublish(const string& volumeId); - - // Returns a CSI volume ID. Future<string> createVolume( const string& name, const Bytes& capacity, const ProfileData& profile); - Future<Nothing> deleteVolume(const string& volumeId); + Future<Nothing> deleteVolume(const string& volumeId, bool preExisting); + Future<string> validateCapability( + const string& volumeId, + const Option<Labels>& metadata, + const csi::VolumeCapability& capability); + Future<Resources> getCapacities(const hashmap<string, ProfileData>& profiles); - // Applies the offer operation. Conventional operations will be - // synchronously applied. Do nothing if the operation is already in a - // terminal state. Future<Nothing> _applyOfferOperation(const UUID& operationUuid); Future<vector<ResourceConversion>> applyCreateVolumeOrBlock( @@ -376,9 +376,6 @@ private: Future<vector<ResourceConversion>> applyDestroyVolumeOrBlock( const Resource& resource); - // Synchronously updates `totalResources` and the offer operation - // status and then asks the status update manager to send status - // updates. Try<Nothing> updateOfferOperationStatus( const UUID& operationUuid, const Try<vector<ResourceConversion>>& conversions); @@ -412,6 +409,8 @@ private: const bool strict; csi::Version csiVersion; + csi::VolumeCapability defaultMountCapability; + csi::VolumeCapability defaultBlockCapability; string bootId; hashmap<string, ProfileData> profiles; process::grpc::client::Runtime runtime; @@ -437,6 +436,9 @@ private: LinkedHashMap<UUID, OfferOperation> offerOperations; Resources totalResources; UUID resourceVersion; + + // We maintain the state of a CSI volume if and only if its + // corresponding resource is not RAW. hashmap<string, VolumeData> volumes; }; @@ -506,6 +508,14 @@ void StorageLocalResourceProviderProcess::initialize() csiVersion.set_minor(1); csiVersion.set_patch(0); + // Default mount and block capabilities for pre-existing volumes. + defaultMountCapability.mutable_mount(); + defaultMountCapability.mutable_access_mode() + ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER); + defaultBlockCapability.mutable_block(); + defaultBlockCapability.mutable_access_mode() + ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER); + Try<string> _bootId = os::bootId(); if (_bootId.isError()) { return fatal("Failed to get boot ID", _bootId.error()); @@ -537,10 +547,8 @@ void StorageLocalResourceProviderProcess::initialize() } } - // NOTE: The name of the default profile is an empty string, which is - // the default value for `Resource.disk.source.profile` when unset. // TODO(chhsiao): Use the volume profile module. - ProfileData& defaultProfile = profiles[""]; + ProfileData& defaultProfile = profiles["default"]; defaultProfile.capability.mutable_mount(); defaultProfile.capability.mutable_access_mode() ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER); @@ -999,17 +1007,17 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcileResourceProviderState() { return recoverStatusUpdates() - .then(defer(self(), &Self::importResources)) - .then(defer(self(), [=](Resources importedResources) { + .then(defer(self(), &Self::discoverResources)) + .then(defer(self(), [=](Resources discoveredResources) { // NODE: If a resource in the checkpointed total resources is - // missing in the imported resources, we will still keep it if it - // is converted by an offer operation before (i.e., has extra info - // other than the default reservations). The reason is that we - // want to maintain a consistent view with frameworks, and do not - // want to lose any data on persistent volumes due to some + // missing in the discovered resources, we will still keep it if + // it is converted by an offer operation before (i.e., has extra + // info other than the default reservations). The reason is that + // we want to maintain a consistent view with frameworks, and do + // not want to lose any data on persistent volumes due to some // temporarily CSI plugin faults. Other missing resources that are // "unconverted" by any framework will be removed from the total - // resources. Then, any new imported resource will be reported + // resources. Then, any newly discovered resource will be reported // under the default reservations. Resources result; @@ -1025,8 +1033,8 @@ StorageLocalResourceProviderProcess::reconcileResourceProviderState() ? resource.disk().source().id() : Option<string>::none(), resource.disk().source().has_metadata() ? resource.disk().source().metadata() : Option<Labels>::none()); - if (importedResources.contains(unconverted)) { - // The checkponited resource appears in the imported resources. + if (discoveredResources.contains(unconverted)) { + // The checkponited resource appears in the discovered resources. result += resource; unconvertedTotal += unconverted; } else if (!totalResources.contains(unconverted)) { @@ -1040,30 +1048,15 @@ StorageLocalResourceProviderProcess::reconcileResourceProviderState() } } - foreach (Resource resource, importedResources - unconvertedTotal) { - if (resource.disk().source().has_id() && - !volumes.contains(resource.disk().source().id())) { - csi::state::VolumeState volumeState; - volumeState.set_state(csi::state::VolumeState::CREATED); - - // The default profile is used if `profile` is unset. - volumeState.mutable_volume_capability()->CopyFrom( - profiles.at(resource.disk().source().profile()).capability); + // NOTE: The states of newly discovered pre-existing volumes will + // be added to `volumes` when `CREATE_VOLUME` or `CREATE_BLOCK` + // operations are applied. + const Resources newResources = discoveredResources - unconvertedTotal; + result += newResources; - if (resource.disk().source().has_metadata()) { - volumeState.mutable_volume_attributes()->swap( - convertLabelsToStringMap( - resource.disk().source().metadata()).get()); - } + LOG(INFO) << "Adding new resources '" << newResources << "'"; - volumes.put(resource.disk().source().id(), std::move(volumeState)); - checkpointVolumeState(resource.disk().source().id()); - } - - result += resource; - - LOG(INFO) << "Adding new resource '" << resource << "'"; - } + // TODO(chhsiao): Check that all profiles exist. if (result != totalResources) { totalResources = result; @@ -1754,17 +1747,18 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService() // Returns resources reported by the CSI plugin, which are unreserved // raw disk resources without any persistent volume. -Future<Resources> StorageLocalResourceProviderProcess::importResources() +Future<Resources> StorageLocalResourceProviderProcess::discoverResources() { // NOTE: This can only be called after `prepareControllerService` and // the resource provider ID has been obtained. CHECK_SOME(controllerCapabilities); CHECK(info.has_id()); - Future<Resources> preprovisioned; + list<Future<Resources>> futures; + futures.push_back(getCapacities(profiles)); if (controllerCapabilities->listVolumes) { - preprovisioned = getService(controllerContainerId) + futures.push_back(getService(controllerContainerId) .then(defer(self(), [=](csi::Client client) { // TODO(chhsiao): Set the max entries and use a loop to do // mutliple `ListVolumes` calls. @@ -1780,8 +1774,9 @@ Future<Resources> StorageLocalResourceProviderProcess::importResources() foreach (const Resource& resource, totalResources) { if (resource.disk().source().has_id() && resource.disk().source().has_profile()) { - volumesToProfiles[resource.disk().source().id()] = - resource.disk().source().profile(); + volumesToProfiles.put( + resource.disk().source().id(), + resource.disk().source().profile()); } } @@ -1801,78 +1796,12 @@ Future<Resources> StorageLocalResourceProviderProcess::importResources() return resources; })); - })); - } else { - preprovisioned = Resources(); + }))); } - return preprovisioned - .then(defer(self(), [=](const Resources& preprovisioned) { - list<Future<Resources>> futures; - - foreach (const Resource& resource, preprovisioned) { - futures.push_back(getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { - csi::ValidateVolumeCapabilitiesRequest request; - request.mutable_version()->CopyFrom(csiVersion); - request.set_volume_id(resource.disk().source().id()); - - // The default profile is used if `profile` is unset. - request.add_volume_capabilities()->CopyFrom( - profiles.at(resource.disk().source().profile()).capability); - - if (resource.disk().source().has_metadata()) { - request.mutable_volume_attributes()->swap( - convertLabelsToStringMap( - resource.disk().source().metadata()).get()); - } - - return client.ValidateVolumeCapabilities(request) - .then(defer(self(), [=]( - const csi::ValidateVolumeCapabilitiesResponse& response) - -> Future<Resources> { - if (!response.supported()) { - return Failure( - "Unsupported volume capability for resource " + - stringify(resource) + ": " + response.message()); - } - - return resource; - })); - }))); - } - - if (controllerCapabilities->getCapacity) { - foreachkey (const string& profile, profiles) { - futures.push_back(getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { - csi::GetCapacityRequest request; - request.mutable_version()->CopyFrom(csiVersion); - request.add_volume_capabilities() - ->CopyFrom(profiles.at(profile).capability); - *request.mutable_parameters() = profiles.at(profile).parameters; - - return client.GetCapacity(request) - .then(defer(self(), [=]( - const csi::GetCapacityResponse& response) - -> Future<Resources> { - if (response.available_capacity() == 0) { - return Resources(); - } - - return createRawDiskResource( - info, - response.available_capacity(), - profile.empty() ? Option<string>::none() : profile); - })); - }))); - } - } - - return collect(futures) - .then(defer(self(), [=](const list<Resources>& resources) { - return accumulate(resources.begin(), resources.end(), Resources()); - })); + return collect(futures) + .then(defer(self(), [=](const list<Resources>& resources) { + return accumulate(resources.begin(), resources.end(), Resources()); })); } @@ -2131,6 +2060,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish( } +// Returns a CSI volume ID. Future<string> StorageLocalResourceProviderProcess::createVolume( const string& name, const Bytes& capacity, @@ -2170,8 +2100,7 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( volumeState.set_state(csi::state::VolumeState::CREATED); volumeState.mutable_volume_capability() ->CopyFrom(profile.capability); - *volumeState.mutable_volume_attributes() = - volumeInfo.attributes(); + *volumeState.mutable_volume_attributes() = volumeInfo.attributes(); volumes.put(volumeInfo.id(), std::move(volumeState)); checkpointVolumeState(volumeInfo.id()); @@ -2184,14 +2113,17 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( - const string& volumeId) + const string& volumeId, + bool preExisting) { // NOTE: This can only be called after `prepareControllerService` and // `prepareNodeService` (since it may require `NodeUnpublishVolume`). CHECK_SOME(controllerCapabilities); CHECK_SOME(nodeId); - if (!controllerCapabilities->createDeleteVolume) { + // We do not need the capability for pre-existing volumes since no + // actual `DeleteVolume` call will be made. + if (!preExisting && !controllerCapabilities->createDeleteVolume) { return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported"); } @@ -2216,21 +2148,26 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( .then(defer(self(), &Self::controllerUnpublish, volumeId)); } case csi::state::VolumeState::CREATED: { + if (!preExisting) { + deleted = deleted + .then(defer(self(), &Self::getService, controllerContainerId)) + .then(defer(self(), [=](csi::Client client) { + csi::DeleteVolumeRequest request; + request.mutable_version()->CopyFrom(csiVersion); + request.set_volume_id(volumeId); + + return client.DeleteVolume(request) + .then([] { return Nothing(); }); + })); + } + deleted = deleted - .then(defer(self(), &Self::getService, controllerContainerId)) - .then(defer(self(), [=](csi::Client client) { - csi::DeleteVolumeRequest request; - request.mutable_version()->CopyFrom(csiVersion); - request.set_volume_id(volumeId); - - return client.DeleteVolume(request) - .then(defer(self(), [=] { - // NOTE: This will destruct the volume's sequence! - volumes.erase(volumeId); - CHECK_SOME(os::rmdir(volumePath)); - - return Nothing(); - })); + .then(defer(self(), [=] { + // NOTE: This will destruct the volume's sequence! + volumes.erase(volumeId); + CHECK_SOME(os::rmdir(volumePath)); + + return Nothing(); })); break; } @@ -2257,6 +2194,110 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( } +// Validates if a volume has the specified capability. This is called +// when applying `CREATE_VOLUME` or `CREATE_BLOCK` on a pre-existing +// volume, so we make it returns a volume ID, similar to `createVolume`. +Future<string> StorageLocalResourceProviderProcess::validateCapability( + const string& volumeId, + const Option<Labels>& metadata, + const csi::VolumeCapability& capability) +{ + return getService(controllerContainerId) + .then(defer(self(), [=](csi::Client client) { + google::protobuf::Map<string, string> volumeAttributes; + + if (metadata.isSome()) { + volumeAttributes.swap(convertLabelsToStringMap(metadata.get()).get()); + } + + csi::ValidateVolumeCapabilitiesRequest request; + request.mutable_version()->CopyFrom(csiVersion); + request.set_volume_id(volumeId); + request.add_volume_capabilities()->CopyFrom(capability); + *request.mutable_volume_attributes() = volumeAttributes; + + return client.ValidateVolumeCapabilities(request) + .then(defer(self(), [=]( + const csi::ValidateVolumeCapabilitiesResponse& response) + -> Future<string> { + if (!response.supported()) { + return Failure( + "Unsupported volume capability for volume '" + volumeId + + "': " + response.message()); + } + + if (volumes.contains(volumeId)) { + // The resource provider failed over after the last + // `ValidateVolumeCapability` call, but before the offer + // operation status was checkpointed. + CHECK_EQ(csi::state::VolumeState::CREATED, + volumes.at(volumeId).state.state()); + } else { + csi::state::VolumeState volumeState; + volumeState.set_state(csi::state::VolumeState::CREATED); + volumeState.mutable_volume_capability()->CopyFrom(capability); + *volumeState.mutable_volume_attributes() = volumeAttributes; + + volumes.put(volumeId, std::move(volumeState)); + checkpointVolumeState(volumeId); + } + + return volumeId; + })); + })); +} + + +// Returns RAW disk resources for specified profiles. +Future<Resources> StorageLocalResourceProviderProcess::getCapacities( + const hashmap<string, ProfileData>& profiles) +{ + // NOTE: This can only be called after `prepareControllerService` and + // the resource provider ID has been obtained. + CHECK_SOME(controllerCapabilities); + CHECK(info.has_id()); + + // We do not return a failure because this is always called when a + // profile is added or a `CreateVolume` CSI call is made. + if (!controllerCapabilities->getCapacity) { + return Resources(); + } + + return getService(controllerContainerId) + .then(defer(self(), [=](csi::Client client) { + list<Future<Resources>> futures; + + foreachpair (const string& profile, const ProfileData& data, profiles) { + csi::GetCapacityRequest request; + request.mutable_version()->CopyFrom(csiVersion); + request.add_volume_capabilities()->CopyFrom(data.capability); + *request.mutable_parameters() = data.parameters; + + futures.push_back(client.GetCapacity(request) + .then(defer(self(), [=]( + const csi::GetCapacityResponse& response) -> Resources { + if (response.available_capacity() == 0) { + return Resources(); + } + + return createRawDiskResource( + info, + response.available_capacity(), + profile); + }))); + } + + return collect(futures) + .then([](const list<Resources>& resources) { + return accumulate(resources.begin(), resources.end(), Resources()); + }); + })); +} + + +// Applies the offer operation. Conventional operations will be +// synchronously applied. Do nothing if the operation is already in a +// terminal state. Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation( const UUID& operationUuid) { @@ -2358,27 +2399,78 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock( { if (resource.disk().source().type() != Resource::DiskInfo::Source::RAW) { return Failure( - "Cannot create volume from source of " + + "Cannot create volume or block from source of " + stringify(resource.disk().source().type()) + " type"); } + // NOTE: Currently we only support two type of RAW disk resources: + // 1. RAW disk from `GetCapacity` with a profile but no volume ID. + // 2. RAW disk from `ListVolumes` for a pre-existing volume, which + // has a volume ID but no profile. + // + // For 1, we check if its profile is mount or block capable, then + // call `CreateVolume` with the operation UUID as the name (so that + // the same volume will be returned when recovering from a failover). + // For 2, we call `ValidateVolumeCapabilities` with a default mount or + // block capability. + CHECK_NE(resource.disk().source().has_profile(), + resource.disk().source().has_id()); + + Future<string> created; + switch (type) { case Resource::DiskInfo::Source::PATH: case Resource::DiskInfo::Source::MOUNT: { - if (!profiles.at(resource.disk().source().profile()) - .capability.has_mount()) { - return Failure( - "Profile '" + resource.disk().source().profile() + - "' cannot be used for CREATE_VOLUME operation"); + if (resource.disk().source().has_profile()) { + if (!profiles.at(resource.disk().source().profile()) + .capability.has_mount()) { + return Failure( + "Profile '" + resource.disk().source().profile() + + "' cannot be used for CREATE_VOLUME operation"); + } + + // TODO(chhsiao): Call `CreateVolume` sequentially with other + // create or delete operations, and send an `UPDATE_STATE` for + // RAW profiled resources afterward. + created = createVolume( + operationUuid.toString(), + resource.scalar().value(), + profiles.at(resource.disk().source().profile())); + } else { + // No need to call `ValidateVolumeCapabilities` sequentially + // since the volume is not used and thus not in `volumes` yet. + created = validateCapability( + resource.disk().source().id(), + resource.disk().source().has_metadata() + ? resource.disk().source().metadata() : Option<Labels>::none(), + defaultMountCapability); } break; } case Resource::DiskInfo::Source::BLOCK: { - if (!profiles.at(resource.disk().source().profile()) - .capability.has_block()) { - return Failure( - "Profile '" + resource.disk().source().profile() + - "' cannot be used for CREATE_BLOCK operation"); + if (resource.disk().source().has_profile()) { + if (!profiles.at(resource.disk().source().profile()) + .capability.has_block()) { + return Failure( + "Profile '" + resource.disk().source().profile() + + "' cannot be used for CREATE_BLOCK operation"); + } + + // TODO(chhsiao): Call `CreateVolume` sequentially with other + // create or delete operations, and send an `UPDATE_STATE` for + // RAW profiled resources afterward. + created = createVolume( + operationUuid.toString(), + resource.scalar().value(), + profiles.at(resource.disk().source().profile())); + } else { + // No need to call `ValidateVolumeCapabilities` sequentially + // since the volume is not used and thus not in `volumes` yet. + created = validateCapability( + resource.disk().source().id(), + resource.disk().source().has_metadata() + ? resource.disk().source().metadata() : Option<Labels>::none(), + defaultBlockCapability); } break; } @@ -2388,26 +2480,6 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock( } } - Future<string> created; - - if (resource.disk().source().has_id()) { - // Preprovisioned volumes with RAW type. - // TODO(chhsiao): Call `ValidateVolumeCapabilities` sequentially - // once we use the profile module and make profile optional. - CHECK(volumes.contains(resource.disk().source().id())); - created = resource.disk().source().id(); - } else { - // We use the operation UUID as the name of the volume, so the same - // offer operation will create the same volume after recovery. - // TODO(chhsiao): Call `CreateVolume` sequentially with other create - // or delete operations. - // TODO(chhsiao): Send `UPDATE_STATE` for RAW resources. - created = createVolume( - operationUuid.toString(), - resource.scalar().value(), - profiles.at(resource.disk().source().profile())); - } - return created .then(defer(self(), [=](const string& volumeId) { CHECK(volumes.contains(volumeId)); @@ -2471,7 +2543,7 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock( case Resource::DiskInfo::Source::UNKNOWN: case Resource::DiskInfo::Source::RAW: { return Failure( - "Cannot delete volume of " + + "Cannot destroy volume or block of " + stringify(resource.disk().source().type()) + " type"); break; } @@ -2481,18 +2553,27 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock( CHECK(volumes.contains(resource.disk().source().id())); // Sequentialize the deletion with other operation on the same volume. + // NOTE: A resource has no profile iff it is a pre-existing volume. return volumes.at(resource.disk().source().id()).sequence->add( - std::function<Future<Nothing>()>( - defer(self(), &Self::deleteVolume, resource.disk().source().id()))) + std::function<Future<Nothing>()>(defer( + self(), + &Self::deleteVolume, + resource.disk().source().id(), + !resource.disk().source().has_profile()))) .then(defer(self(), [=]() { Resource converted = resource; - converted.mutable_disk()->mutable_source()->clear_id(); - converted.mutable_disk()->mutable_source()->clear_metadata(); converted.mutable_disk()->mutable_source()->set_type( Resource::DiskInfo::Source::RAW); converted.mutable_disk()->mutable_source()->clear_path(); converted.mutable_disk()->mutable_source()->clear_mount(); + // NOTE: We keep the source ID and metadata if it is a + // pre-existing volume, which has no profile. + if (resource.disk().source().has_profile()) { + converted.mutable_disk()->mutable_source()->clear_id(); + converted.mutable_disk()->mutable_source()->clear_metadata(); + } + vector<ResourceConversion> conversions; conversions.emplace_back(resource, std::move(converted)); @@ -2501,6 +2582,8 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock( } +// Synchronously updates `totalResources` and the offer operation status +// and then asks the status update manager to send status updates. Try<Nothing> StorageLocalResourceProviderProcess::updateOfferOperationStatus( const UUID& operationUuid, const Try<vector<ResourceConversion>>& conversions)
