This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit f11de38a492669f38b8df006b25f504ae2bea4d3 Author: Chun-Hung Hsiao <chhs...@mesosphere.io> AuthorDate: Thu Nov 15 12:25:18 2018 -0800 Implemented the new `CREATE_DISK`/`DESTROY_DISK` semantics in SLRP. The default mount/block volume capabilities is removed from SLRP. Instead, `CREATE_DISK` will convert a preprovisioned RAW disk to a profile disk, and `DESTROY_DISK` will always deprovision a profile disk as long as the CSI plugin is capable of deprovisioning volumes. Review: https://reviews.apache.org/r/69361 --- src/csi/utils.hpp | 8 + src/resource_provider/storage/provider.cpp | 235 ++++++++++++----------------- 2 files changed, 103 insertions(+), 140 deletions(-) diff --git a/src/csi/utils.hpp b/src/csi/utils.hpp index 5ce318e..9145c67 100644 --- a/src/csi/utils.hpp +++ b/src/csi/utils.hpp @@ -45,6 +45,14 @@ bool operator==( bool operator==(const VolumeCapability& left, const VolumeCapability& right); +inline bool operator!=( + const VolumeCapability& left, + const VolumeCapability& right) +{ + return !(left == right); +} + + std::ostream& operator<<( std::ostream& stream, const ControllerServiceCapability::RPC::Type& type); diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index c137fa4..ebd5a88 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -402,7 +402,7 @@ private: const string& name, const Bytes& capacity, const DiskProfileAdaptor::ProfileInfo& profileInfo); - Future<Nothing> deleteVolume(const string& volumeId, bool preExisting); + Future<bool> deleteVolume(const string& volumeId); Future<string> validateCapability( const string& volumeId, const Option<Labels>& metadata, @@ -420,7 +420,8 @@ private: Future<vector<ResourceConversion>> applyCreateDisk( const Resource& resource, const id::UUID& operationUuid, - const Resource::DiskInfo::Source::Type& type); + const Resource::DiskInfo::Source::Type& targetType, + const Option<string>& targetProfile); Future<vector<ResourceConversion>> applyDestroyDisk( const Resource& resource); @@ -460,8 +461,6 @@ private: shared_ptr<DiskProfileAdaptor> diskProfileAdaptor; - csi::v0::VolumeCapability defaultMountCapability; - csi::v0::VolumeCapability defaultBlockCapability; string bootId; process::grpc::client::Runtime runtime; Owned<v1::resource_provider::Driver> driver; @@ -590,14 +589,6 @@ void StorageLocalResourceProviderProcess::received(const Event& event) void StorageLocalResourceProviderProcess::initialize() { - // Default mount and block capabilities for pre-existing volumes. - defaultMountCapability.mutable_mount(); - defaultMountCapability.mutable_access_mode() - ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER); - defaultBlockCapability.mutable_block(); - defaultBlockCapability.mutable_access_mode() - ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER); - Try<string> _bootId = os::bootId(); if (_bootId.isError()) { LOG(ERROR) << "Failed to get boot ID: " << _bootId.error(); @@ -2650,6 +2641,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish( // Returns a CSI volume ID. +// // NOTE: This can only be called after `prepareControllerService`. Future<string> StorageLocalResourceProviderProcess::createVolume( const string& name, @@ -2680,9 +2672,8 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( const csi::v0::Volume& volume = response.volume(); if (volumes.contains(volume.id())) { - // The resource provider failed over after the last - // `CreateVolume` call, but before the operation status was - // checkpointed. + // The resource provider failed over after the last `createVolume` + // call, but before the operation status was checkpointed. CHECK_EQ(VolumeState::CREATED, volumes.at(volume.id()).state.state()); } else { @@ -2702,19 +2693,13 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( } +// Returns true if the volume has been deprovisioned. +// // NOTE: This can only be called after `prepareControllerService` and // `prepareNodeService` (since it may require `NodeUnpublishVolume`). -Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( - const string& volumeId, - bool preExisting) +Future<bool> StorageLocalResourceProviderProcess::deleteVolume( + const string& volumeId) { - // We do not need the capability for pre-existing volumes since no - // actual `DeleteVolume` call will be made. - if (!preExisting && !controllerCapabilities.createDeleteVolume) { - return Failure( - "Controller capability 'CREATE_DELETE_VOLUME' is not supported"); - } - CHECK_SOME(controllerContainerId); const string volumePath = csi::paths::getVolumePath( @@ -2724,11 +2709,11 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( volumeId); if (!volumes.contains(volumeId)) { - // The resource provider failed over after the last `DeleteVolume` - // call, but before the operation status was checkpointed. + // The resource provider failed over after the last `deleteVolume` call, but + // before the operation status was checkpointed. CHECK(!os::exists(volumePath)); - return Nothing(); + return controllerCapabilities.createDeleteVolume; } const VolumeData& volume = volumes.at(volumeId); @@ -2766,7 +2751,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( // state once the above is done. } case VolumeState::CREATED: { - if (!preExisting) { + // We only delete the volume if the `CREATE_DELETE_VOLUME` capability is + // supported. Otherwise, we simply leave it as a preprovisioned volume. + if (controllerCapabilities.createDeleteVolume) { deleted = deleted .then(defer(self(), &Self::getService, controllerContainerId.get())) .then(defer(self(), [this, volumeId](csi::v0::Client client) { @@ -2804,7 +2791,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( volumes.erase(volumeId); CHECK_SOME(os::rmdir(volumePath)); - return Nothing(); + return controllerCapabilities.createDeleteVolume; })); } @@ -2994,7 +2981,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation( conversions = applyCreateDisk( operation.info().create_disk().source(), operationUuid, - operation.info().create_disk().target_type()); + operation.info().create_disk().target_type(), + operation.info().create_disk().has_target_profile() + ? operation.info().create_disk().target_profile() + : Option<string>::none()); break; } @@ -3098,120 +3088,50 @@ Future<vector<ResourceConversion>> StorageLocalResourceProviderProcess::applyCreateDisk( const Resource& resource, const id::UUID& operationUuid, - const Resource::DiskInfo::Source::Type& type) + const Resource::DiskInfo::Source::Type& targetType, + const Option<string>& targetProfile) { CHECK_EQ(Resource::DiskInfo::Source::RAW, resource.disk().source().type()); - // NOTE: Currently we only support two type of RAW disk resources: + // NOTE: Currently we only support two types of RAW disk resources: // 1. RAW disk from `GetCapacity` with a profile but no volume ID. - // 2. RAW disk from `ListVolumes` for a pre-existing volume, which - // has a volume ID but no profile. + // 2. RAW disk from `ListVolumes` for a preprovisioned volume, which has a + // volume ID but no profile. // // For 1, we check if its profile is mount or block capable, then - // call `CreateVolume` with the operation UUID as the name (so that + // call `createVolume` with the operation UUID as the name (so that // the same volume will be returned when recovering from a failover). // - // For 2, there are two scenarios: + // For 2, the target profile will be specified, so we first check if the + // profile is mount or block capable. Then, there are two scenarios: // a. If the volume has a checkpointed state (because it was created - // by a previous resource provider), we simply check if its - // checkpointed capability supports the conversion. + // by a previous resource provider), we simply check if its checkpointed + // capability and parameters match the profile. // b. If the volume is newly discovered, `ValidateVolumeCapabilities` - // is called with a default mount or block capability. + // is called with the capability of the profile. CHECK_NE(resource.disk().source().has_profile(), - resource.disk().source().has_id()); + resource.disk().source().has_id() && targetProfile.isSome()); - Future<string> created; - - switch (type) { - case Resource::DiskInfo::Source::MOUNT: { - if (resource.disk().source().has_profile()) { - // The profile exists since any operation with a stale profile must have - // been dropped for a mismatched resource version or a reconciliation. - CHECK(profileInfos.contains(resource.disk().source().profile())) - << "Profile '" << resource.disk().source().profile() << "' not found"; + const string profile = + targetProfile.getOrElse(resource.disk().source().profile()); - const DiskProfileAdaptor::ProfileInfo& profileInfo = - profileInfos.at(resource.disk().source().profile()); - - if (!profileInfo.capability.has_mount()) { - return Failure( - "Profile '" + resource.disk().source().profile() + - "' cannot be used to create a MOUNT disk"); - } - - // TODO(chhsiao): Call `CreateVolume` sequentially with other - // create or delete operations, and send an `UPDATE_STATE` for - // RAW profiled resources afterward. - created = createVolume( - operationUuid.toString(), - Bytes(resource.scalar().value() * Bytes::MEGABYTES), - profileInfo); - } else { - const string& volumeId = resource.disk().source().id(); - - if (volumes.contains(volumeId)) { - if (!volumes.at(volumeId).state.volume_capability().has_mount()) { - return Failure( - "Volume '" + volumeId + - "' cannot be converted to a MOUNT disk"); - } + if (!profileInfos.contains(profile)) { + return Failure("Profile '" + profile + "' not found"); + } - created = volumeId; - } else { - // No need to call `ValidateVolumeCapabilities` sequentially - // since the volume is not used and thus not in `volumes` yet. - created = validateCapability( - volumeId, - resource.disk().source().has_metadata() - ? resource.disk().source().metadata() : Option<Labels>::none(), - defaultMountCapability); - } + const DiskProfileAdaptor::ProfileInfo& profileInfo = profileInfos.at(profile); + switch (targetType) { + case Resource::DiskInfo::Source::MOUNT: { + if (!profileInfo.capability.has_mount()) { + return Failure( + "Profile '" + profile + "' cannot be used to create a MOUNT disk"); } break; } case Resource::DiskInfo::Source::BLOCK: { - if (resource.disk().source().has_profile()) { - // The profile exists since any operation with a stale profile must have - // been dropped for a mismatched resource version or a reconciliation. - CHECK(profileInfos.contains(resource.disk().source().profile())) - << "Profile '" << resource.disk().source().profile() << "' not found"; - - const DiskProfileAdaptor::ProfileInfo& profileInfo = - profileInfos.at(resource.disk().source().profile()); - - if (!profileInfo.capability.has_block()) { - return Failure( - "Profile '" + resource.disk().source().profile() + - "' cannot be used to create a BLOCK disk"); - } - - // TODO(chhsiao): Call `CreateVolume` sequentially with other - // create or delete operations, and send an `UPDATE_STATE` for - // RAW profiled resources afterward. - created = createVolume( - operationUuid.toString(), - Bytes(resource.scalar().value() * Bytes::MEGABYTES), - profileInfo); - } else { - const string& volumeId = resource.disk().source().id(); - - if (volumes.contains(volumeId)) { - if (!volumes.at(volumeId).state.volume_capability().has_block()) { - return Failure( - "Volume '" + volumeId + - "' cannot be converted to a BLOCK disk"); - } - - created = volumeId; - } else { - // No need to call `ValidateVolumeCapabilities` sequentially - // since the volume is not used and thus not in `volumes` yet. - created = validateCapability( - volumeId, - resource.disk().source().has_metadata() - ? resource.disk().source().metadata() : Option<Labels>::none(), - defaultBlockCapability); - } + if (!profileInfo.capability.has_block()) { + return Failure( + "Profile '" + profile + "' cannot be used to create a BLOCK disk"); } break; } @@ -3222,6 +3142,40 @@ StorageLocalResourceProviderProcess::applyCreateDisk( } } + Future<string> created; + if (resource.disk().source().has_id()) { + const string& volumeId = resource.disk().source().id(); + + if (volumes.contains(volumeId)) { + const VolumeState& volumeState = volumes.at(volumeId).state; + + // TODO(chhsiao): Validate the volume against the parameters of the + // profile once they are checkpointed. + if (volumeState.volume_capability() != profileInfo.capability) { + return Failure( + "Profile '" + profile + "' cannot be applied to volume '" + + volumeId + "'"); + } + + created = volumeId; + } else { + created = validateCapability( + volumeId, + resource.disk().source().has_metadata() + ? resource.disk().source().metadata() + : Option<Labels>::none(), + profileInfo.capability); + } + } else { + // TODO(chhsiao): Consider calling `CreateVolume` sequentially with other + // create or delete operations, and send an `UPDATE_STATE` for storage pools + // afterward. See MESOS-9254. + created = createVolume( + operationUuid.toString(), + Bytes(resource.scalar().value() * Bytes::MEGABYTES), + profileInfo); + } + return created .then(defer(self(), [=](const string& volumeId) { CHECK(volumes.contains(volumeId)); @@ -3229,7 +3183,8 @@ StorageLocalResourceProviderProcess::applyCreateDisk( Resource converted = resource; converted.mutable_disk()->mutable_source()->set_id(volumeId); - converted.mutable_disk()->mutable_source()->set_type(type); + converted.mutable_disk()->mutable_source()->set_type(targetType); + converted.mutable_disk()->mutable_source()->set_profile(profile); if (!volumeState.volume_attributes().empty()) { converted.mutable_disk()->mutable_source()->mutable_metadata() @@ -3241,7 +3196,7 @@ StorageLocalResourceProviderProcess::applyCreateDisk( info.storage().plugin().type(), info.storage().plugin().name()); - switch (type) { + switch (targetType) { case Resource::DiskInfo::Source::MOUNT: { // Set the root path relative to agent work dir. converted.mutable_disk()->mutable_source()->mutable_mount() @@ -3274,24 +3229,22 @@ StorageLocalResourceProviderProcess::applyDestroyDisk( CHECK(resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT || resource.disk().source().type() == Resource::DiskInfo::Source::BLOCK); CHECK(resource.disk().source().has_id()); - CHECK(volumes.contains(resource.disk().source().id())); + + const string& volumeId = resource.disk().source().id(); + CHECK(volumes.contains(volumeId)); // Sequentialize the deletion with other operation on the same volume. - return volumes.at(resource.disk().source().id()).sequence->add( - std::function<Future<Nothing>()>(defer( - self(), - &Self::deleteVolume, - resource.disk().source().id(), - !resource.disk().source().has_profile()))) - .then(defer(self(), [=]() { + return volumes.at(volumeId).sequence->add(std::function<Future<bool>()>( + defer(self(), &Self::deleteVolume, volumeId))) + .then(defer(self(), [=](bool deprovisioned) { Resource converted = resource; converted.mutable_disk()->mutable_source()->set_type( Resource::DiskInfo::Source::RAW); converted.mutable_disk()->mutable_source()->clear_mount(); - // We only clear the volume ID and metadata if the destroyed volume is not - // a pre-existing volume. - if (resource.disk().source().has_profile()) { + // We clear the volume ID and metadata if the volume has been + // deprovisioned. Otherwise, we clear the profile. + if (deprovisioned) { converted.mutable_disk()->mutable_source()->clear_id(); converted.mutable_disk()->mutable_source()->clear_metadata(); @@ -3321,6 +3274,8 @@ StorageLocalResourceProviderProcess::applyDestroyDisk( defer(self(), &Self::reconcileStoragePools))); } } + } else { + converted.mutable_disk()->mutable_source()->clear_profile(); } vector<ResourceConversion> conversions;