Added example VolumeProfile module. This example module shows how a VolumeProfile module might be implemented (and is a viable module in its own right). The module can be configured to fetch a map of profiles from a URI (`file://` or `http(s)://`) and possibly cache this item for some time.
Review: https://reviews.apache.org/r/64353 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/343776d5 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/343776d5 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/343776d5 Branch: refs/heads/master Commit: 343776d5ea1977fa3fed7fe7c2996c37b5ba5168 Parents: 86548e1 Author: Joseph Wu <[email protected]> Authored: Tue Nov 28 13:40:07 2017 -0800 Committer: Joseph Wu <[email protected]> Committed: Mon Dec 18 19:06:20 2017 -0800 ---------------------------------------------------------------------- src/Makefile.am | 14 + src/csi/uri_volume_profile.proto | 43 ++ src/resource_provider/uri_volume_profile.cpp | 454 ++++++++++++++++++++++ src/resource_provider/uri_volume_profile.hpp | 266 +++++++++++++ 4 files changed, 777 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/343776d5/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 9c8daf5..1e1fe0c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -2332,6 +2332,18 @@ libload_qos_controller_la_SOURCES += slave/qos_controllers/load.cpp libload_qos_controller_la_CPPFLAGS = $(MESOS_CPPFLAGS) libload_qos_controller_la_LDFLAGS = $(MESOS_MODULE_LDFLAGS) +# Library containing the URI volume profile module. +if ENABLE_GRPC +pkgmodule_LTLIBRARIES += liburi_volume_profile.la +liburi_volume_profile_la_SOURCES = \ + csi/uri_volume_profile.pb.cc \ + csi/uri_volume_profile.pb.h \ + resource_provider/uri_volume_profile.cpp \ + resource_provider/uri_volume_profile.hpp +liburi_volume_profile_la_CPPFLAGS = $(MESOS_CPPFLAGS) +liburi_volume_profile_la_LDFLAGS = $(MESOS_MODULE_LDFLAGS) +endif + MESOS_TEST_MODULE_LDFLAGS = $(MESOS_MODULE_LDFLAGS) # Even if we are not installing the test suite, we still need to build @@ -2435,6 +2447,8 @@ noinst_LTLIBRARIES += $(MESOS_TEST_MODULES) endif mesos_tests_SOURCES = \ + csi/uri_volume_profile.pb.cc \ + resource_provider/uri_volume_profile.cpp \ slave/qos_controllers/load.cpp \ tests/active_user_test_helper.cpp \ tests/agent_container_api_tests.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/343776d5/src/csi/uri_volume_profile.proto ---------------------------------------------------------------------- diff --git a/src/csi/uri_volume_profile.proto b/src/csi/uri_volume_profile.proto new file mode 100644 index 0000000..a3dfc5d --- /dev/null +++ b/src/csi/uri_volume_profile.proto @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +import "csi.proto"; + +package mesos.csi; + + +message UriVolumeProfileMapping { + message CSIManifest { + // Capabilities used for creating, publishing, and validating volumes. + // This field is REQUIRED. + // + // NOTE: The name of this field is plural because some CSI requests + // support multiple capabilities. However, Mesos currently does not + // support this. + .csi.VolumeCapability volume_capabilities = 1; + + // Parameters passed to the CSI CreateVolume RPC. + // This field is OPTIONAL. + map<string, string> create_parameters = 2; + } + + // Each map entry associates a profile name (type string) with the CSI + // capabilities and parameters used to make specific CSI requests. + // This field is OPTIONAL. + map<string, CSIManifest> profile_matrix = 1; +} http://git-wip-us.apache.org/repos/asf/mesos/blob/343776d5/src/resource_provider/uri_volume_profile.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/uri_volume_profile.cpp b/src/resource_provider/uri_volume_profile.cpp new file mode 100644 index 0000000..9dc0e6c --- /dev/null +++ b/src/resource_provider/uri_volume_profile.cpp @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <map> +#include <string> +#include <tuple> + +#include <google/protobuf/util/json_util.h> + +#include <mesos/mesos.hpp> + +#include <mesos/module/volume_profile.hpp> + +#include <mesos/resource_provider/volume_profile.hpp> + +#include <process/defer.hpp> +#include <process/delay.hpp> +#include <process/dispatch.hpp> +#include <process/future.hpp> +#include <process/owned.hpp> +#include <process/socket.hpp> + +#include <stout/duration.hpp> +#include <stout/error.hpp> +#include <stout/json.hpp> +#include <stout/option.hpp> +#include <stout/protobuf.hpp> +#include <stout/result.hpp> +#include <stout/strings.hpp> + +#include <csi/spec.hpp> + +#include "resource_provider/uri_volume_profile.hpp" + +using namespace mesos; +using namespace process; + +using std::map; +using std::string; +using std::tuple; + +using google::protobuf::Map; + +using mesos::csi::UriVolumeProfileMapping; + + +namespace csi { + +bool operator==(const VolumeCapability& left, const VolumeCapability& right) { + // NOTE: This enumeration is set when `block` or `mount` are set and + // covers the case where neither are set. + if (left.access_type_case() != right.access_type_case()) { + return false; + } + + // NOTE: No need to check `block` for equality as that object is empty. + + if (left.has_mount()) { + if (left.mount().fs_type() != right.mount().fs_type()) { + return false; + } + + if (left.mount().mount_flags_size() != right.mount().mount_flags_size()) { + return false; + } + + // NOTE: Ordering may or may not matter for these flags, but this helper + // only checks for complete equality. + for (int i = 0; i < left.mount().mount_flags_size(); i++) { + if (left.mount().mount_flags(i) != right.mount().mount_flags(i)) { + return false; + } + } + } + + if (left.has_access_mode() != right.has_access_mode()) { + return false; + } + + if (left.has_access_mode()) { + if (left.access_mode().mode() != right.access_mode().mode()) { + return false; + } + } + + return true; +} + +} // namespace csi { + +namespace mesos { +namespace internal { +namespace profile { + +bool operator==( + const Map<string, string>& left, + const Map<string, string>& right) { + if (left.size() != right.size()) { + return false; + } + + typename Map<string, string>::const_iterator iterator = left.begin(); + while (iterator != left.end()) { + if (right.count(iterator->first) != 1) { + return false; + } + + if (iterator->second != right.at(iterator->first)) { + return false; + } + } + + return true; +} + + +UriVolumeProfileAdaptor::UriVolumeProfileAdaptor(const Flags& _flags) + : flags(_flags), + process(new UriVolumeProfileAdaptorProcess(flags)) +{ + spawn(process.get()); +} + + +UriVolumeProfileAdaptor::~UriVolumeProfileAdaptor() +{ + terminate(process.get()); + wait(process.get()); +} + + +Future<VolumeProfileAdaptor::ProfileInfo> UriVolumeProfileAdaptor::translate( + const string& profile, + const std::string& csiPluginInfoType) +{ + return dispatch( + process.get(), + &UriVolumeProfileAdaptorProcess::translate, + profile, + csiPluginInfoType); +} + + +Future<hashset<string>> UriVolumeProfileAdaptor::watch( + const hashset<string>& knownProfiles, + const std::string& csiPluginInfoType) +{ + return dispatch( + process.get(), + &UriVolumeProfileAdaptorProcess::watch, + knownProfiles, + csiPluginInfoType); +} + + +UriVolumeProfileAdaptorProcess::UriVolumeProfileAdaptorProcess( + const Flags& _flags) + : ProcessBase(ID::generate("uri-volume-profile")), + flags(_flags), + watchPromise(new Promise<hashset<string>>()) {} + + +void UriVolumeProfileAdaptorProcess::initialize() +{ + poll(); +} + + +Future<VolumeProfileAdaptor::ProfileInfo> + UriVolumeProfileAdaptorProcess::translate( + const string& profile, + const std::string& csiPluginInfoType) +{ + if (data.count(profile) != 1) { + return Failure("Profile '" + profile + "' not found"); + } + + return data.at(profile); +} + + +Future<hashset<string>> UriVolumeProfileAdaptorProcess::watch( + const hashset<string>& knownProfiles, + const std::string& csiPluginInfoType) +{ + if (profiles != knownProfiles) { + return profiles; + } + + return watchPromise->future(); +} + + +void UriVolumeProfileAdaptorProcess::poll() +{ + // NOTE: The flags do not allow relative paths, so this is guaranteed to + // be either 'http://' or 'https://'. + if (strings::startsWith(flags.uri, "http")) { + // NOTE: We already validated that this URI is parsable in the flags. + Try<http::URL> url = http::URL::parse(flags.uri.string()); + CHECK_SOME(url); + + http::get(url.get()) + .onAny(defer(self(), [=](const Future<http::Response>& future) { + if (future.isReady()) { + // NOTE: We don't check the HTTP status code because we don't know + // what potential codes are considered successful. + _poll(future->body); + } else if (future.isFailed()) { + _poll(Error(future.failure())); + } else { + _poll(Error("Future discarded or abandoned")); + } + })); + } else { + _poll(os::read(flags.uri.string())); + } +} + + +void UriVolumeProfileAdaptorProcess::_poll(const Try<string>& fetched) +{ + if (fetched.isSome()) { + Try<UriVolumeProfileMapping> parsed = parse(fetched.get()); + + if (parsed.isSome()) { + notify(parsed.get()); + } else { + LOG(ERROR) << "Failed to parse result: " << parsed.error(); + } + } else { + LOG(WARNING) << "Failed to poll URI: " << fetched.error(); + } + + // TODO(josephw): Do we want to retry if polling fails and no polling + // interval is set? Or perhaps we should exit in that case? + if (flags.poll_interval.isSome()) { + delay(flags.poll_interval.get(), self(), &Self::poll); + } +} + + +void UriVolumeProfileAdaptorProcess::notify( + const UriVolumeProfileMapping& parsed) +{ + bool hasErrors = false; + + foreachkey (const string& profile, data) { + if (parsed.profile_matrix().count(profile) != 1) { + hasErrors = true; + + LOG(WARNING) + << "Fetched profile mapping does not contain profile '" << profile + << "'. The fetched mapping will be ignored entirely"; + continue; + } + + bool matchingCapability = + data.at(profile).capability == + parsed.profile_matrix().at(profile).volume_capabilities(); + + bool matchingParameters = + data.at(profile).parameters == + parsed.profile_matrix().at(profile).create_parameters(); + + if (!matchingCapability || !matchingParameters) { + hasErrors = true; + + LOG(WARNING) + << "Fetched profile mapping for profile '" << profile << "'" + << " does not match earlier data." + << " The fetched mapping will be ignored entirely"; + } + } + + // When encountering a data conflict, this module assumes there is a + // problem upstream (i.e. in the `--uri`). It is up to the operator + // to notice and resolve this. + if (hasErrors) { + return; + } + + // Profiles can only be added, so if the parsed data is the same size, + // nothing has changed and no notifications need to be sent. + if (parsed.profile_matrix().size() <= data.size()) { + return; + } + + // The fetched mapping satisfies our invariants. + + // Save the protobuf as a map we can expose through the module interface. + // And update the convenience set of profile names. + profiles.clear(); + auto iterator = parsed.profile_matrix().begin(); + while (iterator != parsed.profile_matrix().end()) { + data[iterator->first] = { + iterator->second.volume_capabilities(), + iterator->second.create_parameters() + }; + + profiles.insert(iterator->first); + iterator++; + } + + // Notify any watchers and then prepare a new promise for the next + // iteration of polling. + // + // TODO(josephw): Delay this based on the `--max_random_wait` option. + watchPromise->set(profiles); + watchPromise.reset(new Promise<hashset<string>>()); + + LOG(INFO) + << "Updated volume profile mapping to " << profiles.size() + << " total profiles"; +} + + +Try<UriVolumeProfileMapping> UriVolumeProfileAdaptorProcess::parse( + const string& data) +{ + // Use Google's JSON utility function to parse the JSON string. + UriVolumeProfileMapping output; + google::protobuf::util::JsonParseOptions options; + options.ignore_unknown_fields = true; + + google::protobuf::util::Status status = + google::protobuf::util::JsonStringToMessage(data, &output, options); + + if (!status.ok()) { + return Error( + "Failed to parse UriVolumeProfileMapping message: " + + status.ToString()); + } + + Option<Error> validation = validate(output); + if (validation.isSome()) { + return Error( + "Fetched profile mapping failed validation with: " + validation->message); + } + + return output; +} + + +Option<Error> UriVolumeProfileAdaptorProcess::validate( + const UriVolumeProfileMapping& mapping) +{ + auto iterator = mapping.profile_matrix().begin(); + while (iterator != mapping.profile_matrix().end()) { + if (!iterator->second.has_volume_capabilities()) { + return Error( + "Profile '" + iterator->first + "' is missing the required field " + + "'volume_capabilities"); + } + + Option<Error> capabilities = + validate(iterator->second.volume_capabilities()); + + if (capabilities.isSome()) { + return Error( + "Profile '" + iterator->first + "' VolumeCapabilities are invalid: " + + capabilities->message); + } + + // NOTE: The `create_parameters` field is optional and needs no further + // validation after parsing. + + iterator++; + } + + return None(); +} + + +Option<Error> UriVolumeProfileAdaptorProcess::validate( + const csi::VolumeCapability& capability) +{ + if (capability.has_mount()) { + // The total size of this repeated field may not exceed 4 KB. + // + // TODO(josephw): The specification does not state how this maximum + // size is calculated. So this check is conservative and does not + // include padding or array separators in the size calculation. + size_t size = 0; + foreach (const string& flag, capability.mount().mount_flags()) { + size += flag.size(); + } + + if (Bytes(size) > Kilobytes(4)) { + return Error("Size of 'mount_flags' may not exceed 4 KB"); + } + } + + if (!capability.has_access_mode()) { + return Error("'access_mode' is a required field"); + } + + if (capability.access_mode().mode() == + csi::VolumeCapability::AccessMode::UNKNOWN) { + return Error("'access_mode.mode' is unknown or not set"); + } + + return None(); +} + +} // namespace profile { +} // namespace internal { +} // namespace mesos { + + +mesos::modules::Module<VolumeProfileAdaptor> +org_apache_mesos_UriVolumeProfileAdaptor( + MESOS_MODULE_API_VERSION, + MESOS_VERSION, + "Apache Mesos", + "[email protected]", + "URI Volume Profile Adaptor module.", + nullptr, + [](const Parameters& parameters) -> VolumeProfileAdaptor* { + // Convert `parameters` into a map. + map<string, string> values; + foreach (const Parameter& parameter, parameters.parameter()) { + values[parameter.key()] = parameter.value(); + } + + // Load and validate flags from the map. + mesos::internal::profile::Flags flags; + Try<flags::Warnings> load = flags.load(values); + + if (load.isError()) { + LOG(ERROR) << "Failed to parse parameters: " << load.error(); + return nullptr; + } + + // Log any flag warnings. + foreach (const flags::Warning& warning, load->warnings) { + LOG(WARNING) << warning.message; + } + + return new mesos::internal::profile::UriVolumeProfileAdaptor(flags); + }); http://git-wip-us.apache.org/repos/asf/mesos/blob/343776d5/src/resource_provider/uri_volume_profile.hpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/uri_volume_profile.hpp b/src/resource_provider/uri_volume_profile.hpp new file mode 100644 index 0000000..e6588f1 --- /dev/null +++ b/src/resource_provider/uri_volume_profile.hpp @@ -0,0 +1,266 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __RESOURCE_PROVIDER_URI_VOLUME_PROFILE_HPP__ +#define __RESOURCE_PROVIDER_URI_VOLUME_PROFILE_HPP__ + +#include <map> +#include <string> +#include <tuple> + +#include <mesos/resource_provider/volume_profile.hpp> + +#include <process/future.hpp> +#include <process/owned.hpp> +#include <process/process.hpp> + +#include <process/ssl/flags.hpp> + +#include <stout/duration.hpp> +#include <stout/error.hpp> +#include <stout/flags.hpp> +#include <stout/option.hpp> +#include <stout/path.hpp> +#include <stout/strings.hpp> + +#include <csi/spec.hpp> + +// ONLY USEFUL AFTER RUNNING PROTOC. +#include "csi/uri_volume_profile.pb.h" + +namespace mesos { +namespace internal { +namespace profile { + +// Forward declaration. +class UriVolumeProfileAdaptorProcess; + +struct Flags : public virtual flags::FlagsBase +{ + Flags() + { + add(&Flags::uri, + "uri", + None(), + "URI to a JSON object containing the volume profile mapping.\n" + "This module supports both HTTP(s) and file URIs\n." + "\n" + "The JSON object should consist of some top-level string keys\n" + "corresponding to the volume profile name. Each value should\n" + "contain a `VolumeCapability` under a 'volume_capabilities'\n" + "and a free-form string-string mapping under 'create_parameters'.\n" + "\n" + "The JSON is modeled after a protobuf found in\n" + "`src/csi/uri_volume_profile.proto`.\n" + "\n" + "For example:\n" + "{\n" + " \"profile_matrix\" : {\n" + " \"my-profile\" : {\n" + " \"volume_capabilities\" : {\n" + " \"block\" : {},\n" + " \"access_mode\" : { \"mode\" : \"SINGLE_NODE_WRITER\" }\n" + " },\n" + " \"create_parameters\" : {\n" + " \"mesos-does-not\" : \"interpret-these\",\n" + " \"type\" : \"raid5\",\n" + " \"stripes\" : \"3\",\n" + " \"stripesize\" : \"64\"\n" + " }\n" + " }\n" + " }\n" + "}", + static_cast<const Path*>(nullptr), + [](const Path& value) -> Option<Error> { + // For now, just check if the URI has a supported scheme. + // + // TODO(josephw): Once we have a proper URI class and parser, + // consider validating this URI more thoroughly. + if (strings::startsWith(value.string(), "http://") +#ifdef USE_SSL_SOCKET + || (process::network::openssl::flags().enabled && + strings::startsWith(value.string(), "https://")) +#endif // USE_SSL_SOCKET + ) { + Try<process::http::URL> url = + process::http::URL::parse(value.string()); + + if (url.isError()) { + return Error("Failed to parse URI: " + url.error()); + } + + return None(); + } + + // NOTE: The `Path` class will strip off the 'file://' prefix. + if (strings::contains(value.string(), "://")) { + return Error("--uri must use a supported scheme (file or http(s))"); + } + + // We only allow absolute paths for file paths. + if (!value.absolute()) { + return Error("--uri to a file must be an absolute path"); + } + + return None(); + }); + + add(&Flags::poll_interval, + "poll_interval", + "How long to wait between polling the specified `--uri`.\n" + "The time is checked each time the `translate` method is called.\n" + "If the given time has elapsed, then the URI is re-fetched." + "If not specified, the URI is only fetched once.", + [](const Option<Duration>& value) -> Option<Error> { + if (value.isSome() && value.get() <= Seconds(0)) { + return Error("--poll_interval must be non-negative"); + } + + return None(); + }); + + add(&Flags::max_random_wait, + "max_random_wait", + "How long at most to wait between discovering a new set of profiles\n" + "and notifying the callers of `watch`. The actual wait time is a\n" + "uniform random value between 0 and this value. If the `--uri` points\n" + "to a centralized location, it may be good to scale this number\n" + "according to the number of resource providers in the cluster.", + Seconds(0), + [](const Duration& value) -> Option<Error> { + if (value < Seconds(0)) { + return Error("--max_random_wait must be zero or greater"); + } + + return None(); + }); + } + + // NOTE: We use the `Path` type here so that the stout flags parser + // does not attempt to read a file if given a `file://` prefixed value. + // + // TODO(josephw): Replace with a URI type when stout gets one. + Path uri; + + Option<Duration> poll_interval; + Duration max_random_wait; +}; + + +// The `UriVolumeProfileAdaptor` is an example VolumeProfile module that +// takes a URI as a module parameter and fetches that URI periodically. +// The fetched data is parsed into the required CSI protobufs +// (which also acts as validation). +// +// If there is an error during fetching, any previously fetched results +// will be used until fetching is successful. +// +// This module does not filter return results based on `CSIPluginInfo::type` +// and assumes that all fetched profiles are meant for all resource providers. +// +// See `Flags` above for more information. +class UriVolumeProfileAdaptor : public mesos::VolumeProfileAdaptor +{ +public: + UriVolumeProfileAdaptor(const Flags& _flags); + + virtual ~UriVolumeProfileAdaptor(); + + virtual process::Future<mesos::VolumeProfileAdaptor::ProfileInfo> translate( + const std::string& profile, + const std::string& csiPluginInfoType) override; + + virtual process::Future<hashset<std::string>> watch( + const hashset<std::string>& knownProfiles, + const std::string& csiPluginInfoType) override; + +protected: + Flags flags; + process::Owned<UriVolumeProfileAdaptorProcess> process; +}; + + +class UriVolumeProfileAdaptorProcess : + public process::Process<UriVolumeProfileAdaptorProcess> +{ +public: + UriVolumeProfileAdaptorProcess(const Flags& _flags); + + virtual void initialize() override; + + process::Future<mesos::VolumeProfileAdaptor::ProfileInfo> translate( + const std::string& profile, + const std::string& csiPluginInfoType); + + process::Future<hashset<std::string>> watch( + const hashset<std::string>& knownProfiles, + const std::string& csiPluginInfoType); + +private: + // Helpers for fetching the `--uri`. + // If `--poll_interval` is set, this method will dispatch to itself with + // a delay once the fetch is complete. + void poll(); + void _poll(const Try<std::string>& fetched); + + // Helper that is called upon successfully polling and parsing the `--uri`. + // This method will check the following conditions before updating the state + // of the module: + // * All known profiles must be included in the updated set. + // * All properties of known profiles must match those in the updated set. + void notify(const csi::UriVolumeProfileMapping& parsed); + +public: + // Helper for parsing a string as the expected data format. + // See the example string in the `--uri` help text for more details. + // + // NOTE: This method is public for testing purposes only. + static Try<csi::UriVolumeProfileMapping> + parse(const std::string& data); + +private: + // Checks the fields inside a `UriVolumeProfileMapping` according to the + // comments above the protobuf. + static Option<Error> validate( + const csi::UriVolumeProfileMapping& mapping); + + // Checks the fields inside a `VolumeCapability` according to the + // comments above the protobuf. + static Option<Error> validate(const csi::VolumeCapability& capability); + + Flags flags; + + // The last fetched profile mapping. + // This module assumes that profiles can only be added and never removed. + // Once added, profiles cannot be changed either. + // + // TODO(josephw): Consider persisting this mapping across agent restarts. + std::map<std::string, mesos::VolumeProfileAdaptor::ProfileInfo> data; + + // Convenience set of the keys in `data` above. + // This module does not filter based on `CSIPluginInfo::type`, so this + // is valid for all input to `watch(...)`. + hashset<std::string> profiles; + + // Will be satisfied whenever `data` is changed. + process::Owned<process::Promise<hashset<std::string>>> watchPromise; +}; + +} // namespace profile { +} // namespace internal { +} // namespace mesos { + +#endif // __RESOURCE_PROVIDER_URI_VOLUME_PROFILE_HPP__
