This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 5fe817fab4e8b85a517a0190014b1eb2c17c9f76
Author: Chun-Hung Hsiao <[email protected]>
AuthorDate: Fri Feb 1 15:29:33 2019 -0800

    Made SLRP recover node-published volumes after reboot.
    
    If a CSI volume has been node-published before a reboot, SLRP will now
    try to bring it back to node-published again. This is important to
    perform synchronous persistent volume cleanup for `DESTROY`.
    
    To achieve this, in addition to keeping track of the boot ID when a CSI
    volume is node-staged in `VolumeState.vol_ready_boot_id` (formerly
    `VolumeState.boot_id`), SLRP now also keeps track of the boot ID when
    the volume is node-published. This helps SLRP to better determine if a
    volume has been published before reboot.
    
    Review: https://reviews.apache.org/r/69892
---
 src/csi/state.proto                        |  12 +-
 src/resource_provider/storage/provider.cpp | 248 +++++++++++++++++++----------
 2 files changed, 176 insertions(+), 84 deletions(-)

diff --git a/src/csi/state.proto b/src/csi/state.proto
index 264a565..b5ccf16 100644
--- a/src/csi/state.proto
+++ b/src/csi/state.proto
@@ -60,9 +60,13 @@ message VolumeState {
   map<string, string> publish_info = 4;
 
   // This field is used to check if the node has been rebooted since the volume
-  // was transitioned to `VOL_READY` state. If yes, `NodeStageVolume` needs to
-  // be called to make the volume publishable again. It MUST be set to the boot
-  // ID of the node if the volume is in `VOL_READY` state, and SHOULD remain
-  // unset otherwise. This is an OPTIONAL field.
+  // is made publishable on the node. It MUST be set to the boot ID of the node
+  // when the volume is transitioned to `VOL_READY`, and SHOULD be cleared when
+  // the volume is transitioned to `NODE_READY`. This is an OPTIONAL field.
   string boot_id = 5;
+
+  // This field is used to check if the volume has been used by a container and
+  // hence needs cleanup. If set, the resource provider MUST transition the
+  // volume to `PUBLISHED` state during recovery.
+  bool node_publish_required = 7;
 }
diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index c6e054e..a0b42a2 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -787,101 +787,177 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recoverVolumes()
 
     if (volumeState.isSome()) {
       volumes.put(volumeId, std::move(volumeState.get()));
-      VolumeData& volume = volumes.at(volumeId);
 
-      Future<Nothing> recovered = Nothing();
+      // To avoid any race with, e.g., `deleteVolume` calls, we sequentialize
+      // this lambda with any other operation on the same volume below, so the
+      // volume is guaranteed to exist in the deferred execution.
+      std::function<Future<Nothing>()> recoverVolume = defer(self(), [=]()
+          -> Future<Nothing> {
+        VolumeData& volume = volumes.at(volumeId);
+        Future<Nothing> recovered = Nothing();
 
-      if (VolumeState::State_IsValid(volume.state.state())) {
-        switch (volume.state.state()) {
-          case VolumeState::CREATED:
-          case VolumeState::NODE_READY: {
-            break;
-          }
-          case VolumeState::VOL_READY:
-          case VolumeState::PUBLISHED: {
-            if (volume.state.boot_id() != bootId) {
-              // The node has been restarted since the volume is made
-              // publishable, so it is reset to `NODE_READY` state.
-              volume.state.set_state(VolumeState::NODE_READY);
-              volume.state.clear_boot_id();
-              checkpointVolumeState(volumeId);
+        // First, bring the volume back to a "good" state.
+        if (VolumeState::State_IsValid(volume.state.state())) {
+          switch (volume.state.state()) {
+            case VolumeState::CREATED:
+            case VolumeState::NODE_READY: {
+              break;
             }
+            case VolumeState::VOL_READY:
+            case VolumeState::PUBLISHED: {
+              if (volume.state.boot_id() != bootId) {
+                // The node has been restarted since the volume is made
+                // publishable, so it is reset to `NODE_READY` state.
+                volume.state.set_state(VolumeState::NODE_READY);
+                volume.state.clear_boot_id();
+                checkpointVolumeState(volumeId);
+              }
 
-            break;
-          }
-          case VolumeState::CONTROLLER_PUBLISH: {
-            recovered = volume.sequence->add(std::function<Future<Nothing>()>(
-                defer(self(), &Self::controllerPublish, volumeId)));
+              break;
+            }
+            case VolumeState::CONTROLLER_PUBLISH: {
+              recovered = controllerPublish(volumeId);
 
-            break;
-          }
-          case VolumeState::CONTROLLER_UNPUBLISH: {
-            recovered = volume.sequence->add(std::function<Future<Nothing>()>(
-                defer(self(), &Self::controllerUnpublish, volumeId)));
+              break;
+            }
+            case VolumeState::CONTROLLER_UNPUBLISH: {
+              recovered = controllerUnpublish(volumeId);
 
-            break;
-          }
-          case VolumeState::NODE_STAGE: {
-            recovered = volume.sequence->add(std::function<Future<Nothing>()>(
-                defer(self(), &Self::nodeStage, volumeId)));
+              break;
+            }
+            case VolumeState::NODE_STAGE: {
+              recovered = nodeStage(volumeId);
 
-            break;
-          }
-          case VolumeState::NODE_UNSTAGE: {
-            recovered = volume.sequence->add(std::function<Future<Nothing>()>(
-                defer(self(), &Self::nodeUnstage, volumeId)));
+              break;
+            }
+            case VolumeState::NODE_UNSTAGE: {
+              if (volume.state.boot_id() != bootId) {
+                // The node has been restarted since the volume is made
+                // publishable, so it is reset to `NODE_READY` state.
+                volume.state.set_state(VolumeState::NODE_READY);
+                volume.state.clear_boot_id();
+                checkpointVolumeState(volumeId);
+              } else {
+                recovered = nodeUnstage(volumeId);
+              }
 
-            break;
-          }
-          case VolumeState::NODE_PUBLISH: {
-            if (volume.state.boot_id() != bootId) {
-              // The node has been restarted since `NodePublishVolume` was
-              // called, so it is reset to `NODE_READY` state.
-              volume.state.set_state(VolumeState::NODE_READY);
-              volume.state.clear_boot_id();
-              checkpointVolumeState(volumeId);
-            } else {
-              recovered = 
volume.sequence->add(std::function<Future<Nothing>()>(
-                  defer(self(), &Self::nodePublish, volumeId)));
+              break;
+            }
+            case VolumeState::NODE_PUBLISH: {
+              if (volume.state.boot_id() != bootId) {
+                // The node has been restarted since the volume is made
+                // publishable, so it is reset to `NODE_READY` state.
+                volume.state.set_state(VolumeState::NODE_READY);
+                volume.state.clear_boot_id();
+                checkpointVolumeState(volumeId);
+              } else {
+                recovered = nodePublish(volumeId);
+              }
+
+              break;
             }
+            case VolumeState::NODE_UNPUBLISH: {
+              if (volume.state.boot_id() != bootId) {
+                // The node has been restarted since the volume is made
+                // publishable, so it is reset to `NODE_READY` state.
+                volume.state.set_state(VolumeState::NODE_READY);
+                volume.state.clear_boot_id();
+                checkpointVolumeState(volumeId);
+              } else {
+                recovered = nodeUnpublish(volumeId);
+              }
 
-            break;
-          }
-          case VolumeState::NODE_UNPUBLISH: {
-            if (volume.state.boot_id() != bootId) {
-              // The node has been restarted since `NodeUnpublishVolume` was
-              // called, so it is reset to `NODE_READY` state.
-              volume.state.set_state(VolumeState::NODE_READY);
-              volume.state.clear_boot_id();
-              checkpointVolumeState(volumeId);
-            } else {
-              recovered = 
volume.sequence->add(std::function<Future<Nothing>()>(
-                  defer(self(), &Self::nodeUnpublish, volumeId)));
+              break;
+            }
+            case VolumeState::UNKNOWN: {
+              return Failure(
+                  "Volume '" + volumeId + "' is in " +
+                  stringify(volume.state.state()) + " state");
             }
 
-            break;
+            // NOTE: We avoid using a default clause for the following values 
in
+            // proto3's open enum to enable the compiler to detect missing enum
+            // cases for us. See: 
https://github.com/google/protobuf/issues/3917
+            case google::protobuf::kint32min:
+            case google::protobuf::kint32max: {
+              UNREACHABLE();
+            }
           }
-          case VolumeState::UNKNOWN: {
-            recovered = Failure(
-                "Volume '" + volumeId + "' is in " +
-                stringify(volume.state.state()) + " state");
+        } else {
+          return Failure("Volume '" + volumeId + "' is in UNDEFINED state");
+        }
 
-            break;
-          }
+        auto err = [](const string& volumeId, const string& message) {
+          LOG(ERROR)
+            << "Failed to recover volume '" << volumeId << "': " << message;
+        };
 
-          // NOTE: We avoid using a default clause for the following values in
-          // proto3's open enum to enable the compiler to detect missing enum
-          // cases for us. See: https://github.com/google/protobuf/issues/3917
-          case google::protobuf::kint32min:
-          case google::protobuf::kint32max: {
-            UNREACHABLE();
-          }
+        // Second, if the volume has been used by a container before recovery,
+        // we have to bring the volume back to `PUBLISHED` so data can be
+        // cleaned up synchronously upon `DESTROY`. Otherwise, we skip the 
error
+        // and continue recovery.
+        if (volume.state.node_publish_required()) {
+          recovered = recovered
+            .then(defer(self(), [this, volumeId]() -> Future<Nothing> {
+              const VolumeData& volume = volumes.at(volumeId);
+              Future<Nothing> published = Nothing();
+
+              CHECK(VolumeState::State_IsValid(volume.state.state()));
+
+              switch (volume.state.state()) {
+                case VolumeState::NODE_READY: {
+                  published = published
+                    .then(defer(self(), &Self::nodeStage, volumeId));
+
+                  // NOTE: We continue to the next case to recover the volume 
in
+                  // `VOL_READY` state once the above is done.
+                }
+                case VolumeState::VOL_READY: {
+                  published = published
+                    .then(defer(self(), &Self::nodePublish, volumeId));
+
+                  // NOTE: We continue to the next case to recover the volume 
in
+                  // `PUBLISHED` state once the above is done.
+                }
+                case VolumeState::PUBLISHED: {
+                  break;
+                }
+                case VolumeState::UNKNOWN:
+                case VolumeState::CREATED:
+                case VolumeState::CONTROLLER_PUBLISH:
+                case VolumeState::CONTROLLER_UNPUBLISH:
+                case VolumeState::NODE_STAGE:
+                case VolumeState::NODE_UNSTAGE:
+                case VolumeState::NODE_PUBLISH:
+                case VolumeState::NODE_UNPUBLISH: {
+                  UNREACHABLE();
+                }
+
+                // NOTE: We avoid using a default clause for the following
+                // values in proto3's open enum to enable the compiler to 
detect
+                // missing enum cases for us. See:
+                // https://github.com/google/protobuf/issues/3917
+                case google::protobuf::kint32min:
+                case google::protobuf::kint32max: {
+                  UNREACHABLE();
+                }
+              }
+
+              return published;
+            }))
+            .onFailed(std::bind(err, volumeId, lambda::_1))
+            .onDiscarded(std::bind(err, volumeId, "future discarded"));
+        } else {
+          recovered = recovered
+            .onFailed(std::bind(err, volumeId, lambda::_1))
+            .onDiscarded(std::bind(err, volumeId, "future discarded"))
+            .recover([](const Future<Nothing>& future) { return Nothing(); });
         }
-      } else {
-        recovered = Failure("Volume '" + volumeId + "' is in UNDEFINED state");
-      }
 
-      futures.push_back(recovered);
+        return recovered;
+      });
+
+      futures.push_back(volumes.at(volumeId).sequence->add(recoverVolume));
     }
   }
 
@@ -2422,6 +2498,13 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::nodePublish(
       VolumeData& volume = volumes.at(volumeId);
 
       volume.state.set_state(VolumeState::PUBLISHED);
+
+      // NOTE: The `node_publish_required` field is always set up by the
+      // successful `nodePublish` call, as it indicates that a container is
+      // going to use the volume. However, it will not cleared by a
+      // `nodeUnpublish` call, but by a `deleteVolume` call instead.
+      volume.state.set_node_publish_required(true);
+
       checkpointVolumeState(volumeId);
 
       return Nothing();
@@ -2543,12 +2626,17 @@ Future<bool> 
StorageLocalResourceProviderProcess::deleteVolume(
     return controllerCapabilities.createDeleteVolume;
   }
 
-  const VolumeData& volume = volumes.at(volumeId);
+  VolumeData& volume = volumes.at(volumeId);
 
-  CHECK(VolumeState::State_IsValid(volume.state.state()));
+  // NOTE: The volume must have been cleaned up before the `deleteVolume` call
+  // is made, so it is no longer required to publish the volume.
+  volume.state.set_node_publish_required(false);
+  checkpointVolumeState(volumeId);
 
   Future<Nothing> deleted = Nothing();
 
+  CHECK(VolumeState::State_IsValid(volume.state.state()));
+
   switch (volume.state.state()) {
     case VolumeState::PUBLISHED:
     case VolumeState::NODE_PUBLISH:

Reply via email to