Updated discard handling in `Docker::inspect()`.

Previously, discards of the `Future` returned by `Docker::inspect()`
were only handled at the beginning of each asynchronous continuation
in the library function's call chain. This meant that if a Docker
CLI command became stuck in between async calls, discarding the
`Future` would have no effect.

This patch adds an `onDiscard` callback to the `Future` to ensure
that any discards have the desired effect: cleanup of any spawned
subprocess, and a transition of the `Future` to the discarded state.

Since the Docker library is not a libprocess process, we must
implement this with a `shared_ptr` and a mutex, to protect against
concurrent access to the `onDiscard` callback, which must be updated
when retries are performed.

Review: https://reviews.apache.org/r/65683/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b0c6a31c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b0c6a31c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b0c6a31c

Branch: refs/heads/1.3.x
Commit: b0c6a31c82cf4e07103f88a46d386e45b143bcd6
Parents: 9449610
Author: Greg Mann <g...@mesosphere.io>
Authored: Wed Feb 28 15:24:17 2018 -0800
Committer: Gilbert Song <songzihao1...@gmail.com>
Committed: Mon Mar 5 18:14:59 2018 -0800

----------------------------------------------------------------------
 src/docker/docker.cpp | 53 ++++++++++++++++++++++++++++++++++------------
 src/docker/docker.hpp | 14 +++++++++---
 2 files changed, 50 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b0c6a31c/src/docker/docker.cpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp
index d046b5a..dfcfff8 100755
--- a/src/docker/docker.cpp
+++ b/src/docker/docker.cpp
@@ -15,6 +15,8 @@
 // limitations under the License.
 
 #include <map>
+#include <mutex>
+#include <utility>
 #include <vector>
 
 #include <stout/error.hpp>
@@ -56,6 +58,9 @@ using namespace process;
 
 using std::list;
 using std::map;
+using std::mutex;
+using std::pair;
+using std::shared_ptr;
 using std::string;
 using std::vector;
 
@@ -1054,20 +1059,29 @@ Future<Docker::Container> Docker::inspect(
 {
   Owned<Promise<Docker::Container>> promise(new Promise<Docker::Container>());
 
-  const string cmd =  path + " -H " + socket + " inspect " + containerName;
-  _inspect(cmd, promise, retryInterval);
+  // Holds a callback used for cleanup in case this call to 'docker inspect' is
+  // discarded, and a mutex to control access to the callback.
+  auto callback = std::make_shared<pair<lambda::function<void()>, mutex>>();
 
-  return promise->future();
+  const string cmd = path + " -H " + socket + " inspect " + containerName;
+  _inspect(cmd, promise, retryInterval, callback);
+
+  return promise->future()
+    .onDiscard([callback]() {
+      synchronized (callback->second) {
+        callback->first();
+      }
+    });
 }
 
 
 void Docker::_inspect(
     const string& cmd,
     const Owned<Promise<Docker::Container>>& promise,
-    const Option<Duration>& retryInterval)
+    const Option<Duration>& retryInterval,
+    shared_ptr<pair<lambda::function<void()>, mutex>> callback)
 {
   if (promise->future().hasDiscard()) {
-    promise->discard();
     return;
   }
 
@@ -1084,13 +1098,25 @@ void Docker::_inspect(
     return;
   }
 
+  // Set the `onDiscard` callback which will clean up the subprocess if the
+  // caller discards the `Future` that we returned.
+  synchronized (callback->second) {
+    callback->first = [promise, s, cmd]() {
+      promise->discard();
+      CHECK_SOME(s);
+      commandDiscarded(s.get(), cmd);
+    };
+  }
+
   // Start reading from stdout so writing to the pipe won't block
   // to handle cases where the output is larger than the pipe
   // capacity.
   const Future<string> output = io::read(s.get().out().get());
 
   s.get().status()
-    .onAny([=]() { __inspect(cmd, promise, retryInterval, output, s.get()); });
+    .onAny([=]() {
+      __inspect(cmd, promise, retryInterval, output, s.get(), callback);
+    });
 }
 
 
@@ -1099,11 +1125,10 @@ void Docker::__inspect(
     const Owned<Promise<Docker::Container>>& promise,
     const Option<Duration>& retryInterval,
     Future<string> output,
-    const Subprocess& s)
+    const Subprocess& s,
+    shared_ptr<pair<lambda::function<void()>, mutex>> callback)
 {
   if (promise->future().hasDiscard()) {
-    promise->discard();
-    output.discard();
     return;
   }
 
@@ -1121,7 +1146,7 @@ void Docker::__inspect(
       VLOG(1) << "Retrying inspect with non-zero status code. cmd: '"
               << cmd << "', interval: " << stringify(retryInterval.get());
       Clock::timer(retryInterval.get(),
-                   [=]() { _inspect(cmd, promise, retryInterval); } );
+                   [=]() { _inspect(cmd, promise, retryInterval, callback); });
       return;
     }
 
@@ -1143,7 +1168,7 @@ void Docker::__inspect(
   CHECK_SOME(s.out());
   output
     .onAny([=](const Future<string>& output) {
-      ___inspect(cmd, promise, retryInterval, output);
+      ___inspect(cmd, promise, retryInterval, output, callback);
     });
 }
 
@@ -1152,10 +1177,10 @@ void Docker::___inspect(
     const string& cmd,
     const Owned<Promise<Docker::Container>>& promise,
     const Option<Duration>& retryInterval,
-    const Future<string>& output)
+    const Future<string>& output,
+    shared_ptr<pair<lambda::function<void()>, mutex>> callback)
 {
   if (promise->future().hasDiscard()) {
-    promise->discard();
     return;
   }
 
@@ -1176,7 +1201,7 @@ void Docker::___inspect(
     VLOG(1) << "Retrying inspect since container not yet started. cmd: '"
             << cmd << "', interval: " << stringify(retryInterval.get());
     Clock::timer(retryInterval.get(),
-                 [=]() { _inspect(cmd, promise, retryInterval); } );
+                 [=]() { _inspect(cmd, promise, retryInterval, callback); } );
     return;
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b0c6a31c/src/docker/docker.hpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp
index 5593cb6..7b98b49 100644
--- a/src/docker/docker.hpp
+++ b/src/docker/docker.hpp
@@ -19,7 +19,9 @@
 
 #include <list>
 #include <map>
+#include <mutex>
 #include <string>
+#include <utility>
 
 #include <process/future.hpp>
 #include <process/owned.hpp>
@@ -309,20 +311,26 @@ private:
   static void _inspect(
       const std::string& cmd,
       const process::Owned<process::Promise<Container>>& promise,
-      const Option<Duration>& retryInterval);
+      const Option<Duration>& retryInterval,
+      std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>>
+        callback);
 
   static void __inspect(
       const std::string& cmd,
       const process::Owned<process::Promise<Container>>& promise,
       const Option<Duration>& retryInterval,
       process::Future<std::string> output,
-      const process::Subprocess& s);
+      const process::Subprocess& s,
+      std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>>
+        callback);
 
   static void ___inspect(
       const std::string& cmd,
       const process::Owned<process::Promise<Container>>& promise,
       const Option<Duration>& retryInterval,
-      const process::Future<std::string>& output);
+      const process::Future<std::string>& output,
+      std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>>
+        callback);
 
   static process::Future<std::list<Container>> _ps(
       const Docker& docker,

Reply via email to