Repository: mesos
Updated Branches:
  refs/heads/master 848767b4f -> adf4fa3f2


Added `LocalResourceProviderDaemon` methods to modify configs.

The `add()` and `update()` methods adds a new config file and updates an
existing config file of a local resource provider respectively. It will
then trigger a rolead on the resource provider asynchronously.

The `remove()` method removes the config file, and triggers the resource
provider to terminate asynchronously.

Review: https://reviews.apache.org/r/64439/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b9073b83
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b9073b83
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b9073b83

Branch: refs/heads/master
Commit: b9073b83d43739b472105a3d9c233dca51cabeea
Parents: 848767b
Author: Chun-Hung Hsiao <[email protected]>
Authored: Mon Dec 11 15:09:35 2017 -0800
Committer: Jie Yu <[email protected]>
Committed: Mon Dec 11 15:09:35 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/daemon.cpp | 277 +++++++++++++++++++++++++++++++---
 1 file changed, 252 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b9073b83/src/resource_provider/daemon.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/daemon.cpp b/src/resource_provider/daemon.cpp
index 6159982..7c783e3 100644
--- a/src/resource_provider/daemon.cpp
+++ b/src/resource_provider/daemon.cpp
@@ -63,6 +63,12 @@ using process::http::authentication::Principal;
 namespace mesos {
 namespace internal {
 
+// Directory under the resource provider config directory to store
+// temporarily files for adding and updating resource provider config
+// files atomically (all-or-nothing).
+constexpr char STAGING_DIR[] = ".staging";
+
+
 class LocalResourceProviderDaemonProcess
   : public Process<LocalResourceProviderDaemonProcess>
 {
@@ -86,22 +92,41 @@ public:
 
   void start(const SlaveID& _slaveId);
 
+  Future<bool> add(const ResourceProviderInfo& info);
+  Future<bool> update(const ResourceProviderInfo& info);
+  Future<bool> remove(const string& type, const string& name);
+
 protected:
   void initialize() override;
 
 private:
   struct ProviderData
   {
-    ProviderData(const ResourceProviderInfo& _info)
-      : info(_info) {}
+    ProviderData(const string& _path, const ResourceProviderInfo& _info)
+      : path(_path), info(_info), version(UUID::random()) {}
 
-    const ResourceProviderInfo info;
+    const string path;
+    ResourceProviderInfo info;
+
+    // The `version` is used to check if `provider` holds a resource
+    // provider instance that is in sync with the current config.
+    UUID version;
     Owned<LocalResourceProvider> provider;
   };
 
   Try<Nothing> load(const string& path);
-
-  Future<Nothing> launch(const string& type, const string& name);
+  Try<Nothing> save(const string& path, const ResourceProviderInfo& info);
+
+  // NOTE: `launch` should only be called once for each config version.
+  // It will pick up the latest config to launch the resource provider.
+  Future<Nothing> launch(
+      const string& type,
+      const string& name);
+  Future<Nothing> _launch(
+      const string& type,
+      const string& name,
+      const UUID& version,
+      const Option<string>& authToken);
 
   Future<Option<string>> generateAuthToken(const ResourceProviderInfo& info);
 
@@ -146,6 +171,121 @@ void LocalResourceProviderDaemonProcess::start(const 
SlaveID& _slaveId)
 }
 
 
+Future<bool> LocalResourceProviderDaemonProcess::add(
+    const ResourceProviderInfo& info)
+{
+  if (configDir.isNone()) {
+    return Failure("`--resource_provider_config_dir` must be specified");
+  }
+
+  if (providers[info.type()].contains(info.name())) {
+    return false;
+  }
+
+  // Generate a filename for the the config.
+  // NOTE: We use the following template `<type>.<name>.<uuid>.json`
+  // with a random UUID to generate a new filename to avoid any conflict
+  // with existing ad-hoc config files.
+  const string path = path::join(
+      configDir.get(),
+      strings::join(".", info.type(), info.name(), UUID::random(), "json"));
+
+  LOG(INFO) << "Creating new config file '" << path << "'";
+
+  Try<Nothing> _save = save(path, info);
+  if (_save.isError()) {
+    return Failure(
+        "Failed to write config file '" + path + "': " + _save.error());
+  }
+
+  providers[info.type()].put(info.name(), {path, info});
+
+  // Launch the resource provider if the daemon is already started.
+  if (slaveId.isSome()) {
+    auto err = [](const ResourceProviderInfo& info, const string& message) {
+      LOG(ERROR)
+        << "Failed to launch resource provider with type '" << info.type()
+        << "' and name '" << info.name() << "': " << message;
+    };
+
+    launch(info.type(), info.name())
+      .onFailed(std::bind(err, info, lambda::_1))
+      .onDiscarded(std::bind(err, info, "future discarded"));
+  }
+
+  return true;
+}
+
+
+Future<bool> LocalResourceProviderDaemonProcess::update(
+    const ResourceProviderInfo& info)
+{
+  if (configDir.isNone()) {
+    return Failure("`--resource_provider_config_dir` must be specified");
+  }
+
+  if (!providers[info.type()].contains(info.name())) {
+    return false;
+  }
+
+  ProviderData& data = providers[info.type()].at(info.name());
+
+  Try<Nothing> _save = save(data.path, info);
+  if (_save.isError()) {
+    return Failure(
+        "Failed to write config file '" + data.path + "': " + _save.error());
+  }
+
+  data.info = info;
+
+  // Update `version` to indicate that the config has been updated.
+  data.version = UUID::random();
+
+  // Launch the resource provider if the daemon is already started.
+  if (slaveId.isSome()) {
+    auto err = [](const ResourceProviderInfo& info, const string& message) {
+      LOG(ERROR)
+        << "Failed to launch resource provider with type '" << info.type()
+        << "' and name '" << info.name() << "': " << message;
+    };
+
+    launch(info.type(), info.name())
+      .onFailed(std::bind(err, info, lambda::_1))
+      .onDiscarded(std::bind(err, info, "future discarded"));
+  }
+
+  return true;
+}
+
+
+Future<bool> LocalResourceProviderDaemonProcess::remove(
+    const string& type,
+    const string& name)
+{
+  if (configDir.isNone()) {
+    return Failure("`--resource_provider_config_dir` must be specified");
+  }
+
+  if (!providers[type].contains(name)) {
+    return false;
+  }
+
+  const string path = providers[type].at(name).path;
+
+  Try<Nothing> rm = os::rm(path);
+  if (rm.isError()) {
+    return Failure(
+        "Failed to remove config file '" + path + "': " + rm.error());
+  }
+
+  // Removing the provider data from `providers` will cause the resource
+  // provider to be destructed.
+  providers[type].erase(name);
+
+  return true;
+}
+
+
 void LocalResourceProviderDaemonProcess::initialize()
 {
   if (configDir.isNone()) {
@@ -162,6 +302,8 @@ void LocalResourceProviderDaemonProcess::initialize()
   foreach (const string& entry, entries.get()) {
     const string path = path::join(configDir.get(), entry);
 
+    // Skip subdirectories in the resource provider config directory,
+    // including the staging directory.
     if (os::stat::isdir(path)) {
       continue;
     }
@@ -201,7 +343,54 @@ Try<Nothing> 
LocalResourceProviderDaemonProcess::load(const string& path)
         "' and name '" + info->name() + "'");
   }
 
-  providers[info->type()].put(info->name(), info.get());
+  providers[info->type()].put(info->name(), {path, std::move(info.get())});
+
+  return Nothing();
+}
+
+
+// NOTE: We provide atomic (all-or-nothing) semantics here by always
+// writing to a temporary file to the staging directory first then using
+// `os::rename` to atomically move it to the desired path.
+Try<Nothing> LocalResourceProviderDaemonProcess::save(
+    const string& path,
+    const ResourceProviderInfo& info)
+{
+  CHECK_SOME(configDir);
+
+  // NOTE: We create the staging direcotry in the resource provider
+  // config directory to make sure that the renaming below does not
+  // cross devices (MESOS-2319).
+  // TODO(chhsiao): Consider adding a way to garbage collect the staging
+  // directory.
+  const string stagingDir = path::join(configDir.get(), STAGING_DIR);
+  Try<Nothing> mkdir = os::mkdir(stagingDir);
+  if (mkdir.isError()) {
+    return Error(
+        "Failed to create directory '" + stagingDir + "': " + mkdir.error());
+  }
+
+  const string stagingPath = path::join(stagingDir, Path(path).basename());
+
+  Try<Nothing> write = os::write(stagingPath, stringify(JSON::protobuf(info)));
+  if (write.isError()) {
+    // Try to remove the temporary file on error.
+    os::rm(stagingPath);
+
+    return Error(
+        "Failed to write temporary file '" + stagingPath + "': " +
+        write.error());
+  }
+
+  Try<Nothing> rename = os::rename(stagingPath, path);
+  if (rename.isError()) {
+    // Try to remove the temporary file on error.
+    os::rm(stagingPath);
+
+    return Error(
+        "Failed to rename '" + stagingPath + "' to '" + path + "': " +
+        rename.error());
+  }
 
   return Nothing();
 }
@@ -212,27 +401,55 @@ Future<Nothing> 
LocalResourceProviderDaemonProcess::launch(
     const string& name)
 {
   CHECK_SOME(slaveId);
-  CHECK(providers[type].contains(name));
 
-  return generateAuthToken(providers[type].at(name).info)
-    .then(defer(self(), [=](
-        const Option<string>& authToken) -> Future<Nothing> {
-      ProviderData& data = providers[type].at(name);
+  // If the resource provider config is removed, nothing needs to be done.
+  if (!providers[type].contains(name)) {
+    return Nothing();
+  }
+
+  ProviderData& data = providers[type].at(name);
 
-      Try<Owned<LocalResourceProvider>> provider =
-        LocalResourceProvider::create(
-            url, workDir, data.info, slaveId.get(), authToken);
+  // Destruct the previous resource provider (which will synchronously
+  // terminate its actor and driver) if there is one.
+  data.provider.reset();
 
-      if (provider.isError()) {
-        return Failure(
-            "Failed to create resource provider with type '" + type +
-            "' and name '" + name + "': " + provider.error());
-      }
+  return generateAuthToken(data.info)
+    .then(defer(self(), &Self::_launch, type, name, data.version, lambda::_1));
+}
 
-      data.provider = provider.get();
 
-      return Nothing();
-    }));
+Future<Nothing> LocalResourceProviderDaemonProcess::_launch(
+    const string& type,
+    const string& name,
+    const UUID& version,
+    const Option<string>& authToken)
+{
+  // If the resource provider config is removed, abort the launch sequence.
+  if (!providers[type].contains(name)) {
+    return Nothing();
+  }
+
+  ProviderData& data = providers[type].at(name);
+
+  // If there is a version mismatch, abort the launch sequence since
+  // `authToken` might be outdated. The callback updating the version
+  // should have dispatched another launch sequence.
+  if (version != data.version) {
+    return Nothing();
+  }
+
+  Try<Owned<LocalResourceProvider>> provider = LocalResourceProvider::create(
+      url, workDir, data.info, slaveId.get(), authToken);
+
+  if (provider.isError()) {
+    return Failure(
+        "Failed to create resource provider with type '" + type +
+        "' and name '" + name + "': " + provider.error());
+  }
+
+  data.provider = provider.get();
+
+  return Nothing();
 }
 
 
@@ -323,14 +540,20 @@ void LocalResourceProviderDaemon::start(const SlaveID& 
slaveId)
 
 Future<bool> LocalResourceProviderDaemon::add(const ResourceProviderInfo& info)
 {
-  return Failure("Unimplemented");
+  return dispatch(
+      process.get(),
+      &LocalResourceProviderDaemonProcess::add,
+      info);
 }
 
 
 Future<bool> LocalResourceProviderDaemon::update(
     const ResourceProviderInfo& info)
 {
-  return Failure("Unimplemented");
+  return dispatch(
+      process.get(),
+      &LocalResourceProviderDaemonProcess::update,
+      info);
 }
 
 
@@ -338,7 +561,11 @@ Future<bool> LocalResourceProviderDaemon::remove(
     const string& type,
     const string& name)
 {
-  return Failure("Unimplemented");
+  return dispatch(
+      process.get(),
+      &LocalResourceProviderDaemonProcess::remove,
+      type,
+      name);
 }
 
 } // namespace internal {

Reply via email to