Repository: mesos Updated Branches: refs/heads/master 9c5c92724 -> 3baa60965
Recover docker containers that launched in containers. Review: https://reviews.apache.org/r/29336 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7fee619c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7fee619c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7fee619c Branch: refs/heads/master Commit: 7fee619cf0f3097aa1c473759831c38b8441d3c1 Parents: 597b8a2 Author: Timothy Chen <[email protected]> Authored: Thu Dec 4 07:59:45 2014 +0000 Committer: Timothy Chen <[email protected]> Committed: Fri May 22 23:13:50 2015 -0700 ---------------------------------------------------------------------- src/slave/containerizer/docker.cpp | 232 ++++++++++++++++++-------------- src/slave/containerizer/docker.hpp | 2 +- 2 files changed, 131 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/7fee619c/src/slave/containerizer/docker.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index 0154c4b..50a5b72 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -113,6 +113,11 @@ Option<ContainerID> parse(const Docker::Container& container) ContainerID id; id.set_value(parts[1]); return id; + } else if (parts.size() == 3) { + // We found a executor or log container. + ContainerID id; + id.set_value(parts[2]); + return id; } } @@ -120,6 +125,26 @@ Option<ContainerID> parse(const Docker::Container& container) } +// Launches a docker wait process on given container name. +// Returns the wait process pid. +Try<pid_t> launchWaitProcess(const string& docker, const string& name) +{ + string command = "exit `" + docker + " wait " + name + "`"; + + Try<Subprocess> wait = subprocess( + command, + Subprocess::PATH("/dev/null"), + Subprocess::PATH("/dev/null"), + Subprocess::PATH("/dev/null")); + + if (wait.isError()) { + return Error("Unable to launch docker wait on executor: " + wait.error()); + } + + return wait.get().pid(); +} + + Try<DockerContainerizer*> DockerContainerizer::create( const Flags& flags, Fetcher* fetcher) @@ -441,16 +466,21 @@ Future<Nothing> DockerContainerizerProcess::recover( const Option<SlaveState>& state) { LOG(INFO) << "Recovering Docker containers"; - // Get the list of all Docker containers (running and exited) in - // order to remove any orphans and reconcile checkpointed executors. - // TODO(tnachen): Remove this when we expect users to have already - // upgraded to 0.23. - return docker->ps(true, DOCKER_NAME_PREFIX) - .then(defer(self(), &Self::_recover, state, lambda::_1)); + if (state.isSome()) { + // Get the list of all Docker containers (running and exited) in + // order to remove any orphans and reconcile checkpointed executors. + // TODO(tnachen): Remove this when we expect users to have already + // upgraded to 0.23. + return docker->ps(true, DOCKER_NAME_PREFIX + state.get().id.value()) + .then(defer(self(), &Self::_recover, state.get(), lambda::_1)); + } + +return Nothing(); } + Future<Nothing> DockerContainerizerProcess::_recover( - const Option<SlaveState>& state, + const SlaveState& state, const list<Docker::Container>& _containers) { // Although the slave checkpoints executor pids, before 0.23 @@ -470,107 +500,105 @@ Future<Nothing> DockerContainerizerProcess::_recover( } } - if (state.isSome()) { - // Collection of pids that we've started reaping in order to - // detect very unlikely duplicate scenario (see below). - hashmap<ContainerID, pid_t> pids; - - foreachvalue (const FrameworkState& framework, state.get().frameworks) { - foreachvalue (const ExecutorState& executor, framework.executors) { - if (executor.info.isNone()) { - LOG(WARNING) << "Skipping recovery of executor '" << executor.id - << "' of framework " << framework.id - << " because its info could not be recovered"; - continue; - } - - if (executor.latest.isNone()) { - LOG(WARNING) << "Skipping recovery of executor '" << executor.id - << "' of framework " << framework.id - << " because its latest run could not be recovered"; - continue; - } - - // We are only interested in the latest run of the executor! - const ContainerID& containerId = executor.latest.get(); - Option<RunState> run = executor.runs.get(containerId); - CHECK_SOME(run); - CHECK_SOME(run.get().id); - CHECK_EQ(containerId, run.get().id.get()); - - // We need the pid so the reaper can monitor the executor so - // skip this executor if it's not present. This is not an - // error because the slave will try to wait on the container - // which will return a failed Termination and everything will - // get cleaned up. - if (!run.get().forkedPid.isSome()) { - continue; - } - - if (run.get().completed) { - VLOG(1) << "Skipping recovery of executor '" << executor.id + // Collection of pids that we've started reaping in order to + // detect very unlikely duplicate scenario (see below). + hashmap<ContainerID, pid_t> pids; + + foreachvalue (const FrameworkState& framework, state.get().frameworks) { + foreachvalue (const ExecutorState& executor, framework.executors) { + if (executor.info.isNone()) { + LOG(WARNING) << "Skipping recovery of executor '" << executor.id + << "' of framework " << framework.id + << " because its info could not be recovered"; + continue; + } + + if (executor.latest.isNone()) { + LOG(WARNING) << "Skipping recovery of executor '" << executor.id + << "' of framework " << framework.id + << " because its latest run could not be recovered"; + continue; + } + + // We are only interested in the latest run of the executor! + const ContainerID& containerId = executor.latest.get(); + Option<RunState> run = executor.runs.get(containerId); + CHECK_SOME(run); + CHECK_SOME(run.get().id); + CHECK_EQ(containerId, run.get().id.get()); + + // We need the pid so the reaper can monitor the executor so + // skip this executor if it's not present. This is not an + // error because the slave will try to wait on the container + // which will return a failed Termination and everything will + // get cleaned up. + if (!run.get().forkedPid.isSome()) { + continue; + } + + if (run.get().completed) { + VLOG(1) << "Skipping recovery of executor '" << executor.id + << "' of framework " << framework.id + << " because its latest run " + << containerId << " is completed"; + continue; + } + + const ExecutorInfo executorInfo = executor.info.get(); + if (executorInfo.has_container() && + executorInfo.container().type() != ContainerInfo::DOCKER) { + LOG(INFO) << "Skipping recovery of executor '" << executor.id << "' of framework " << framework.id - << " because its latest run " - << containerId << " is completed"; - continue; - } - - const ExecutorInfo executorInfo = executor.info.get(); - if (executorInfo.has_container() && - executorInfo.container().type() != ContainerInfo::DOCKER) { - LOG(INFO) << "Skipping recovery of executor '" << executor.id - << "' of framework " << framework.id - << " because it was not launched from docker containerizer"; - continue; - } - - if (!executorInfo.has_container() && - !existingContainers.contains(containerId)) { - LOG(INFO) << "Skipping recovery of executor '" << executor.id - << "' of framework " << framework.id - << " because its executor is not marked as docker " - << "and the docker container doesn't exist"; - continue; - } - - LOG(INFO) << "Recovering container '" << containerId - << "' for executor '" << executor.id - << "' of framework " << framework.id; - - // Create and store a container. - Container* container = new Container(containerId); - containers_[containerId] = container; - container->slaveId = state.get().id; - container->state = Container::RUNNING; - - pid_t pid = run.get().forkedPid.get(); - - container->status.set(process::reap(pid)); - - container->status.future().get() - .onAny(defer(self(), &Self::reaped, containerId)); - - if (pids.containsValue(pid)) { - // This should (almost) never occur. There is the - // possibility that a new executor is launched with the same - // pid as one that just exited (highly unlikely) and the - // slave dies after the new executor is launched but before - // it hears about the termination of the earlier executor - // (also unlikely). - return Failure( - "Detected duplicate pid " + stringify(pid) + - " for container " + stringify(containerId)); - } - - pids.put(containerId, pid); + << " because it was not launched from docker containerizer"; + continue; } - } - if (flags.docker_kill_orphans) { - return __recover(_containers); + if (!executorInfo.has_container() && + !existingContainers.contains(containerId)) { + LOG(INFO) << "Skipping recovery of executor '" << executor.id + << "' of framework " << framework.id + << " because its executor is not marked as docker " + << "and the docker container doesn't exist"; + continue; + } + + LOG(INFO) << "Recovering container '" << containerId + << "' for executor '" << executor.id + << "' of framework " << framework.id; + + // Create and store a container. + Container* container = new Container(containerId); + containers_[containerId] = container; + container->slaveId = state.get().id; + container->state = Container::RUNNING; + + pid_t pid = run.get().forkedPid.get(); + + container->status.set(process::reap(pid)); + + container->status.future().get() + .onAny(defer(self(), &Self::reaped, containerId)); + + if (pids.containsValue(pid)) { + // This should (almost) never occur. There is the + // possibility that a new executor is launched with the same + // pid as one that just exited (highly unlikely) and the + // slave dies after the new executor is launched but before + // it hears about the termination of the earlier executor + // (also unlikely). + return Failure( + "Detected duplicate pid " + stringify(pid) + + " for container " + stringify(containerId)); + } + + pids.put(containerId, pid); } } + if (flags.docker_kill_orphans) { + return __recover(_containers); + } + return Nothing(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/7fee619c/src/slave/containerizer/docker.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp index 16c7775..0eda1c0 100644 --- a/src/slave/containerizer/docker.hpp +++ b/src/slave/containerizer/docker.hpp @@ -182,7 +182,7 @@ private: pid_t pid); process::Future<Nothing> _recover( - const Option<state::SlaveState>& state, + const state::SlaveState& state, const std::list<Docker::Container>& containers); process::Future<Nothing> __recover(
