Repository: mesos Updated Branches: refs/heads/master 8a01dd8d2 -> f6f5d85a8
Capped number of parallel inspect instances on a docker ps call. Review: https://reviews.apache.org/r/35330 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f6f5d85a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f6f5d85a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f6f5d85a Branch: refs/heads/master Commit: f6f5d85a8b62ec50888bc39ce1a4bb1b4a1742fa Parents: 8a01dd8 Author: Lily Chen <[email protected]> Authored: Thu Aug 27 16:56:37 2015 -0700 Committer: Timothy Chen <[email protected]> Committed: Thu Aug 27 16:56:37 2015 -0700 ---------------------------------------------------------------------- src/docker/docker.cpp | 75 ++++++++++++++++++++++++++++++++++++++------ src/docker/docker.hpp | 12 +++++++ src/slave/constants.hpp | 4 +++ 3 files changed, 81 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/f6f5d85a/src/docker/docker.cpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp old mode 100644 new mode 100755 index 1367de8..12dc050 --- a/src/docker/docker.cpp +++ b/src/docker/docker.cpp @@ -857,28 +857,83 @@ Future<list<Docker::Container>> Docker::__ps( const Option<string>& prefix, const string& output) { - vector<string> lines = strings::tokenize(output, "\n"); + Owned<vector<string>> lines(new vector<string>()); + *lines = strings::tokenize(output, "\n"); // Skip the header. - CHECK(!lines.empty()); - lines.erase(lines.begin()); + CHECK(!lines->empty()); + lines->erase(lines->begin()); - list<Future<Docker::Container>> futures; + Owned<list<Docker::Container>> containers(new list<Docker::Container>()); + + Owned<Promise<list<Docker::Container>>> promise( + new Promise<list<Docker::Container>>()); + + // Limit number of parallel calls to docker inspect at once to prevent + // reaching system's open file descriptor limit. + inspectBatches(containers, lines, promise, docker, prefix); + + return promise->future(); +} + +// TODO(chenlily): Generalize functionality into a concurrency limiter +// within libprocess. +void Docker::inspectBatches( + Owned<list<Docker::Container>> containers, + Owned<vector<string>> lines, + Owned<Promise<list<Docker::Container>>> promise, + const Docker& docker, + const Option<string>& prefix) +{ + list<Future<Docker::Container>> batch = + createInspectBatch(lines, docker, prefix); + + collect(batch).onAny([=](const Future<list<Docker::Container>>& c) { + if (c.isReady()) { + foreach (const Docker::Container& container, c.get()) { + containers->push_back(container); + } + if (lines->empty()) { + promise->set(*containers); + } + else { + inspectBatches(containers, lines, promise, docker, prefix); + } + } else { + if (c.isFailed()) { + promise->fail("Docker ps batch failed " + c.failure()); + } + else { + promise->fail("Docker ps batch discarded"); + } + } + }); +} + + +list<Future<Docker::Container>> Docker::createInspectBatch( + Owned<vector<string>> lines, + const Docker& docker, + const Option<string>& prefix) +{ + list<Future<Docker::Container>> batch; + + while (!lines->empty() && batch.size() < DOCKER_PS_MAX_INSPECT_CALLS) { + string line = lines->back(); + lines->pop_back(); - foreach (const string& line, lines) { // Inspect the containers that we are interested in depending on // whether or not a 'prefix' was specified. vector<string> columns = strings::split(strings::trim(line), " "); + // We expect the name column to be the last column from ps. string name = columns[columns.size() - 1]; - if (prefix.isNone()) { - futures.push_back(docker.inspect(name)); - } else if (strings::startsWith(name, prefix.get())) { - futures.push_back(docker.inspect(name)); + if (prefix.isNone() || strings::startsWith(name, prefix.get())) { + batch.push_back(docker.inspect(name)); } } - return collect(futures); + return batch; } http://git-wip-us.apache.org/repos/asf/mesos/blob/f6f5d85a/src/docker/docker.hpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp index 38e5299..6086710 100644 --- a/src/docker/docker.hpp +++ b/src/docker/docker.hpp @@ -201,6 +201,18 @@ private: const Option<std::string>& prefix, const std::string& output); + static void inspectBatches( + process::Owned<std::list<Docker::Container>> containers, + process::Owned<std::vector<std::string>> lines, + process::Owned<process::Promise<std::list<Docker::Container>>> promise, + const Docker& docker, + const Option<std::string>& prefix); + + static std::list<process::Future<Docker::Container>> createInspectBatch( + process::Owned<std::vector<std::string>> lines, + const Docker& docker, + const Option<std::string>& prefix); + static process::Future<Image> _pull( const Docker& docker, const process::Subprocess& s, http://git-wip-us.apache.org/repos/asf/mesos/blob/f6f5d85a/src/slave/constants.hpp ---------------------------------------------------------------------- diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp index ccfe89c..df18676 100644 --- a/src/slave/constants.hpp +++ b/src/slave/constants.hpp @@ -111,6 +111,10 @@ extern const std::string DEFAULT_AUTHENTICATEE; // Default maximum storage space to be used by the fetcher cache. const Bytes DEFAULT_FETCHER_CACHE_SIZE = Gigabytes(2); +// Default maximum number of docker inspect calls docker ps will invoke +// in parallel to prevent hitting system's open file descriptor limit. +const int DOCKER_PS_MAX_INSPECT_CALLS = 100; + // If no pings received within this timeout, then the slave will // trigger a re-detection of the master to cause a re-registration. Duration DEFAULT_MASTER_PING_TIMEOUT();
