This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch 1.7.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 8cbde799632ad6790019db4262a57fae93bef7c5 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Tue Mar 26 21:02:04 2019 -0700 Supported destroying preprovisioned CSI volumes in SLRP. SLRP now accepts `DESTROY_DISK` on `RAW` disk resources with source IDs. If the backed CSI plugin does have the `CREATE_DELETE_VOLUME` controller capability, this operation will be a no-op; otherwise the underlying CSI volume will be deprovisioned. Review: https://reviews.apache.org/r/70314/ --- src/resource_provider/storage/provider.cpp | 215 +++++++++++++++++------------ 1 file changed, 123 insertions(+), 92 deletions(-) diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 687976d..36b6fc0 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -2637,107 +2637,122 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( Future<bool> StorageLocalResourceProviderProcess::deleteVolume( const string& volumeId) { - const string volumePath = csi::paths::getVolumePath( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name(), - volumeId); + Future<Nothing> deleted = Nothing(); - if (!volumes.contains(volumeId)) { - // The resource provider failed over after the last `deleteVolume` call, but - // before the operation status was checkpointed. - CHECK(!os::exists(volumePath)); + // If the volume has a checkpointed state, we transition it to `CREATED` state + // before deleting. Otherwise, we have to delete it directly because we have + // no idea what state this volume is in. + if (volumes.contains(volumeId)) { + VolumeData& volume = volumes.at(volumeId); - return controllerCapabilities.createDeleteVolume; - } + if (volume.state.node_publish_required()) { + CHECK_EQ(VolumeState::PUBLISHED, volume.state.state()); - VolumeData& volume = volumes.at(volumeId); + const string targetPath = csi::paths::getMountTargetPath( + csi::paths::getMountRootDir( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name()), + volumeId); + + // NOTE: Normally the volume should have been cleaned up before + // `deleteVolume` is called. However this may not be true for + // preprovisioned volumes (e.g., leftover from a previous resource + // provider instance). To prevent data leakage in such cases, we clean up + // the data (but not the target path) here. + Try<Nothing> rmdir = os::rmdir(targetPath, true, false); + if (rmdir.isError()) { + return Failure( + "Failed to clean up volume '" + volumeId + "': " + rmdir.error()); + } - // NOTE: The volume must have been cleaned up before the `deleteVolume` call - // is made, so it is no longer required to publish the volume. - volume.state.set_node_publish_required(false); - checkpointVolumeState(volumeId); + volume.state.set_node_publish_required(false); + checkpointVolumeState(volumeId); + } - Future<Nothing> deleted = Nothing(); + CHECK(VolumeState::State_IsValid(volume.state.state())); - CHECK(VolumeState::State_IsValid(volume.state.state())); + switch (volume.state.state()) { + case VolumeState::PUBLISHED: + case VolumeState::NODE_PUBLISH: + case VolumeState::NODE_UNPUBLISH: { + deleted = deleted.then(defer(self(), &Self::nodeUnpublish, volumeId)); - switch (volume.state.state()) { - case VolumeState::PUBLISHED: - case VolumeState::NODE_PUBLISH: - case VolumeState::NODE_UNPUBLISH: { - deleted = deleted - .then(defer(self(), &Self::nodeUnpublish, volumeId)); + // NOTE: We continue to the next case to delete the volume in + // `VOL_READY` state once the above is done. + } + case VolumeState::VOL_READY: + case VolumeState::NODE_STAGE: + case VolumeState::NODE_UNSTAGE: { + deleted = deleted.then(defer(self(), &Self::nodeUnstage, volumeId)); - // NOTE: We continue to the next case to delete the volume in `VOL_READY` - // state once the above is done. - } - case VolumeState::VOL_READY: - case VolumeState::NODE_STAGE: - case VolumeState::NODE_UNSTAGE: { - deleted = deleted - .then(defer(self(), &Self::nodeUnstage, volumeId)); - - // NOTE: We continue to the next case to delete the volume in `NODE_READY` - // state once the above is done. - } - case VolumeState::NODE_READY: - case VolumeState::CONTROLLER_PUBLISH: - case VolumeState::CONTROLLER_UNPUBLISH: { - deleted = deleted - .then(defer(self(), &Self::controllerUnpublish, volumeId)); - - // NOTE: We continue to the next case to delete the volume in `CREATED` - // state once the above is done. - } - case VolumeState::CREATED: { - // We only delete the volume if the `CREATE_DELETE_VOLUME` capability is - // supported. Otherwise, we simply leave it as a preprovisioned volume. - if (controllerCapabilities.createDeleteVolume) { - deleted = deleted - .then(defer(self(), [this, volumeId] { - csi::v0::DeleteVolumeRequest request; - request.set_volume_id(volumeId); - - CHECK_SOME(controllerContainerId); - - return call<csi::v0::DELETE_VOLUME>( - controllerContainerId.get(), std::move(request), true) // Retry. - .then([] { return Nothing(); }); - })); + // NOTE: We continue to the next case to delete the volume in + // `NODE_READY` state once the above is done. + } + case VolumeState::NODE_READY: + case VolumeState::CONTROLLER_PUBLISH: + case VolumeState::CONTROLLER_UNPUBLISH: { + deleted = + deleted.then(defer(self(), &Self::controllerUnpublish, volumeId)); + + // NOTE: We continue to the next case to delete the volume in `CREATED` + // state once the above is done. + } + case VolumeState::CREATED: { + break; + } + case VolumeState::UNKNOWN: { + UNREACHABLE(); } - break; - } - case VolumeState::UNKNOWN: { - UNREACHABLE(); + // NOTE: We avoid using a default clause for the following values in + // proto3's open enum to enable the compiler to detect missing enum cases + // for us. See: + // https://github.com/google/protobuf/issues/3917 + case google::protobuf::kint32min: + case google::protobuf::kint32max: { + UNREACHABLE(); + } } + } - // NOTE: We avoid using a default clause for the following values in - // proto3's open enum to enable the compiler to detect missing enum cases - // for us. See: - // https://github.com/google/protobuf/issues/3917 - case google::protobuf::kint32min: - case google::protobuf::kint32max: { - UNREACHABLE(); - } + // We only delete the volume if the `CREATE_DELETE_VOLUME` capability is + // supported. Otherwise, we simply leave it as a preprovisioned volume. + if (controllerCapabilities.createDeleteVolume) { + deleted = deleted.then(defer(self(), [this, volumeId] { + csi::v0::DeleteVolumeRequest request; + request.set_volume_id(volumeId); + + CHECK_SOME(controllerContainerId); + + return call<csi::v0::DELETE_VOLUME>( + controllerContainerId.get(), std::move(request), true) // Retry. + .then([] { return Nothing(); }); + })); } // NOTE: The last asynchronous continuation of `deleteVolume`, which is - // supposed to be run in the volume's sequence, would cause the sequence to be - // destructed, which would in turn discard the returned future. However, since - // the continuation would have already been run, the returned future will - // become ready, making the future returned by the sequence ready as well. + // supposed to be run in the volume's sequence if it exists, would cause the + // sequence to be destructed, which would in turn discard the returned future. + // However, since the continuation would have already been run, the returned + // future will become ready. return deleted - .then(defer(self(), [this, volumeId, volumePath] { - volumes.erase(volumeId); + .then(defer(self(), [this, volumeId] { + if (volumes.contains(volumeId)) { + volumes.erase(volumeId); - Try<Nothing> rmdir = os::rmdir(volumePath); - CHECK_SOME(rmdir) - << "Failed to remove checkpointed volume state at '" << volumePath - << "': " << rmdir.error(); + const string volumePath = csi::paths::getVolumePath( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name(), + volumeId); - garbageCollectMountPath(volumeId); + Try<Nothing> rmdir = os::rmdir(volumePath); + CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '" + << volumePath << "': " << rmdir.error(); + + garbageCollectMountPath(volumeId); + } return controllerCapabilities.createDeleteVolume; })); @@ -3195,21 +3210,37 @@ StorageLocalResourceProviderProcess::applyDestroyDisk( const Resource& resource) { CHECK(!Resources::isPersistentVolume(resource)); - CHECK(resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT || - resource.disk().source().type() == Resource::DiskInfo::Source::BLOCK); CHECK(resource.disk().source().has_id()); const string& volumeId = resource.disk().source().id(); - CHECK(volumes.contains(volumeId)); + + Future<bool> deleted = + volumes.contains(volumeId) + ? volumes.at(volumeId).sequence->add(std::function<Future<bool>()>( + defer(self(), &Self::deleteVolume, volumeId))) + : deleteVolume(volumeId); // Sequentialize the deletion with other operation on the same volume. - return volumes.at(volumeId).sequence->add(std::function<Future<bool>()>( - defer(self(), &Self::deleteVolume, volumeId))) + return deleted .then(defer(self(), [=](bool deprovisioned) { Resource converted = resource; converted.mutable_disk()->mutable_source()->set_type( Resource::DiskInfo::Source::RAW); - converted.mutable_disk()->mutable_source()->clear_mount(); + + switch (resource.disk().source().type()) { + case Resource::DiskInfo::Source::MOUNT: { + converted.mutable_disk()->mutable_source()->clear_mount(); + break; + } + case Resource::DiskInfo::Source::BLOCK: + case Resource::DiskInfo::Source::RAW: { + break; + } + case Resource::DiskInfo::Source::UNKNOWN: + case Resource::DiskInfo::Source::PATH: { + UNREACHABLE(); // Should have been validated by the master. + } + } // We clear the volume ID and metadata if the volume has been // deprovisioned. Otherwise, we clear the profile. @@ -3217,7 +3248,8 @@ StorageLocalResourceProviderProcess::applyDestroyDisk( converted.mutable_disk()->mutable_source()->clear_id(); converted.mutable_disk()->mutable_source()->clear_metadata(); - if (!profileInfos.contains(resource.disk().source().profile())) { + if (!resource.disk().source().has_profile() || + !profileInfos.contains(resource.disk().source().profile())) { // The destroyed volume is converted into an empty resource to prevent // the freed disk from being sent out with a disappeared profile. converted.mutable_scalar()->set_value(0); @@ -3232,8 +3264,7 @@ StorageLocalResourceProviderProcess::applyDestroyDisk( LOG(INFO) << "Reconciling storage pools for resource provider " << info.id() - << " after the disk with profile '" - << resource.disk().source().profile() << "' has been freed"; + << " after resource '" << resource << "' has been freed"; // Reconcile the storage pools in `sequence` to wait for any other // pending operation that disallow reconciliation to finish, and set
