This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 8cbde799632ad6790019db4262a57fae93bef7c5
Author: Chun-Hung Hsiao <[email protected]>
AuthorDate: Tue Mar 26 21:02:04 2019 -0700

    Supported destroying preprovisioned CSI volumes in SLRP.
    
    SLRP now accepts `DESTROY_DISK` on `RAW` disk resources with source IDs.
    If the backed CSI plugin does have the `CREATE_DELETE_VOLUME` controller
    capability, this operation will be a no-op; otherwise the underlying CSI
    volume will be deprovisioned.
    
    Review: https://reviews.apache.org/r/70314/
---
 src/resource_provider/storage/provider.cpp | 215 +++++++++++++++++------------
 1 file changed, 123 insertions(+), 92 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index 687976d..36b6fc0 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -2637,107 +2637,122 @@ Future<string> 
StorageLocalResourceProviderProcess::createVolume(
 Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
     const string& volumeId)
 {
-  const string volumePath = csi::paths::getVolumePath(
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin().type(),
-      info.storage().plugin().name(),
-      volumeId);
+  Future<Nothing> deleted = Nothing();
 
-  if (!volumes.contains(volumeId)) {
-    // The resource provider failed over after the last `deleteVolume` call, 
but
-    // before the operation status was checkpointed.
-    CHECK(!os::exists(volumePath));
+  // If the volume has a checkpointed state, we transition it to `CREATED` 
state
+  // before deleting. Otherwise, we have to delete it directly because we have
+  // no idea what state this volume is in.
+  if (volumes.contains(volumeId)) {
+    VolumeData& volume = volumes.at(volumeId);
 
-    return controllerCapabilities.createDeleteVolume;
-  }
+    if (volume.state.node_publish_required()) {
+      CHECK_EQ(VolumeState::PUBLISHED, volume.state.state());
 
-  VolumeData& volume = volumes.at(volumeId);
+      const string targetPath = csi::paths::getMountTargetPath(
+          csi::paths::getMountRootDir(
+              slave::paths::getCsiRootDir(workDir),
+              info.storage().plugin().type(),
+              info.storage().plugin().name()),
+          volumeId);
+
+      // NOTE: Normally the volume should have been cleaned up before
+      // `deleteVolume` is called. However this may not be true for
+      // preprovisioned volumes (e.g., leftover from a previous resource
+      // provider instance). To prevent data leakage in such cases, we clean up
+      // the data (but not the target path) here.
+      Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
+      if (rmdir.isError()) {
+        return Failure(
+            "Failed to clean up volume '" + volumeId + "': " + rmdir.error());
+      }
 
-  // NOTE: The volume must have been cleaned up before the `deleteVolume` call
-  // is made, so it is no longer required to publish the volume.
-  volume.state.set_node_publish_required(false);
-  checkpointVolumeState(volumeId);
+      volume.state.set_node_publish_required(false);
+      checkpointVolumeState(volumeId);
+    }
 
-  Future<Nothing> deleted = Nothing();
+    CHECK(VolumeState::State_IsValid(volume.state.state()));
 
-  CHECK(VolumeState::State_IsValid(volume.state.state()));
+    switch (volume.state.state()) {
+      case VolumeState::PUBLISHED:
+      case VolumeState::NODE_PUBLISH:
+      case VolumeState::NODE_UNPUBLISH: {
+        deleted = deleted.then(defer(self(), &Self::nodeUnpublish, volumeId));
 
-  switch (volume.state.state()) {
-    case VolumeState::PUBLISHED:
-    case VolumeState::NODE_PUBLISH:
-    case VolumeState::NODE_UNPUBLISH: {
-      deleted = deleted
-        .then(defer(self(), &Self::nodeUnpublish, volumeId));
+        // NOTE: We continue to the next case to delete the volume in
+        // `VOL_READY` state once the above is done.
+      }
+      case VolumeState::VOL_READY:
+      case VolumeState::NODE_STAGE:
+      case VolumeState::NODE_UNSTAGE: {
+        deleted = deleted.then(defer(self(), &Self::nodeUnstage, volumeId));
 
-      // NOTE: We continue to the next case to delete the volume in `VOL_READY`
-      // state once the above is done.
-    }
-    case VolumeState::VOL_READY:
-    case VolumeState::NODE_STAGE:
-    case VolumeState::NODE_UNSTAGE: {
-      deleted = deleted
-        .then(defer(self(), &Self::nodeUnstage, volumeId));
-
-      // NOTE: We continue to the next case to delete the volume in 
`NODE_READY`
-      // state once the above is done.
-    }
-    case VolumeState::NODE_READY:
-    case VolumeState::CONTROLLER_PUBLISH:
-    case VolumeState::CONTROLLER_UNPUBLISH: {
-      deleted = deleted
-        .then(defer(self(), &Self::controllerUnpublish, volumeId));
-
-      // NOTE: We continue to the next case to delete the volume in `CREATED`
-      // state once the above is done.
-    }
-    case VolumeState::CREATED: {
-      // We only delete the volume if the `CREATE_DELETE_VOLUME` capability is
-      // supported. Otherwise, we simply leave it as a preprovisioned volume.
-      if (controllerCapabilities.createDeleteVolume) {
-        deleted = deleted
-          .then(defer(self(), [this, volumeId] {
-            csi::v0::DeleteVolumeRequest request;
-            request.set_volume_id(volumeId);
-
-            CHECK_SOME(controllerContainerId);
-
-            return call<csi::v0::DELETE_VOLUME>(
-                controllerContainerId.get(), std::move(request), true) // 
Retry.
-              .then([] { return Nothing(); });
-          }));
+        // NOTE: We continue to the next case to delete the volume in
+        // `NODE_READY` state once the above is done.
+      }
+      case VolumeState::NODE_READY:
+      case VolumeState::CONTROLLER_PUBLISH:
+      case VolumeState::CONTROLLER_UNPUBLISH: {
+        deleted =
+          deleted.then(defer(self(), &Self::controllerUnpublish, volumeId));
+
+        // NOTE: We continue to the next case to delete the volume in `CREATED`
+        // state once the above is done.
+      }
+      case VolumeState::CREATED: {
+        break;
+      }
+      case VolumeState::UNKNOWN: {
+        UNREACHABLE();
       }
 
-      break;
-    }
-    case VolumeState::UNKNOWN: {
-      UNREACHABLE();
+      // NOTE: We avoid using a default clause for the following values in
+      // proto3's open enum to enable the compiler to detect missing enum cases
+      // for us. See:
+      // https://github.com/google/protobuf/issues/3917
+      case google::protobuf::kint32min:
+      case google::protobuf::kint32max: {
+        UNREACHABLE();
+      }
     }
+  }
 
-    // NOTE: We avoid using a default clause for the following values in
-    // proto3's open enum to enable the compiler to detect missing enum cases
-    // for us. See:
-    // https://github.com/google/protobuf/issues/3917
-    case google::protobuf::kint32min:
-    case google::protobuf::kint32max: {
-      UNREACHABLE();
-    }
+  // We only delete the volume if the `CREATE_DELETE_VOLUME` capability is
+  // supported. Otherwise, we simply leave it as a preprovisioned volume.
+  if (controllerCapabilities.createDeleteVolume) {
+    deleted = deleted.then(defer(self(), [this, volumeId] {
+      csi::v0::DeleteVolumeRequest request;
+      request.set_volume_id(volumeId);
+
+      CHECK_SOME(controllerContainerId);
+
+      return call<csi::v0::DELETE_VOLUME>(
+          controllerContainerId.get(), std::move(request), true) // Retry.
+        .then([] { return Nothing(); });
+    }));
   }
 
   // NOTE: The last asynchronous continuation of `deleteVolume`, which is
-  // supposed to be run in the volume's sequence, would cause the sequence to 
be
-  // destructed, which would in turn discard the returned future. However, 
since
-  // the continuation would have already been run, the returned future will
-  // become ready, making the future returned by the sequence ready as well.
+  // supposed to be run in the volume's sequence if it exists, would cause the
+  // sequence to be destructed, which would in turn discard the returned 
future.
+  // However, since the continuation would have already been run, the returned
+  // future will become ready.
   return deleted
-    .then(defer(self(), [this, volumeId, volumePath] {
-      volumes.erase(volumeId);
+    .then(defer(self(), [this, volumeId] {
+      if (volumes.contains(volumeId)) {
+        volumes.erase(volumeId);
 
-      Try<Nothing> rmdir = os::rmdir(volumePath);
-      CHECK_SOME(rmdir)
-        << "Failed to remove checkpointed volume state at '" << volumePath
-        << "': " << rmdir.error();
+        const string volumePath = csi::paths::getVolumePath(
+            slave::paths::getCsiRootDir(workDir),
+            info.storage().plugin().type(),
+            info.storage().plugin().name(),
+            volumeId);
 
-      garbageCollectMountPath(volumeId);
+        Try<Nothing> rmdir = os::rmdir(volumePath);
+        CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '"
+                          << volumePath << "': " << rmdir.error();
+
+        garbageCollectMountPath(volumeId);
+      }
 
       return controllerCapabilities.createDeleteVolume;
     }));
@@ -3195,21 +3210,37 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
     const Resource& resource)
 {
   CHECK(!Resources::isPersistentVolume(resource));
-  CHECK(resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT ||
-        resource.disk().source().type() == Resource::DiskInfo::Source::BLOCK);
   CHECK(resource.disk().source().has_id());
 
   const string& volumeId = resource.disk().source().id();
-  CHECK(volumes.contains(volumeId));
+
+  Future<bool> deleted =
+    volumes.contains(volumeId)
+      ? volumes.at(volumeId).sequence->add(std::function<Future<bool>()>(
+            defer(self(), &Self::deleteVolume, volumeId)))
+      : deleteVolume(volumeId);
 
   // Sequentialize the deletion with other operation on the same volume.
-  return volumes.at(volumeId).sequence->add(std::function<Future<bool>()>(
-      defer(self(), &Self::deleteVolume, volumeId)))
+  return deleted
     .then(defer(self(), [=](bool deprovisioned) {
       Resource converted = resource;
       converted.mutable_disk()->mutable_source()->set_type(
           Resource::DiskInfo::Source::RAW);
-      converted.mutable_disk()->mutable_source()->clear_mount();
+
+      switch (resource.disk().source().type()) {
+        case Resource::DiskInfo::Source::MOUNT: {
+          converted.mutable_disk()->mutable_source()->clear_mount();
+          break;
+        }
+        case Resource::DiskInfo::Source::BLOCK:
+        case Resource::DiskInfo::Source::RAW: {
+          break;
+        }
+        case Resource::DiskInfo::Source::UNKNOWN:
+        case Resource::DiskInfo::Source::PATH: {
+          UNREACHABLE(); // Should have been validated by the master.
+        }
+      }
 
       // We clear the volume ID and metadata if the volume has been
       // deprovisioned. Otherwise, we clear the profile.
@@ -3217,7 +3248,8 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
         converted.mutable_disk()->mutable_source()->clear_id();
         converted.mutable_disk()->mutable_source()->clear_metadata();
 
-        if (!profileInfos.contains(resource.disk().source().profile())) {
+        if (!resource.disk().source().has_profile() ||
+            !profileInfos.contains(resource.disk().source().profile())) {
           // The destroyed volume is converted into an empty resource to 
prevent
           // the freed disk from being sent out with a disappeared profile.
           converted.mutable_scalar()->set_value(0);
@@ -3232,8 +3264,7 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
 
             LOG(INFO)
               << "Reconciling storage pools for resource provider " << 
info.id()
-              << " after the disk with profile '"
-              << resource.disk().source().profile() << "' has been freed";
+              << " after resource '" << resource << "' has been freed";
 
             // Reconcile the storage pools in `sequence` to wait for any other
             // pending operation that disallow reconciliation to finish, and 
set

Reply via email to