This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 8e60bc7a921a2d4b37e5bec32ad3a10ddec96e78 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Thu Apr 4 18:48:24 2019 -0700 Refactored the test CSI plugin to make it easy to support CSI v1. Review: https://reviews.apache.org/r/70403 --- src/examples/test_csi_plugin.cpp | 951 +++++++++++++++++++++++++-------------- 1 file changed, 606 insertions(+), 345 deletions(-) diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp index 4321f8f..54753d9 100644 --- a/src/examples/test_csi_plugin.cpp +++ b/src/examples/test_csi_plugin.cpp @@ -14,9 +14,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include <algorithm> +#include <limits> #include <memory> #include <thread> #include <utility> +#include <vector> + +#include <google/protobuf/map.h> +#include <google/protobuf/message.h> #include <grpcpp/grpcpp.h> @@ -25,16 +31,24 @@ #include <mesos/type_utils.hpp> +#include <mesos/csi/types.hpp> #include <mesos/csi/v0.hpp> +#include <process/grpc.hpp> +#include <process/http.hpp> + #include <stout/bytes.hpp> #include <stout/flags.hpp> #include <stout/foreach.hpp> #include <stout/hashmap.hpp> +#include <stout/none.hpp> #include <stout/option.hpp> #include <stout/path.hpp> +#include <stout/some.hpp> #include <stout/stringify.hpp> #include <stout/strings.hpp> +#include <stout/try.hpp> +#include <stout/unreachable.hpp> #include <stout/os/exists.hpp> #include <stout/os/ls.hpp> @@ -47,6 +61,7 @@ #include "logging/logging.hpp" +namespace http = process::http; namespace fs = mesos::internal::fs; using std::cerr; @@ -59,6 +74,10 @@ using std::string; using std::unique_ptr; using std::vector; +using google::protobuf::Map; +using google::protobuf::MapPair; +using google::protobuf::RepeatedPtrField; + using grpc::AsyncGenericService; using grpc::ByteBuffer; using grpc::ClientContext; @@ -73,6 +92,10 @@ using grpc::ServerContext; using grpc::Status; using grpc::WriteOptions; +using mesos::csi::types::VolumeCapability; + +using process::grpc::StatusError; + constexpr char PLUGIN_NAME[] = "org.apache.mesos.csi.test"; constexpr char NODE_ID[] = "localhost"; constexpr Bytes DEFAULT_VOLUME_CAPACITY = Megabytes(64); @@ -144,7 +167,7 @@ public: // Construct the default mount volume capability. defaultVolumeCapability.mutable_mount(); defaultVolumeCapability.mutable_access_mode() - ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER); + ->set_mode(VolumeCapability::AccessMode::SINGLE_NODE_WRITER); // Scan for preprovisioned volumes. // @@ -280,12 +303,69 @@ private: string getVolumePath(const VolumeInfo& volumeInfo); Try<VolumeInfo> parseVolumePath(const string& path); + Try<VolumeInfo, StatusError> createVolume( + const string& name, + const Bytes& requiredBytes, + const Bytes& limitBytes, + const RepeatedPtrField<VolumeCapability>& capabilities, + const Map<string, string> parameters); + + Try<Nothing, StatusError> deleteVolume(const string& volumeId); + + Try<Nothing, StatusError> controllerPublishVolume( + const string& volumeId, + const string& nodeId, + const VolumeCapability& capability, + bool readonly, + const Map<string, string>& volumeContext); + + Try<Nothing, StatusError> controllerUnpublishVolume( + const string& volumeId, const string& nodeId); + + // Returns `StatusError` if the volume does not exist; returns `Option<Error>` + // with an error set if the volume is not compatible with the given arguments. + Try<Option<Error>, StatusError> validateVolumeCapabilities( + const string& volumeId, + const Map<string, string>& volumeContext, + const RepeatedPtrField<VolumeCapability>& capabilities, + const Option<Map<string, string>>& parameters = None()); + + Try<vector<VolumeInfo>, StatusError> listVolumes( + const Option<int32_t>& maxEntries, + const Option<string>& startingToken); + + Try<Bytes, StatusError> getCapacity( + const RepeatedPtrField<VolumeCapability>& capabilities, + const Map<string, string>& parameters); + + Try<Nothing, StatusError> nodeStageVolume( + const string& volumeId, + const Map<string, string>& publishContext, + const string& stagingPath, + const VolumeCapability& capability, + const Map<string, string>& volumeContext); + + Try<Nothing, StatusError> nodeUnstageVolume( + const string& volumeId, const string& stagingPath); + + Try<Nothing, StatusError> nodePublishVolume( + const string& volumeId, + const Map<string, string>& publishContext, + const string& stagingPath, + const string& targetPath, + const VolumeCapability& capability, + bool readonly, + const Map<string, string>& volumeContext); + + Try<Nothing, StatusError> nodeUnpublishVolume( + const string& volumeId, const string& targetPath); + const string workDir; const string endpoint; Bytes availableCapacity; - csi::v0::VolumeCapability defaultVolumeCapability; - google::protobuf::Map<string, string> createParameters; + VolumeCapability defaultVolumeCapability; + Map<string, string> createParameters; hashmap<string, VolumeInfo> volumes; }; @@ -349,92 +429,26 @@ Status TestCSIPlugin::CreateVolume( { LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'"; - // TODO(chhsiao): Validate required fields. + // TODO(chhsiao): Validate the request. - if (request->name().empty()) { - return Status(grpc::INVALID_ARGUMENT, "Volume name cannot be empty"); - } - - if (strings::contains(request->name(), stringify(os::PATH_SEPARATOR))) { - return Status( - grpc::INVALID_ARGUMENT, - "Volume name cannot contain '" + stringify(os::PATH_SEPARATOR) + "'"); - } + Try<VolumeInfo, StatusError> result = createVolume( + request->name(), + request->capacity_range().required_bytes() + ? request->capacity_range().required_bytes() : 1, + request->capacity_range().limit_bytes() + ? request->capacity_range().limit_bytes() + : std::numeric_limits<int64_t>::max(), + mesos::csi::v0::devolve(request->volume_capabilities()), + request->parameters()); - foreach (const csi::v0::VolumeCapability& capability, - request->volume_capabilities()) { - if (capability != defaultVolumeCapability) { - return Status(grpc::INVALID_ARGUMENT, "Unsupported volume capabilities"); - } + if (result.isError()) { + return result.error().status; } - if (request->parameters() != createParameters) { - return Status(grpc::INVALID_ARGUMENT, "Unsupported create parameters"); - } - - // The volume ID is determined by `name`, so we check whether the volume - // corresponding to `name` is compatible to the request if it exists. - if (volumes.contains(request->name())) { - const VolumeInfo volumeInfo = volumes.at(request->name()); - - if (request->has_capacity_range()) { - const csi::v0::CapacityRange& range = request->capacity_range(); - - if (range.limit_bytes() != 0 && - volumeInfo.size > Bytes(range.limit_bytes())) { - return Status(grpc::ALREADY_EXISTS, "Cannot satisfy 'limit_bytes'"); - } else if (range.required_bytes() != 0 && - volumeInfo.size < Bytes(range.required_bytes())) { - return Status(grpc::ALREADY_EXISTS, "Cannot satisfy 'required_bytes'"); - } - } - } else { - if (availableCapacity == Bytes(0)) { - return Status(grpc::OUT_OF_RANGE, "Insufficient capacity"); - } - - VolumeInfo volumeInfo; - volumeInfo.id = request->name(); - volumeInfo.size = min(DEFAULT_VOLUME_CAPACITY, availableCapacity); - - if (request->has_capacity_range()) { - const csi::v0::CapacityRange& range = request->capacity_range(); - - // The highest we can pick. - Bytes limit = range.limit_bytes() != 0 - ? min(availableCapacity, Bytes(range.limit_bytes())) - : availableCapacity; - - if (range.required_bytes() != 0 && - limit < Bytes(range.required_bytes())) { - return Status(grpc::OUT_OF_RANGE, "Cannot satisfy 'required_bytes'"); - } - - volumeInfo.size = min( - limit, - max(DEFAULT_VOLUME_CAPACITY, Bytes(range.required_bytes()))); - } - - const string path = getVolumePath(volumeInfo); - - Try<Nothing> mkdir = os::mkdir(path); - if (mkdir.isError()) { - return Status( - grpc::INTERNAL, - "Failed to create volume '" + volumeInfo.id + "': " + mkdir.error()); - } - - CHECK_GE(availableCapacity, volumeInfo.size); - availableCapacity -= volumeInfo.size; - volumes.put(volumeInfo.id, std::move(volumeInfo)); - } - - const VolumeInfo& volumeInfo = volumes.at(request->name()); - - response->mutable_volume()->set_id(volumeInfo.id); - response->mutable_volume()->set_capacity_bytes(volumeInfo.size.bytes()); + response->mutable_volume()->set_id(result->id); + response->mutable_volume()->set_capacity_bytes(result->size.bytes()); (*response->mutable_volume()->mutable_attributes())["path"] = - getVolumePath(volumeInfo); + getVolumePath(result.get()); return Status::OK; } @@ -447,26 +461,14 @@ Status TestCSIPlugin::DeleteVolume( { LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'"; - // TODO(chhsiao): Validate required fields. + // TODO(chhsiao): Validate the request. - if (!volumes.contains(request->volume_id())) { - return Status::OK; - } + Try<Nothing, StatusError> result = deleteVolume(request->volume_id()); - const VolumeInfo& volumeInfo = volumes.at(request->volume_id()); - const string path = getVolumePath(volumeInfo); - - Try<Nothing> rmdir = os::rmdir(path); - if (rmdir.isError()) { - return Status( - grpc::INTERNAL, - "Failed to delete volume '" + request->volume_id() + "': " + - rmdir.error()); + if (result.isError()) { + return result.error().status; } - availableCapacity += volumeInfo.size; - volumes.erase(volumeInfo.id); - return Status::OK; } @@ -478,29 +480,19 @@ Status TestCSIPlugin::ControllerPublishVolume( { LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'"; - // TODO(chhsiao): Validate required fields. + // TODO(chhsiao): Validate the request. - if (!volumes.contains(request->volume_id())) { - return Status( - grpc::NOT_FOUND, - "Volume '" + request->volume_id() + "' is not found"); - } + Try<Nothing, StatusError> result = controllerPublishVolume( + request->volume_id(), + request->node_id(), + mesos::csi::v0::devolve(request->volume_capability()), + request->readonly(), + request->volume_attributes()); - const VolumeInfo& volumeInfo = volumes.at(request->volume_id()); - const string path = getVolumePath(volumeInfo); - - auto it = request->volume_attributes().find("path"); - if (it == request->volume_attributes().end() || it->second != path) { - return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes"); + if (result.isError()) { + return result.error().status; } - if (request->node_id() != NODE_ID) { - return Status( - grpc::NOT_FOUND, - "Node '" + request->node_id() + "' is not found"); - } - - // Do nothing. return Status::OK; } @@ -512,21 +504,15 @@ Status TestCSIPlugin::ControllerUnpublishVolume( { LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'"; - // TODO(chhsiao): Validate required fields. + // TODO(chhsiao): Validate the request. - if (!volumes.contains(request->volume_id())) { - return Status( - grpc::NOT_FOUND, - "Volume '" + request->volume_id() + "' is not found"); - } + Try<Nothing, StatusError> result = + controllerUnpublishVolume(request->volume_id(), request->node_id()); - if (request->node_id() != NODE_ID) { - return Status( - grpc::NOT_FOUND, - "Node '" + request->node_id() + "' is not found"); + if (result.isError()) { + return result.error().status; } - // Do nothing. return Status::OK; } @@ -538,36 +524,24 @@ Status TestCSIPlugin::ValidateVolumeCapabilities( { LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'"; - // TODO(chhsiao): Validate required fields. - - if (!volumes.contains(request->volume_id())) { - return Status( - grpc::NOT_FOUND, - "Volume '" + request->volume_id() + "' is not found"); - } + // TODO(chhsiao): Validate the request. - const VolumeInfo& volumeInfo = volumes.at(request->volume_id()); - const string path = getVolumePath(volumeInfo); + Try<Option<Error>, StatusError> result = validateVolumeCapabilities( + request->volume_id(), + request->volume_attributes(), + mesos::csi::v0::devolve(request->volume_capabilities())); - auto it = request->volume_attributes().find("path"); - if (it == request->volume_attributes().end() || it->second != path) { - return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes"); + if (result.isError()) { + return result.error().status; } - foreach (const csi::v0::VolumeCapability& capability, - request->volume_capabilities()) { - if (capability != defaultVolumeCapability) { - response->set_supported(false); - response->set_message("Unsupported volume capabilities"); - - return Status::OK; - } + if (result->isSome()) { + response->set_supported(false); + response->set_message(result->get().message); + } else { + response->set_supported(true); } - // TODO(chhsiao): Validate the parameters once we get CSI v1. - - response->set_supported(true); - return Status::OK; } @@ -579,17 +553,17 @@ Status TestCSIPlugin::ListVolumes( { LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'"; - // TODO(chhsiao): Support the `max_entries` field. - if (request->max_entries() > 0) { - return Status(grpc::ABORTED, "Field 'max_entries' is not supported"); - } + Try<vector<VolumeInfo>, StatusError> result = listVolumes( + request->max_entries() + ? request->max_entries() : Option<int32_t>::none(), + !request->starting_token().empty() + ? request->starting_token() : Option<string>::none()); - // TODO(chhsiao): Support the `starting_token` fields. - if (!request->starting_token().empty()) { - return Status(grpc::ABORTED, "Field 'starting_token' is not supported"); + if (result.isError()) { + return result.error().status; } - foreachvalue (const VolumeInfo& volumeInfo, volumes) { + foreach (const VolumeInfo& volumeInfo, result.get()) { csi::v0::Volume* volume = response->add_entries()->mutable_volume(); volume->set_id(volumeInfo.id); volume->set_capacity_bytes(volumeInfo.size.bytes()); @@ -607,25 +581,15 @@ Status TestCSIPlugin::GetCapacity( { LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'"; - foreach (const csi::v0::VolumeCapability& capability, - request->volume_capabilities()) { - // We report zero capacity for any capability other than the - // default-constructed mount volume capability since this plugin - // does not support any filesystem types and mount flags. - if (capability != defaultVolumeCapability) { - response->set_available_capacity(0); - - return Status::OK; - } - } - - if (request->parameters() != createParameters) { - response->set_available_capacity(0); + Try<Bytes, StatusError> result = getCapacity( + mesos::csi::v0::devolve(request->volume_capabilities()), + request->parameters()); - return Status::OK; + if (result.isError()) { + return result.error().status; } - response->set_available_capacity(availableCapacity.bytes()); + response->set_available_capacity(result->bytes()); return Status::OK; } @@ -658,53 +622,17 @@ Status TestCSIPlugin::NodeStageVolume( { LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'"; - // TODO(chhsiao): Validate required fields. - - if (!volumes.contains(request->volume_id())) { - return Status( - grpc::NOT_FOUND, - "Volume '" + request->volume_id() + "' is not found"); - } - - const VolumeInfo& volumeInfo = volumes.at(request->volume_id()); - const string path = getVolumePath(volumeInfo); - - auto it = request->volume_attributes().find("path"); - if (it == request->volume_attributes().end() || it->second != path) { - return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes"); - } - - if (!os::exists(request->staging_target_path())) { - return Status( - grpc::INVALID_ARGUMENT, - "Target path '" + request->staging_target_path() + "' is not found"); - } + // TODO(chhsiao): Validate the request. - Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); - if (table.isError()) { - return Status( - grpc::INTERNAL, - "Failed to get mount table: " + table.error()); - } - - foreach (const fs::MountInfoTable::Entry& entry, table->entries) { - if (entry.target == request->staging_target_path()) { - return Status::OK; - } - } - - Try<Nothing> mount = fs::mount( - path, + Try<Nothing, StatusError> result = nodeStageVolume( + request->volume_id(), + request->publish_info(), request->staging_target_path(), - None(), - MS_BIND, - None()); + mesos::csi::v0::devolve(request->volume_capability()), + request->volume_attributes()); - if (mount.isError()) { - return Status( - grpc::INTERNAL, - "Failed to mount from '" + path + "' to '" + - request->staging_target_path() + "': " + mount.error()); + if (result.isError()) { + return result.error().status; } return Status::OK; @@ -718,39 +646,13 @@ Status TestCSIPlugin::NodeUnstageVolume( { LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'"; - // TODO(chhsiao): Validate required fields. - - if (!volumes.contains(request->volume_id())) { - return Status( - grpc::NOT_FOUND, - "Volume '" + request->volume_id() + "' is not found"); - } - - Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); - if (table.isError()) { - return Status( - grpc::INTERNAL, - "Failed to get mount table: " + table.error()); - } - - bool found = false; - foreach (const fs::MountInfoTable::Entry& entry, table->entries) { - if (entry.target == request->staging_target_path()) { - found = true; - break; - } - } + // TODO(chhsiao): Validate the request. - if (!found) { - return Status::OK; - } + Try<Nothing, StatusError> result = + nodeUnstageVolume(request->volume_id(), request->staging_target_path()); - Try<Nothing> unmount = fs::unmount(request->staging_target_path()); - if (unmount.isError()) { - return Status( - grpc::INTERNAL, - "Failed to unmount '" + request->staging_target_path() + - "': " + unmount.error()); + if (result.isError()) { + return result.error().status; } return Status::OK; @@ -764,73 +666,19 @@ Status TestCSIPlugin::NodePublishVolume( { LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'"; - // TODO(chhsiao): Validate required fields. - - if (!volumes.contains(request->volume_id())) { - return Status( - grpc::NOT_FOUND, - "Volume '" + request->volume_id() + "' is not found"); - } - - const VolumeInfo& volumeInfo = volumes.at(request->volume_id()); - const string path = getVolumePath(volumeInfo); - - auto it = request->volume_attributes().find("path"); - if (it == request->volume_attributes().end() || it->second != path) { - return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes"); - } - - if (!os::exists(request->target_path())) { - return Status( - grpc::INVALID_ARGUMENT, - "Target path '" + request->target_path() + "' is not found"); - } - - if (request->staging_target_path().empty()) { - return Status( - grpc::FAILED_PRECONDITION, - "Expecting 'staging_target_path' to be set"); - } - - Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); - if (table.isError()) { - return Status( - grpc::INTERNAL, - "Failed to get mount table: " + table.error()); - } - - bool found = false; - foreach (const fs::MountInfoTable::Entry& entry, table->entries) { - if (entry.target == request->staging_target_path()) { - found = true; - break; - } - } - - if (!found) { - return Status( - grpc::FAILED_PRECONDITION, - "Volume '" + request->volume_id() + "' has not been staged yet"); - } - - foreach (const fs::MountInfoTable::Entry& entry, table->entries) { - if (entry.target == request->target_path()) { - return Status::OK; - } - } + // TODO(chhsiao): Validate the request. - Try<Nothing> mount = fs::mount( + Try<Nothing, StatusError> result = nodePublishVolume( + request->volume_id(), + request->publish_info(), request->staging_target_path(), request->target_path(), - None(), - MS_BIND | (request->readonly() ? MS_RDONLY : 0), - None()); + mesos::csi::v0::devolve(request->volume_capability()), + request->readonly(), + request->volume_attributes()); - if (mount.isError()) { - return Status( - grpc::INTERNAL, - "Failed to mount from '" + path + "' to '" + - request->target_path() + "': " + mount.error()); + if (result.isError()) { + return result.error().status; } return Status::OK; @@ -844,37 +692,13 @@ Status TestCSIPlugin::NodeUnpublishVolume( { LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'"; - if (!volumes.contains(request->volume_id())) { - return Status( - grpc::NOT_FOUND, - "Volume '" + request->volume_id() + "' is not found"); - } - - Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); - if (table.isError()) { - return Status( - grpc::INTERNAL, - "Failed to get mount table: " + table.error()); - } - - bool found = false; - foreach (const fs::MountInfoTable::Entry& entry, table->entries) { - if (entry.target == request->target_path()) { - found = true; - break; - } - } + // TODO(chhsiao): Validate the request. - if (!found) { - return Status::OK; - } + Try<Nothing, StatusError> result = + nodeUnpublishVolume(request->volume_id(), request->target_path()); - Try<Nothing> unmount = fs::unmount(request->target_path()); - if (unmount.isError()) { - return Status( - grpc::INTERNAL, - "Failed to unmount '" + request->target_path() + - "': " + unmount.error()); + if (result.isError()) { + return result.error().status; } return Status::OK; @@ -940,6 +764,443 @@ Try<TestCSIPlugin::VolumeInfo> TestCSIPlugin::parseVolumePath( } +Try<TestCSIPlugin::VolumeInfo, StatusError> TestCSIPlugin::createVolume( + const string& name, + const Bytes& requiredBytes, + const Bytes& limitBytes, + const RepeatedPtrField<VolumeCapability>& capabilities, + const Map<string, string> parameters) +{ + // The volume ID is determined by `name`, with reserved characters escaped. + const string volumeId = http::encode(name); + + foreach (const VolumeCapability& capability, capabilities) { + if (capability != defaultVolumeCapability) { + return StatusError(Status( + grpc::INVALID_ARGUMENT, "Unsupported volume capabilities")); + } + } + + if (parameters != createParameters) { + return StatusError(Status( + grpc::INVALID_ARGUMENT, "Unsupported create parameters")); + } + + if (volumes.contains(volumeId)) { + const VolumeInfo& volumeInfo = volumes.at(volumeId); + + if (volumeInfo.size > limitBytes) { + return StatusError(Status( + grpc::ALREADY_EXISTS, "Cannot satisfy limit bytes")); + } + + if (volumeInfo.size < requiredBytes) { + return StatusError(Status( + grpc::ALREADY_EXISTS, "Cannot satisfy required bytes")); + } + + return volumeInfo; + } else { + if (availableCapacity < requiredBytes) { + return StatusError(Status(grpc::OUT_OF_RANGE, "Insufficient capacity")); + } + + VolumeInfo volumeInfo; + volumeInfo.id = volumeId; + + // We assume that `requiredBytes <= limitBytes` has been verified. + const Bytes defaultSize = min(availableCapacity, DEFAULT_VOLUME_CAPACITY); + volumeInfo.size = min(max(defaultSize, requiredBytes), limitBytes); + + const string path = getVolumePath(volumeInfo); + + Try<Nothing> mkdir = os::mkdir(path); + if (mkdir.isError()) { + return StatusError(Status( + grpc::INTERNAL, + "Failed to create volume '" + volumeInfo.id + "': " + mkdir.error())); + } + + CHECK_GE(availableCapacity, volumeInfo.size); + availableCapacity -= volumeInfo.size; + volumes.put(volumeInfo.id, volumeInfo); + + return volumeInfo; + } + + UNREACHABLE(); +} + + +Try<Nothing, StatusError> TestCSIPlugin::deleteVolume(const string& volumeId) +{ + if (!volumes.contains(volumeId)) { + // Return a success for idempotency. + return Nothing(); + } + + const VolumeInfo& volumeInfo = volumes.at(volumeId); + const string path = getVolumePath(volumeInfo); + + Try<Nothing> rmdir = os::rmdir(path); + if (rmdir.isError()) { + return StatusError(Status( + grpc::INTERNAL, + "Failed to delete volume '" + volumeId + "': " + rmdir.error())); + } + + availableCapacity += volumeInfo.size; + volumes.erase(volumeInfo.id); + + return Nothing(); +} + + +Try<Nothing, StatusError> TestCSIPlugin::controllerPublishVolume( + const string& volumeId, + const string& nodeId, + const VolumeCapability& capability, + bool readonly, + const Map<string, string>& volumeContext) +{ + if (!volumes.contains(volumeId)) { + return StatusError(Status( + grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist")); + } + + if (nodeId != NODE_ID) { + return StatusError(Status( + grpc::NOT_FOUND, "Node '" + nodeId + "' does not exist")); + } + + if (capability != defaultVolumeCapability) { + return StatusError(Status( + grpc::INVALID_ARGUMENT, "Unsupported volume capability")); + } + + if (readonly) { + return StatusError(Status( + grpc::INVALID_ARGUMENT, "Unsupported read-only mode")); + } + + const VolumeInfo& volumeInfo = volumes.at(volumeId); + const string path = getVolumePath(volumeInfo); + + if (!volumeContext.count("path") || volumeContext.at("path") != path) { + return StatusError(Status( + grpc::INVALID_ARGUMENT, "Invalid volume context")); + } + + // Do nothing. + return Nothing(); +} + + +Try<Nothing, StatusError> TestCSIPlugin::controllerUnpublishVolume( + const string& volumeId, const string& nodeId) +{ + if (!volumes.contains(volumeId)) { + return StatusError(Status( + grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist")); + } + + if (nodeId != NODE_ID) { + return StatusError(Status( + grpc::NOT_FOUND, "Node '" + nodeId + "' does not exist")); + } + + // Do nothing. + return Nothing(); +} + + +Try<Option<Error>, StatusError> TestCSIPlugin::validateVolumeCapabilities( + const string& volumeId, + const Map<string, string>& volumeContext, + const RepeatedPtrField<VolumeCapability>& capabilities, + const Option<Map<string, string>>& parameters) +{ + if (!volumes.contains(volumeId)) { + return StatusError(Status( + grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist")); + } + + const VolumeInfo& volumeInfo = volumes.at(volumeId); + const string path = getVolumePath(volumeInfo); + + if (!volumeContext.count("path") || volumeContext.at("path") != path) { + return StatusError(Status( + grpc::INVALID_ARGUMENT, "Invalid volume context")); + } + + foreach (const VolumeCapability& capability, capabilities) { + if (capability != defaultVolumeCapability) { + return Some(Error("Unsupported volume capabilities")); + } + } + + if (parameters.isSome() && parameters.get() != createParameters) { + return Some(Error("Mismatched parameters")); + } + + return None(); +} + + +Try<vector<TestCSIPlugin::VolumeInfo>, StatusError> TestCSIPlugin::listVolumes( + const Option<int32_t>& maxEntries, + const Option<string>& startingToken) +{ + // TODO(chhsiao): Support max entries. + if (maxEntries.isSome()) { + return StatusError(Status( + grpc::ABORTED, "Specifying max entries is not supported")); + } + + // TODO(chhsiao): Support starting token. + if (startingToken.isSome()) { + return StatusError(Status( + grpc::ABORTED, "Specifying starting token is not supported")); + } + + return volumes.values(); +} + + +Try<Bytes, StatusError> TestCSIPlugin::getCapacity( + const RepeatedPtrField<VolumeCapability>& capabilities, + const Map<string, string>& parameters) +{ + // We report zero capacity if any capability other than the default mount + // volume capability is given. If no capacity is given, the total available + // capacity will be returned. + foreach (const VolumeCapability& capability, capabilities) { + if (capability != defaultVolumeCapability) { + return Bytes(0); + } + } + + if (parameters != createParameters) { + return Bytes(0); + } + + return availableCapacity; +} + + +Try<Nothing, StatusError> TestCSIPlugin::nodeStageVolume( + const string& volumeId, + const Map<string, string>& publishContext, + const string& stagingPath, + const VolumeCapability& capability, + const Map<string, string>& volumeContext) +{ + if (!volumes.contains(volumeId)) { + return StatusError(Status( + grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist")); + } + + if (!publishContext.empty()) { + return StatusError(Status( + grpc::INVALID_ARGUMENT, "Invalid publish context")); + } + + if (!os::exists(stagingPath)) { + return StatusError(Status( + grpc::INVALID_ARGUMENT, + "Staging path '" + stagingPath + "' does not exist")); + } + + if (capability != defaultVolumeCapability) { + return StatusError(Status( + grpc::INVALID_ARGUMENT, "Unsupported volume capability")); + } + + const VolumeInfo& volumeInfo = volumes.at(volumeId); + const string path = getVolumePath(volumeInfo); + + if (!volumeContext.count("path") || volumeContext.at("path") != path) { + return StatusError(Status( + grpc::INVALID_ARGUMENT, "Invalid volume context")); + } + + Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); + if (table.isError()) { + return StatusError(Status( + grpc::INTERNAL, "Failed to get mount table: " + table.error())); + } + + if (std::any_of( + table->entries.begin(), + table->entries.end(), + [&](const fs::MountInfoTable::Entry& entry) { + return entry.target == stagingPath; + })) { + return Nothing(); + } + + Try<Nothing> mount = fs::mount(path, stagingPath, None(), MS_BIND, None()); + if (mount.isError()) { + return StatusError(Status( + grpc::INTERNAL, + "Failed to mount from '" + path + "' to '" + stagingPath + + "': " + mount.error())); + } + + return Nothing(); +} + + +Try<Nothing, StatusError> TestCSIPlugin::nodeUnstageVolume( + const string& volumeId, const string& stagingPath) +{ + if (!volumes.contains(volumeId)) { + return StatusError(Status( + grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist")); + } + + Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); + if (table.isError()) { + return StatusError(Status( + grpc::INTERNAL, "Failed to get mount table: " + table.error())); + } + + if (std::none_of( + table->entries.begin(), + table->entries.end(), + [&](const fs::MountInfoTable::Entry& entry) { + return entry.target == stagingPath; + })) { + return Nothing(); + } + + Try<Nothing> unmount = fs::unmount(stagingPath); + if (unmount.isError()) { + return StatusError(Status( + grpc::INTERNAL, + "Failed to unmount '" + stagingPath + "': " + unmount.error())); + } + + return Nothing(); +} + + +Try<Nothing, StatusError> TestCSIPlugin::nodePublishVolume( + const string& volumeId, + const Map<string, string>& publishContext, + const string& stagingPath, + const string& targetPath, + const VolumeCapability& capability, + bool readonly, + const Map<string, string>& volumeContext) +{ + if (!volumes.contains(volumeId)) { + return StatusError(Status( + grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist")); + } + + if (!publishContext.empty()) { + return StatusError(Status( + grpc::INVALID_ARGUMENT, "Invalid publish context")); + } + + if (!os::exists(targetPath)) { + return StatusError(Status( + grpc::INVALID_ARGUMENT, + "Target path '" + targetPath + "' does not exist")); + } + + if (capability != defaultVolumeCapability) { + return StatusError(Status( + grpc::INVALID_ARGUMENT, "Unsupported volume capability")); + } + + const VolumeInfo& volumeInfo = volumes.at(volumeId); + const string path = getVolumePath(volumeInfo); + + if (!volumeContext.count("path") || volumeContext.at("path") != path) { + return StatusError(Status( + grpc::INVALID_ARGUMENT, "Invalid volume context")); + } + + Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); + if (table.isError()) { + return StatusError(Status( + grpc::INTERNAL, "Failed to get mount table: " + table.error())); + } + + if (std::none_of( + table->entries.begin(), + table->entries.end(), + [&](const fs::MountInfoTable::Entry& entry) { + return entry.target == stagingPath; + })) { + return StatusError(Status( + grpc::FAILED_PRECONDITION, + "Volume '" + volumeId + "' has not been staged yet")); + } + + if (std::any_of( + table->entries.begin(), + table->entries.end(), + [&](const fs::MountInfoTable::Entry& entry) { + return entry.target == targetPath; + })) { + return Nothing(); + } + + Try<Nothing> mount = fs::mount( + stagingPath, + targetPath, + None(), + MS_BIND | (readonly ? MS_RDONLY : 0), + None()); + + if (mount.isError()) { + return StatusError(Status( + grpc::INTERNAL, + "Failed to mount from '" + stagingPath + "' to '" + targetPath + + "': " + mount.error())); + } + + return Nothing(); +} + + +Try<Nothing, StatusError> TestCSIPlugin::nodeUnpublishVolume( + const string& volumeId, const string& targetPath) +{ + if (!volumes.contains(volumeId)) { + return StatusError(Status( + grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist")); + } + + Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); + if (table.isError()) { + return StatusError(Status( + grpc::INTERNAL, "Failed to get mount table: " + table.error())); + } + + if (std::none_of( + table->entries.begin(), + table->entries.end(), + [&](const fs::MountInfoTable::Entry& entry) { + return entry.target == targetPath; + })) { + return Nothing(); + } + + Try<Nothing> unmount = fs::unmount(targetPath); + if (unmount.isError()) { + return StatusError(Status( + grpc::INTERNAL, + "Failed to unmount '" + targetPath + "': " + unmount.error())); + } + + return Nothing(); +} + + // Serves CSI calls from the given endpoint through forwarding the calls to // another CSI endpoint and returning back the results. class CSIProxy
