Updated discard handling in `Docker::inspect()`.

Previously, discards of the `Future` returned by `Docker::inspect()`
were only handled at the beginning of each asynchronous continuation
in the library function's call chain. This meant that if a Docker
CLI command became stuck in between async calls, discarding the
`Future` would have no effect.

This patch adds an `onDiscard` callback to the `Future` to ensure
that any discards have the desired effect: cleanup of any spawned
subprocess, and a transition of the `Future` to the discarded state.

Since the Docker library is not a libprocess process, we must
implement this with a `shared_ptr` and a mutex, to protect against
concurrent access to the `onDiscard` callback, which must be updated
when retries are performed.

Review: https://reviews.apache.org/r/65683/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/155b3afe
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/155b3afe
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/155b3afe

Branch: refs/heads/1.4.x
Commit: 155b3afec7144baca498036a39ec954b38dbbfbb
Parents: 57cd3c3
Author: Greg Mann <g...@mesosphere.io>
Authored: Wed Feb 28 15:24:17 2018 -0800
Committer: Gilbert Song <songzihao1...@gmail.com>
Committed: Mon Mar 5 18:11:12 2018 -0800

----------------------------------------------------------------------
 src/docker/docker.cpp | 51 ++++++++++++++++++++++++++++++++++------------
 src/docker/docker.hpp | 14 ++++++++++---
 2 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/155b3afe/src/docker/docker.cpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp
index 9ab716f..e9e600e 100755
--- a/src/docker/docker.cpp
+++ b/src/docker/docker.cpp
@@ -15,6 +15,8 @@
 // limitations under the License.
 
 #include <map>
+#include <mutex>
+#include <utility>
 #include <vector>
 
 #include <stout/error.hpp>
@@ -56,6 +58,9 @@ using namespace process;
 
 using std::list;
 using std::map;
+using std::mutex;
+using std::pair;
+using std::shared_ptr;
 using std::string;
 using std::vector;
 
@@ -1226,20 +1231,29 @@ Future<Docker::Container> Docker::inspect(
 {
   Owned<Promise<Docker::Container>> promise(new Promise<Docker::Container>());
 
+  // Holds a callback used for cleanup in case this call to 'docker inspect' is
+  // discarded, and a mutex to control access to the callback.
+  auto callback = std::make_shared<pair<lambda::function<void()>, mutex>>();
+
   const string cmd = path + " -H " + socket + " inspect " + containerName;
-  _inspect(cmd, promise, retryInterval);
+  _inspect(cmd, promise, retryInterval, callback);
 
-  return promise->future();
+  return promise->future()
+    .onDiscard([callback]() {
+      synchronized (callback->second) {
+        callback->first();
+      }
+    });
 }
 
 
 void Docker::_inspect(
     const string& cmd,
     const Owned<Promise<Docker::Container>>& promise,
-    const Option<Duration>& retryInterval)
+    const Option<Duration>& retryInterval,
+    shared_ptr<pair<lambda::function<void()>, mutex>> callback)
 {
   if (promise->future().hasDiscard()) {
-    promise->discard();
     return;
   }
 
@@ -1256,13 +1270,25 @@ void Docker::_inspect(
     return;
   }
 
+  // Set the `onDiscard` callback which will clean up the subprocess if the
+  // caller discards the `Future` that we returned.
+  synchronized (callback->second) {
+    callback->first = [promise, s, cmd]() {
+      promise->discard();
+      CHECK_SOME(s);
+      commandDiscarded(s.get(), cmd);
+    };
+  }
+
   // Start reading from stdout so writing to the pipe won't block
   // to handle cases where the output is larger than the pipe
   // capacity.
   const Future<string> output = io::read(s.get().out().get());
 
   s.get().status()
-    .onAny([=]() { __inspect(cmd, promise, retryInterval, output, s.get()); });
+    .onAny([=]() {
+      __inspect(cmd, promise, retryInterval, output, s.get(), callback);
+    });
 }
 
 
@@ -1271,11 +1297,10 @@ void Docker::__inspect(
     const Owned<Promise<Docker::Container>>& promise,
     const Option<Duration>& retryInterval,
     Future<string> output,
-    const Subprocess& s)
+    const Subprocess& s,
+    shared_ptr<pair<lambda::function<void()>, mutex>> callback)
 {
   if (promise->future().hasDiscard()) {
-    promise->discard();
-    output.discard();
     return;
   }
 
@@ -1293,7 +1318,7 @@ void Docker::__inspect(
       VLOG(1) << "Retrying inspect with non-zero status code. cmd: '"
               << cmd << "', interval: " << stringify(retryInterval.get());
       Clock::timer(retryInterval.get(),
-                   [=]() { _inspect(cmd, promise, retryInterval); } );
+                   [=]() { _inspect(cmd, promise, retryInterval, callback); });
       return;
     }
 
@@ -1315,7 +1340,7 @@ void Docker::__inspect(
   CHECK_SOME(s.out());
   output
     .onAny([=](const Future<string>& output) {
-      ___inspect(cmd, promise, retryInterval, output);
+      ___inspect(cmd, promise, retryInterval, output, callback);
     });
 }
 
@@ -1324,10 +1349,10 @@ void Docker::___inspect(
     const string& cmd,
     const Owned<Promise<Docker::Container>>& promise,
     const Option<Duration>& retryInterval,
-    const Future<string>& output)
+    const Future<string>& output,
+    shared_ptr<pair<lambda::function<void()>, mutex>> callback)
 {
   if (promise->future().hasDiscard()) {
-    promise->discard();
     return;
   }
 
@@ -1348,7 +1373,7 @@ void Docker::___inspect(
     VLOG(1) << "Retrying inspect since container not yet started. cmd: '"
             << cmd << "', interval: " << stringify(retryInterval.get());
     Clock::timer(retryInterval.get(),
-                 [=]() { _inspect(cmd, promise, retryInterval); } );
+                 [=]() { _inspect(cmd, promise, retryInterval, callback); } );
     return;
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/155b3afe/src/docker/docker.hpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp
index 95e60a7..81c1154 100644
--- a/src/docker/docker.hpp
+++ b/src/docker/docker.hpp
@@ -19,7 +19,9 @@
 
 #include <list>
 #include <map>
+#include <mutex>
 #include <string>
+#include <utility>
 
 #include <process/future.hpp>
 #include <process/owned.hpp>
@@ -334,20 +336,26 @@ private:
   static void _inspect(
       const std::string& cmd,
       const process::Owned<process::Promise<Container>>& promise,
-      const Option<Duration>& retryInterval);
+      const Option<Duration>& retryInterval,
+      std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>>
+        callback);
 
   static void __inspect(
       const std::string& cmd,
       const process::Owned<process::Promise<Container>>& promise,
       const Option<Duration>& retryInterval,
       process::Future<std::string> output,
-      const process::Subprocess& s);
+      const process::Subprocess& s,
+      std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>>
+        callback);
 
   static void ___inspect(
       const std::string& cmd,
       const process::Owned<process::Promise<Container>>& promise,
       const Option<Duration>& retryInterval,
-      const process::Future<std::string>& output);
+      const process::Future<std::string>& output,
+      std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>>
+        callback);
 
   static process::Future<std::list<Container>> _ps(
       const Docker& docker,

Reply via email to