Added example VolumeProfile module.

This example module shows how a VolumeProfile module might be
implemented (and is a viable module in its own right).  The module
can be configured to fetch a map of profiles from a URI (`file://` or
`http(s)://`) and possibly cache this item for some time.

Review: https://reviews.apache.org/r/64353


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/343776d5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/343776d5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/343776d5

Branch: refs/heads/master
Commit: 343776d5ea1977fa3fed7fe7c2996c37b5ba5168
Parents: 86548e1
Author: Joseph Wu <[email protected]>
Authored: Tue Nov 28 13:40:07 2017 -0800
Committer: Joseph Wu <[email protected]>
Committed: Mon Dec 18 19:06:20 2017 -0800

----------------------------------------------------------------------
 src/Makefile.am                              |  14 +
 src/csi/uri_volume_profile.proto             |  43 ++
 src/resource_provider/uri_volume_profile.cpp | 454 ++++++++++++++++++++++
 src/resource_provider/uri_volume_profile.hpp | 266 +++++++++++++
 4 files changed, 777 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/343776d5/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 9c8daf5..1e1fe0c 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2332,6 +2332,18 @@ libload_qos_controller_la_SOURCES += 
slave/qos_controllers/load.cpp
 libload_qos_controller_la_CPPFLAGS = $(MESOS_CPPFLAGS)
 libload_qos_controller_la_LDFLAGS = $(MESOS_MODULE_LDFLAGS)
 
+# Library containing the URI volume profile module.
+if ENABLE_GRPC
+pkgmodule_LTLIBRARIES += liburi_volume_profile.la
+liburi_volume_profile_la_SOURCES =                             \
+  csi/uri_volume_profile.pb.cc                                 \
+  csi/uri_volume_profile.pb.h                                  \
+  resource_provider/uri_volume_profile.cpp                     \
+  resource_provider/uri_volume_profile.hpp
+liburi_volume_profile_la_CPPFLAGS = $(MESOS_CPPFLAGS)
+liburi_volume_profile_la_LDFLAGS = $(MESOS_MODULE_LDFLAGS)
+endif
+
 MESOS_TEST_MODULE_LDFLAGS = $(MESOS_MODULE_LDFLAGS)
 
 # Even if we are not installing the test suite, we still need to build
@@ -2435,6 +2447,8 @@ noinst_LTLIBRARIES += $(MESOS_TEST_MODULES)
 endif
 
 mesos_tests_SOURCES =                                          \
+  csi/uri_volume_profile.pb.cc                                 \
+  resource_provider/uri_volume_profile.cpp                     \
   slave/qos_controllers/load.cpp                               \
   tests/active_user_test_helper.cpp                            \
   tests/agent_container_api_tests.cpp                          \

