Repository: mesos Updated Branches: refs/heads/master cc9fd8124 -> a47398bec
Add Docker pull to docker abstraction. Review: https://reviews.apache.org/r/25523 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a47398be Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a47398be Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a47398be Branch: refs/heads/master Commit: a47398bec4d1e24226785571eee8ee8114cd445e Parents: cc9fd81 Author: Timothy Chen <[email protected]> Authored: Wed Sep 17 09:09:14 2014 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Wed Sep 17 09:45:23 2014 -0700 ---------------------------------------------------------------------- src/docker/docker.cpp | 195 ++++++++++++++++++++++++++++++++ src/docker/docker.hpp | 34 ++++++ src/slave/containerizer/docker.cpp | 110 ++++-------------- src/tests/docker_tests.cpp | 36 +++++- 4 files changed, 283 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a47398be/src/docker/docker.cpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp index 1dd3dd1..6063114 100644 --- a/src/docker/docker.cpp +++ b/src/docker/docker.cpp @@ -232,6 +232,45 @@ Try<Docker::Container> Docker::Container::create(const JSON::Object& json) } +Try<Docker::Image> Docker::Image::create(const JSON::Object& json) +{ + Result<JSON::Value> entrypoint = + json.find<JSON::Value>("ContainerConfig.Entrypoint"); + + if (entrypoint.isError()) { + return Error("Failed to find 'ContainerConfig.Entrypoint': " + + entrypoint.error()); + + } else if (entrypoint.isNone()) { + return Error("Unable to find 'ContainerConfig.Entrypoint'"); + } + + if (entrypoint.get().is<JSON::Null>()) { + return Docker::Image(None()); + } + + if (!entrypoint.get().is<JSON::Array>()) { + return Error("Unexpected type found for 'ContainerConfig.Entrypoint'"); + } + + const list<JSON::Value>& values = entrypoint.get().as<JSON::Array>().values; + if (values.size() == 0) { + return Docker::Image(None()); + } + + vector<string> result; + + foreach (const JSON::Value& value, values) { + if (!value.is<JSON::String>()) { + return Error("Expecting 'ContainerConfig.EntryPoint' array of strings"); + } + result.push_back(value.as<JSON::String>().value); + } + + return Docker::Image(result); +} + + Future<Nothing> Docker::run( const ContainerInfo& containerInfo, const CommandInfo& commandInfo, @@ -698,3 +737,159 @@ Future<list<Docker::Container> > Docker::__ps( return collect(futures); } + + +Future<Docker::Image> Docker::pull( + const string& directory, + const string& image) +{ + vector<string> argv; + + string dockerImage = image; + + // Check if the specified image has a tag. Also split on "/" in case + // the user specified a registry server (ie: localhost:5000/image) + // to get the actual image name. If no tag was given we add a + // 'latest' tag to avoid pulling down the repository. + + vector<string> parts = strings::split(image, "/"); + + if (!strings::contains(parts.back(), ":")) { + dockerImage += ":latest"; + } + + argv.push_back(path); + argv.push_back("inspect"); + argv.push_back(dockerImage); + + string cmd = strings::join(" ", argv); + + VLOG(1) << "Running " << cmd; + + Try<Subprocess> s = subprocess( + path, + argv, + Subprocess::PATH("/dev/null"), + Subprocess::PIPE(), + Subprocess::PIPE(), + None()); + + if (s.isError()) { + return Failure("Failed to execute '" + cmd + "': " + s.error()); + } + + // We assume docker inspect to exit quickly and do not need to be + // discarded. + return s.get().status() + .then(lambda::bind( + &Docker::_pull, + s.get(), + directory, + dockerImage, + path)); +} + + +Future<Docker::Image> Docker::_pull( + const Subprocess& s, + const string& directory, + const string& image, + const string& path) +{ + Option<int> status = s.status().get(); + if (status.isSome() && status.get() == 0) { + return io::read(s.out().get()) + .then(lambda::bind(&Docker::___pull, lambda::_1)); + } + + vector<string> argv; + argv.push_back(path); + argv.push_back("pull"); + argv.push_back(image); + + string cmd = strings::join(" ", argv); + + VLOG(1) << "Running " << cmd; + + // Set HOME variable to pick up .dockercfg. + map<string, string> environment; + + environment["HOME"] = directory; + + Try<Subprocess> s_ = subprocess( + path, + argv, + Subprocess::PATH("/dev/null"), + Subprocess::PIPE(), + Subprocess::PIPE(), + None(), + environment); + + if (s_.isError()) { + return Failure("Failed to execute '" + cmd + "': " + s_.error()); + } + + // Docker pull can run for a long time due to large images, so + // we allow the future to be discarded and it will kill the pull + // process. + return s_.get().status() + .then(lambda::bind(&Docker::__pull, s_.get(), cmd)) + .onDiscard(lambda::bind(&Docker::pullDiscarded, s_.get(), cmd)); +} + + +void Docker::pullDiscarded(const Subprocess& s, const string& cmd) +{ + VLOG(1) << "'" << cmd << "' is being discarded"; + os::killtree(s.pid(), SIGKILL); +} + + +Future<Docker::Image> Docker::__pull( + const Subprocess& s, + const string& cmd) +{ + Option<int> status = s.status().get(); + + if (!status.isSome()) { + return Failure("No status found from '" + cmd + "'"); + } else if (status.get() != 0) { + return io::read(s.err().get()) + .then(lambda::bind(&failure<Image>, cmd, status.get(), lambda::_1)); + } + + return io::read(s.out().get()) + .then(lambda::bind(&Docker::___pull, lambda::_1)); +} + + +Future<Docker::Image> Docker::___pull( + const string& output) +{ + Try<JSON::Array> parse = JSON::parse<JSON::Array>(output); + + if (parse.isError()) { + return Failure("Failed to parse JSON: " + parse.error()); + } + + JSON::Array array = parse.get(); + + // Only return if only one image identified with name. + if (array.values.size() == 1) { + CHECK(array.values.front().is<JSON::Object>()); + + Try<Docker::Image> image = + Docker::Image::create(array.values.front().as<JSON::Object>()); + + if (image.isError()) { + return Failure("Unable to create image: " + image.error()); + } + + return image.get(); + } + + // TODO(tnachen): Handle the case where the short image ID was + // not sufficiently unique and 'array.values.size() > 1'. + + return Failure("Failed to find image"); +} http://git-wip-us.apache.org/repos/asf/mesos/blob/a47398be/src/docker/docker.hpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp index e7adedb..443db49 100644 --- a/src/docker/docker.hpp +++ b/src/docker/docker.hpp @@ -65,6 +65,19 @@ public: : id(_id), name(_name), pid(_pid) {} }; + class Image + { + public: + static Try<Image> create(const JSON::Object& json); + + Option<std::vector<std::string> > entrypoint; + + private: + Image(const Option<std::vector<std::string> >& _entrypoint) + : entrypoint(_entrypoint) {} + }; + + // Performs 'docker run IMAGE'. process::Future<Nothing> run( const mesos::ContainerInfo& containerInfo, @@ -104,6 +117,10 @@ public: const std::string& container, const std::string& directory); + process::Future<Image> pull( + const std::string& directory, + const std::string& image); + private: // Uses the specified path to the Docker CLI tool. Docker(const std::string& _path) : path(_path) {}; @@ -133,6 +150,23 @@ private: const Option<std::string>& prefix, const std::string& output); + static process::Future<Image> _pull( + const process::Subprocess& s, + const std::string& directory, + const std::string& image, + const std::string& path); + + static process::Future<Image> __pull( + const process::Subprocess& s, + const std::string& cmd); + + static process::Future<Image> ___pull( + const std::string& output); + + static void pullDiscarded( + const process::Subprocess& s, + const std::string& cmd); + const std::string path; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/a47398be/src/slave/containerizer/docker.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index 0febbac..9a29489 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -135,14 +135,9 @@ private: process::Future<Nothing> pull( const ContainerID& containerId, const std::string& directory, - const ContainerInfo::DockerInfo& dockerInfo); + const std::string& image); - process::Future<Nothing> _pull( - const Subprocess& s); - - process::Future<Nothing> __pull( - const Subprocess& s, - const string& output); + process::Future<Nothing> _pull(const std::string& image); process::Future<Nothing> _recover( const std::list<Docker::Container>& containers); @@ -273,10 +268,10 @@ private: // state anymore, although it doesn't hurt since it gives us // better error messages. enum State { - FETCHING, - PULLING, - RUNNING, - DESTROYING + FETCHING = 1, + PULLING = 2, + RUNNING = 3, + DESTROYING = 4 } state; ContainerID id; @@ -302,9 +297,9 @@ private: // are fetching. Option<Subprocess> fetcher; - // The docker pull subprocess is stored so we can killtree the - // pid when destroy is called while docker is pulling the image. - Option<Subprocess> pull; + // The docker pull future is stored so we can discard when + // destroy is called while docker is pulling the image. + Future<Docker::Image> pull; // Once the container is running, this saves the pid of the // running container. @@ -430,86 +425,21 @@ Future<Nothing> DockerContainerizerProcess::_fetch( } -// TODO(benh): Move this into Docker::pull after we've correctly made -// the futures returned from Docker::* functions be discardable. Future<Nothing> DockerContainerizerProcess::pull( const ContainerID& containerId, const string& directory, - const ContainerInfo::DockerInfo& dockerInfo) -{ - vector<string> argv; - argv.push_back(flags.docker); - argv.push_back("pull"); - - // Check if the specified image has a tag. Also split on "/" in case - // the user specified a registry server (ie: localhost:5000/image) - // to get the actual image name. If no tag was given we add a - // 'latest' tag to avoid pulling down the repository. - vector<string> parts = strings::split(dockerInfo.image(), "/"); - if (strings::contains(parts.back(), ":")) { - argv.push_back(dockerInfo.image()); - } else { - argv.push_back(dockerInfo.image() + ":latest"); - } - - VLOG(1) << "Running " << strings::join(" ", argv); - - map<string, string> environment; - environment["HOME"] = directory; - - Try<Subprocess> s = subprocess( - flags.docker, - argv, - Subprocess::PATH("/dev/null"), - Subprocess::PATH("/dev/null"), - Subprocess::PIPE(), - None(), - environment); - - if (s.isError()) { - return Failure("Failed to execute 'docker pull': " + s.error()); - } - - containers_[containerId]->pull = s.get(); - - return s.get().status() - .then(defer(self(), &Self::_pull, s.get())); -} - - -Future<Nothing> DockerContainerizerProcess::_pull( - const Subprocess& s) + const string& image) { - CHECK_READY(s.status()); - - Option<int> status = s.status().get(); - - if (status.isSome() && status.get() == 0) { - return Nothing(); - } - - CHECK_SOME(s.err()); - return io::read(s.err().get()) - .then(defer(self(), &Self::__pull, s, lambda::_1)); + Future<Docker::Image> future = docker.pull(directory, image); + containers_[containerId]->pull = future; + return future.then(defer(self(), &Self::_pull, image)); } -Future<Nothing> DockerContainerizerProcess::__pull( - const Subprocess& s, - const string& output) +Future<Nothing> DockerContainerizerProcess::_pull(const string& image) { - CHECK_READY(s.status()); - - Option<int> status = s.status().get(); - - if (status.isNone()) { - return Failure("No exit status available from 'docker pull': \n" + output); - } - - CHECK_NE(0, status.get()); - - return Failure("Failed to execute 'docker pull', exited with status (" + - WSTRINGIFY(status.get()) + "): \n" + output); + VLOG(1) << "Docker pull " << image << " completed"; + return Nothing(); } @@ -849,7 +779,7 @@ Future<bool> DockerContainerizerProcess::_launch( containers_[containerId]->state = Container::PULLING; - return pull(containerId, directory, taskInfo.container().docker()) + return pull(containerId, directory, taskInfo.container().docker().image()) .then(defer(self(), &Self::__launch, containerId, @@ -1087,7 +1017,7 @@ Future<bool> DockerContainerizerProcess::_launch( containers_[containerId]->state = Container::PULLING; - return pull(containerId, directory, executorInfo.container().docker()) + return pull(containerId, directory, executorInfo.container().docker().image()) .then(defer(self(), &Self::__launch, containerId, @@ -1587,9 +1517,7 @@ void DockerContainerizerProcess::destroy( LOG(INFO) << "Destroying Container '" << containerId << "' in PULLING state"; - if (container->pull.isSome()) { - os::killtree(container->pull.get().pid(), SIGKILL); - } + container->pull.discard(); containerizer::Termination termination; termination.set_killed(killed); http://git-wip-us.apache.org/repos/asf/mesos/blob/a47398be/src/tests/docker_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/docker_tests.cpp b/src/tests/docker_tests.cpp index e6c228a..04139af 100644 --- a/src/tests/docker_tests.cpp +++ b/src/tests/docker_tests.cpp @@ -20,6 +20,8 @@ #include <process/future.hpp> #include <process/gtest.hpp> +#include <process/owned.hpp> +#include <process/subprocess.hpp> #include <stout/option.hpp> #include <stout/gtest.hpp> @@ -260,7 +262,7 @@ TEST(DockerTest, ROOT_DOCKER_CheckPortResource) resources); // Port should be out side of the provided ranges. - AWAIT_EXPECTED_FAILED(run); + AWAIT_EXPECT_FAILED(run); resources = Resources::parse("ports:[9998-9999];ports:[10000-11000]").get(); @@ -280,3 +282,35 @@ TEST(DockerTest, ROOT_DOCKER_CheckPortResource) Future<Nothing> status = docker.rm(containerName, true); AWAIT_READY(status); } + + +TEST(DockerTest, ROOT_DOCKER_CancelPull) +{ + // Delete the test image if it exists. + + Try<Subprocess> s = process::subprocess( + tests::flags.docker + " rmi lingmann/1gb", + Subprocess::PATH("/dev/null"), + Subprocess::PATH("/dev/null"), + Subprocess::PATH("/dev/null")); + + ASSERT_SOME(s); + + AWAIT_READY_FOR(s.get().status(), Seconds(30)); + + Docker docker = Docker::create(tests::flags.docker, false).get(); + + Try<string> directory = environment->mkdtemp(); + + CHECK_SOME(directory) << "Failed to create temporary directory"; + + // Assume that pulling the very large image 'lingmann/1gb' will take + // sufficiently long that we can start it and discard (i.e., cancel + // it) right away and the future will indeed get discarded. + Future<Docker::Image> future = + docker.pull(directory.get(), "lingmann/1gb"); + + future.discard(); + + AWAIT_DISCARDED(future); +}
