Updated discard handling in `Docker::inspect()`. Previously, discards of the `Future` returned by `Docker::inspect()` were only handled at the beginning of each asynchronous continuation in the library function's call chain. This meant that if a Docker CLI command became stuck in between async calls, discarding the `Future` would have no effect.
This patch adds an `onDiscard` callback to the `Future` to ensure that any discards have the desired effect: cleanup of any spawned subprocess, and a transition of the `Future` to the discarded state. Since the Docker library is not a libprocess process, we must implement this with a `shared_ptr` and a mutex, to protect against concurrent access to the `onDiscard` callback, which must be updated when retries are performed. Review: https://reviews.apache.org/r/65683/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e72def49 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e72def49 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e72def49 Branch: refs/heads/1.5.x Commit: e72def4926049d45641bbeb761faeb2613494441 Parents: 93fbdbe Author: Greg Mann <g...@mesosphere.io> Authored: Wed Feb 28 15:24:17 2018 -0800 Committer: Gilbert Song <songzihao1...@gmail.com> Committed: Wed Mar 7 01:08:12 2018 -0800 ---------------------------------------------------------------------- src/docker/docker.cpp | 51 ++++++++++++++++++++++++++++++++++------------ src/docker/docker.hpp | 14 ++++++++++--- 2 files changed, 49 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e72def49/src/docker/docker.cpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp index 6013707..3e3b8d5 100644 --- a/src/docker/docker.cpp +++ b/src/docker/docker.cpp @@ -15,6 +15,8 @@ // limitations under the License. #include <map> +#include <mutex> +#include <utility> #include <vector> #include <stout/error.hpp> @@ -56,6 +58,9 @@ using namespace process; using std::list; using std::map; +using std::mutex; +using std::pair; +using std::shared_ptr; using std::string; using std::vector; @@ -1246,20 +1251,29 @@ Future<Docker::Container> Docker::inspect( { Owned<Promise<Docker::Container>> promise(new Promise<Docker::Container>()); + // Holds a callback used for cleanup in case this call to 'docker inspect' is + // discarded, and a mutex to control access to the callback. + auto callback = std::make_shared<pair<lambda::function<void()>, mutex>>(); + const string cmd = path + " -H " + socket + " inspect " + containerName; - _inspect(cmd, promise, retryInterval); + _inspect(cmd, promise, retryInterval, callback); - return promise->future(); + return promise->future() + .onDiscard([callback]() { + synchronized (callback->second) { + callback->first(); + } + }); } void Docker::_inspect( const string& cmd, const Owned<Promise<Docker::Container>>& promise, - const Option<Duration>& retryInterval) + const Option<Duration>& retryInterval, + shared_ptr<pair<lambda::function<void()>, mutex>> callback) { if (promise->future().hasDiscard()) { - promise->discard(); return; } @@ -1276,13 +1290,25 @@ void Docker::_inspect( return; } + // Set the `onDiscard` callback which will clean up the subprocess if the + // caller discards the `Future` that we returned. + synchronized (callback->second) { + callback->first = [promise, s, cmd]() { + promise->discard(); + CHECK_SOME(s); + commandDiscarded(s.get(), cmd); + }; + } + // Start reading from stdout so writing to the pipe won't block // to handle cases where the output is larger than the pipe // capacity. const Future<string> output = io::read(s.get().out().get()); s.get().status() - .onAny([=]() { __inspect(cmd, promise, retryInterval, output, s.get()); }); + .onAny([=]() { + __inspect(cmd, promise, retryInterval, output, s.get(), callback); + }); } @@ -1291,11 +1317,10 @@ void Docker::__inspect( const Owned<Promise<Docker::Container>>& promise, const Option<Duration>& retryInterval, Future<string> output, - const Subprocess& s) + const Subprocess& s, + shared_ptr<pair<lambda::function<void()>, mutex>> callback) { if (promise->future().hasDiscard()) { - promise->discard(); - output.discard(); return; } @@ -1313,7 +1338,7 @@ void Docker::__inspect( VLOG(1) << "Retrying inspect with non-zero status code. cmd: '" << cmd << "', interval: " << stringify(retryInterval.get()); Clock::timer(retryInterval.get(), - [=]() { _inspect(cmd, promise, retryInterval); } ); + [=]() { _inspect(cmd, promise, retryInterval, callback); }); return; } @@ -1335,7 +1360,7 @@ void Docker::__inspect( CHECK_SOME(s.out()); output .onAny([=](const Future<string>& output) { - ___inspect(cmd, promise, retryInterval, output); + ___inspect(cmd, promise, retryInterval, output, callback); }); } @@ -1344,10 +1369,10 @@ void Docker::___inspect( const string& cmd, const Owned<Promise<Docker::Container>>& promise, const Option<Duration>& retryInterval, - const Future<string>& output) + const Future<string>& output, + shared_ptr<pair<lambda::function<void()>, mutex>> callback) { if (promise->future().hasDiscard()) { - promise->discard(); return; } @@ -1368,7 +1393,7 @@ void Docker::___inspect( VLOG(1) << "Retrying inspect since container not yet started. cmd: '" << cmd << "', interval: " << stringify(retryInterval.get()); Clock::timer(retryInterval.get(), - [=]() { _inspect(cmd, promise, retryInterval); } ); + [=]() { _inspect(cmd, promise, retryInterval, callback); } ); return; } http://git-wip-us.apache.org/repos/asf/mesos/blob/e72def49/src/docker/docker.hpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp index d9e71f8..f5e4a70 100644 --- a/src/docker/docker.hpp +++ b/src/docker/docker.hpp @@ -19,7 +19,9 @@ #include <list> #include <map> +#include <mutex> #include <string> +#include <utility> #include <process/future.hpp> #include <process/owned.hpp> @@ -340,20 +342,26 @@ private: static void _inspect( const std::string& cmd, const process::Owned<process::Promise<Container>>& promise, - const Option<Duration>& retryInterval); + const Option<Duration>& retryInterval, + std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>> + callback); static void __inspect( const std::string& cmd, const process::Owned<process::Promise<Container>>& promise, const Option<Duration>& retryInterval, process::Future<std::string> output, - const process::Subprocess& s); + const process::Subprocess& s, + std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>> + callback); static void ___inspect( const std::string& cmd, const process::Owned<process::Promise<Container>>& promise, const Option<Duration>& retryInterval, - const process::Future<std::string>& output); + const process::Future<std::string>& output, + std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>> + callback); static process::Future<std::list<Container>> _ps( const Docker& docker,