This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 6366b5c1e5e60dfda5ca0368d6a22da998f0cfa4 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Thu Sep 20 15:06:51 2018 -0700 Cleaned up residual containers when removing resource provider configs. When processing `REMOVE_RESOURCE_PROVIDER_CONFIG`, the local resource provider daemon now performs a best-effort cleanup by killing all standalone containers prefixed by the 'cid_prefix' of the resource provider. During the cleanup, no resource provider config with the same type and name can be added. Review: https://reviews.apache.org/r/68763 --- src/resource_provider/daemon.cpp | 239 +++++++++++++++++++++++++++++++++++---- 1 file changed, 220 insertions(+), 19 deletions(-) diff --git a/src/resource_provider/daemon.cpp b/src/resource_provider/daemon.cpp index f1a941c..2fd82ad 100644 --- a/src/resource_provider/daemon.cpp +++ b/src/resource_provider/daemon.cpp @@ -22,6 +22,7 @@ #include <mesos/type_utils.hpp> +#include <process/collect.hpp> #include <process/defer.hpp> #include <process/dispatch.hpp> #include <process/id.hpp> @@ -35,25 +36,31 @@ #include <stout/os.hpp> #include <stout/path.hpp> #include <stout/protobuf.hpp> +#include <stout/strings.hpp> #include <stout/try.hpp> +#include "common/http.hpp" #include "common/validation.hpp" +#include "internal/devolve.hpp" +#include "internal/evolve.hpp" + #include "resource_provider/local.hpp" namespace http = process::http; using std::list; using std::string; +using std::vector; +using process::await; +using process::defer; +using process::dispatch; using process::Failure; using process::Future; using process::Owned; using process::Process; using process::ProcessBase; - -using process::defer; -using process::dispatch; using process::spawn; using process::terminate; using process::wait; @@ -109,11 +116,17 @@ private: const string path; ResourceProviderInfo info; + Option<string> authToken; // The `version` is used to check if `provider` holds a resource // provider instance that is in sync with the current config. id::UUID version; Owned<LocalResourceProvider> provider; + + // If set, it means that the resource provider is being removed, and the + // future would be completed once the removal is done. Note that this object + // will be destructed as soon as the future is ready. + Option<Future<Nothing>> removing; }; Try<Nothing> load(const string& path); @@ -133,6 +146,10 @@ private: Future<Option<string>> generateAuthToken(const ResourceProviderInfo& info); + Future<Nothing> cleanupContainers( + const ResourceProviderInfo& info, + const Option<string>& authToken); + const http::URL url; const string workDir; const Option<string> configDir; @@ -161,7 +178,13 @@ void LocalResourceProviderDaemonProcess::start(const SlaveID& _slaveId) slaveId = _slaveId; foreachkey (const string& type, providers) { - foreachkey (const string& name, providers[type]) { + foreachpair (const string& name, + const ProviderData& data, + providers[type]) { + if (data.removing.isSome()) { + continue; + } + auto error = [=](const string& message) { LOG(ERROR) << "Failed to launch resource provider with type '" << type << "' and name '" << name << "': " << message; @@ -186,7 +209,15 @@ Future<bool> LocalResourceProviderDaemonProcess::add( // Return true if the info has been added for idempotency. if (providers[info.type()].contains(info.name())) { - return providers[info.type()].at(info.name()).info == info; + const ProviderData& data = providers[info.type()].at(info.name()); + + if (data.removing.isSome()) { + return Failure( + "Failed to add resource provider with type '" + info.type() + + "' and name '" + info.name() + "' as a removal is still in progress"); + } + + return data.info == info; } // Generate a filename for the config. @@ -239,6 +270,12 @@ Future<bool> LocalResourceProviderDaemonProcess::update( ProviderData& data = providers[info.type()].at(info.name()); + if (data.removing.isSome()) { + return Failure( + "Failed to update resource provider with type '" + info.type() + + "' and name '" + info.name() + "' as a removal is still in progress"); + } + // Return true if the info has been updated for idempotency. if (data.info == info) { return true; @@ -285,19 +322,39 @@ Future<Nothing> LocalResourceProviderDaemonProcess::remove( return Nothing(); } - const string path = providers[type].at(name).path; + ProviderData& data = providers[type].at(name); - Try<Nothing> rm = os::rm(path); - if (rm.isError()) { - return Failure( - "Failed to remove config file '" + path + "': " + rm.error()); + // Return the same future if it is being removed for idempotency. + if (data.removing.isSome() && data.removing->isPending()) { + return data.removing.get(); } - // Removing the provider data from `providers` will cause the resource - // provider to be destructed. - providers[type].erase(name); + // Destruct the resource provider instance to stop its container daemons, then + // do a best-effort cleanup of the standalone containers launched by the + // removed resource provider. + // TODO(chhsiao): This is not ideal since the daemon does not know how to + // perform resource-provider-specific cleanups. We should refactor this into a + // `LocalResourceProvider::stop` virtual function. However this also means + // that we need to ensure that we must have a resource provider instance with + // a running actor to invoke `stop`. Given that `launch` is asynchronous, we + // need to carefully redesign the state machine, and the semantics of the + // `{ADD,UPDATE,REMOVE}_RESOURCE_PROVIDER_CONFIG` API calls might be affected. + // In addition, we should consider how to reconcile orphaned containers. + data.provider.reset(); + data.removing = cleanupContainers(data.info, data.authToken) + .then(defer(self(), [this, type, name]() -> Future<Nothing> { + Try<Nothing> rm = os::rm(providers[type].at(name).path); + if (rm.isError()) { + return Failure( + "Failed to remove config file '" + providers[type].at(name).path + + "': " + rm.error()); + } - return Nothing(); + providers[type].erase(name); + return Nothing(); + })); + + return data.removing.get(); } @@ -420,13 +477,10 @@ Future<Nothing> LocalResourceProviderDaemonProcess::launch( const string& name) { CHECK_SOME(slaveId); - - // If the resource provider config is removed, nothing needs to be done. - if (!providers[type].contains(name)) { - return Nothing(); - } + CHECK(providers[type].contains(name)); ProviderData& data = providers[type].at(name); + CHECK(data.removing.isNone()); // Destruct the previous resource provider (which will synchronously // terminate its actor and driver) if there is one. @@ -450,6 +504,11 @@ Future<Nothing> LocalResourceProviderDaemonProcess::_launch( ProviderData& data = providers[type].at(name); + // If the resource provider config is being removed, nothing needs to be done. + if (data.removing.isSome()) { + return Nothing(); + } + // If there is a version mismatch, abort the launch sequence since // `authToken` might be outdated. The callback updating the version // should have dispatched another launch sequence. @@ -466,6 +525,7 @@ Future<Nothing> LocalResourceProviderDaemonProcess::_launch( "' and name '" + name + "': " + provider.error()); } + data.authToken = authToken; data.provider = std::move(provider.get()); return Nothing(); @@ -501,6 +561,147 @@ Future<Option<string>> LocalResourceProviderDaemonProcess::generateAuthToken( } +Future<Nothing> LocalResourceProviderDaemonProcess::cleanupContainers( + const ResourceProviderInfo& info, + const Option<string>& authToken) +{ + const Principal principal = LocalResourceProvider::principal(info); + CHECK(principal.claims.contains("cid_prefix")); + + const string& cidPrefix = principal.claims.at("cid_prefix"); + + LOG(INFO) << "Cleaning up containers prefixed by '" << cidPrefix << "'"; + + // TODO(chhsiao): Consider using a more reliable way to get the v1 endpoint. + http::URL agentUrl = url; + agentUrl.path = Path(url.path).dirname(); + + http::Headers headers{{"Accept", stringify(ContentType::PROTOBUF)}}; + if (authToken.isSome()) { + headers["Authorization"] = "Bearer " + authToken.get(); + } + + agent::Call call; + call.set_type(agent::Call::GET_CONTAINERS); + call.mutable_get_containers()->set_show_nested(false); + call.mutable_get_containers()->set_show_standalone(true); + + return http::post( + agentUrl, + headers, + serialize(ContentType::PROTOBUF, evolve(call)), + stringify(ContentType::PROTOBUF)) + .then(defer(self(), [=]( + const http::Response& httpResponse) -> Future<Nothing> { + if (httpResponse.status != http::OK().status) { + return Failure( + "Failed to get containers: Unexpected response '" + + httpResponse.status + "' (" + httpResponse.body + ")"); + } + + Try<v1::agent::Response> v1Response = deserialize<v1::agent::Response>( + ContentType::PROTOBUF, httpResponse.body); + if (v1Response.isError()) { + return Failure("Failed to get containers: " + v1Response.error()); + } + + vector<Future<Nothing>> futures; + + agent::Response response = devolve(v1Response.get()); + foreach (const agent::Response::GetContainers::Container& container, + response.get_containers().containers()) { + const ContainerID& containerId = container.container_id(); + + if (!strings::startsWith(containerId.value(), cidPrefix)) { + continue; + } + + // NOTE: We skip containers that are not actually running by checking + // their `executor_pid`s to avoid `ESRCH` errors or killing an arbitrary + // process. But we might skip the ones that are being launched as well. + if (!container.has_container_status() || + !container.container_status().has_executor_pid()) { + LOG(WARNING) + << "Skipped killing container '" << containerId + << "' because it is not running"; + + continue; + } + + agent::Call call; + call.set_type(agent::Call::KILL_CONTAINER); + call.mutable_kill_container()->mutable_container_id() + ->CopyFrom(containerId); + + LOG(INFO) << "Killing container '" << containerId << "'"; + + futures.push_back(http::post( + agentUrl, + headers, + serialize(ContentType::PROTOBUF, evolve(call)), + stringify(ContentType::PROTOBUF)) + .then(defer(self(), [=]( + const http::Response& response) -> Future<Nothing> { + if (response.status == http::NotFound().status) { + LOG(WARNING) + << "Skipped waiting for container '" << containerId + << "' because it no longer exists"; + + return Nothing(); + } + + if (response.status != http::OK().status) { + return Failure( + "Failed to kill container '" + stringify(containerId) + + "': Unexpected response '" + response.status + "' (" + + response.body + ")"); + } + + LOG(INFO) << "Waiting for container '" << containerId << "'"; + + agent::Call call; + call.set_type(agent::Call::WAIT_CONTAINER); + call.mutable_wait_container()->mutable_container_id() + ->CopyFrom(containerId); + + return http::post( + agentUrl, + headers, + serialize(ContentType::PROTOBUF, evolve(call)), + stringify(ContentType::PROTOBUF)) + .then([containerId]( + const http::Response& response) -> Future<Nothing> { + if (response.status != http::OK().status && + response.status != http::NotFound().status) { + return Failure( + "Failed to wait for container '" + + stringify(containerId) + "': Unexpected response '" + + response.status + "' (" + response.body + ")"); + } + + return Nothing(); + }); + }))); + } + + // We use await here to do a best-effort cleanup. + return await(futures) + .then([cidPrefix]( + const vector<Future<Nothing>>& futures) -> Future<Nothing> { + foreach (const Future<Nothing>& future, futures) { + if (!future.isReady()) { + return Failure( + "Failed to clean up containers prefixed by '" + cidPrefix + + "': " + stringify(futures)); + } + } + + return Nothing(); + }); + })); +} + + Try<Owned<LocalResourceProviderDaemon>> LocalResourceProviderDaemon::create( const http::URL& url, const slave::Flags& flags,
