Repository: mesos Updated Branches: refs/heads/master 848767b4f -> adf4fa3f2
Added `LocalResourceProviderDaemon` methods to modify configs. The `add()` and `update()` methods adds a new config file and updates an existing config file of a local resource provider respectively. It will then trigger a rolead on the resource provider asynchronously. The `remove()` method removes the config file, and triggers the resource provider to terminate asynchronously. Review: https://reviews.apache.org/r/64439/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b9073b83 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b9073b83 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b9073b83 Branch: refs/heads/master Commit: b9073b83d43739b472105a3d9c233dca51cabeea Parents: 848767b Author: Chun-Hung Hsiao <[email protected]> Authored: Mon Dec 11 15:09:35 2017 -0800 Committer: Jie Yu <[email protected]> Committed: Mon Dec 11 15:09:35 2017 -0800 ---------------------------------------------------------------------- src/resource_provider/daemon.cpp | 277 +++++++++++++++++++++++++++++++--- 1 file changed, 252 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b9073b83/src/resource_provider/daemon.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/daemon.cpp b/src/resource_provider/daemon.cpp index 6159982..7c783e3 100644 --- a/src/resource_provider/daemon.cpp +++ b/src/resource_provider/daemon.cpp @@ -63,6 +63,12 @@ using process::http::authentication::Principal; namespace mesos { namespace internal { +// Directory under the resource provider config directory to store +// temporarily files for adding and updating resource provider config +// files atomically (all-or-nothing). +constexpr char STAGING_DIR[] = ".staging"; + + class LocalResourceProviderDaemonProcess : public Process<LocalResourceProviderDaemonProcess> { @@ -86,22 +92,41 @@ public: void start(const SlaveID& _slaveId); + Future<bool> add(const ResourceProviderInfo& info); + Future<bool> update(const ResourceProviderInfo& info); + Future<bool> remove(const string& type, const string& name); + protected: void initialize() override; private: struct ProviderData { - ProviderData(const ResourceProviderInfo& _info) - : info(_info) {} + ProviderData(const string& _path, const ResourceProviderInfo& _info) + : path(_path), info(_info), version(UUID::random()) {} - const ResourceProviderInfo info; + const string path; + ResourceProviderInfo info; + + // The `version` is used to check if `provider` holds a resource + // provider instance that is in sync with the current config. + UUID version; Owned<LocalResourceProvider> provider; }; Try<Nothing> load(const string& path); - - Future<Nothing> launch(const string& type, const string& name); + Try<Nothing> save(const string& path, const ResourceProviderInfo& info); + + // NOTE: `launch` should only be called once for each config version. + // It will pick up the latest config to launch the resource provider. + Future<Nothing> launch( + const string& type, + const string& name); + Future<Nothing> _launch( + const string& type, + const string& name, + const UUID& version, + const Option<string>& authToken); Future<Option<string>> generateAuthToken(const ResourceProviderInfo& info); @@ -146,6 +171,121 @@ void LocalResourceProviderDaemonProcess::start(const SlaveID& _slaveId) } +Future<bool> LocalResourceProviderDaemonProcess::add( + const ResourceProviderInfo& info) +{ + if (configDir.isNone()) { + return Failure("`--resource_provider_config_dir` must be specified"); + } + + if (providers[info.type()].contains(info.name())) { + return false; + } + + // Generate a filename for the the config. + // NOTE: We use the following template `<type>.<name>.<uuid>.json` + // with a random UUID to generate a new filename to avoid any conflict + // with existing ad-hoc config files. + const string path = path::join( + configDir.get(), + strings::join(".", info.type(), info.name(), UUID::random(), "json")); + + LOG(INFO) << "Creating new config file '" << path << "'"; + + Try<Nothing> _save = save(path, info); + if (_save.isError()) { + return Failure( + "Failed to write config file '" + path + "': " + _save.error()); + } + + providers[info.type()].put(info.name(), {path, info}); + + // Launch the resource provider if the daemon is already started. + if (slaveId.isSome()) { + auto err = [](const ResourceProviderInfo& info, const string& message) { + LOG(ERROR) + << "Failed to launch resource provider with type '" << info.type() + << "' and name '" << info.name() << "': " << message; + }; + + launch(info.type(), info.name()) + .onFailed(std::bind(err, info, lambda::_1)) + .onDiscarded(std::bind(err, info, "future discarded")); + } + + return true; +} + + +Future<bool> LocalResourceProviderDaemonProcess::update( + const ResourceProviderInfo& info) +{ + if (configDir.isNone()) { + return Failure("`--resource_provider_config_dir` must be specified"); + } + + if (!providers[info.type()].contains(info.name())) { + return false; + } + + ProviderData& data = providers[info.type()].at(info.name()); + + Try<Nothing> _save = save(data.path, info); + if (_save.isError()) { + return Failure( + "Failed to write config file '" + data.path + "': " + _save.error()); + } + + data.info = info; + + // Update `version` to indicate that the config has been updated. + data.version = UUID::random(); + + // Launch the resource provider if the daemon is already started. + if (slaveId.isSome()) { + auto err = [](const ResourceProviderInfo& info, const string& message) { + LOG(ERROR) + << "Failed to launch resource provider with type '" << info.type() + << "' and name '" << info.name() << "': " << message; + }; + + launch(info.type(), info.name()) + .onFailed(std::bind(err, info, lambda::_1)) + .onDiscarded(std::bind(err, info, "future discarded")); + } + + return true; +} + + +Future<bool> LocalResourceProviderDaemonProcess::remove( + const string& type, + const string& name) +{ + if (configDir.isNone()) { + return Failure("`--resource_provider_config_dir` must be specified"); + } + + if (!providers[type].contains(name)) { + return false; + } + + const string path = providers[type].at(name).path; + + Try<Nothing> rm = os::rm(path); + if (rm.isError()) { + return Failure( + "Failed to remove config file '" + path + "': " + rm.error()); + } + + // Removing the provider data from `providers` will cause the resource + // provider to be destructed. + providers[type].erase(name); + + return true; +} + + void LocalResourceProviderDaemonProcess::initialize() { if (configDir.isNone()) { @@ -162,6 +302,8 @@ void LocalResourceProviderDaemonProcess::initialize() foreach (const string& entry, entries.get()) { const string path = path::join(configDir.get(), entry); + // Skip subdirectories in the resource provider config directory, + // including the staging directory. if (os::stat::isdir(path)) { continue; } @@ -201,7 +343,54 @@ Try<Nothing> LocalResourceProviderDaemonProcess::load(const string& path) "' and name '" + info->name() + "'"); } - providers[info->type()].put(info->name(), info.get()); + providers[info->type()].put(info->name(), {path, std::move(info.get())}); + + return Nothing(); +} + + +// NOTE: We provide atomic (all-or-nothing) semantics here by always +// writing to a temporary file to the staging directory first then using +// `os::rename` to atomically move it to the desired path. +Try<Nothing> LocalResourceProviderDaemonProcess::save( + const string& path, + const ResourceProviderInfo& info) +{ + CHECK_SOME(configDir); + + // NOTE: We create the staging direcotry in the resource provider + // config directory to make sure that the renaming below does not + // cross devices (MESOS-2319). + // TODO(chhsiao): Consider adding a way to garbage collect the staging + // directory. + const string stagingDir = path::join(configDir.get(), STAGING_DIR); + Try<Nothing> mkdir = os::mkdir(stagingDir); + if (mkdir.isError()) { + return Error( + "Failed to create directory '" + stagingDir + "': " + mkdir.error()); + } + + const string stagingPath = path::join(stagingDir, Path(path).basename()); + + Try<Nothing> write = os::write(stagingPath, stringify(JSON::protobuf(info))); + if (write.isError()) { + // Try to remove the temporary file on error. + os::rm(stagingPath); + + return Error( + "Failed to write temporary file '" + stagingPath + "': " + + write.error()); + } + + Try<Nothing> rename = os::rename(stagingPath, path); + if (rename.isError()) { + // Try to remove the temporary file on error. + os::rm(stagingPath); + + return Error( + "Failed to rename '" + stagingPath + "' to '" + path + "': " + + rename.error()); + } return Nothing(); } @@ -212,27 +401,55 @@ Future<Nothing> LocalResourceProviderDaemonProcess::launch( const string& name) { CHECK_SOME(slaveId); - CHECK(providers[type].contains(name)); - return generateAuthToken(providers[type].at(name).info) - .then(defer(self(), [=]( - const Option<string>& authToken) -> Future<Nothing> { - ProviderData& data = providers[type].at(name); + // If the resource provider config is removed, nothing needs to be done. + if (!providers[type].contains(name)) { + return Nothing(); + } + + ProviderData& data = providers[type].at(name); - Try<Owned<LocalResourceProvider>> provider = - LocalResourceProvider::create( - url, workDir, data.info, slaveId.get(), authToken); + // Destruct the previous resource provider (which will synchronously + // terminate its actor and driver) if there is one. + data.provider.reset(); - if (provider.isError()) { - return Failure( - "Failed to create resource provider with type '" + type + - "' and name '" + name + "': " + provider.error()); - } + return generateAuthToken(data.info) + .then(defer(self(), &Self::_launch, type, name, data.version, lambda::_1)); +} - data.provider = provider.get(); - return Nothing(); - })); +Future<Nothing> LocalResourceProviderDaemonProcess::_launch( + const string& type, + const string& name, + const UUID& version, + const Option<string>& authToken) +{ + // If the resource provider config is removed, abort the launch sequence. + if (!providers[type].contains(name)) { + return Nothing(); + } + + ProviderData& data = providers[type].at(name); + + // If there is a version mismatch, abort the launch sequence since + // `authToken` might be outdated. The callback updating the version + // should have dispatched another launch sequence. + if (version != data.version) { + return Nothing(); + } + + Try<Owned<LocalResourceProvider>> provider = LocalResourceProvider::create( + url, workDir, data.info, slaveId.get(), authToken); + + if (provider.isError()) { + return Failure( + "Failed to create resource provider with type '" + type + + "' and name '" + name + "': " + provider.error()); + } + + data.provider = provider.get(); + + return Nothing(); } @@ -323,14 +540,20 @@ void LocalResourceProviderDaemon::start(const SlaveID& slaveId) Future<bool> LocalResourceProviderDaemon::add(const ResourceProviderInfo& info) { - return Failure("Unimplemented"); + return dispatch( + process.get(), + &LocalResourceProviderDaemonProcess::add, + info); } Future<bool> LocalResourceProviderDaemon::update( const ResourceProviderInfo& info) { - return Failure("Unimplemented"); + return dispatch( + process.get(), + &LocalResourceProviderDaemonProcess::update, + info); } @@ -338,7 +561,11 @@ Future<bool> LocalResourceProviderDaemon::remove( const string& type, const string& name) { - return Failure("Unimplemented"); + return dispatch( + process.get(), + &LocalResourceProviderDaemonProcess::remove, + type, + name); } } // namespace internal {
