Address comments for Docker containerizer changes.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3baa6096 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3baa6096 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3baa6096 Branch: refs/heads/master Commit: 3baa60965407bf0c3eb9c3da1b2ba7c0a4fee968 Parents: 0b5f67e Author: Timothy Chen <[email protected]> Authored: Thu Apr 30 00:43:14 2015 -0700 Committer: Timothy Chen <[email protected]> Committed: Sun May 24 22:27:40 2015 -0700 ---------------------------------------------------------------------- docs/configuration.md | 18 +- src/Makefile.am | 1 + src/docker/docker.cpp | 293 +++++++++++---------- src/docker/docker.hpp | 86 ++++--- src/docker/executor.cpp | 187 +++++++------- src/docker/executor.hpp | 69 +++++ src/slave/constants.cpp | 3 + src/slave/constants.hpp | 8 + src/slave/containerizer/docker.cpp | 342 +++++++++++++----------- src/slave/containerizer/docker.hpp | 165 ++++++------ src/slave/flags.cpp | 20 +- src/tests/docker_containerizer_tests.cpp | 357 +++++++++++++------------- src/tests/docker_tests.cpp | 30 +-- 13 files changed, 872 insertions(+), 707 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index 0a7629a..5a41477 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -862,7 +862,12 @@ file:///path/to/file (where file contains one of the above)</code></pre> --[no-]docker_kill_orphans </td> <td> - Enable docker containerizer to kill orphaned containers + Enable docker containerizer to kill orphaned containers. + You should consider setting this to false when you launch multiple + slaves in the same OS, to avoid one of the DockerContainerizer removing + docker tasks launched by other slaves. However you should also make sure + you enable checkpoint for the slave so the same slave id can be reused, + otherwise docker tasks on slave restart will not be cleaned up. (default: true) </td> </tr> @@ -871,9 +876,9 @@ file:///path/to/file (where file contains one of the above)</code></pre> --docker_sock=VALUE </td> <td> - The docker UNIX socket path that the CLI uses to communicate to the - daemon. We need this to launch docker containers that can run docker - CLI. + The UNIX socket path to be mounted into the docker executor container to + provide docker CLI access to the docker daemon. This must be the path used + by the slave's docker image. (default: /var/run/docker.sock) </td> </tr> @@ -883,9 +888,10 @@ file:///path/to/file (where file contains one of the above)</code></pre> </td> <td> The docker image used to launch this mesos slave instance. - If a image is specified, the docker containerizer assumes the slave + If an image is specified, the docker containerizer assumes the slave is running in a docker container, and launches executors with - docker containers in order to recover them when the slave recovers. + docker containers in order to recover them when the slave restarts and + recovers. </td> </tr> <tr> http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 1e56ae5..814468e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -323,6 +323,7 @@ libmesos_no_3rdparty_la_SOURCES = \ common/values.cpp \ docker/docker.hpp \ docker/docker.cpp \ + docker/executor.hpp \ exec/exec.cpp \ files/files.cpp \ hook/manager.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/docker/docker.cpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp index 08f2023..ee74da5 100644 --- a/src/docker/docker.cpp +++ b/src/docker/docker.cpp @@ -41,6 +41,8 @@ #include "slave/containerizer/isolators/cgroups/cpushare.hpp" #include "slave/containerizer/isolators/cgroups/mem.hpp" +#include "slave/constants.hpp" + using namespace mesos; using namespace mesos::internal::slave; @@ -110,16 +112,16 @@ Try<Docker*> Docker::create(const string& path, bool validate) delete docker; return Error("Failed to find a mounted cgroups hierarchy " "for the 'cpu' subsystem; you probably need " - "to mount cgroups manually!"); + "to mount cgroups manually"); } #endif // __linux__ // Validate the version (and that we can use Docker at all). Future<Version> version = docker->version(); - if(!version.await(Seconds(5))) { + if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) { delete docker; - return Error("Timed out getting docker version."); + return Error("Timed out getting docker version"); } if (version.isFailed()) { @@ -129,7 +131,7 @@ Try<Docker*> Docker::create(const string& path, bool validate) if (version.get() < Version(1, 0, 0)) { delete docker; - return Error("Insufficient version of Docker! Please upgrade to >= 1.0.0"); + return Error("Insufficient version of Docker. Please upgrade to >= 1.0.0"); } return docker; @@ -165,7 +167,7 @@ Future<Version> Docker::version() const Future<Version> Docker::_version(const string& cmd, const Subprocess& s) { const Option<int>& status = s.status().get(); - if (!status.isSome() || status.get() != 0) { + if (status.isNone() || status.get() != 0) { string msg = "Failed to execute '" + cmd + "': "; if (status.isSome()) { msg += WSTRINGIFY(status.get()); @@ -198,66 +200,62 @@ Future<Version> Docker::__version(const Future<string>& output) } } - return Failure("Unable to find docker version in output."); + return Failure("Unable to find docker version in output"); } Try<Docker::Container> Docker::Container::create(const JSON::Object& json) { - map<string, JSON::Value>::const_iterator entry = - json.values.find("Id"); - if (entry == json.values.end()) { + Result<JSON::String> idValue = json.find<JSON::String>("Id"); + if (idValue.isNone()) { return Error("Unable to find Id in container"); + } else if (idValue.isError()) { + return Error("Error finding Id in container: " + idValue.error()); } - JSON::Value idValue = entry->second; - if (!idValue.is<JSON::String>()) { - return Error("Id in container is not a string type"); - } - - string id = idValue.as<JSON::String>().value; + string id = idValue.get().value; - entry = json.values.find("Name"); - if (entry == json.values.end()) { + Result<JSON::String> nameValue = json.find<JSON::String>("Name"); + if (nameValue.isNone()) { return Error("Unable to find Name in container"); + } else if (nameValue.isError()) { + return Error("Error finding Name in container: " + nameValue.error()); } - JSON::Value nameValue = entry->second; - if (!nameValue.is<JSON::String>()) { - return Error("Name in container is not string type"); - } - - string name = nameValue.as<JSON::String>().value; + string name = nameValue.get().value; - entry = json.values.find("State"); - if (entry == json.values.end()) { + Result<JSON::Object> stateValue = json.find<JSON::Object>("State"); + if (stateValue.isNone()) { return Error("Unable to find State in container"); + } else if (stateValue.isError()) { + return Error("Error finding State in container: " + stateValue.error()); } - JSON::Value stateValue = entry->second; - if (!stateValue.is<JSON::Object>()) { - return Error("State in container is not object type"); - } - - entry = stateValue.as<JSON::Object>().values.find("Pid"); - if (entry == json.values.end()) { + Result<JSON::Number> pidValue = stateValue.get().find<JSON::Number>("Pid"); + if (pidValue.isNone()) { return Error("Unable to find Pid in State"); + } else if (pidValue.isError()) { + return Error("Error finding Pid in State: " + pidValue.error()); } - // TODO(yifan): Reload operator '=' to reuse the value variable above. - JSON::Value pidValue = entry->second; - if (!pidValue.is<JSON::Number>()) { - return Error("Pid in State is not number type"); - } - - pid_t pid = pid_t(pidValue.as<JSON::Number>().value); + pid_t pid = pid_t(pidValue.get().value); Option<pid_t> optionalPid; if (pid != 0) { optionalPid = pid; } - return Docker::Container(id, name, optionalPid); + Result<JSON::String> startedAtValue = + stateValue.get().find<JSON::String>("StartedAt"); + if (startedAtValue.isNone()) { + return Error("Unable to find StartedAt in State"); + } else if (startedAtValue.isError()) { + return Error("Error finding StartedAt in State: " + startedAtValue.error()); + } + + bool started = startedAtValue.get().value != "0001-01-01T00:00:00Z"; + + return Docker::Container(id, name, optionalPid, started); } @@ -307,8 +305,9 @@ Future<Nothing> Docker::run( const string& sandboxDirectory, const string& mappedDirectory, const Option<Resources>& resources, - const Option<map<string, string> >& env, - bool detached) const + const Option<map<string, string>>& env, + const Option<string>& stdoutPath, + const Option<string>& stderrPath) const { if (!containerInfo.has_docker()) { return Failure("No docker info found in container info"); @@ -320,10 +319,6 @@ Future<Nothing> Docker::run( argv.push_back(path); argv.push_back("run"); - if (detached) { - argv.push_back("-d"); - } - if (dockerInfo.privileged()) { argv.push_back("--privileged"); } @@ -381,7 +376,7 @@ Future<Nothing> Docker::run( argv.push_back(volumeConfig); } - // Mapping sandbox directory into the contianer mapped directory. + // Mapping sandbox directory into the container mapped directory. argv.push_back("-v"); argv.push_back(sandboxDirectory + ":" + mappedDirectory); @@ -501,16 +496,22 @@ Future<Nothing> Docker::run( // URI downloads. environment["HOME"] = sandboxDirectory; + Subprocess::IO stdoutIo = Subprocess::PIPE(); + Subprocess::IO stderrIo = Subprocess::PIPE(); + if (stdoutPath.isSome()) { + stdoutIo = Subprocess::PATH(stdoutPath.get()); + } + + if (stderrPath.isSome()) { + stderrIo = Subprocess::PATH(stderrPath.get()); + } + Try<Subprocess> s = subprocess( path, argv, Subprocess::PATH("/dev/null"), - (detached - ? Subprocess::PIPE() - : Subprocess::PATH(path::join(sandboxDirectory, "stdout"))), - (detached - ? Subprocess::PIPE() - : Subprocess::PATH(path::join(sandboxDirectory, "stderr"))), + stdoutIo, + stderrIo, None(), environment); @@ -518,10 +519,9 @@ Future<Nothing> Docker::run( return Failure(s.error()); } - if (detached) { - return checkError(cmd, s.get()); - } - + // We don't call checkError here to avoid printing the stderr + // of the docker container task as docker run with attach forwards + // the container's stderr to the client's stderr. return s.get().status() .then(lambda::bind( &Docker::_run, @@ -533,7 +533,7 @@ Future<Nothing> Docker::run( Future<Nothing> Docker::_run(const Option<int>& status) { if (status.isNone()) { - return Failure("Failed to get exit status."); + return Failure("Failed to get exit status"); } else if (status.get() != 0) { return Failure("Container exited on error: " + WSTRINGIFY(status.get())); } @@ -543,7 +543,7 @@ Future<Nothing> Docker::_run(const Option<int>& status) Future<Nothing> Docker::stop( - const string& container, + const string& containerName, const Duration& timeout, bool remove) const { @@ -553,7 +553,8 @@ Future<Nothing> Docker::stop( stringify(timeoutSecs)); } - string cmd = path + " stop -t " + stringify(timeoutSecs) + " " + container; + string cmd = path + " stop -t " + stringify(timeoutSecs) + + " " + containerName; VLOG(1) << "Running " << cmd; @@ -571,7 +572,7 @@ Future<Nothing> Docker::stop( .then(lambda::bind( &Docker::_stop, *this, - container, + containerName, cmd, s.get(), remove)); @@ -579,7 +580,7 @@ Future<Nothing> Docker::stop( Future<Nothing> Docker::_stop( const Docker& docker, - const string& container, + const string& containerName, const string& cmd, const Subprocess& s, bool remove) @@ -588,7 +589,7 @@ Future<Nothing> Docker::_stop( if (remove) { bool force = !status.isSome() || status.get() != 0; - return docker.rm(container, force); + return docker.rm(containerName, force); } return checkError(cmd, s); @@ -596,10 +597,10 @@ Future<Nothing> Docker::_stop( Future<Nothing> Docker::rm( - const string& container, + const string& containerName, bool force) const { - const string cmd = path + (force ? " rm -f " : " rm ") + container; + const string cmd = path + (force ? " rm -f " : " rm ") + containerName; VLOG(1) << "Running " << cmd; @@ -617,9 +618,28 @@ Future<Nothing> Docker::rm( } -Future<Docker::Container> Docker::inspect(const string& container) const +Future<Docker::Container> Docker::inspect( + const string& containerName, + const Option<Duration>& retryInterval) const { - const string cmd = path + " inspect " + container; + Owned<Promise<Docker::Container>> promise(new Promise<Docker::Container>()); + + const string cmd = path + " inspect " + containerName; + _inspect(cmd, promise, retryInterval); + + return promise->future(); +} + + +void Docker::_inspect( + const string& cmd, + const Owned<Promise<Docker::Container>>& promise, + const Option<Duration>& retryInterval) +{ + if (promise->future().hasDiscard()) { + promise->discard(); + return; + } VLOG(1) << "Running " << cmd; @@ -630,48 +650,86 @@ Future<Docker::Container> Docker::inspect(const string& container) const Subprocess::PIPE()); if (s.isError()) { - return Failure(s.error()); + promise->fail(s.error()); + return; } - return s.get().status() - .then(lambda::bind(&Docker::_inspect, cmd, s.get())); + s.get().status() + .onAny([=]() { __inspect(cmd, promise, retryInterval, s.get()); }); } -Future<Docker::Container> Docker::_inspect( +void Docker::__inspect( const string& cmd, + const Owned<Promise<Docker::Container>>& promise, + const Option<Duration>& retryInterval, const Subprocess& s) { + if (promise->future().hasDiscard()) { + promise->discard(); + return; + } + // Check the exit status of 'docker inspect'. CHECK_READY(s.status()); Option<int> status = s.status().get(); if (!status.isSome()) { - return Failure("No status found from '" + cmd + "'"); + promise->fail("No status found from '" + cmd + "'"); } else if (status.get() != 0) { + if (retryInterval.isSome()) { + VLOG(1) << "Retrying inspect with non-zero status code. cmd: '" + << cmd << "', interval: " << stringify(retryInterval.get()); + Clock::timer(retryInterval.get(), + [=]() { _inspect(cmd, promise, retryInterval); } ); + return; + } + CHECK_SOME(s.err()); - return io::read(s.err().get()) + io::read(s.err().get()) .then(lambda::bind( - failure<Docker::Container>, + failure<Nothing>, cmd, status.get(), - lambda::_1)); + lambda::_1)) + .onAny([=](const Future<Nothing>& future) { + CHECK_FAILED(future); + promise->fail(future.failure()); + }); + return; } // Read to EOF. CHECK_SOME(s.out()); - return io::read(s.out().get()) - .then(lambda::bind(&Docker::__inspect, lambda::_1)); + io::read(s.out().get()) + .onAny([=](const Future<string>& output) { + ___inspect(cmd, promise, retryInterval, output); + }); } -Future<Docker::Container> Docker::__inspect(const string& output) +void Docker::___inspect( + const string& cmd, + const Owned<Promise<Docker::Container>>& promise, + const Option<Duration>& retryInterval, + const Future<string>& output) { - Try<JSON::Array> parse = JSON::parse<JSON::Array>(output); + if (promise->future().hasDiscard()) { + promise->discard(); + return; + } + + if (!output.isReady()) { + promise->fail(output.isFailed() ? output.failure() : "future discarded"); + return; + } + + Try<JSON::Array> parse = JSON::parse<JSON::Array>(output.get()); if (parse.isError()) { - return Failure("Failed to parse JSON: " + parse.error()); + promise->fail("Failed to parse JSON: " + parse.error()); + return; } JSON::Array array = parse.get(); @@ -682,69 +740,30 @@ Future<Docker::Container> Docker::__inspect(const string& output) Docker::Container::create(array.values.front().as<JSON::Object>()); if (container.isError()) { - return Failure("Unable to create container: " + container.error()); + promise->fail("Unable to create container: " + container.error()); + return; } - return container.get(); + if (retryInterval.isSome() && !container.get().started) { + VLOG(1) << "Retrying inspect since container not yet started. cmd: '" + << cmd << "', interval: " << stringify(retryInterval.get()); + Clock::timer(retryInterval.get(), + [=]() { _inspect(cmd, promise, retryInterval); } ); + return; + } + + promise->set(container.get()); + return; } // TODO(benh): Handle the case where the short container ID was // not sufficiently unique and 'array.values.size() > 1'. - return Failure("Failed to find container"); -} - - -Future<Nothing> Docker::logs( - const std::string& container, - const std::string& directory) const -{ - // Redirect the logs into stdout/stderr. - // - // TODO(benh): This is an intermediate solution for now until we can - // reliably stream the logs either from the CLI or from the REST - // interface directly. The problem is that it's possible that the - // 'docker logs --follow' command will be started AFTER the - // container has already terminated, and thus it will continue - // running forever because the container has stopped. Unfortunately, - // when we later remove the container that still doesn't cause the - // 'logs' process to exit. Thus, we wait some period of time until - // after the container has terminated in order to let any log data - // get flushed, then we kill the 'logs' process ourselves. A better - // solution would be to first "create" the container, then start - // following the logs, then finally "start" the container so that - // when the container terminates Docker will properly close the log - // stream and 'docker logs' will exit. For more information, please - // see: https://github.com/docker/docker/issues/7020 - - string logs = - "logs() {\n" - " " + path + " logs --follow $1 &\n" - " pid=$!\n" - " " + path + " wait $1 >/dev/null 2>&1\n" - " sleep 10\n" // Sleep 10 seconds to make sure the logs are flushed. - " kill -TERM $pid >/dev/null 2>&1 &\n" - "}\n" - "logs " + container; - - VLOG(1) << "Running " << logs; - - Try<Subprocess> s = subprocess( - logs, - Subprocess::PATH("/dev/null"), - Subprocess::PATH(path::join(directory, "stdout")), - Subprocess::PATH(path::join(directory, "stderr"))); - - if (s.isError()) { - return Failure("Unable to launch docker logs: " + s.error()); - } - - return s.get().status() - .then(lambda::bind(&_nothing)); + promise->fail("Failed to find container"); } -Future<list<Docker::Container> > Docker::ps( +Future<list<Docker::Container>> Docker::ps( bool all, const Option<string>& prefix) const { @@ -772,7 +791,7 @@ Future<list<Docker::Container> > Docker::ps( } -Future<list<Docker::Container> > Docker::_ps( +Future<list<Docker::Container>> Docker::_ps( const Docker& docker, const string& cmd, const Subprocess& s, @@ -789,7 +808,7 @@ Future<list<Docker::Container> > Docker::_ps( CHECK_SOME(s.err()); return io::read(s.err().get()) .then(lambda::bind( - failure<list<Docker::Container> >, + failure<list<Docker::Container>>, cmd, status.get(), lambda::_1)); @@ -800,7 +819,7 @@ Future<list<Docker::Container> > Docker::_ps( } -Future<list<Docker::Container> > Docker::__ps( +Future<list<Docker::Container>> Docker::__ps( const Docker& docker, const Option<string>& prefix, const string& output) @@ -811,7 +830,7 @@ Future<list<Docker::Container> > Docker::__ps( CHECK(!lines.empty()); lines.erase(lines.begin()); - list<Future<Docker::Container> > futures; + list<Future<Docker::Container>> futures; foreach (const string& line, lines) { // Inspect the containers that we are interested in depending on http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/docker/docker.hpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp index 3b6ea64..d06c73a 100644 --- a/src/docker/docker.hpp +++ b/src/docker/docker.hpp @@ -52,21 +52,26 @@ public: static Try<Container> create(const JSON::Object& json); // Returns the ID of the container. - std::string id; + const std::string id; // Returns the name of the container. - std::string name; + const std::string name; // Returns the pid of the container, or None if the container is // not running. - Option<pid_t> pid; + const Option<pid_t> pid; + + // Returns if the container has already started. This field is + // needed since pid is empty when the container terminates. + const bool started; private: Container( - const std::string& _id, - const std::string& _name, - const Option<pid_t>& _pid) - : id(_id), name(_name), pid(_pid) {} + const std::string& id, + const std::string& name, + const Option<pid_t>& pid, + bool started) + : id(id), name(name), pid(pid), started(started) {} }; class Image @@ -74,26 +79,27 @@ public: public: static Try<Image> create(const JSON::Object& json); - Option<std::vector<std::string> > entrypoint; + Option<std::vector<std::string>> entrypoint; private: - Image(const Option<std::vector<std::string> >& _entrypoint) + Image(const Option<std::vector<std::string>>& _entrypoint) : entrypoint(_entrypoint) {} }; - // Returns the current docker version. - virtual process::Future<Version> version() const; - // Performs 'docker run IMAGE'. virtual process::Future<Nothing> run( const mesos::ContainerInfo& containerInfo, const mesos::CommandInfo& commandInfo, - const std::string& name, + const std::string& containerName, const std::string& sandboxDirectory, const std::string& mappedDirectory, const Option<mesos::Resources>& resources = None(), - const Option<std::map<std::string, std::string> >& env = None(), - bool detached = true) const; + const Option<std::map<std::string, std::string>>& env = None(), + const Option<std::string>& stdoutPath = None(), + const Option<std::string>& stderrPath = None()) const; + + // Returns the current docker version. + virtual process::Future<Version> version() const; // Performs 'docker stop -t TIMEOUT CONTAINER'. If remove is true then a rm -f // will be called when stop failed, otherwise a failure is returned. The @@ -102,33 +108,27 @@ public: // A value of zero (the default value) is the same as issuing a // 'docker kill CONTAINER'. virtual process::Future<Nothing> stop( - const std::string& container, + const std::string& containerName, const Duration& timeout = Seconds(0), bool remove = false) const; // Performs 'docker rm (-f) CONTAINER'. virtual process::Future<Nothing> rm( - const std::string& container, + const std::string& containerName, bool force = false) const; - // Performs 'docker inspect CONTAINER'. + // Performs 'docker inspect CONTAINER'. If retryInterval is set, + // we will keep retrying inspect until the container is started or + // the future is discarded. virtual process::Future<Container> inspect( - const std::string& container) const; + const std::string& containerName, + const Option<Duration>& retryInterval = None()) const; // Performs 'docker ps (-a)'. - virtual process::Future<std::list<Container> > ps( + virtual process::Future<std::list<Container>> ps( bool all = false, const Option<std::string>& prefix = None()) const; - // Performs a 'docker logs --follow' and sends the output into a - // 'stderr' and 'stdout' file in the specfied directory. - // - // TODO(benh): Return the file descriptors, or some struct around - // them so others can do what they want with stdout/stderr. - virtual process::Future<Nothing> logs( - const std::string& container, - const std::string& directory) const; - virtual process::Future<Image> pull( const std::string& directory, const std::string& image, @@ -139,6 +139,9 @@ protected: Docker(const std::string& _path) : path(_path) {}; private: + static process::Future<Nothing> _run( + const Option<int>& status); + static process::Future<Version> _version( const std::string& cmd, const process::Subprocess& s); @@ -148,29 +151,36 @@ private: static process::Future<Nothing> _stop( const Docker& docker, - const std::string& container, + const std::string& containerName, const std::string& cmd, const process::Subprocess& s, bool remove); - static process::Future<Container> _inspect( + static void _inspect( const std::string& cmd, - const process::Subprocess& s); + const process::Owned<process::Promise<Container>>& promise, + const Option<Duration>& retryInterval); - static process::Future<Container> __inspect( - const std::string& output); + static void __inspect( + const std::string& cmd, + const process::Owned<process::Promise<Container>>& promise, + const Option<Duration>& retryInterval, + const process::Subprocess& s); - static process::Future<Nothing> _run( - const Option<int>& status); + static void ___inspect( + const std::string& cmd, + const process::Owned<process::Promise<Container>>& promise, + const Option<Duration>& retryInterval, + const process::Future<std::string>& output); - static process::Future<std::list<Container> > _ps( + static process::Future<std::list<Container>> _ps( const Docker& docker, const std::string& cmd, const process::Subprocess& s, const Option<std::string>& prefix, process::Future<std::string> output); - static process::Future<std::list<Container> > __ps( + static process::Future<std::list<Container>> __ps( const Docker& docker, const Option<std::string>& prefix, const std::string& output); http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/docker/executor.cpp ---------------------------------------------------------------------- diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp index 847997b..075c6b5 100644 --- a/src/docker/executor.cpp +++ b/src/docker/executor.cpp @@ -35,6 +35,7 @@ #include "common/status_utils.hpp" #include "docker/docker.hpp" +#include "docker/executor.hpp" #include "logging/flags.hpp" #include "logging/logging.hpp" @@ -46,6 +47,7 @@ using std::string; namespace mesos { namespace internal { +namespace docker { using namespace mesos; using namespace process; @@ -55,9 +57,9 @@ using namespace process; // redirect log output to configured stdout and stderr files. // Similar to the CommandExecutor, it is only responsible to launch // one container and exits afterwards. -// The executor also assumes it is launched from the -// DockerContainerizer, which already calls setsid before launching -// the executor. +// The executor assumes that it is launched from the +// DockerContainerizer, which already sets up when launching the +// executor that ensures its kept running if the slave exits. class DockerExecutorProcess : public ProtobufProcess<DockerExecutorProcess> { public: @@ -65,13 +67,15 @@ public: const Owned<Docker>& docker, const string& container, const string& sandboxDirectory, - const string& mappedDirectory) - : launched(false), - killed(false), + const string& mappedDirectory, + const Duration& stopTimeout) + : killed(false), docker(docker), container(container), sandboxDirectory(sandboxDirectory), - mappedDirectory(mappedDirectory) {} + mappedDirectory(mappedDirectory), + stopTimeout(stopTimeout), + stop(Nothing()) {} virtual ~DockerExecutorProcess() {} @@ -92,13 +96,16 @@ public: cout << "Re-registered docker executor on " << slaveInfo.hostname() << endl; } - void disconnected(ExecutorDriver* driver) {} + void disconnected(ExecutorDriver* driver) + { + cout << "Disconnected from the slave" << endl; + } void launchTask(ExecutorDriver* driver, const TaskInfo& task) { - if (launched) { + if (run.isSome()) { TaskStatus status; - status.mutable_task_id()->MergeFrom(task.task_id()); + status.mutable_task_id()->CopyFrom(task.task_id()); status.set_state(TASK_FAILED); status.set_message( "Attempted to run multiple tasks using a \"docker\" executor"); @@ -109,46 +116,43 @@ public: cout << "Starting task " << task.task_id().value() << endl; - // We assume the Docker executor is launched from the - // DockerContainerizer, which already calls setsid before - // launching the executor. - CHECK(task.has_container()); CHECK(task.has_command()); - ContainerInfo containerInfo = task.container(); - - CHECK(containerInfo.type() == ContainerInfo::DOCKER); + CHECK(task.container().type() == ContainerInfo::DOCKER); - Future<Nothing> run = docker->run( - containerInfo, + // We're adding task and executor resources to launch docker since + // the DockerContainerizer updates the container cgroup limits + // directly and it expects it to be the sum of both task and + // executor resources. This does leave to a bit of unaccounted + // resources for running this executor, but we are assuming + // this is just a very small amount of overcommit. + run = docker->run( + task.container(), task.command(), container, sandboxDirectory, mappedDirectory, task.resources() + task.executor().resources(), None(), - false); - - run.onAny(defer( + path::join(sandboxDirectory, "stdout"), + path::join(sandboxDirectory, "stderr")) + .onAny(defer( self(), &Self::reaped, driver, task.task_id(), lambda::_1)); - dockerRun = run; - TaskStatus status; - status.mutable_task_id()->MergeFrom(task.task_id()); + status.mutable_task_id()->CopyFrom(task.task_id()); status.set_state(TASK_RUNNING); driver->sendStatusUpdate(status); - - launched = true; } void killTask(ExecutorDriver* driver, const TaskID& taskId) { + cout << "Killing docker task" << endl; shutdown(driver); } @@ -158,14 +162,19 @@ public: { cout << "Shutting down" << endl; - if (dockerRun.isSome() && !killed) { - Future<Nothing> dockerRun_ = dockerRun.get(); - dockerRun_.discard(); + if (run.isSome() && !killed) { + // The docker daemon might still be in progress starting the + // container, therefore we kill both the docker run process + // and also ask the daemon to stop the container. + + // Making a mutable copy of the future so we can call discard. + Future<Nothing>(run.get()).discard(); + stop = docker->stop(container, stopTimeout); killed = true; } } - virtual void error(ExecutorDriver* driver, const string& message) {} + void error(ExecutorDriver* driver, const string& message) {} private: void reaped( @@ -173,9 +182,22 @@ private: const TaskID& taskId, const Future<Nothing>& run) { + stop.onAny(defer(self(), &Self::_reaped, driver, taskId, run, lambda::_1)); + } + + void _reaped( + ExecutorDriver* driver, + const TaskID& taskId, + const Future<Nothing>& run, + const Future<Nothing>& stop) + { TaskState state; string message; - if (killed) { + if (!stop.isReady()) { + state = TASK_FAILED; + message = "Unable to stop docker container, error: " + + (stop.isFailed() ? stop.failure() : "future discarded"); + } else if (killed) { state = TASK_KILLED; } else if (!run.isReady()) { state = TASK_FAILED; @@ -186,7 +208,7 @@ private: } TaskStatus taskStatus; - taskStatus.mutable_task_id()->MergeFrom(taskId); + taskStatus.mutable_task_id()->CopyFrom(taskId); taskStatus.set_state(state); taskStatus.set_message(message); @@ -194,18 +216,21 @@ private: // A hack for now ... but we need to wait until the status update // is sent to the slave before we shut ourselves down. + // TODO(tnachen): Remove this hack and also the same hack in the + // command executor when we have the new HTTP APIs to wait until + // an ack. os::sleep(Seconds(1)); driver->stop(); } - - bool launched; bool killed; Owned<Docker> docker; string container; string sandboxDirectory; string mappedDirectory; - Option<Future<Nothing>> dockerRun; + Duration stopTimeout; + Option<Future<Nothing>> run; + Future<Nothing> stop; Option<ExecutorDriver*> driver; }; @@ -217,22 +242,23 @@ public: const Owned<Docker>& docker, const string& container, const string& sandboxDirectory, - const string& mappedDirectory) + const string& mappedDirectory, + const Duration& stopTimeout) { - process = new DockerExecutorProcess( + process = Owned<DockerExecutorProcess>(new DockerExecutorProcess( docker, container, sandboxDirectory, - mappedDirectory); + mappedDirectory, + stopTimeout)); - spawn(process); + spawn(process.get()); } virtual ~DockerExecutor() { - terminate(process); - wait(process); - delete process; + terminate(process.get()); + wait(process.get()); } virtual void registered( @@ -241,7 +267,7 @@ public: const FrameworkInfo& frameworkInfo, const SlaveInfo& slaveInfo) { - dispatch(process, + dispatch(process.get(), &DockerExecutorProcess::registered, driver, executorInfo, @@ -253,7 +279,7 @@ public: ExecutorDriver* driver, const SlaveInfo& slaveInfo) { - dispatch(process, + dispatch(process.get(), &DockerExecutorProcess::reregistered, driver, slaveInfo); @@ -261,38 +287,43 @@ public: virtual void disconnected(ExecutorDriver* driver) { - dispatch(process, &DockerExecutorProcess::disconnected, driver); + dispatch(process.get(), &DockerExecutorProcess::disconnected, driver); } virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task) { - dispatch(process, &DockerExecutorProcess::launchTask, driver, task); + dispatch(process.get(), &DockerExecutorProcess::launchTask, driver, task); } virtual void killTask(ExecutorDriver* driver, const TaskID& taskId) { - dispatch(process, &DockerExecutorProcess::killTask, driver, taskId); + dispatch(process.get(), &DockerExecutorProcess::killTask, driver, taskId); } virtual void frameworkMessage(ExecutorDriver* driver, const string& data) { - dispatch(process, &DockerExecutorProcess::frameworkMessage, driver, data); + dispatch(process.get(), + &DockerExecutorProcess::frameworkMessage, + driver, + data); } virtual void shutdown(ExecutorDriver* driver) { - dispatch(process, &DockerExecutorProcess::shutdown, driver); + dispatch(process.get(), &DockerExecutorProcess::shutdown, driver); } virtual void error(ExecutorDriver* driver, const string& data) { - dispatch(process, &DockerExecutorProcess::error, driver, data); + dispatch(process.get(), &DockerExecutorProcess::error, driver, data); } private: - DockerExecutorProcess* process; + Owned<DockerExecutorProcess> process; }; + +} // namespace docker { } // namespace internal { } // namespace mesos { @@ -306,41 +337,11 @@ void usage(const char* argv0, const flags::FlagsBase& flags) } -class Flags : public mesos::internal::logging::Flags -{ -public: - Flags() - { - add(&Flags::container, - "container", - "The name of the docker container to run.\n"); - - add(&Flags::docker, - "docker", - "The path to the docker cli executable.\n"); - - add(&Flags::sandbox_directory, - "sandbox_directory", - "The path to the container sandbox that stores stdout and stderr\n" - "files that is being redirected with docker container logs.\n"); - - add(&Flags::mapped_directory, - "mapped_directory", - "The sandbox directory path that is mapped in the docker container.\n"); - } - - Option<string> container; - Option<string> docker; - Option<string> sandbox_directory; - Option<string> mapped_directory; -}; - - int main(int argc, char** argv) { GOOGLE_PROTOBUF_VERIFY_VERSION; - Flags flags; + mesos::internal::docker::Flags flags; bool help; flags.add(&help, @@ -357,6 +358,8 @@ int main(int argc, char** argv) return -1; } + std::cout << stringify(flags) << std::endl; + mesos::internal::logging::initialize(argv[0], flags, true); // Catch signals. if (help) { @@ -364,6 +367,8 @@ int main(int argc, char** argv) return -1; } + std::cout << stringify(flags) << std::endl; + if (flags.docker.isNone()) { LOG(WARNING) << "Expected docker executable path"; usage(argv[0], flags); @@ -388,19 +393,27 @@ int main(int argc, char** argv) return 0; } - // We skip validation when creating docker abstraction as we - // don't want to be checking for cgroups in the executor. + if (flags.stop_timeout.isNone()) { + LOG(WARNING) << "Expected stop timeout"; + usage(argv[0], flags); + return 0; + } + + // The 2nd argument for docker create is set to false so we skip + // validation when creating a docker abstraction, as the slave + // should have already validated docker. Try<Docker*> docker = Docker::create(flags.docker.get(), false); if (docker.isError()) { LOG(WARNING) << "Unable to create docker abstraction: " << docker.error(); return -1; } - mesos::internal::DockerExecutor executor( + mesos::internal::docker::DockerExecutor executor( process::Owned<Docker>(docker.get()), flags.container.get(), flags.sandbox_directory.get(), - flags.mapped_directory.get()); + flags.mapped_directory.get(), + flags.stop_timeout.get()); mesos::MesosExecutorDriver driver(&executor); return driver.run() == mesos::DRIVER_STOPPED ? 0 : 1; http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/docker/executor.hpp ---------------------------------------------------------------------- diff --git a/src/docker/executor.hpp b/src/docker/executor.hpp new file mode 100644 index 0000000..fa13b6e --- /dev/null +++ b/src/docker/executor.hpp @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __DOCKER_EXECUTOR_HPP__ +#define __DOCKER_EXECUTOR_HPP__ + +#include <stdio.h> + +#include <process/process.hpp> + +#include "logging/flags.hpp" + +namespace mesos { +namespace internal { +namespace docker { + +struct Flags : public mesos::internal::logging::Flags +{ + Flags() { + add(&container, + "container", + "The name of the docker container to run.\n"); + + add(&docker, + "docker", + "The path to the docker executable.\n"); + + add(&sandbox_directory, + "sandbox_directory", + "The path to the container sandbox holding stdout and stderr files\n" + "into which docker container logs will be redirected."); + + add(&mapped_directory, + "mapped_directory", + "The sandbox directory path that is mapped in the docker container.\n"); + + add(&stop_timeout, + "stop_timeout", + "The duration for docker to wait after stopping a running container\n" + "before it kills that container."); + } + + Option<std::string> container; + Option<std::string> docker; + Option<std::string> sandbox_directory; + Option<std::string> mapped_directory; + Option<Duration> stop_timeout; +}; + +} // namespace docker { +} // namespace internal { +} // namespace mesos { + +#endif // __DOCKER_EXECUTOR_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/slave/constants.cpp ---------------------------------------------------------------------- diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp index 2a99b11..d8d2f98 100644 --- a/src/slave/constants.cpp +++ b/src/slave/constants.cpp @@ -50,6 +50,9 @@ const std::string DEFAULT_PORTS = "[31000-32000]"; const uint16_t DEFAULT_EPHEMERAL_PORTS_PER_CONTAINER = 1024; #endif const Duration DOCKER_REMOVE_DELAY = Hours(6); +const Duration DOCKER_INSPECT_DELAY = Seconds(1); +// TODO(tnachen): Make this a flag. +const Duration DOCKER_VERSION_WAIT_TIMEOUT = Seconds(5); const std::string DEFAULT_AUTHENTICATEE = "crammd5"; Duration MASTER_PING_TIMEOUT() http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/slave/constants.hpp ---------------------------------------------------------------------- diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp index fd1c1ab..206d439 100644 --- a/src/slave/constants.hpp +++ b/src/slave/constants.hpp @@ -97,6 +97,14 @@ extern const uint16_t DEFAULT_EPHEMERAL_PORTS_PER_CONTAINER; // Default duration that docker containers will be removed after exit. extern const Duration DOCKER_REMOVE_DELAY; +// Default duration to wait before retry inspecting a docker +// container. +extern const Duration DOCKER_INSPECT_DELAY; + +// Default duration that docker containerizer will wait to check +// docker version. +extern const Duration DOCKER_VERSION_WAIT_TIMEOUT; + // Name of the default, CRAM-MD5 authenticatee. extern const std::string DEFAULT_AUTHENTICATEE; http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/slave/containerizer/docker.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index 952d3eb..bd58f94 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -35,8 +35,6 @@ #include "common/status_utils.hpp" -#include "docker/docker.hpp" - #ifdef __linux__ #include "linux/cgroups.hpp" #endif // __linux__ @@ -48,7 +46,6 @@ #include "slave/containerizer/docker.hpp" #include "slave/containerizer/fetcher.hpp" - #include "slave/containerizer/isolators/cgroups/constants.hpp" #include "usage/usage.hpp" @@ -99,8 +96,8 @@ Option<ContainerID> parse(const Docker::Container& container) // was DOCKER_NAME_PREFIX + containerId, and starting with 0.23.0 // it is changed to DOCKER_NAME_PREFIX + slaveId + // DOCKER_NAME_SEPERATOR + containerId. - // To be backward compatible during upgrade, we still to support - // the previous format. + // To be backward compatible during upgrade, we still have to + // support the previous format. // TODO(tnachen): Remove this check after deprecation cycle. if (!strings::contains(name.get(), DOCKER_NAME_SEPERATOR)) { ContainerID id; @@ -124,16 +121,16 @@ Try<DockerContainerizer*> DockerContainerizer::create( const Flags& flags, Fetcher* fetcher) { - Try<Docker*> docker = Docker::create(flags.docker); - if (docker.isError()) { - return Error(docker.error()); + Try<Docker*> create = Docker::create(flags.docker); + if (create.isError()) { + return Error("Failed to create docker: " + create.error()); } - Shared<Docker> docker_(docker.get()); + Shared<Docker> docker(create.get()); if (flags.docker_mesos_image.isSome()) { - Future<Version> version = docker_->version(); - if (!version.await(Seconds(5))) { + Future<Version> version = docker->version(); + if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) { return Error("Timed out waiting for docker version"); } @@ -148,7 +145,7 @@ Try<DockerContainerizer*> DockerContainerizer::create( } } - return new DockerContainerizer(flags, fetcher, docker_); + return new DockerContainerizer(flags, fetcher, docker); } @@ -177,17 +174,18 @@ DockerContainerizer::~DockerContainerizer() } -string dockerExecutorCommand( - const Flags& flags, - const string directory, - const string container) +docker::Flags dockerFlags( + const Flags& flags, + const string& name, + const string& directory) { - string command = "mesos-docker-executor --docker=" + flags.docker + - " --container=" + container + - " --sandbox_directory=" + directory + - " --mapped_directory=" + flags.docker_sandbox_directory; - - return path::join(flags.launcher_dir, command); + docker::Flags dockerFlags; + dockerFlags.container = name; + dockerFlags.docker = flags.docker; + dockerFlags.sandbox_directory = directory; + dockerFlags.mapped_directory = flags.docker_sandbox_directory; + dockerFlags.stop_timeout = flags.docker_stop_timeout; + return dockerFlags; } @@ -254,34 +252,26 @@ DockerContainerizerProcess::Container::create( symlinked = true; } - Container* container = new Container( - id, - taskInfo, - executorInfo, - containerWorkdir, - user, - slaveId, - slavePid, - checkpoint, - symlinked, - flags); - + Option<ContainerInfo> containerInfo = None(); + Option<CommandInfo> commandInfo = None(); + Option<std::map<string, string>> environment = None(); + bool launchesExecutorContainer = false; if (taskInfo.isSome() && flags.docker_mesos_image.isSome()) { // Override the container and command to launch an executor // in a docker container. - ContainerInfo containerInfo; + ContainerInfo newContainerInfo; // Mounting in the docker socket so the executor can communicate to // the host docker daemon. We are assuming the current instance is // launching docker containers to the host daemon as well. - Volume* dockerSockVolume = containerInfo.add_volumes(); + Volume* dockerSockVolume = newContainerInfo.add_volumes(); dockerSockVolume->set_host_path(flags.docker_socket); dockerSockVolume->set_container_path(flags.docker_socket); dockerSockVolume->set_mode(Volume::RO); // Mounting in sandbox so the logs from the executor can be // persisted over container failures. - Volume* sandboxVolume = containerInfo.add_volumes(); + Volume* sandboxVolume = newContainerInfo.add_volumes(); sandboxVolume->set_host_path(containerWorkdir); sandboxVolume->set_container_path(containerWorkdir); sandboxVolume->set_mode(Volume::RW); @@ -289,27 +279,48 @@ DockerContainerizerProcess::Container::create( ContainerInfo::DockerInfo dockerInfo; dockerInfo.set_image(flags.docker_mesos_image.get()); - containerInfo.mutable_docker()->CopyFrom(dockerInfo); + newContainerInfo.mutable_docker()->CopyFrom(dockerInfo); + + docker::Flags dockerExecutorFlags = dockerFlags( + flags, + Container::name(slaveId, stringify(id)), + containerWorkdir); - const string& command = dockerExecutorCommand( - flags, containerWorkdir, container->name()); + CommandInfo newCommandInfo; + // TODO(tnachen): Pass flags directly into docker run. + newCommandInfo.set_value( + path::join(flags.launcher_dir, "mesos-docker-executor") + + " " + stringify(dockerExecutorFlags)); - CommandInfo commandInfo; - commandInfo.set_value(command); - commandInfo.set_shell(true); + newCommandInfo.set_shell(true); - container->overrideContainer = containerInfo; - container->overrideCommand = commandInfo; - container->overrideEnvironment = executorEnvironment( + containerInfo = newContainerInfo; + commandInfo = newCommandInfo; + environment = executorEnvironment( executorInfo, containerWorkdir, slaveId, slavePid, checkpoint, flags.recovery_timeout); + launchesExecutorContainer = true; } - return container; + return new Container( + id, + taskInfo, + executorInfo, + containerWorkdir, + user, + slaveId, + slavePid, + checkpoint, + symlinked, + flags, + commandInfo, + containerInfo, + environment, + launchesExecutorContainer); } @@ -321,7 +332,7 @@ Future<Nothing> DockerContainerizerProcess::fetch( return fetcher->fetch( containerId, - container->command(), + container->command, container->directory, None(), flags); @@ -329,21 +340,28 @@ Future<Nothing> DockerContainerizerProcess::fetch( Future<Nothing> DockerContainerizerProcess::pull( - const ContainerID& containerId, - const string& directory, - const string& image, - bool forcePullImage) + const ContainerID& containerId) { - Future<Docker::Image> future = docker->pull(directory, image, forcePullImage); - containers_[containerId]->pull = future; - return future.then(defer(self(), &Self::_pull, image)); -} + if (!containers_.contains(containerId)) { + return Failure("Container is already destroyed"); + } + Container* container = containers_[containerId]; + container->state = Container::PULLING; -Future<Nothing> DockerContainerizerProcess::_pull(const string& image) -{ - VLOG(1) << "Docker pull " << image << " completed"; - return Nothing(); + string image = container->image(); + + Future<Docker::Image> future = docker->pull( + container->directory, + image, + container->forcePullImage()); + + containers_[containerId]->pull = future; + + return future.then(defer(self(), [=]() { + VLOG(1) << "Docker pull " << image << " completed"; + return Nothing(); + })); } @@ -545,10 +563,16 @@ Future<Nothing> DockerContainerizerProcess::_recover( // checkpointed container id. // TODO(tnachen): Remove this explicit reconciliation 0.24. hashset<ContainerID> existingContainers; + // Tracks all the task containers that launched an executor in + // a docker container. + hashset<ContainerID> executorContainers; foreach (const Docker::Container& container, _containers) { Option<ContainerID> id = parse(container); if (id.isSome()) { existingContainers.insert(id.get()); + if (strings::contains(container.name, ".executor")) { + executorContainers.insert(id.get()); + } } } @@ -623,6 +647,8 @@ Future<Nothing> DockerContainerizerProcess::_recover( containers_[containerId] = container; container->slaveId = state.id; container->state = Container::RUNNING; + container->launchesExecutorContainer = + executorContainers.contains(containerId); pid_t pid = run.get().forkedPid.get(); @@ -747,18 +773,19 @@ Future<bool> DockerContainerizerProcess::launch( if (taskInfo.isSome() && flags.docker_mesos_image.isNone()) { // Launching task by forking a subprocess to run docker executor. return container.get()->launch = fetch(containerId) - .then(defer(self(), &Self::_launch, containerId)) - .then(defer(self(), &Self::___launch, containerId)) - .then(defer(self(), &Self::______launch, containerId, lambda::_1)) - .onFailed(defer(self(), &Self::destroy, containerId, true)); + .then(defer(self(), [=]() { return pull(containerId); })) + .then(defer(self(), [=]() { return launchExecutorProcess(containerId); })) + .then(defer(self(), [=](pid_t pid) { + return reapExecutor(containerId, pid); + })); } string containerName = container.get()->name(); - if (flags.docker_mesos_image.isSome()) { + if (container.get()->executorName().isSome()) { // Launch the container with the executor name as we expect the // executor will launch the docker container. - containerName = container.get()->executorName(); + containerName = container.get()->executorName().get(); } // Launching task or executor by launching a seperate docker @@ -768,67 +795,76 @@ Future<bool> DockerContainerizerProcess::launch( // we want the executor to keep running when the slave container // dies. return container.get()->launch = fetch(containerId) - .then(defer(self(), &Self::_launch, containerId)) - .then(defer(self(), &Self::__launch, containerId, containerName)) - .then(defer(self(), &Self::____launch, containerId, containerName)) - .then(defer(self(), &Self::_____launch, containerId, lambda::_1)) - .then(defer(self(), &Self::______launch, containerId, lambda::_1)); + .then(defer(self(), [=]() { return pull(containerId); })) + .then(defer(self(), [=]() { + return launchExecutorContainer(containerId, containerName); + })) + .then(defer(self(), [=](const Docker::Container& dockerContainer) { + return checkpointExecutor(containerId, dockerContainer); + })) + .then(defer(self(), [=](pid_t pid) { + return reapExecutor(containerId, pid); + })); } -Future<Nothing> DockerContainerizerProcess::_launch( - const ContainerID& containerId) +Future<Docker::Container> DockerContainerizerProcess::launchExecutorContainer( + const ContainerID& containerId, + const string& containerName) { - // Doing the fetch might have succeded but we were actually asked to - // destroy the container, which we did, so don't continue. if (!containers_.contains(containerId)) { - return Failure("Container was destroyed while launching"); + return Failure("Container is already destroyed"); } Container* container = containers_[containerId]; - - container->state = Container::PULLING; - - return pull( - containerId, - container->directory, - container->image(), - container->forcePullImage()); + container->state = Container::RUNNING; + + // Start the executor in a Docker container. + // This executor could either be a custom executor specified by an + // ExecutorInfo, or the docker executor. + Future<Nothing> run = docker->run( + container->container, + container->command, + containerName, + container->directory, + flags.docker_sandbox_directory, + container->resources, + container->environment, + path::join(container->directory, "stdout"), + path::join(container->directory, "stderr")); + + Owned<Promise<Docker::Container>> promise(new Promise<Docker::Container>()); + // We like to propogate the run failure when run fails so slave can + // send this failure back to the scheduler. Otherwise we return + // inspect's result or its failure, which should not fail when + // the container isn't launched. + Future<Docker::Container> inspect = + docker->inspect(containerName, slave::DOCKER_INSPECT_DELAY) + .onAny([=](Future<Docker::Container> f) { + // We cannot associate the promise outside of the callback + // because we like to propagate run's failure when + // available. + promise->associate(f); + }); + + run.onFailed([=](const string& failure) mutable { + inspect.discard(); + promise->fail(failure); + }); + + return promise->future(); } -Future<Nothing> DockerContainerizerProcess::__launch( - const ContainerID& containerId, - const string& container) +Future<pid_t> DockerContainerizerProcess::launchExecutorProcess( + const ContainerID& containerId) { if (!containers_.contains(containerId)) { - return Failure("Container was destroyed while pulling image"); + return Failure("Container is already destroyed"); } - Container* container_ = containers_[containerId]; - - container_->state = Container::RUNNING; - - // Try and start the Docker container. - return docker->run( - container_->container(), - container_->command(), - container, - container_->directory, - flags.docker_sandbox_directory, - container_->resources, - container_->environment()); -} - - -Future<pid_t> DockerContainerizerProcess::___launch( - const ContainerID& containerId) -{ - // After we do Docker::run we shouldn't remove a container until - // after we set Container::status. - CHECK(containers_.contains(containerId)); - Container* container = containers_[containerId]; + container->state = Container::RUNNING; // Prepare environment variables for the executor. map<string, string> environment = executorEnvironment( @@ -850,19 +886,19 @@ Future<pid_t> DockerContainerizerProcess::___launch( environment["GLOG_v"] = os::getenv("GLOG_v"); } - const string& command = dockerExecutorCommand( - flags, container->directory, container->name()); - - VLOG(1) << "Launching docker executor with command: " << command; + vector<string> argv; + argv.push_back("mesos-docker-executor"); // Construct the mesos-docker-executor using the "name" we gave the // container (to distinguish it from Docker containers not created // by Mesos). Try<Subprocess> s = subprocess( - command, + path::join(flags.launcher_dir, "mesos-docker-executor"), + argv, Subprocess::PIPE(), Subprocess::PATH(path::join(container->directory, "stdout")), Subprocess::PATH(path::join(container->directory, "stderr")), + dockerFlags(flags, container->name(), container->directory), environment, lambda::bind(&setup, container->directory)); @@ -893,37 +929,23 @@ Future<pid_t> DockerContainerizerProcess::___launch( if (length != sizeof(c)) { string error = string(strerror(errno)); os::close(s.get().in().get()); - Failure failure("Failed to synchronize with child process: " + error); - - container->run = failure; - return failure; + return Failure("Failed to synchronize with child process: " + error); } return s.get().pid(); } -Future<Docker::Container> DockerContainerizerProcess::____launch( - const ContainerID& containerId, - const std::string& container) -{ - CHECK(containers_.contains(containerId)); - - return docker->inspect(container); -} - -Future<pid_t> DockerContainerizerProcess::_____launch( +Future<pid_t> DockerContainerizerProcess::checkpointExecutor( const ContainerID& containerId, - const Docker::Container& container) + const Docker::Container& dockerContainer) { // After we do Docker::run we shouldn't remove a container until // after we set Container::status. CHECK(containers_.contains(containerId)); - Container* container_ = containers_[containerId]; - - Option<int> pid = container.pid; + Option<int> pid = dockerContainer.pid; if (!pid.isSome()) { return Failure("Unable to get executor pid after launch"); @@ -936,16 +958,11 @@ Future<pid_t> DockerContainerizerProcess::_____launch( "Failed to checkpoint executor's pid: " + checkpointed.error()); } - // TODO(tnachen): We need to handle the slave in container scenario. - // Instead of forking a log process, launch a log container for - // executor. - docker->logs(container_->name(), container_->directory); - return pid.get(); } -Future<bool> DockerContainerizerProcess::______launch( +Future<bool> DockerContainerizerProcess::reapExecutor( const ContainerID& containerId, pid_t pid) { @@ -1355,6 +1372,25 @@ void DockerContainerizerProcess::destroy( container->state = Container::DESTROYING; + if (killed && container->executorPid.isSome()) { + LOG(INFO) << "Sending SIGTERM to executor with pid: " + << container->executorPid.get(); + // We need to clean up the executor as the executor might not have + // received run task due to a failed containerizer update. + // We also kill the executor first since container->status below + // is waiting for the executor to finish. + Try<std::list<os::ProcessTree>> kill = + os::killtree(container->executorPid.get(), SIGTERM); + + if (kill.isError()) { + // Ignoring the error from killing executor as it can already + // have exited. + VLOG(1) << "Ignoring error when killing executor pid " + << container->executorPid.get() << " in destroy, error: " + << kill.error(); + } + } + // Otherwise, wait for Docker::run to succeed, in which case we'll // continue in _destroy (calling Docker::kill) or for Docker::run to // fail, in which case we'll re-execute this function and cleanup @@ -1374,17 +1410,19 @@ void DockerContainerizerProcess::_destroy( CHECK(container->state == Container::DESTROYING); - // Do a 'docker rm -f' which we'll then find out about in '_destroy' + // Do a 'docker stop' which we'll then find out about in '_destroy' // after we've reaped either the container's root process (in the // event that we had just launched a container for an executor) or // the mesos-docker-executor (in the case we launched a container // for a task). - LOG(INFO) << "Running docker stop on container '" << containerId << "'"; - docker->stop(container->name(), - flags.docker_stop_timeout) - .onAny(defer(self(), &Self::__destroy, containerId, killed, lambda::_1)); + if (killed) { + docker->stop(container->name(), flags.docker_stop_timeout) + .onAny(defer(self(), &Self::__destroy, containerId, killed, lambda::_1)); + } else { + __destroy(containerId, killed, Nothing()); + } } @@ -1397,7 +1435,7 @@ void DockerContainerizerProcess::__destroy( Container* container = containers_[containerId]; - if (!kill.isReady()) { + if (!kill.isReady() && !container->status.future().isReady()) { // TODO(benh): This means we've failed to do a Docker::kill, which // means it's possible that the container is still going to be // running after we return! We either need to have a periodic @@ -1422,7 +1460,7 @@ void DockerContainerizerProcess::__destroy( } // Status must be ready since we did a Docker::kill. - CHECK_READY(containers_[containerId]->status.future()); + CHECK_READY(container->status.future()); container->status.future().get() .onAny(defer(self(), &Self::___destroy, containerId, killed, lambda::_1)); @@ -1483,11 +1521,13 @@ void DockerContainerizerProcess::reaped(const ContainerID& containerId) void DockerContainerizerProcess::remove( - const string& container, - const string& executor) + const string& containerName, + const Option<string>& executor) { - docker->rm(container, true); - docker->rm(executor, true); + docker->rm(containerName, true); + if (executor.isSome()) { + docker->rm(executor.get(), true); + } } http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/slave/containerizer/docker.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp index 2961048..d2cca4b 100644 --- a/src/slave/containerizer/docker.hpp +++ b/src/slave/containerizer/docker.hpp @@ -21,9 +21,11 @@ #include <process/shared.hpp> +#include <stout/flags.hpp> #include <stout/hashset.hpp> #include "docker/docker.hpp" +#include "docker/executor.hpp" #include "slave/containerizer/containerizer.hpp" @@ -36,7 +38,7 @@ namespace slave { extern const std::string DOCKER_NAME_PREFIX; // Seperator used to compose docker container name, which is made up -// of slave id and container id. +// of slave ID and container ID. extern const std::string DOCKER_NAME_SEPERATOR; // Directory that stores all the symlinked sandboxes that is mapped @@ -152,11 +154,7 @@ public: virtual process::Future<Nothing> fetch(const ContainerID& containerId); - virtual process::Future<Nothing> pull( - const ContainerID& containerId, - const std::string& directory, - const std::string& image, - bool forcePullImage); + virtual process::Future<Nothing> pull(const ContainerID& containerId); virtual process::Future<hashset<ContainerID>> containers(); @@ -166,8 +164,6 @@ private: const ContainerID& containerId, const Option<int>& status); - process::Future<Nothing> _pull(const std::string& image); - Try<Nothing> checkpoint( const ContainerID& containerId, pid_t pid); @@ -179,32 +175,21 @@ private: process::Future<Nothing> __recover( const std::list<Docker::Container>& containers); - process::Future<Nothing> _launch( - const ContainerID& containerId); - - process::Future<Nothing> __launch( + // Starts the executor in a Docker container. + process::Future<Docker::Container> launchExecutorContainer( const ContainerID& containerId, - const std::string& container); + const std::string& containerName); - // NOTE: This continuation is only applicable when launching a - // container for a task. - process::Future<pid_t> ___launch( + // Starts the docker executor with a subprocess. + process::Future<pid_t> launchExecutorProcess( const ContainerID& containerId); - - // NOTE: This continuation is only applicable when launching a - // container for an executor. - process::Future<Docker::Container> ____launch( + process::Future<pid_t> checkpointExecutor( const ContainerID& containerId, - const std::string& container); + const Docker::Container& dockerContainer); - // NOTE: This continuation is only applicable when launching a - // container for an executor. - process::Future<pid_t> _____launch( - const ContainerID& containerId, - const Docker::Container& container); - - process::Future<bool> ______launch( + // Reaps on the executor pid. + process::Future<bool> reapExecutor( const ContainerID& containerId, pid_t pid); @@ -245,7 +230,9 @@ private: void reaped(const ContainerID& containerId); // Removes the docker container. - void remove(const std::string& container, const std::string& executor); + void remove( + const std::string& containerName, + const Option<std::string>& executor); const Flags flags; @@ -266,6 +253,12 @@ private: bool checkpoint, const Flags& flags); + static std::string name(const SlaveID& slaveId, const std::string& id) + { + return DOCKER_NAME_PREFIX + slaveId.value() + DOCKER_NAME_SEPERATOR + + stringify(id); + } + Container(const ContainerID& id) : state(FETCHING), id(id) {} @@ -278,7 +271,11 @@ private: const process::PID<Slave>& slavePid, bool checkpoint, bool symlinked, - const Flags& flags) + const Flags& flags, + const Option<CommandInfo>& _command, + const Option<ContainerInfo>& _container, + const Option<std::map<std::string, std::string>>& _environment, + bool launchesExecutorContainer) : state(FETCHING), id(id), task(taskInfo), @@ -289,7 +286,8 @@ private: slavePid(slavePid), checkpoint(checkpoint), symlinked(symlinked), - flags(flags) + flags(flags), + launchesExecutorContainer(launchesExecutorContainer) { // NOTE: The task's resources are included in the executor's // resources in order to make sure when launching the executor @@ -307,13 +305,33 @@ private: CHECK(resources.contains(task.get().resources())); } - overrideEnvironment = executorEnvironment( - executor, - directory, - slaveId, - slavePid, - checkpoint, - flags.recovery_timeout); + if (_command.isSome()) { + command = _command.get(); + } else if (task.isSome()) { + command = task.get().command(); + } else { + command = executor.command(); + } + + if (_container.isSome()) { + container = _container.get(); + } else if (task.isSome()) { + container = task.get().container(); + } else { + container = executor.container(); + } + + if (_environment.isSome()) { + environment = _environment.get(); + } else { + environment = executorEnvironment( + executor, + directory, + slaveId, + slavePid, + checkpoint, + flags.recovery_timeout); + } } ~Container() @@ -327,13 +345,16 @@ private: std::string name() { - return DOCKER_NAME_PREFIX + slaveId.value() + DOCKER_NAME_SEPERATOR + - stringify(id); + return name(slaveId, stringify(id)); } - std::string executorName() + Option<std::string> executorName() { - return name() + DOCKER_NAME_SEPERATOR + "executor"; + if (launchesExecutorContainer) { + return name() + DOCKER_NAME_SEPERATOR + "executor"; + } else { + return None(); + } } std::string image() const @@ -354,43 +375,6 @@ private: return executor.container().docker().force_pull_image(); } - ContainerInfo container() const - { - if (overrideContainer.isSome()) { - return overrideContainer.get(); - } - - if (task.isSome()) { - return task.get().container(); - } - - return executor.container(); - } - - CommandInfo command() const - { - if (overrideCommand.isSome()) { - return overrideCommand.get(); - } - - if (task.isSome()) { - return task.get().command(); - } - - return executor.command(); - } - - // Returns any extra environment varaibles to set when launching - // the Docker container (beyond the those found in CommandInfo). - std::map<std::string, std::string> environment() const - { - if (overrideEnvironment.isSome()) { - return overrideEnvironment.get(); - } - - return std::map<std::string, std::string>(); - } - // The DockerContainerier needs to be able to properly clean up // Docker containers, regardless of when they are destroyed. For // example, if a container gets destroyed while we are fetching, @@ -421,23 +405,22 @@ private: } state; const ContainerID id; - Option<TaskInfo> task; - ExecutorInfo executor; + const Option<TaskInfo> task; + const ExecutorInfo executor; + ContainerInfo container; + CommandInfo command; + std::map<std::string, std::string> environment; // The sandbox directory for the container. This holds the // symlinked path if symlinked boolean is true. - std::string directory; + const std::string directory; - Option<std::string> user; + const Option<std::string> user; SlaveID slaveId; - process::PID<Slave> slavePid; + const process::PID<Slave> slavePid; bool checkpoint; bool symlinked; - Flags flags; - - Option<ContainerInfo> overrideContainer; - Option<CommandInfo> overrideCommand; - Option<std::map<std::string, std::string>> overrideEnvironment; + const Flags flags; // Promise for future returned from wait(). Promise<containerizer::Termination> termination; @@ -469,6 +452,10 @@ private: // container. This is stored so we can clean up the executor // on destroy. Option<pid_t> executorPid; + + // Marks if this container launches a executor in a docker + // container. + bool launchesExecutorContainer; }; hashmap<ContainerID, Container*> containers_; http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/slave/flags.cpp ---------------------------------------------------------------------- diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp index acf30d1..a8c7c49 100644 --- a/src/slave/flags.cpp +++ b/src/slave/flags.cpp @@ -294,22 +294,28 @@ mesos::internal::slave::Flags::Flags() add(&Flags::docker_kill_orphans, "docker_kill_orphans", - "Enable docker containerizer to kill orphaned containers", + "Enable docker containerizer to kill orphaned containers.\n" + "You should consider setting this to false when you launch multiple\n" + "slaves in the same OS, to avoid one of the DockerContainerizer \n" + "removing docker tasks launched by other slaves. However you should\n" + "also make sure to enable checkpoint for the slave so the same slave id\n" + "can be reused, otherwise docker tasks on slave restart will not be\n" + "cleaned up.\n", true); add(&Flags::docker_mesos_image, "docker_mesos_image", "The docker image used to launch this mesos slave instance.\n" "If an image is specified, the docker containerizer assumes the slave\n" - "is running in a docker container. This enables the docker\n" - "containerizer to launch executors in docker containers, so they keep\n" - "running when the slave container exits."); + "is running in a docker container, and launches executors with\n" + "docker containers in order to recover them when the slave restarts and\n" + "recovers.\n"); add(&Flags::docker_socket, "docker_socket", - "The docker UNIX socket path that the docker CLI uses to communicate\n" - "to the docker daemon. Mesos needs this to launch docker containers\n" - "that can run the docker CLI.\n", + "The UNIX socket path to be mounted into the docker executor container\n" + "to provide docker CLI access to the docker daemon. This must be the\n" + "path used by the slave's docker image.\n", "/var/run/docker.sock"); add(&Flags::default_container_info,
