Repository: mesos
Updated Branches:
  refs/heads/master 9c5c92724 -> 3baa60965


Recover docker containers that launched in containers.

Review: https://reviews.apache.org/r/29336


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7fee619c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7fee619c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7fee619c

Branch: refs/heads/master
Commit: 7fee619cf0f3097aa1c473759831c38b8441d3c1
Parents: 597b8a2
Author: Timothy Chen <[email protected]>
Authored: Thu Dec 4 07:59:45 2014 +0000
Committer: Timothy Chen <[email protected]>
Committed: Fri May 22 23:13:50 2015 -0700

----------------------------------------------------------------------
 src/slave/containerizer/docker.cpp | 232 ++++++++++++++++++--------------
 src/slave/containerizer/docker.hpp |   2 +-
 2 files changed, 131 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7fee619c/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp 
b/src/slave/containerizer/docker.cpp
index 0154c4b..50a5b72 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -113,6 +113,11 @@ Option<ContainerID> parse(const Docker::Container& 
container)
       ContainerID id;
       id.set_value(parts[1]);
       return id;
+    } else if (parts.size() == 3) {
+      // We found a executor or log container.
+      ContainerID id;
+      id.set_value(parts[2]);
+      return id;
     }
   }
 
@@ -120,6 +125,26 @@ Option<ContainerID> parse(const Docker::Container& 
container)
 }
 
 
+// Launches a docker wait process on given container name.
+// Returns the wait process pid.
+Try<pid_t> launchWaitProcess(const string& docker, const string& name)
+{
+  string command = "exit `" + docker + " wait " + name + "`";
+
+  Try<Subprocess> wait = subprocess(
+      command,
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PATH("/dev/null"));
+
+  if (wait.isError()) {
+    return Error("Unable to launch docker wait on executor: " + wait.error());
+  }
+
+  return wait.get().pid();
+}
+
+
 Try<DockerContainerizer*> DockerContainerizer::create(
     const Flags& flags,
     Fetcher* fetcher)
@@ -441,16 +466,21 @@ Future<Nothing> DockerContainerizerProcess::recover(
     const Option<SlaveState>& state)
 {
   LOG(INFO) << "Recovering Docker containers";
-  // Get the list of all Docker containers (running and exited) in
-  // order to remove any orphans and reconcile checkpointed executors.
-  // TODO(tnachen): Remove this when we expect users to have already
-  // upgraded to 0.23.
-  return docker->ps(true, DOCKER_NAME_PREFIX)
-    .then(defer(self(), &Self::_recover, state, lambda::_1));
+  if (state.isSome()) {
+    // Get the list of all Docker containers (running and exited) in
+    // order to remove any orphans and reconcile checkpointed executors.
+    // TODO(tnachen): Remove this when we expect users to have already
+    // upgraded to 0.23.
+    return docker->ps(true, DOCKER_NAME_PREFIX + state.get().id.value())
+      .then(defer(self(), &Self::_recover, state.get(), lambda::_1));
+  }
+
+return Nothing();
 }
 
+
 Future<Nothing> DockerContainerizerProcess::_recover(
-    const Option<SlaveState>& state,
+    const SlaveState& state,
     const list<Docker::Container>& _containers)
 {
   // Although the slave checkpoints executor pids, before 0.23
@@ -470,107 +500,105 @@ Future<Nothing> DockerContainerizerProcess::_recover(
     }
   }
 
