Supported `STAGE_UNSTAGE_VOLUME` CSI node capability in SLRP. The storage local resource provider now properly calls `NodeStageVolume` or `NodeUnstageVolume` when publishing or deleting volumes for CSI plugins support the `STAGE_UNSTAGE_VOLUME` capability.
Review: https://reviews.apache.org/r/66575/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a9aa3ad8 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a9aa3ad8 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a9aa3ad8 Branch: refs/heads/master Commit: a9aa3ad8599cdafbd8d36620887c7f9002ff56d2 Parents: 02f9cb1 Author: Chun-Hung Hsiao <[email protected]> Authored: Thu Apr 12 19:42:26 2018 -0700 Committer: Chun-Hung Hsiao <[email protected]> Committed: Thu Apr 12 19:42:26 2018 -0700 ---------------------------------------------------------------------- src/csi/state.proto | 33 +-- src/resource_provider/storage/provider.cpp | 350 +++++++++++++++++++----- 2 files changed, 293 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a9aa3ad8/src/csi/state.proto ---------------------------------------------------------------------- diff --git a/src/csi/state.proto b/src/csi/state.proto index a252cb2..8445399 100644 --- a/src/csi/state.proto +++ b/src/csi/state.proto @@ -26,10 +26,13 @@ message VolumeState { enum State { UNKNOWN = 0; CREATED = 1; // The volume is provisioned but not published. - NODE_READY = 2; // The volume is attached to the node. - PUBLISHED = 3; // The volume is mounted on the node. + NODE_READY = 2; // The volume is made available on the node. + VOL_READY = 8; // The volume is made publishable on the node. + PUBLISHED = 3; // The volume is published on the node. CONTROLLER_PUBLISH = 4; // `ControllerPublishVolume` is being called. CONTROLLER_UNPUBLISH = 5; // `ControllerUnpublishVolume` is being called. + NODE_STAGE = 9; // `NodeStageVolume` is being called. + NODE_UNSTAGE = 10; // `NodeUnstageVolume` is being called. NODE_PUBLISH = 6; // `NodePublishVolume` is being called. NODE_UNPUBLISH = 7; // `NodeUnpublishVolume` is being called. } @@ -37,25 +40,23 @@ message VolumeState { // The state of the volume. This is a REQUIRED field. State state = 1; - // The capability used to publish the volume. This is a - // REQUIRED field. + // The capability used to publish the volume. This is a REQUIRED field. .csi.v0.VolumeCapability volume_capability = 2; - // Attributes of the volume to be used on the node. This field MUST - // match the attributes of the `Volume` returned by `CreateVolume`. - // This is an OPTIONAL field. + // Attributes of the volume to be used on the node. This field MUST match the + // attributes of the `Volume` returned by `CreateVolume`. This is an OPTIONAL + // field. map<string, string> volume_attributes = 3; - // If the plugin has the `PUBLISH_UNPUBLISH_VOLUME` controller - // capability, this field MUST be set to the value returned by - // `ControllerPublishVolume`. Otherwise, this field MUST remain unset. - // This is an OPTIONAL field. + // If the plugin has the `PUBLISH_UNPUBLISH_VOLUME` controller capability, + // this field MUST be set to the value returned by `ControllerPublishVolume`. + // Otherwise, this field MUST remain unset. This is an OPTIONAL field. map<string, string> publish_info = 4; - // This field is used to check if the node has been rebooted since the - // last time the volume is mounted. If yes, `NodePublishVolume` needs - // to be called to mount the volume again. It MUST be set to the boot - // ID of the node if the volume is mounted, and SHOULD remain unset - // otherwise. This is an OPTIONAL field. + // This field is used to check if the node has been rebooted since the volume + // was transitioned to `VOL_READY` state. If yes, `NodeStageVolume` needs to + // be called to make the volume publishable again. It MUST be set to the boot + // ID of the node if the volume is in `VOL_READY` state, and SHOULD remain + // unset otherwise. This is an OPTIONAL field. string boot_id = 5; } http://git-wip-us.apache.org/repos/asf/mesos/blob/a9aa3ad8/src/resource_provider/storage/provider.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index ce94393..8ca2d3a 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -326,7 +326,7 @@ public: private: struct VolumeData { - VolumeData(const VolumeState& _state) + VolumeData(VolumeState&& _state) : state(_state), sequence(new Sequence("volume-sequence")) {} VolumeState state; @@ -376,6 +376,8 @@ private: Future<Nothing> prepareNodeService(); Future<Nothing> controllerPublish(const string& volumeId); Future<Nothing> controllerUnpublish(const string& volumeId); + Future<Nothing> nodeStage(const string& volumeId); + Future<Nothing> nodeUnstage(const string& volumeId); Future<Nothing> nodePublish(const string& volumeId); Future<Nothing> nodeUnpublish(const string& volumeId); Future<string> createVolume( @@ -640,7 +642,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() auto err = [](const string& message) { LOG(ERROR) - << "Failed to watch for VolumeprofileAdaptor: " << message; + << "Failed to watch for DiskProfileAdaptor: " << message; }; // Start watching the DiskProfileAdaptor. @@ -843,59 +845,94 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes() Future<Nothing> recovered = Nothing(); - switch (volume.state.state()) { - case VolumeState::CREATED: - case VolumeState::NODE_READY: { - break; - } - case VolumeState::PUBLISHED: { - if (volume.state.boot_id() != bootId) { - // The node has been restarted since the volume is mounted, - // so it is no longer in the `PUBLISHED` state. - volume.state.set_state(VolumeState::NODE_READY); - volume.state.clear_boot_id(); - checkpointVolumeState(volumeId); + if (VolumeState::State_IsValid(volume.state.state())) { + switch (volume.state.state()) { + case VolumeState::CREATED: + case VolumeState::NODE_READY: { + break; } - break; - } - case VolumeState::CONTROLLER_PUBLISH: { - recovered = - volume.sequence->add(std::function<Future<Nothing>()>( + case VolumeState::VOL_READY: + case VolumeState::PUBLISHED: { + if (volume.state.boot_id() != bootId) { + // The node has been restarted since the volume is made + // publishable, so it is reset to `NODE_READY` state. + volume.state.set_state(VolumeState::NODE_READY); + volume.state.clear_boot_id(); + checkpointVolumeState(volumeId); + } + + break; + } + case VolumeState::CONTROLLER_PUBLISH: { + recovered = volume.sequence->add(std::function<Future<Nothing>()>( defer(self(), &Self::controllerPublish, volumeId))); - break; - } - case VolumeState::CONTROLLER_UNPUBLISH: { - recovered = - volume.sequence->add(std::function<Future<Nothing>()>( + + break; + } + case VolumeState::CONTROLLER_UNPUBLISH: { + recovered = volume.sequence->add(std::function<Future<Nothing>()>( defer(self(), &Self::controllerUnpublish, volumeId))); - break; - } - case VolumeState::NODE_PUBLISH: { - recovered = - volume.sequence->add(std::function<Future<Nothing>()>( - defer(self(), &Self::nodePublish, volumeId))); - break; - } - case VolumeState::NODE_UNPUBLISH: { - recovered = - volume.sequence->add(std::function<Future<Nothing>()>( - defer(self(), &Self::nodeUnpublish, volumeId))); - break; - } - case VolumeState::UNKNOWN: { - recovered = Failure( - "Volume '" + volumeId + "' is in " + - stringify(volume.state.state()) + " state"); - } - // NOTE: We avoid using a default clause for the following - // values in proto3's open enum to enable the compiler to detect - // missing enum cases for us. See: - // https://github.com/google/protobuf/issues/3917 - case google::protobuf::kint32min: - case google::protobuf::kint32max: { - UNREACHABLE(); + break; + } + case VolumeState::NODE_STAGE: { + recovered = volume.sequence->add(std::function<Future<Nothing>()>( + defer(self(), &Self::nodeStage, volumeId))); + + break; + } + case VolumeState::NODE_UNSTAGE: { + recovered = volume.sequence->add(std::function<Future<Nothing>()>( + defer(self(), &Self::nodeUnstage, volumeId))); + + break; + } + case VolumeState::NODE_PUBLISH: { + if (volume.state.boot_id() != bootId) { + // The node has been restarted since `NodePublishVolume` was + // called, so it is reset to `NODE_READY` state. + volume.state.set_state(VolumeState::NODE_READY); + volume.state.clear_boot_id(); + checkpointVolumeState(volumeId); + } else { + recovered = volume.sequence->add(std::function<Future<Nothing>()>( + defer(self(), &Self::nodePublish, volumeId))); + } + + break; + } + case VolumeState::NODE_UNPUBLISH: { + if (volume.state.boot_id() != bootId) { + // The node has been restarted since `NodeUnpublishVolume` was + // called, so it is reset to `NODE_READY` state. + volume.state.set_state(VolumeState::NODE_READY); + volume.state.clear_boot_id(); + checkpointVolumeState(volumeId); + } else { + recovered = volume.sequence->add(std::function<Future<Nothing>()>( + defer(self(), &Self::nodeUnpublish, volumeId))); + } + + break; + } + case VolumeState::UNKNOWN: { + recovered = Failure( + "Volume '" + volumeId + "' is in " + + stringify(volume.state.state()) + " state"); + + break; + } + + // NOTE: We avoid using a default clause for the following values in + // proto3's open enum to enable the compiler to detect missing enum + // cases for us. See: https://github.com/google/protobuf/issues/3917 + case google::protobuf::kint32min: + case google::protobuf::kint32max: { + UNREACHABLE(); + } } + } else { + recovered = Failure("Volume '" + volumeId + "' is in UNDEFINED state"); } futures.push_back(recovered); @@ -1511,10 +1548,13 @@ void StorageLocalResourceProviderProcess::publishResources( std::function<Future<Nothing>()> controllerAndNodePublish = defer(self(), [=] { CHECK(volumes.contains(volumeId)); + const VolumeData& volume = volumes.at(volumeId); Future<Nothing> published = Nothing(); - switch (volumes.at(volumeId).state.state()) { + CHECK(VolumeState::State_IsValid(volume.state.state())); + + switch (volume.state.state()) { case VolumeState::CONTROLLER_UNPUBLISH: { published = published .then(defer(self(), &Self::controllerUnpublish, volumeId)); @@ -1526,18 +1566,34 @@ void StorageLocalResourceProviderProcess::publishResources( case VolumeState::CONTROLLER_PUBLISH: { published = published .then(defer(self(), &Self::controllerPublish, volumeId)) + .then(defer(self(), &Self::nodeStage, volumeId)) .then(defer(self(), &Self::nodePublish, volumeId)); break; } - case VolumeState::NODE_UNPUBLISH: { + case VolumeState::NODE_UNSTAGE: { published = published - .then(defer(self(), &Self::nodeUnpublish, volumeId)); + .then(defer(self(), &Self::nodeUnstage, volumeId)); // NOTE: We continue to the next case to publish the volume in // `NODE_READY` state once the above is done. } case VolumeState::NODE_READY: + case VolumeState::NODE_STAGE: { + published = published + .then(defer(self(), &Self::nodeStage, volumeId)) + .then(defer(self(), &Self::nodePublish, volumeId)); + + break; + } + case VolumeState::NODE_UNPUBLISH: { + published = published + .then(defer(self(), &Self::nodeUnpublish, volumeId)); + + // NOTE: We continue to the next case to publish the volume in + // `VOL_READY` state once the above is done. + } + case VolumeState::VOL_READY: case VolumeState::NODE_PUBLISH: { published = published .then(defer(self(), &Self::nodePublish, volumeId)); @@ -2038,12 +2094,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService() -> Future<csi::v0::Client> { nodeCapabilities = response.capabilities(); - // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support. - if (nodeCapabilities.stageUnstageVolume) { - return Failure( - "Node capability 'STAGE_UNSTAGE_VOLUME' is not supported"); - } - // Get the latest service future before proceeding to the next step. return getService(nodeContainerId.get()); })) @@ -2177,14 +2227,144 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish( } -// Transition the state of the specified volume from `NODE_READY` or +// Transitions the state of the specified volume from `NODE_READY` or +// `NODE_STAGE` to `VOL_READY`. +// NOTE: This can only be called after `prepareNodeService`. +Future<Nothing> StorageLocalResourceProviderProcess::nodeStage( + const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeData& volume = volumes.at(volumeId); + + if (!nodeCapabilities.stageUnstageVolume) { + CHECK_EQ(VolumeState::NODE_READY, volume.state.state()); + + volume.state.set_state(VolumeState::VOL_READY); + volume.state.set_boot_id(bootId); + checkpointVolumeState(volumeId); + + return Nothing(); + } + + CHECK_SOME(nodeContainerId); + + return getService(nodeContainerId.get()) + .then(defer(self(), [this, volumeId]( + csi::v0::Client client) -> Future<Nothing> { + VolumeData& volume = volumes.at(volumeId); + + const string stagingPath = csi::paths::getMountStagingPath( + csi::paths::getMountRootDir( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name()), + volumeId); + + Try<Nothing> mkdir = os::mkdir(stagingPath); + if (mkdir.isError()) { + return Failure( + "Failed to create mount staging path '" + stagingPath + "': " + + mkdir.error()); + } + + if (volume.state.state() == VolumeState::NODE_READY) { + volume.state.set_state(VolumeState::NODE_STAGE); + checkpointVolumeState(volumeId); + } + + CHECK_EQ(VolumeState::NODE_STAGE, volume.state.state()); + + csi::v0::NodeStageVolumeRequest request; + request.set_volume_id(volumeId); + *request.mutable_publish_info() = volume.state.publish_info(); + request.set_staging_target_path(stagingPath); + request.mutable_volume_capability() + ->CopyFrom(volume.state.volume_capability()); + *request.mutable_volume_attributes() = volume.state.volume_attributes(); + + return client.NodeStageVolume(request) + .then(defer(self(), [this, volumeId] { + VolumeData& volume = volumes.at(volumeId); + + volume.state.set_state(VolumeState::VOL_READY); + volume.state.set_boot_id(bootId); + checkpointVolumeState(volumeId); + + return Nothing(); + })); + })); +} + + +// Transitions the state of the specified volume from `VOL_READY`, `NODE_STAGE` +// or `NODE_UNSTAGE` to `NODE_READY`. +// NOTE: This can only be called after `prepareNodeService`. +Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage( + const string& volumeId) +{ + CHECK(volumes.contains(volumeId)); + VolumeData& volume = volumes.at(volumeId); + + if (!nodeCapabilities.stageUnstageVolume) { + CHECK_EQ(VolumeState::VOL_READY, volume.state.state()); + + volume.state.set_state(VolumeState::NODE_READY); + volume.state.clear_boot_id(); + checkpointVolumeState(volumeId); + + return Nothing(); + } + + CHECK_SOME(nodeContainerId); + + return getService(nodeContainerId.get()) + .then(defer(self(), [this, volumeId](csi::v0::Client client) { + VolumeData& volume = volumes.at(volumeId); + + const string stagingPath = csi::paths::getMountStagingPath( + csi::paths::getMountRootDir( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name()), + volumeId); + + CHECK(os::exists(stagingPath)); + + // A previously failed `NodeStageVolume` call can be recovered through the + // current `NodeUnstageVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT + if (volume.state.state() == VolumeState::VOL_READY || + volume.state.state() == VolumeState::NODE_STAGE) { + volume.state.set_state(VolumeState::NODE_UNSTAGE); + checkpointVolumeState(volumeId); + } + + CHECK_EQ(VolumeState::NODE_UNSTAGE, volume.state.state()); + + csi::v0::NodeUnstageVolumeRequest request; + request.set_volume_id(volumeId); + request.set_staging_target_path(stagingPath); + + return client.NodeUnstageVolume(request) + .then(defer(self(), [this, volumeId] { + VolumeData& volume = volumes.at(volumeId); + + volume.state.set_state(VolumeState::NODE_READY); + volume.state.clear_boot_id(); + checkpointVolumeState(volumeId); + + return Nothing(); + })); + })); +} + + +// Transitions the state of the specified volume from `VOL_READY` or // `NODE_PUBLISH` to `PUBLISHED`. // NOTE: This can only be called after `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::nodePublish( const string& volumeId) { - // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support. - CHECK(volumes.contains(volumeId)); CHECK_SOME(nodeContainerId); @@ -2207,7 +2387,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish( mkdir.error()); } - if (volume.state.state() == VolumeState::NODE_READY) { + if (volume.state.state() == VolumeState::VOL_READY) { volume.state.set_state(VolumeState::NODE_PUBLISH); checkpointVolumeState(volumeId); } @@ -2223,12 +2403,24 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish( request.set_readonly(false); *request.mutable_volume_attributes() = volume.state.volume_attributes(); + if (nodeCapabilities.stageUnstageVolume) { + const string stagingPath = csi::paths::getMountStagingPath( + csi::paths::getMountRootDir( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name()), + volumeId); + + CHECK(os::exists(stagingPath)); + + request.set_staging_target_path(stagingPath); + } + return client.NodePublishVolume(request) .then(defer(self(), [this, volumeId] { VolumeData& volume = volumes.at(volumeId); volume.state.set_state(VolumeState::PUBLISHED); - volume.state.set_boot_id(bootId); checkpointVolumeState(volumeId); return Nothing(); @@ -2237,14 +2429,12 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish( } -// Transition the state of the specified volume from `PUBLISHED`, -// `NODE_PUBLISH` or `NODE_UNPUBLISH` to `NODE_READY`. +// Transitions the state of the specified volume from `PUBLISHED`, +// `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`. // NOTE: This can only be called after `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish( const string& volumeId) { - // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support. - CHECK(volumes.contains(volumeId)); CHECK_SOME(nodeContainerId); @@ -2280,8 +2470,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish( .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> { VolumeData& volume = volumes.at(volumeId); - volume.state.set_state(VolumeState::NODE_READY); - volume.state.clear_boot_id(); + volume.state.set_state(VolumeState::VOL_READY); checkpointVolumeState(volumeId); Try<Nothing> rmdir = os::rmdir(targetPath); @@ -2362,6 +2551,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( "Controller capability 'CREATE_DELETE_VOLUME' is not supported"); } + CHECK_SOME(controllerContainerId); + const string volumePath = csi::paths::getVolumePath( slave::paths::getCsiRootDir(workDir), info.storage().plugin().type(), @@ -2376,17 +2567,28 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( return Nothing(); } - CHECK_SOME(controllerContainerId); + const VolumeData& volume = volumes.at(volumeId); Future<Nothing> deleted = Nothing(); - switch (volumes.at(volumeId).state.state()) { + CHECK(VolumeState::State_IsValid(volume.state.state())); + + switch (volume.state.state()) { case VolumeState::PUBLISHED: case VolumeState::NODE_PUBLISH: case VolumeState::NODE_UNPUBLISH: { deleted = deleted .then(defer(self(), &Self::nodeUnpublish, volumeId)); + // NOTE: We continue to the next case to delete the volume in `VOL_READY` + // state once the above is done. + } + case VolumeState::VOL_READY: + case VolumeState::NODE_STAGE: + case VolumeState::NODE_UNSTAGE: { + deleted = deleted + .then(defer(self(), &Self::nodeUnstage, volumeId)); + // NOTE: We continue to the next case to delete the volume in `NODE_READY` // state once the above is done. } @@ -2403,7 +2605,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( if (!preExisting) { deleted = deleted .then(defer(self(), &Self::getService, controllerContainerId.get())) - .then(defer(self(), [=](csi::v0::Client client) { + .then(defer(self(), [volumeId](csi::v0::Client client) { csi::v0::DeleteVolumeRequest request; request.set_volume_id(volumeId); @@ -2434,7 +2636,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( // the continuation would have already been run, the returned future will // become ready, making the future returned by the sequence ready as well. return deleted - .then(defer(self(), [=] { + .then(defer(self(), [this, volumeId, volumePath] { volumes.erase(volumeId); CHECK_SOME(os::rmdir(volumePath));
