Bug fixes and cleanups in Docker abstraction.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fa400fef Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fa400fef Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fa400fef Branch: refs/heads/master Commit: fa400fef337900a97ccc77906dbf80b7d0fbd967 Parents: 86a9769 Author: Benjamin Hindman <[email protected]> Authored: Mon Jun 23 09:32:57 2014 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Mon Aug 4 15:08:15 2014 -0700 ---------------------------------------------------------------------- src/docker/docker.cpp | 107 +++++++++++++++++++++++++++++++++++++++------ src/docker/docker.hpp | 17 +++++-- 2 files changed, 107 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/fa400fef/src/docker/docker.cpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp index db31ba3..976a660 100644 --- a/src/docker/docker.cpp +++ b/src/docker/docker.cpp @@ -4,9 +4,12 @@ #include <stout/lambda.hpp> #include <stout/strings.hpp> +#include <stout/result.hpp> + +#include <stout/os/read.hpp> + #include <process/check.hpp> #include <process/collect.hpp> -#include <process/io.hpp> #include "docker/docker.hpp" @@ -18,6 +21,16 @@ using std::string; using std::vector; +string Docker::Container::id() const +{ + map<string, JSON::Value>::const_iterator entry = + json.values.find("Id"); + CHECK(entry != json.values.end()); + JSON::Value value = entry->second; + CHECK(value.is<JSON::String>()); + return value.as<JSON::String>().value; +} + string Docker::Container::name() const { map<string, JSON::Value>::const_iterator entry = @@ -29,10 +42,16 @@ string Docker::Container::name() const } -Future<Option<int> > Docker::run(const string& image) const +Future<Option<int> > Docker::run( + const string& image, + const string& command, + const string& name) const { + VLOG(1) << "Running " << path << " run --name=" << name << " " + << image << " " << command; + Try<Subprocess> s = subprocess( - path + " run " + image, + path + " run --name=" + name + " " + image + " " + command, Subprocess::PIPE(), Subprocess::PIPE(), Subprocess::PIPE()); @@ -47,6 +66,8 @@ Future<Option<int> > Docker::run(const string& image) const Future<Option<int> > Docker::kill(const string& container) const { + VLOG(1) << "Running " << path << " kill " << container; + Try<Subprocess> s = subprocess( path + " kill " + container, Subprocess::PIPE(), @@ -63,6 +84,8 @@ Future<Option<int> > Docker::kill(const string& container) const Future<Docker::Container> Docker::inspect(const string& container) const { + VLOG(1) << "Running " << path << " inspect " << container; + Try<Subprocess> s = subprocess( path + " inspect " + container, Subprocess::PIPE(), @@ -70,7 +93,6 @@ Future<Docker::Container> Docker::inspect(const string& container) const Subprocess::PIPE()); if (s.isError()) { - // TODO(benh): Include stdout and stderr in error message. return Failure(s.error()); } @@ -79,24 +101,73 @@ Future<Docker::Container> Docker::inspect(const string& container) const } +namespace os { + +inline Result<std::string> read( + int fd, + Option<size_t> size = None(), + size_t chunk = 16 * 4096) +{ + std::string result; + + while (size.isNone() || result.size() < size.get()) { + char buffer[chunk]; + ssize_t length = ::read(fd, buffer, chunk); + + if (length < 0) { + // TODO(bmahler): Handle a non-blocking fd? (EAGAIN, EWOULDBLOCK) + if (errno == EINTR) { + continue; + } + return ErrnoError(); + } else if (length == 0) { + // Reached EOF before expected! Only return as much data as + // available or None if we haven't read anything yet. + if (result.size() > 0) { + return result; + } + return None(); + } + + result.append(buffer, length); + } + + return result; +} + +} // namespace os { + + Future<Docker::Container> Docker::_inspect(const Subprocess& s) { - // Check the exit status of 'docker ps'. + // Check the exit status of 'docker inspect'. CHECK_READY(s.status()); Option<int> status = s.status().get(); if (status.isSome() && status.get() != 0) { - // TODO(benh): Include stdout and stderr in error message. - return Failure("Failed to do 'docker ps'"); + // TODO(benh): Include stderr in error message. + Result<string> read = os::read(s.err().get()); + return Failure("Failed to do 'docker inspect': " + + (read.isSome() + ? read.get() + : " exited with status " + stringify(status.get()))); } // Read to EOF. // TODO(benh): Read output asynchronously. CHECK_SOME(s.out()); - string output = io::read(s.out().get()).get(); + Result<string> output = os::read(s.out().get()); + + if (output.isError()) { + // TODO(benh): Include stderr in error message. + return Failure("Failed to read output: " + output.error()); + } else if (output.isNone()) { + // TODO(benh): Include stderr in error message. + return Failure("No output available"); + } - Try<JSON::Array> parse = JSON::parse<JSON::Array>(output); + Try<JSON::Array> parse = JSON::parse<JSON::Array>(output.get()); if (parse.isError()) { return Failure("Failed to parse JSON: " + parse.error()); @@ -119,6 +190,8 @@ Future<Docker::Container> Docker::_inspect(const Subprocess& s) Future<list<Docker::Container> > Docker::ps() const { + VLOG(1) << "Running " << path << " ps"; + Try<Subprocess> s = subprocess( path + " ps", Subprocess::PIPE(), @@ -144,16 +217,24 @@ Future<list<Docker::Container> > Docker::_ps( Option<int> status = s.status().get(); if (status.isSome() && status.get() != 0) { - // TODO(benh): Include stdout and stderr in error message. + // TODO(benh): Include stderr in error message. return Failure("Failed to do 'docker ps'"); } // Read to EOF. // TODO(benh): Read output asynchronously. CHECK_SOME(s.out()); - string output = io::read(s.out().get()).get(); + Result<string> output = os::read(s.out().get()); + + if (output.isError()) { + // TODO(benh): Include stderr in error message. + return Failure("Failed to read output: " + output.error()); + } else if (output.isNone()) { + // TODO(benh): Include stderr in error message. + return Failure("No output available"); + } - vector<string> lines = strings::split(output, "\n"); + vector<string> lines = strings::tokenize(output.get(), "\n"); // Skip the header. CHECK_NE(0, lines.size()); @@ -163,7 +244,7 @@ Future<list<Docker::Container> > Docker::_ps( foreach (const string& line, lines) { // Inspect the container. - futures.push_back(docker.inspect(strings::split(line, "\n")[0])); + futures.push_back(docker.inspect(strings::split(line, " ")[0])); } return collect(futures); http://git-wip-us.apache.org/repos/asf/mesos/blob/fa400fef/src/docker/docker.hpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp index 26d6ec3..3bed71d 100644 --- a/src/docker/docker.hpp +++ b/src/docker/docker.hpp @@ -37,6 +37,9 @@ public: public: Container(const JSON::Object& json) : json(json) {} + // Returns the ID of the container. + std::string id() const; + // Returns the name of the container. std::string name() const; @@ -48,20 +51,26 @@ public: Docker(const std::string& path) : path(path) {} // Performs 'docker run IMAGE'. - process::Future<Option<int> > run(const std::string& image) const; + process::Future<Option<int> > run( + const std::string& image, + const std::string& command, + const std::string& name) const; // Performs 'docker kill CONTAINER'. - process::Future<Option<int> > kill(const std::string& container) const; + process::Future<Option<int> > kill( + const std::string& container) const; // Performs 'docker inspect CONTAINER'. - process::Future<Container> inspect(const std::string& container) const; + process::Future<Container> inspect( + const std::string& container) const; // Performs 'docker ps'. process::Future<std::list<Container> > ps() const; private: // Continuations. - static process::Future<Container> _inspect(const process::Subprocess& s); + static process::Future<Container> _inspect( + const process::Subprocess& s); static process::Future<std::list<Container> > _ps( const Docker& docker, const process::Subprocess& s);