-  if (state.isSome()) {
-    // Collection of pids that we've started reaping in order to
-    // detect very unlikely duplicate scenario (see below).
-    hashmap<ContainerID, pid_t> pids;
-
-    foreachvalue (const FrameworkState& framework, state.get().frameworks) {
-      foreachvalue (const ExecutorState& executor, framework.executors) {
-        if (executor.info.isNone()) {
-          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
-                       << "' of framework " << framework.id
-                       << " because its info could not be recovered";
-          continue;
-        }
-
-        if (executor.latest.isNone()) {
-          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
-                       << "' of framework " << framework.id
-                       << " because its latest run could not be recovered";
-          continue;
-        }
-
-        // We are only interested in the latest run of the executor!
-        const ContainerID& containerId = executor.latest.get();
-        Option<RunState> run = executor.runs.get(containerId);
-        CHECK_SOME(run);
-        CHECK_SOME(run.get().id);
-        CHECK_EQ(containerId, run.get().id.get());
-
-        // We need the pid so the reaper can monitor the executor so
-        // skip this executor if it's not present. This is not an
-        // error because the slave will try to wait on the container
-        // which will return a failed Termination and everything will
-        // get cleaned up.
-        if (!run.get().forkedPid.isSome()) {
-          continue;
-        }
-
-        if (run.get().completed) {
-          VLOG(1) << "Skipping recovery of executor '" << executor.id
+  // Collection of pids that we've started reaping in order to
+  // detect very unlikely duplicate scenario (see below).
+  hashmap<ContainerID, pid_t> pids;
+
+  foreachvalue (const FrameworkState& framework, state.get().frameworks) {
+    foreachvalue (const ExecutorState& executor, framework.executors) {
+      if (executor.info.isNone()) {
+        LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+                     << "' of framework " << framework.id
+                     << " because its info could not be recovered";
+        continue;
+      }
+
+      if (executor.latest.isNone()) {
+        LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+                     << "' of framework " << framework.id
+                     << " because its latest run could not be recovered";
+        continue;
+      }
+
+      // We are only interested in the latest run of the executor!
+      const ContainerID& containerId = executor.latest.get();
+      Option<RunState> run = executor.runs.get(containerId);
+      CHECK_SOME(run);
+      CHECK_SOME(run.get().id);
+      CHECK_EQ(containerId, run.get().id.get());
+
+      // We need the pid so the reaper can monitor the executor so
+      // skip this executor if it's not present. This is not an
+      // error because the slave will try to wait on the container
+      // which will return a failed Termination and everything will
+      // get cleaned up.
+      if (!run.get().forkedPid.isSome()) {
+        continue;
+      }
+
+      if (run.get().completed) {
+        VLOG(1) << "Skipping recovery of executor '" << executor.id
+                << "' of framework " << framework.id
+                << " because its latest run "
+                << containerId << " is completed";
+        continue;
+      }
+
+      const ExecutorInfo executorInfo = executor.info.get();
+      if (executorInfo.has_container() &&
+          executorInfo.container().type() != ContainerInfo::DOCKER) {
+        LOG(INFO) << "Skipping recovery of executor '" << executor.id
                   << "' of framework " << framework.id
-                  << " because its latest run "
-                  << containerId << " is completed";
-          continue;
-        }
-
-        const ExecutorInfo executorInfo = executor.info.get();
-        if (executorInfo.has_container() &&
-            executorInfo.container().type() != ContainerInfo::DOCKER) {
-          LOG(INFO) << "Skipping recovery of executor '" << executor.id
-                    << "' of framework " << framework.id
-                    << " because it was not launched from docker 
containerizer";
-          continue;
-        }
-
-        if (!executorInfo.has_container() &&
-            !existingContainers.contains(containerId)) {
-          LOG(INFO) << "Skipping recovery of executor '" << executor.id
-                    << "' of framework " << framework.id
-                    << " because its executor is not marked as docker "
-                    << "and the docker container doesn't exist";
-          continue;
-        }
-
-        LOG(INFO) << "Recovering container '" << containerId
-                  << "' for executor '" << executor.id
-                  << "' of framework " << framework.id;
-
-        // Create and store a container.
-        Container* container = new Container(containerId);
-        containers_[containerId] = container;
-        container->slaveId = state.get().id;
-        container->state = Container::RUNNING;
-
-        pid_t pid = run.get().forkedPid.get();
-
-        container->status.set(process::reap(pid));
-
-        container->status.future().get()
-          .onAny(defer(self(), &Self::reaped, containerId));
-
-        if (pids.containsValue(pid)) {
-          // This should (almost) never occur. There is the
-          // possibility that a new executor is launched with the same
-          // pid as one that just exited (highly unlikely) and the
-          // slave dies after the new executor is launched but before
-          // it hears about the termination of the earlier executor
-          // (also unlikely).
-          return Failure(
-              "Detected duplicate pid " + stringify(pid) +
-              " for container " + stringify(containerId));
-        }
-
-        pids.put(containerId, pid);
+                  << " because it was not launched from docker containerizer";
+        continue;
       }
-    }
 
-    if (flags.docker_kill_orphans) {
-      return __recover(_containers);
+      if (!executorInfo.has_container() &&
+          !existingContainers.contains(containerId)) {
+        LOG(INFO) << "Skipping recovery of executor '" << executor.id
+                  << "' of framework " << framework.id
+                  << " because its executor is not marked as docker "
+                  << "and the docker container doesn't exist";
+        continue;
+      }
+
+      LOG(INFO) << "Recovering container '" << containerId
+                << "' for executor '" << executor.id
+                << "' of framework " << framework.id;
+
+      // Create and store a container.
+      Container* container = new Container(containerId);
+      containers_[containerId] = container;
+      container->slaveId = state.get().id;
+      container->state = Container::RUNNING;
+
+      pid_t pid = run.get().forkedPid.get();
+
+      container->status.set(process::reap(pid));
+
+      container->status.future().get()
+        .onAny(defer(self(), &Self::reaped, containerId));
+
+      if (pids.containsValue(pid)) {
+        // This should (almost) never occur. There is the
+        // possibility that a new executor is launched with the same
+        // pid as one that just exited (highly unlikely) and the
+        // slave dies after the new executor is launched but before
+        // it hears about the termination of the earlier executor
+        // (also unlikely).
+        return Failure(
+            "Detected duplicate pid " + stringify(pid) +
+            " for container " + stringify(containerId));
+      }
+
+      pids.put(containerId, pid);
     }
   }
 
+  if (flags.docker_kill_orphans) {
+    return __recover(_containers);
+  }
+
   return Nothing();
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7fee619c/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp 
b/src/slave/containerizer/docker.hpp
index 16c7775..0eda1c0 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -182,7 +182,7 @@ private:
       pid_t pid);
 
   process::Future<Nothing> _recover(
-      const Option<state::SlaveState>& state,
+      const state::SlaveState& state,
       const std::list<Docker::Container>& containers);
 
   process::Future<Nothing> __recover(

Reply via email to