http://git-wip-us.apache.org/repos/asf/mesos/blob/343776d5/src/csi/uri_volume_profile.proto
----------------------------------------------------------------------
diff --git a/src/csi/uri_volume_profile.proto b/src/csi/uri_volume_profile.proto
new file mode 100644
index 0000000..a3dfc5d
--- /dev/null
+++ b/src/csi/uri_volume_profile.proto
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "csi.proto";
+
+package mesos.csi;
+
+
+message UriVolumeProfileMapping {
+  message CSIManifest {
+    // Capabilities used for creating, publishing, and validating volumes.
+    // This field is REQUIRED.
+    //
+    // NOTE: The name of this field is plural because some CSI requests
+    // support multiple capabilities. However, Mesos currently does not
+    // support this.
+    .csi.VolumeCapability volume_capabilities = 1;
+
+    // Parameters passed to the CSI CreateVolume RPC.
+    // This field is OPTIONAL.
+    map<string, string> create_parameters = 2;
+  }
+
+  // Each map entry associates a profile name (type string) with the CSI
+  // capabilities and parameters used to make specific CSI requests.
+  // This field is OPTIONAL.
+  map<string, CSIManifest> profile_matrix = 1;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/343776d5/src/resource_provider/uri_volume_profile.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/uri_volume_profile.cpp 
b/src/resource_provider/uri_volume_profile.cpp
new file mode 100644
index 0000000..9dc0e6c
--- /dev/null
+++ b/src/resource_provider/uri_volume_profile.cpp
@@ -0,0 +1,454 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <map>
+#include <string>
+#include <tuple>
+
+#include <google/protobuf/util/json_util.h>
+
+#include <mesos/mesos.hpp>
+
+#include <mesos/module/volume_profile.hpp>
+
+#include <mesos/resource_provider/volume_profile.hpp>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/owned.hpp>
+#include <process/socket.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/error.hpp>
+#include <stout/json.hpp>
+#include <stout/option.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/result.hpp>
+#include <stout/strings.hpp>
+
+#include <csi/spec.hpp>
+
+#include "resource_provider/uri_volume_profile.hpp"
+
+using namespace mesos;
+using namespace process;
+
+using std::map;
+using std::string;
+using std::tuple;
+
+using google::protobuf::Map;
+
+using mesos::csi::UriVolumeProfileMapping;
+
+
+namespace csi {
+
+bool operator==(const VolumeCapability& left, const VolumeCapability& right) {
+  // NOTE: This enumeration is set when `block` or `mount` are set and
+  // covers the case where neither are set.
+  if (left.access_type_case() != right.access_type_case()) {
+    return false;
+  }
+
+  // NOTE: No need to check `block` for equality as that object is empty.
+
+  if (left.has_mount()) {
+    if (left.mount().fs_type() != right.mount().fs_type()) {
+      return false;
+    }
+
+    if (left.mount().mount_flags_size() != right.mount().mount_flags_size()) {
+      return false;
+    }
+
+    // NOTE: Ordering may or may not matter for these flags, but this helper
+    // only checks for complete equality.
+    for (int i = 0; i < left.mount().mount_flags_size(); i++) {
+      if (left.mount().mount_flags(i) != right.mount().mount_flags(i)) {
+        return false;
+      }
+    }
+  }
+
+  if (left.has_access_mode() != right.has_access_mode()) {
+    return false;
+  }
+
+  if (left.has_access_mode()) {
+    if (left.access_mode().mode() != right.access_mode().mode()) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+} // namespace csi {
+
+namespace mesos {
+namespace internal {
+namespace profile {
+
+bool operator==(
+    const Map<string, string>& left,
+    const Map<string, string>& right) {
+  if (left.size() != right.size()) {
+    return false;
+  }
+
+  typename Map<string, string>::const_iterator iterator = left.begin();
+  while (iterator != left.end()) {
+    if (right.count(iterator->first) != 1) {
+      return false;
+    }
+
+    if (iterator->second != right.at(iterator->first)) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+
+UriVolumeProfileAdaptor::UriVolumeProfileAdaptor(const Flags& _flags)
+  : flags(_flags),
+    process(new UriVolumeProfileAdaptorProcess(flags))
+{
+  spawn(process.get());
+}
+
+
+UriVolumeProfileAdaptor::~UriVolumeProfileAdaptor()
+{
+  terminate(process.get());
+  wait(process.get());
+}
+
+
+Future<VolumeProfileAdaptor::ProfileInfo> UriVolumeProfileAdaptor::translate(
+    const string& profile,
+    const std::string& csiPluginInfoType)
+{
+  return dispatch(
+      process.get(),
+      &UriVolumeProfileAdaptorProcess::translate,
+      profile,
+      csiPluginInfoType);
+}
+
+
+Future<hashset<string>> UriVolumeProfileAdaptor::watch(
+    const hashset<string>& knownProfiles,
+    const std::string& csiPluginInfoType)
+{
+  return dispatch(
+      process.get(),
+      &UriVolumeProfileAdaptorProcess::watch,
+      knownProfiles,
+      csiPluginInfoType);
+}
+
+
+UriVolumeProfileAdaptorProcess::UriVolumeProfileAdaptorProcess(
+    const Flags& _flags)
+  : ProcessBase(ID::generate("uri-volume-profile")),
+    flags(_flags),
+    watchPromise(new Promise<hashset<string>>()) {}
+
+
+void UriVolumeProfileAdaptorProcess::initialize()
+{
+  poll();
+}
+
+
+Future<VolumeProfileAdaptor::ProfileInfo>
+  UriVolumeProfileAdaptorProcess::translate(
+      const string& profile,
+      const std::string& csiPluginInfoType)
+{
+  if (data.count(profile) != 1) {
+    return Failure("Profile '" + profile + "' not found");
+  }
+
+  return data.at(profile);
+}
+
+
+Future<hashset<string>> UriVolumeProfileAdaptorProcess::watch(
+    const hashset<string>& knownProfiles,
+    const std::string& csiPluginInfoType)
+{
+  if (profiles != knownProfiles) {
+    return profiles;
+  }
+
+  return watchPromise->future();
+}
+
+
+void UriVolumeProfileAdaptorProcess::poll()
+{
+  // NOTE: The flags do not allow relative paths, so this is guaranteed to
+  // be either 'http://' or 'https://'.
+  if (strings::startsWith(flags.uri, "http")) {
+    // NOTE: We already validated that this URI is parsable in the flags.
+    Try<http::URL> url = http::URL::parse(flags.uri.string());
+    CHECK_SOME(url);
+
+    http::get(url.get())
+      .onAny(defer(self(), [=](const Future<http::Response>& future) {
+        if (future.isReady()) {
+          // NOTE: We don't check the HTTP status code because we don't know
+          // what potential codes are considered successful.
+          _poll(future->body);
+        } else if (future.isFailed()) {
+          _poll(Error(future.failure()));
+        } else {
+          _poll(Error("Future discarded or abandoned"));
+        }
+      }));
+  } else {
+    _poll(os::read(flags.uri.string()));
+  }
+}
+
+
+void UriVolumeProfileAdaptorProcess::_poll(const Try<string>& fetched)
+{
+  if (fetched.isSome()) {
+    Try<UriVolumeProfileMapping> parsed = parse(fetched.get());
+
+    if (parsed.isSome()) {
+      notify(parsed.get());
+    } else {
+      LOG(ERROR) << "Failed to parse result: " << parsed.error();
+    }
+  } else {
+    LOG(WARNING) << "Failed to poll URI: " << fetched.error();
+  }
+
+  // TODO(josephw): Do we want to retry if polling fails and no polling
+  // interval is set? Or perhaps we should exit in that case?
+  if (flags.poll_interval.isSome()) {
+    delay(flags.poll_interval.get(), self(), &Self::poll);
+  }
+}
+
+
+void UriVolumeProfileAdaptorProcess::notify(
+    const UriVolumeProfileMapping& parsed)
+{
+  bool hasErrors = false;
+
+  foreachkey (const string& profile, data) {
+    if (parsed.profile_matrix().count(profile) != 1) {
+      hasErrors = true;
+
+      LOG(WARNING)
+        << "Fetched profile mapping does not contain profile '" << profile
+        << "'. The fetched mapping will be ignored entirely";
+      continue;
+    }
+
+    bool matchingCapability =
+      data.at(profile).capability ==
+        parsed.profile_matrix().at(profile).volume_capabilities();
+
+    bool matchingParameters =
+      data.at(profile).parameters ==
+        parsed.profile_matrix().at(profile).create_parameters();
+
+    if (!matchingCapability || !matchingParameters) {
+      hasErrors = true;
+
+      LOG(WARNING)
+        << "Fetched profile mapping for profile '" << profile << "'"
+        << " does not match earlier data."
+        << " The fetched mapping will be ignored entirely";
+    }
+  }
+
+  // When encountering a data conflict, this module assumes there is a
+  // problem upstream (i.e. in the `--uri`). It is up to the operator
+  // to notice and resolve this.
+  if (hasErrors) {
+    return;
+  }
+
+  // Profiles can only be added, so if the parsed data is the same size,
+  // nothing has changed and no notifications need to be sent.
+  if (parsed.profile_matrix().size() <= data.size()) {
+    return;
+  }
+
+  // The fetched mapping satisfies our invariants.
+
+  // Save the protobuf as a map we can expose through the module interface.
+  // And update the convenience set of profile names.
+  profiles.clear();
+  auto iterator = parsed.profile_matrix().begin();
+  while (iterator != parsed.profile_matrix().end()) {
+    data[iterator->first] = {
+      iterator->second.volume_capabilities(),
+      iterator->second.create_parameters()
+    };
+
+    profiles.insert(iterator->first);
+    iterator++;
+  }
+
+  // Notify any watchers and then prepare a new promise for the next
+  // iteration of polling.
+  //
+  // TODO(josephw): Delay this based on the `--max_random_wait` option.
+  watchPromise->set(profiles);
+  watchPromise.reset(new Promise<hashset<string>>());
+
+  LOG(INFO)
+    << "Updated volume profile mapping to " << profiles.size()
+    << " total profiles";
+}
+
+
+Try<UriVolumeProfileMapping> UriVolumeProfileAdaptorProcess::parse(
+    const string& data)
+{
+  // Use Google's JSON utility function to parse the JSON string.
+  UriVolumeProfileMapping output;
+  google::protobuf::util::JsonParseOptions options;
+  options.ignore_unknown_fields = true;
+
+  google::protobuf::util::Status status =
+    google::protobuf::util::JsonStringToMessage(data, &output, options);
+
+  if (!status.ok()) {
+    return Error(
+        "Failed to parse UriVolumeProfileMapping message: "
+        + status.ToString());
+  }
+
+  Option<Error> validation = validate(output);
+  if (validation.isSome()) {
+    return Error(
+      "Fetched profile mapping failed validation with: " + 
validation->message);
+  }
+
+  return output;
+}
+
+
+Option<Error> UriVolumeProfileAdaptorProcess::validate(
+    const UriVolumeProfileMapping& mapping)
+{
+  auto iterator = mapping.profile_matrix().begin();
+  while (iterator != mapping.profile_matrix().end()) {
+    if (!iterator->second.has_volume_capabilities()) {
+      return Error(
+          "Profile '" + iterator->first + "' is missing the required field "
+          + "'volume_capabilities");
+    }
+
+    Option<Error> capabilities =
+      validate(iterator->second.volume_capabilities());
+
+    if (capabilities.isSome()) {
+      return Error(
+          "Profile '" + iterator->first + "' VolumeCapabilities are invalid: "
+          + capabilities->message);
+    }
+
+    // NOTE: The `create_parameters` field is optional and needs no further
+    // validation after parsing.
+
+    iterator++;
+  }
+
+  return None();
+}
+
+
+Option<Error> UriVolumeProfileAdaptorProcess::validate(
+    const csi::VolumeCapability& capability)
+{
+  if (capability.has_mount()) {
+    // The total size of this repeated field may not exceed 4 KB.
+    //
+    // TODO(josephw): The specification does not state how this maximum
+    // size is calculated. So this check is conservative and does not
+    // include padding or array separators in the size calculation.
+    size_t size = 0;
+    foreach (const string& flag, capability.mount().mount_flags()) {
+      size += flag.size();
+    }
+
+    if (Bytes(size) > Kilobytes(4)) {
+      return Error("Size of 'mount_flags' may not exceed 4 KB");
+    }
+  }
+
+  if (!capability.has_access_mode()) {
+    return Error("'access_mode' is a required field");
+  }
+
+  if (capability.access_mode().mode() ==
+      csi::VolumeCapability::AccessMode::UNKNOWN) {
+    return Error("'access_mode.mode' is unknown or not set");
+  }
+
+  return None();
+}
+
+} // namespace profile {
+} // namespace internal {
+} // namespace mesos {
+
+
+mesos::modules::Module<VolumeProfileAdaptor>
+org_apache_mesos_UriVolumeProfileAdaptor(
+    MESOS_MODULE_API_VERSION,
+    MESOS_VERSION,
+    "Apache Mesos",
+    "[email protected]",
+    "URI Volume Profile Adaptor module.",
+    nullptr,
+    [](const Parameters& parameters) -> VolumeProfileAdaptor* {
+      // Convert `parameters` into a map.
+      map<string, string> values;
+      foreach (const Parameter& parameter, parameters.parameter()) {
+        values[parameter.key()] = parameter.value();
+      }
+
+      // Load and validate flags from the map.
+      mesos::internal::profile::Flags flags;
+      Try<flags::Warnings> load = flags.load(values);
+
+      if (load.isError()) {
+        LOG(ERROR) << "Failed to parse parameters: " << load.error();
+        return nullptr;
+      }
+
+      // Log any flag warnings.
+      foreach (const flags::Warning& warning, load->warnings) {
+        LOG(WARNING) << warning.message;
+      }
+
+      return new mesos::internal::profile::UriVolumeProfileAdaptor(flags);
+    });

http://git-wip-us.apache.org/repos/asf/mesos/blob/343776d5/src/resource_provider/uri_volume_profile.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/uri_volume_profile.hpp 
b/src/resource_provider/uri_volume_profile.hpp
new file mode 100644
index 0000000..e6588f1
--- /dev/null
+++ b/src/resource_provider/uri_volume_profile.hpp
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __RESOURCE_PROVIDER_URI_VOLUME_PROFILE_HPP__
+#define __RESOURCE_PROVIDER_URI_VOLUME_PROFILE_HPP__
+
+#include <map>
+#include <string>
+#include <tuple>
+
+#include <mesos/resource_provider/volume_profile.hpp>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <process/ssl/flags.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/error.hpp>
+#include <stout/flags.hpp>
+#include <stout/option.hpp>
+#include <stout/path.hpp>
+#include <stout/strings.hpp>
+
+#include <csi/spec.hpp>
+
+// ONLY USEFUL AFTER RUNNING PROTOC.
+#include "csi/uri_volume_profile.pb.h"
+
+namespace mesos {
+namespace internal {
+namespace profile {
+
+// Forward declaration.
+class UriVolumeProfileAdaptorProcess;
+
+struct Flags : public virtual flags::FlagsBase
+{
+  Flags()
+  {
+    add(&Flags::uri,
+        "uri",
+        None(),
+        "URI to a JSON object containing the volume profile mapping.\n"
+        "This module supports both HTTP(s) and file URIs\n."
+        "\n"
+        "The JSON object should consist of some top-level string keys\n"
+        "corresponding to the volume profile name. Each value should\n"
+        "contain a `VolumeCapability` under a 'volume_capabilities'\n"
+        "and a free-form string-string mapping under 'create_parameters'.\n"
+        "\n"
+        "The JSON is modeled after a protobuf found in\n"
+        "`src/csi/uri_volume_profile.proto`.\n"
+        "\n"
+        "For example:\n"
+        "{\n"
+        "  \"profile_matrix\" : {\n"
+        "    \"my-profile\" : {\n"
+        "      \"volume_capabilities\" : {\n"
+        "        \"block\" : {},\n"
+        "        \"access_mode\" : { \"mode\" : \"SINGLE_NODE_WRITER\" }\n"
+        "      },\n"
+        "      \"create_parameters\" : {\n"
+        "        \"mesos-does-not\" : \"interpret-these\",\n"
+        "        \"type\" : \"raid5\",\n"
+        "        \"stripes\" : \"3\",\n"
+        "        \"stripesize\" : \"64\"\n"
+        "      }\n"
+        "    }\n"
+        "  }\n"
+        "}",
+        static_cast<const Path*>(nullptr),
+        [](const Path& value) -> Option<Error> {
+          // For now, just check if the URI has a supported scheme.
+          //
+          // TODO(josephw): Once we have a proper URI class and parser,
+          // consider validating this URI more thoroughly.
+          if (strings::startsWith(value.string(), "http://";)
+#ifdef USE_SSL_SOCKET
+              || (process::network::openssl::flags().enabled &&
+                  strings::startsWith(value.string(), "https://";))
+#endif // USE_SSL_SOCKET
+          ) {
+            Try<process::http::URL> url =
+              process::http::URL::parse(value.string());
+
+            if (url.isError()) {
+              return Error("Failed to parse URI: " + url.error());
+            }
+
+            return None();
+          }
+
+          // NOTE: The `Path` class will strip off the 'file://' prefix.
+          if (strings::contains(value.string(), "://")) {
+            return Error("--uri must use a supported scheme (file or 
http(s))");
+          }
+
+          // We only allow absolute paths for file paths.
+          if (!value.absolute()) {
+            return Error("--uri to a file must be an absolute path");
+          }
+
+          return None();
+        });
+
+    add(&Flags::poll_interval,
+        "poll_interval",
+        "How long to wait between polling the specified `--uri`.\n"
+        "The time is checked each time the `translate` method is called.\n"
+        "If the given time has elapsed, then the URI is re-fetched."
+        "If not specified, the URI is only fetched once.",
+        [](const Option<Duration>& value) -> Option<Error> {
+          if (value.isSome() && value.get() <= Seconds(0)) {
+            return Error("--poll_interval must be non-negative");
+          }
+
+          return None();
+        });
+
+    add(&Flags::max_random_wait,
+        "max_random_wait",
+        "How long at most to wait between discovering a new set of profiles\n"
+        "and notifying the callers of `watch`. The actual wait time is a\n"
+        "uniform random value between 0 and this value. If the `--uri` 
points\n"
+        "to a centralized location, it may be good to scale this number\n"
+        "according to the number of resource providers in the cluster.",
+        Seconds(0),
+        [](const Duration& value) -> Option<Error> {
+          if (value < Seconds(0)) {
+            return Error("--max_random_wait must be zero or greater");
+          }
+
+          return None();
+        });
+  }
+
+  // NOTE: We use the `Path` type here so that the stout flags parser
+  // does not attempt to read a file if given a `file://` prefixed value.
+  //
+  // TODO(josephw): Replace with a URI type when stout gets one.
+  Path uri;
+
+  Option<Duration> poll_interval;
+  Duration max_random_wait;
+};
+
+
+// The `UriVolumeProfileAdaptor` is an example VolumeProfile module that
+// takes a URI as a module parameter and fetches that URI periodically.
+// The fetched data is parsed into the required CSI protobufs
+// (which also acts as validation).
+//
+// If there is an error during fetching, any previously fetched results
+// will be used until fetching is successful.
+//
+// This module does not filter return results based on `CSIPluginInfo::type`
+// and assumes that all fetched profiles are meant for all resource providers.
+//
+// See `Flags` above for more information.
+class UriVolumeProfileAdaptor : public mesos::VolumeProfileAdaptor
+{
+public:
+  UriVolumeProfileAdaptor(const Flags& _flags);
+
+  virtual ~UriVolumeProfileAdaptor();
+
+  virtual process::Future<mesos::VolumeProfileAdaptor::ProfileInfo> translate(
+      const std::string& profile,
+      const std::string& csiPluginInfoType) override;
+
+  virtual process::Future<hashset<std::string>> watch(
+      const hashset<std::string>& knownProfiles,
+      const std::string& csiPluginInfoType) override;
+
+protected:
+  Flags flags;
+  process::Owned<UriVolumeProfileAdaptorProcess> process;
+};
+
+
+class UriVolumeProfileAdaptorProcess :
+  public process::Process<UriVolumeProfileAdaptorProcess>
+{
+public:
+  UriVolumeProfileAdaptorProcess(const Flags& _flags);
+
+  virtual void initialize() override;
+
+  process::Future<mesos::VolumeProfileAdaptor::ProfileInfo> translate(
+      const std::string& profile,
+      const std::string& csiPluginInfoType);
+
+  process::Future<hashset<std::string>> watch(
+      const hashset<std::string>& knownProfiles,
+      const std::string& csiPluginInfoType);
+
+private:
+  // Helpers for fetching the `--uri`.
+  // If `--poll_interval` is set, this method will dispatch to itself with
+  // a delay once the fetch is complete.
+  void poll();
+  void _poll(const Try<std::string>& fetched);
+
+  // Helper that is called upon successfully polling and parsing the `--uri`.
+  // This method will check the following conditions before updating the state
+  // of the module:
+  //   * All known profiles must be included in the updated set.
+  //   * All properties of known profiles must match those in the updated set.
+  void notify(const csi::UriVolumeProfileMapping& parsed);
+
+public:
+  // Helper for parsing a string as the expected data format.
+  // See the example string in the `--uri` help text for more details.
+  //
+  // NOTE: This method is public for testing purposes only.
+  static Try<csi::UriVolumeProfileMapping>
+    parse(const std::string& data);
+
+private:
+  // Checks the fields inside a `UriVolumeProfileMapping` according to the
+  // comments above the protobuf.
+  static Option<Error> validate(
+      const csi::UriVolumeProfileMapping& mapping);
+
+  // Checks the fields inside a `VolumeCapability` according to the
+  // comments above the protobuf.
+  static Option<Error> validate(const csi::VolumeCapability& capability);
+
+  Flags flags;
+
+  // The last fetched profile mapping.
+  // This module assumes that profiles can only be added and never removed.
+  // Once added, profiles cannot be changed either.
+  //
+  // TODO(josephw): Consider persisting this mapping across agent restarts.
+  std::map<std::string, mesos::VolumeProfileAdaptor::ProfileInfo> data;
+
+  // Convenience set of the keys in `data` above.
+  // This module does not filter based on `CSIPluginInfo::type`, so this
+  // is valid for all input to `watch(...)`.
+  hashset<std::string> profiles;
+
+  // Will be satisfied whenever `data` is changed.
+  process::Owned<process::Promise<hashset<std::string>>> watchPromise;
+};
+
+} // namespace profile {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __RESOURCE_PROVIDER_URI_VOLUME_PROFILE_HPP__

Reply via email to