Updated discard handling in `Docker::inspect()`. Previously, discards of the `Future` returned by `Docker::inspect()` were only handled at the beginning of each asynchronous continuation in the library function's call chain. This meant that if a Docker CLI command became stuck in between async calls, discarding the `Future` would have no effect.
This patch adds an `onDiscard` callback to the `Future` to ensure that any discards have the desired effect: cleanup of any spawned subprocess, and a transition of the `Future` to the discarded state. Since the Docker library is not a libprocess process, we must implement this with a `shared_ptr` and a mutex, to protect against concurrent access to the `onDiscard` callback, which must be updated when retries are performed. Review: https://reviews.apache.org/r/65683/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b0c6a31c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b0c6a31c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b0c6a31c Branch: refs/heads/1.3.x Commit: b0c6a31c82cf4e07103f88a46d386e45b143bcd6 Parents: 9449610 Author: Greg Mann <[email protected]> Authored: Wed Feb 28 15:24:17 2018 -0800 Committer: Gilbert Song <[email protected]> Committed: Mon Mar 5 18:14:59 2018 -0800 ---------------------------------------------------------------------- src/docker/docker.cpp | 53 ++++++++++++++++++++++++++++++++++------------ src/docker/docker.hpp | 14 +++++++++--- 2 files changed, 50 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b0c6a31c/src/docker/docker.cpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp index d046b5a..dfcfff8 100755 --- a/src/docker/docker.cpp +++ b/src/docker/docker.cpp @@ -15,6 +15,8 @@ // limitations under the License. #include <map> +#include <mutex> +#include <utility> #include <vector> #include <stout/error.hpp> @@ -56,6 +58,9 @@ using namespace process; using std::list; using std::map; +using std::mutex; +using std::pair; +using std::shared_ptr; using std::string; using std::vector; @@ -1054,20 +1059,29 @@ Future<Docker::Container> Docker::inspect( { Owned<Promise<Docker::Container>> promise(new Promise<Docker::Container>()); - const string cmd = path + " -H " + socket + " inspect " + containerName; - _inspect(cmd, promise, retryInterval); + // Holds a callback used for cleanup in case this call to 'docker inspect' is + // discarded, and a mutex to control access to the callback. + auto callback = std::make_shared<pair<lambda::function<void()>, mutex>>(); - return promise->future(); + const string cmd = path + " -H " + socket + " inspect " + containerName; + _inspect(cmd, promise, retryInterval, callback); + + return promise->future() + .onDiscard([callback]() { + synchronized (callback->second) { + callback->first(); + } + }); } void Docker::_inspect( const string& cmd, const Owned<Promise<Docker::Container>>& promise, - const Option<Duration>& retryInterval) + const Option<Duration>& retryInterval, + shared_ptr<pair<lambda::function<void()>, mutex>> callback) { if (promise->future().hasDiscard()) { - promise->discard(); return; } @@ -1084,13 +1098,25 @@ void Docker::_inspect( return; } + // Set the `onDiscard` callback which will clean up the subprocess if the + // caller discards the `Future` that we returned. + synchronized (callback->second) { + callback->first = [promise, s, cmd]() { + promise->discard(); + CHECK_SOME(s); + commandDiscarded(s.get(), cmd); + }; + } + // Start reading from stdout so writing to the pipe won't block // to handle cases where the output is larger than the pipe // capacity. const Future<string> output = io::read(s.get().out().get()); s.get().status() - .onAny([=]() { __inspect(cmd, promise, retryInterval, output, s.get()); }); + .onAny([=]() { + __inspect(cmd, promise, retryInterval, output, s.get(), callback); + }); } @@ -1099,11 +1125,10 @@ void Docker::__inspect( const Owned<Promise<Docker::Container>>& promise, const Option<Duration>& retryInterval, Future<string> output, - const Subprocess& s) + const Subprocess& s, + shared_ptr<pair<lambda::function<void()>, mutex>> callback) { if (promise->future().hasDiscard()) { - promise->discard(); - output.discard(); return; } @@ -1121,7 +1146,7 @@ void Docker::__inspect( VLOG(1) << "Retrying inspect with non-zero status code. cmd: '" << cmd << "', interval: " << stringify(retryInterval.get()); Clock::timer(retryInterval.get(), - [=]() { _inspect(cmd, promise, retryInterval); } ); + [=]() { _inspect(cmd, promise, retryInterval, callback); }); return; } @@ -1143,7 +1168,7 @@ void Docker::__inspect( CHECK_SOME(s.out()); output .onAny([=](const Future<string>& output) { - ___inspect(cmd, promise, retryInterval, output); + ___inspect(cmd, promise, retryInterval, output, callback); }); } @@ -1152,10 +1177,10 @@ void Docker::___inspect( const string& cmd, const Owned<Promise<Docker::Container>>& promise, const Option<Duration>& retryInterval, - const Future<string>& output) + const Future<string>& output, + shared_ptr<pair<lambda::function<void()>, mutex>> callback) { if (promise->future().hasDiscard()) { - promise->discard(); return; } @@ -1176,7 +1201,7 @@ void Docker::___inspect( VLOG(1) << "Retrying inspect since container not yet started. cmd: '" << cmd << "', interval: " << stringify(retryInterval.get()); Clock::timer(retryInterval.get(), - [=]() { _inspect(cmd, promise, retryInterval); } ); + [=]() { _inspect(cmd, promise, retryInterval, callback); } ); return; } http://git-wip-us.apache.org/repos/asf/mesos/blob/b0c6a31c/src/docker/docker.hpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp index 5593cb6..7b98b49 100644 --- a/src/docker/docker.hpp +++ b/src/docker/docker.hpp @@ -19,7 +19,9 @@ #include <list> #include <map> +#include <mutex> #include <string> +#include <utility> #include <process/future.hpp> #include <process/owned.hpp> @@ -309,20 +311,26 @@ private: static void _inspect( const std::string& cmd, const process::Owned<process::Promise<Container>>& promise, - const Option<Duration>& retryInterval); + const Option<Duration>& retryInterval, + std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>> + callback); static void __inspect( const std::string& cmd, const process::Owned<process::Promise<Container>>& promise, const Option<Duration>& retryInterval, process::Future<std::string> output, - const process::Subprocess& s); + const process::Subprocess& s, + std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>> + callback); static void ___inspect( const std::string& cmd, const process::Owned<process::Promise<Container>>& promise, const Option<Duration>& retryInterval, - const process::Future<std::string>& output); + const process::Future<std::string>& output, + std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>> + callback); static process::Future<std::list<Container>> _ps( const Docker& docker,
