Add option to launch docker containers with helper containers. Review: https://reviews.apache.org/r/29334
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/280e4659 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/280e4659 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/280e4659 Branch: refs/heads/master Commit: 280e4659b922d84c5c405a176b8d5db2f9203928 Parents: 168ba44 Author: Timothy Chen <[email protected]> Authored: Tue Dec 2 18:23:17 2014 -0800 Committer: Timothy Chen <[email protected]> Committed: Fri May 22 23:13:50 2015 -0700 ---------------------------------------------------------------------- src/slave/containerizer/docker.cpp | 212 +++++++++++++++++++++++++++++--- src/slave/containerizer/docker.hpp | 32 ++++- 2 files changed, 226 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/280e4659/src/slave/containerizer/docker.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index c4e636c..c996afe 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -649,11 +649,29 @@ Future<bool> DockerContainerizerProcess::launch( << "' (and executor '" << executorInfo.executor_id() << "') of framework '" << executorInfo.framework_id() << "'"; - return container.get()->launch = fetch(containerId) + Future<Nothing> future = fetch(containerId) .then(defer(self(), &Self::_launch, containerId)) - .then(defer(self(), &Self::__launch, containerId)) - .then(defer(self(), &Self::___launch, containerId)) - .then(defer(self(), &Self::______launch, containerId, lambda::_1)); + .then(defer(self(), &Self::__launch, containerId)); + + if (flags.docker_mesos_image.isNone()) { + // Launch executor and logs with subprocess. + return container.get()->launch = future + .then(defer(self(), &Self::___launch, containerId)) + .then(defer(self(), &Self::______launch, containerId, lambda::_1)) + .then(defer(self(), &Self::_______launch, containerId, lambda::_1)) + .onFailed(defer(self(), &Self::destroy, containerId, true)); + } + + // Launch executor and logs with docker containers. + return container.get()->launch = future + .then(defer(self(), &Self::___launchInContainer, containerId)) + .then(defer( + self(), + &Self::____launchInContainer, + containerId)) + .then(defer(self(), &Self::______launch, containerId, lambda::_1)) + .then(defer(self(), &Self::_______launch, containerId, lambda::_1)) + .onFailed(defer(self(), &Self::destroy, containerId, true)); } @@ -777,6 +795,79 @@ Future<pid_t> DockerContainerizerProcess::___launch( } +Future<Nothing> DockerContainerizerProcess::___launchInContainer( + const ContainerID& containerId) +{ + // After we do Docker::run we shouldn't remove a container until + // after we set Container::status. + CHECK(containers_.contains(containerId)); + CHECK(flags.docker_mesos_image.isSome()); + + Container* container = containers_[containerId]; + + // Prepare environment variables for the executor. + map<string, string> environment = executorEnvironment( + container->executor, + container->directory, + container->slaveId, + container->slavePid, + container->checkpoint, + flags.recovery_timeout); + + // Include any enviroment variables from ExecutorInfo. + foreach (const Environment::Variable& variable, + container->executor.command().environment().variables()) { + environment[variable.name()] = variable.value(); + } + + ContainerInfo containerInfo; + ContainerInfo::DockerInfo dockerInfo; + + dockerInfo.set_image(flags.docker_mesos_image.get()); + containerInfo.mutable_docker()->CopyFrom(dockerInfo); + + string command = "mesos-docker-executor --docker=" + flags.docker + + " --container=" + container->name(); + + command = path::join(flags.launcher_dir, command); + + CommandInfo commandInfo; + commandInfo.set_value(command); + commandInfo.set_shell(true); + + VLOG(2) << "Launching docker executor in container with command: " << command; + + return docker->run( + containerInfo, + commandInfo, + container->executorName(), + container->directory, + flags.docker_sandbox_directory, + None(), + environment); +} + + +// Launches a docker wait process on given container name. +// Returns the wait process pid. +Try<pid_t> launchWaitProcess(const string& docker, const string& name) +{ + string command = "exit `" + docker + " wait " + name + "`"; + + Try<Subprocess> wait = subprocess( + command, + Subprocess::PATH("/dev/null"), + Subprocess::PATH("/dev/null"), + Subprocess::PATH("/dev/null")); + + if (wait.isError()) { + return Error("Unable to launch docker wait on executor: " + wait.error()); + } + + return wait.get().pid(); +} + + Future<bool> DockerContainerizerProcess::launch( const ContainerID& containerId, const ExecutorInfo& executorInfo, @@ -828,20 +919,17 @@ Future<bool> DockerContainerizerProcess::launch( .then(defer(self(), &Self::__launch, containerId)) .then(defer(self(), &Self::____launch, containerId)) .then(defer(self(), &Self::_____launch, containerId, lambda::_1)) - .then(defer(self(), &Self::______launch, containerId, lambda::_1)); + .then(defer(self(), &Self::______launch, containerId, lambda::_1)) + .then(defer(self(), &Self::_______launch, containerId, lambda::_1)) } Future<Docker::Container> DockerContainerizerProcess::____launch( const ContainerID& containerId) { - // After we do Docker::run we shouldn't remove a container until - // after we set Container::status. CHECK(containers_.contains(containerId)); - Container* container = containers_[containerId]; - - return docker->inspect(container->name()); + return docker->inspect(containers_[containerId]->name()); } @@ -859,12 +947,6 @@ Future<pid_t> DockerContainerizerProcess::_____launch( return Failure("Unable to get executor pid after launch"); } - // TODO(tnachen): We might not be able to checkpoint if the slave - // dies before it can checkpoint while the executor is still - // running. Optinally we can consider recording the slave id and - // executor id as part of the docker container name so we can - // recover from this. - Try<Nothing> checkpointed = checkpoint(containerId, pid.get()); if (checkpointed.isError()) { @@ -876,7 +958,96 @@ Future<pid_t> DockerContainerizerProcess::_____launch( } -Future<bool> DockerContainerizerProcess::______launch( +Future<pid_t> DockerContainerizerProcess::______launch( + const ContainerID& containerId, + pid_t pid) +{ + CHECK(containers_.contains(containerId)); + + Container* container = containers_[containerId]; + + docker->logs(container->name(), container->directory); + + return pid; +} + + +Future<pid_t> DockerContainerizerProcess::______launchInContainer( + const ContainerID& containerId, + pid_t pid) +{ + CHECK(containers_.contains(containerId)); + CHECK(flags.docker_mesos_image.isSome()); + + Container* container = containers_[containerId]; + + // We are launching a docker container to read the logs from + // a given launched container into the sandbox stdout and stderr + // files. This requires the docker container to run docker logs, + // which requires us mounting in the docker socket and docker + // CLI binary. + ContainerInfo containerInfo; + Volume* volume = containerInfo.add_volumes(); + volume->set_host_path(flags.docker_socket); + volume->set_container_path(flags.docker_socket); + volume->set_mode(Volume::RO); + + volume = containerInfo.add_volumes(); + volume->set_host_path(flags.docker); + volume->set_container_path(flags.docker); + volume->set_mode(Volume::RO); + + ContainerInfo::DockerInfo dockerInfo; + dockerInfo.set_image(flags.docker_mesos_image.get()); + containerInfo.mutable_docker()->CopyFrom(dockerInfo); + + string command = flags.docker + " logs --follow " + container->name() + + " 2>> " + + path::join(flags.docker_sandbox_directory, "stderr") + + " 1>> " + + path::join(flags.docker_sandbox_directory, "stdout"); + + VLOG(1) << "Running docker logs in container with command: " << command; + + CommandInfo commandInfo; + commandInfo.set_value(command); + commandInfo.set_shell(true); + + docker->run( + containerInfo, + commandInfo, + container->logName(), + container->directory, + flags.docker_sandbox_directory); + + return pid; +} + + +Future<pid_t> DockerContainerizerProcess::____launchInContainer( + const ContainerID& containerId) +{ + CHECK(containers_.contains(containerId)); + + Try<pid_t> waitPid = + launchWaitProcess(flags.docker, containers_[containerId]->executorName()); + + if (waitPid.isError()) { + return Failure(waitPid.error()); + } + + Try<Nothing> checkpointed = checkpoint(containerId, waitPid.get()); + + if (checkpointed.isError()) { + return Failure( + "Failed to checkpoint executor's pid: " + checkpointed.error()); + } + + return waitPid.get(); +} + + +Future<bool> DockerContainerizerProcess::_______launch( const ContainerID& containerId, pid_t pid) { @@ -1287,6 +1458,13 @@ void DockerContainerizerProcess::destroy( CHECK(container->state == Container::RUNNING); + // Remove the executor and log docker containers. They might not + // been configured to launch but we might have recovered containers + // on previous slave run that has configured to launch executor in + // docker. + docker->rm(container->logName(), true); + docker->rm(container->executorName(), true); + container->state = Container::DESTROYING; if (container->executorPid.isSome()) { http://git-wip-us.apache.org/repos/asf/mesos/blob/280e4659/src/slave/containerizer/docker.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp index f08520a..a54e0d4 100644 --- a/src/slave/containerizer/docker.hpp +++ b/src/slave/containerizer/docker.hpp @@ -196,6 +196,16 @@ private: const ContainerID& containerId); // NOTE: This continuation is only applicable when launching a + // container for a task. + process::Future<Nothing> ___launchInContainer( + const ContainerID& containerId); + + // NOTE: This continuation is only applicable when launching a + // container for a task. + process::Future<pid_t> ____launchInContainer( + const ContainerID& containerId); + + // NOTE: This continuation is only applicable when launching a // container for an executor. process::Future<Docker::Container> ____launch( const ContainerID& containerId); @@ -206,7 +216,17 @@ private: const ContainerID& containerId, const Docker::Container& container); - process::Future<bool> ______launch( + process::Future<pid_t> ______launch( + const ContainerID& containerId, + pid_t pid); + + // NOTE: This continuation is only applicable when launching a + // container for a task. + process::Future<pid_t> ______launchInContainer( + const ContainerID& containerId, + pid_t pid); + + process::Future<bool> _______launch( const ContainerID& containerId, pid_t pid); @@ -324,6 +344,16 @@ private: return DOCKER_NAME_PREFIX + slaveId.value() + "/" + stringify(id); } + std::string logName() + { + return name() + "/log"; + } + + std::string executorName() + { + return name() + "/executor"; + } + std::string image() const { if (task.isSome()) {
