Repository: mesos
Updated Branches:
  refs/heads/master 8a01dd8d2 -> f6f5d85a8


Capped number of parallel inspect instances on a docker ps call.

Review: https://reviews.apache.org/r/35330


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f6f5d85a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f6f5d85a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f6f5d85a

Branch: refs/heads/master
Commit: f6f5d85a8b62ec50888bc39ce1a4bb1b4a1742fa
Parents: 8a01dd8
Author: Lily Chen <[email protected]>
Authored: Thu Aug 27 16:56:37 2015 -0700
Committer: Timothy Chen <[email protected]>
Committed: Thu Aug 27 16:56:37 2015 -0700

----------------------------------------------------------------------
 src/docker/docker.cpp   | 75 ++++++++++++++++++++++++++++++++++++++------
 src/docker/docker.hpp   | 12 +++++++
 src/slave/constants.hpp |  4 +++
 3 files changed, 81 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f6f5d85a/src/docker/docker.cpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp
old mode 100644
new mode 100755
index 1367de8..12dc050
--- a/src/docker/docker.cpp
+++ b/src/docker/docker.cpp
@@ -857,28 +857,83 @@ Future<list<Docker::Container>> Docker::__ps(
     const Option<string>& prefix,
     const string& output)
 {
-  vector<string> lines = strings::tokenize(output, "\n");
+  Owned<vector<string>> lines(new vector<string>());
+  *lines = strings::tokenize(output, "\n");
 
   // Skip the header.
-  CHECK(!lines.empty());
-  lines.erase(lines.begin());
+  CHECK(!lines->empty());
+  lines->erase(lines->begin());
 
-  list<Future<Docker::Container>> futures;
+  Owned<list<Docker::Container>> containers(new list<Docker::Container>());
+
+  Owned<Promise<list<Docker::Container>>> promise(
+    new Promise<list<Docker::Container>>());
+
+  // Limit number of parallel calls to docker inspect at once to prevent
+  // reaching system's open file descriptor limit.
+  inspectBatches(containers, lines, promise, docker, prefix);
+
+  return promise->future();
+}
+
+// TODO(chenlily): Generalize functionality into a concurrency limiter
+// within libprocess.
+void Docker::inspectBatches(
+    Owned<list<Docker::Container>> containers,
+    Owned<vector<string>> lines,
+    Owned<Promise<list<Docker::Container>>> promise,
+    const Docker& docker,
+    const Option<string>& prefix)
+{
+  list<Future<Docker::Container>> batch =
+    createInspectBatch(lines, docker, prefix);
+
+  collect(batch).onAny([=](const Future<list<Docker::Container>>& c) {
+    if (c.isReady()) {
+      foreach (const Docker::Container& container, c.get()) {
+        containers->push_back(container);
+      }
+      if (lines->empty()) {
+        promise->set(*containers);
+      }
+      else {
+        inspectBatches(containers, lines, promise, docker, prefix);
+      }
+    } else {
+      if (c.isFailed()) {
+        promise->fail("Docker ps batch failed " + c.failure());
+      }
+      else {
+        promise->fail("Docker ps batch discarded");
+      }
+    }
+  });
+}
+
+
+list<Future<Docker::Container>> Docker::createInspectBatch(
+    Owned<vector<string>> lines,
+    const Docker& docker,
+    const Option<string>& prefix)
+{
+  list<Future<Docker::Container>> batch;
+
+  while (!lines->empty() && batch.size() < DOCKER_PS_MAX_INSPECT_CALLS) {
+    string line = lines->back();
+    lines->pop_back();
 
-  foreach (const string& line, lines) {
     // Inspect the containers that we are interested in depending on
     // whether or not a 'prefix' was specified.
     vector<string> columns = strings::split(strings::trim(line), " ");
+
     // We expect the name column to be the last column from ps.
     string name = columns[columns.size() - 1];
-    if (prefix.isNone()) {
-      futures.push_back(docker.inspect(name));
-    } else if (strings::startsWith(name, prefix.get())) {
-      futures.push_back(docker.inspect(name));
+    if (prefix.isNone() || strings::startsWith(name, prefix.get())) {
+      batch.push_back(docker.inspect(name));
     }
   }
 
-  return collect(futures);
+  return batch;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f6f5d85a/src/docker/docker.hpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp
index 38e5299..6086710 100644
--- a/src/docker/docker.hpp
+++ b/src/docker/docker.hpp
@@ -201,6 +201,18 @@ private:
       const Option<std::string>& prefix,
       const std::string& output);
 
+  static void inspectBatches(
+      process::Owned<std::list<Docker::Container>> containers,
+      process::Owned<std::vector<std::string>> lines,
+      process::Owned<process::Promise<std::list<Docker::Container>>> promise,
+      const Docker& docker,
+      const Option<std::string>& prefix);
+
+  static std::list<process::Future<Docker::Container>> createInspectBatch(
+      process::Owned<std::vector<std::string>> lines,
+      const Docker& docker,
+      const Option<std::string>& prefix);
+
   static process::Future<Image> _pull(
       const Docker& docker,
       const process::Subprocess& s,

http://git-wip-us.apache.org/repos/asf/mesos/blob/f6f5d85a/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index ccfe89c..df18676 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -111,6 +111,10 @@ extern const std::string DEFAULT_AUTHENTICATEE;
 // Default maximum storage space to be used by the fetcher cache.
 const Bytes DEFAULT_FETCHER_CACHE_SIZE = Gigabytes(2);
 
+// Default maximum number of docker inspect calls docker ps will invoke
+// in parallel to prevent hitting system's open file descriptor limit.
+const int DOCKER_PS_MAX_INSPECT_CALLS = 100;
+
 // If no pings received within this timeout, then the slave will
 // trigger a re-detection of the master to cause a re-registration.
 Duration DEFAULT_MASTER_PING_TIMEOUT();

Reply via email to