Address comments for Docker containerizer changes.

Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3baa6096
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3baa6096
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3baa6096

Branch: refs/heads/master
Commit: 3baa60965407bf0c3eb9c3da1b2ba7c0a4fee968
Parents: 0b5f67e
Author: Timothy Chen <[email protected]>
Authored: Thu Apr 30 00:43:14 2015 -0700
Committer: Timothy Chen <[email protected]>
Committed: Sun May 24 22:27:40 2015 -0700

----------------------------------------------------------------------
 docs/configuration.md                    |  18 +-
 src/Makefile.am                          |   1 +
 src/docker/docker.cpp                    | 293 +++++++++++----------
 src/docker/docker.hpp                    |  86 ++++---
 src/docker/executor.cpp                  | 187 +++++++-------
 src/docker/executor.hpp                  |  69 +++++
 src/slave/constants.cpp                  |   3 +
 src/slave/constants.hpp                  |   8 +
 src/slave/containerizer/docker.cpp       | 342 +++++++++++++-----------
 src/slave/containerizer/docker.hpp       | 165 ++++++------
 src/slave/flags.cpp                      |  20 +-
 src/tests/docker_containerizer_tests.cpp | 357 +++++++++++++-------------
 src/tests/docker_tests.cpp               |  30 +--
 13 files changed, 872 insertions(+), 707 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 0a7629a..5a41477 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -862,7 +862,12 @@ file:///path/to/file (where file contains one of the 
above)</code></pre>
       --[no-]docker_kill_orphans
     </td>
     <td>
-      Enable docker containerizer to kill orphaned containers
+      Enable docker containerizer to kill orphaned containers.
+      You should consider setting this to false when you launch multiple
+      slaves in the same OS, to avoid one of the DockerContainerizer removing
+      docker tasks launched by other slaves. However you should also make sure
+      you enable checkpoint for the slave so the same slave id can be reused,
+      otherwise docker tasks on slave restart will not be cleaned up.
       (default: true)
     </td>
   </tr>
@@ -871,9 +876,9 @@ file:///path/to/file (where file contains one of the 
above)</code></pre>
       --docker_sock=VALUE
     </td>
     <td>
-      The docker UNIX socket path that the CLI uses to communicate to the
-      daemon. We need this to launch docker containers that can run docker
-      CLI.
+      The UNIX socket path to be mounted into the docker executor container to
+      provide docker CLI access to the docker daemon. This must be the path 
used
+      by the slave's docker image.
       (default: /var/run/docker.sock)
     </td>
   </tr>
@@ -883,9 +888,10 @@ file:///path/to/file (where file contains one of the 
above)</code></pre>
     </td>
     <td>
       The docker image used to launch this mesos slave instance.
-      If a image is specified, the docker containerizer assumes the slave
+      If an image is specified, the docker containerizer assumes the slave
       is running in a docker container, and launches executors with
-      docker containers in order to recover them when the slave recovers.
+      docker containers in order to recover them when the slave restarts and
+      recovers.
     </td>
   </tr>
   <tr>

http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 1e56ae5..814468e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -323,6 +323,7 @@ libmesos_no_3rdparty_la_SOURCES =                           
        \
        common/values.cpp                                               \
        docker/docker.hpp                                               \
        docker/docker.cpp                                               \
+        docker/executor.hpp                                             \
        exec/exec.cpp                                                   \
        files/files.cpp                                                 \
        hook/manager.cpp                                                \

http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/docker/docker.cpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp
index 08f2023..ee74da5 100644
--- a/src/docker/docker.cpp
+++ b/src/docker/docker.cpp
@@ -41,6 +41,8 @@
 #include "slave/containerizer/isolators/cgroups/cpushare.hpp"
 #include "slave/containerizer/isolators/cgroups/mem.hpp"
 
+#include "slave/constants.hpp"
+
 using namespace mesos;
 
 using namespace mesos::internal::slave;
@@ -110,16 +112,16 @@ Try<Docker*> Docker::create(const string& path, bool 
validate)
     delete docker;
     return Error("Failed to find a mounted cgroups hierarchy "
                  "for the 'cpu' subsystem; you probably need "
-                 "to mount cgroups manually!");
+                 "to mount cgroups manually");
   }
 #endif // __linux__
 
   // Validate the version (and that we can use Docker at all).
   Future<Version> version = docker->version();
 
-  if(!version.await(Seconds(5))) {
+  if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) {
     delete docker;
-    return Error("Timed out getting docker version.");
+    return Error("Timed out getting docker version");
   }
 
   if (version.isFailed()) {
@@ -129,7 +131,7 @@ Try<Docker*> Docker::create(const string& path, bool 
validate)
 
   if (version.get() < Version(1, 0, 0)) {
     delete docker;
-    return Error("Insufficient version of Docker! Please upgrade to >= 1.0.0");
+    return Error("Insufficient version of Docker. Please upgrade to >= 1.0.0");
   }
 
   return docker;
@@ -165,7 +167,7 @@ Future<Version> Docker::version() const
 Future<Version> Docker::_version(const string& cmd, const Subprocess& s)
 {
   const Option<int>& status = s.status().get();
-  if (!status.isSome() || status.get() != 0) {
+  if (status.isNone() || status.get() != 0) {
     string msg = "Failed to execute '" + cmd + "': ";
     if (status.isSome()) {
       msg += WSTRINGIFY(status.get());
@@ -198,66 +200,62 @@ Future<Version> Docker::__version(const Future<string>& 
output)
     }
   }
 
-  return Failure("Unable to find docker version in output.");
+  return Failure("Unable to find docker version in output");
 }
 
 
 Try<Docker::Container> Docker::Container::create(const JSON::Object& json)
 {
-  map<string, JSON::Value>::const_iterator entry =
-    json.values.find("Id");
-  if (entry == json.values.end()) {
+  Result<JSON::String> idValue = json.find<JSON::String>("Id");
+  if (idValue.isNone()) {
     return Error("Unable to find Id in container");
+  } else if (idValue.isError()) {
+    return Error("Error finding Id in container: " + idValue.error());
   }
 
-  JSON::Value idValue = entry->second;
-  if (!idValue.is<JSON::String>()) {
-    return Error("Id in container is not a string type");
-  }
-
-  string id = idValue.as<JSON::String>().value;
+  string id = idValue.get().value;
 
-  entry = json.values.find("Name");
-  if (entry == json.values.end()) {
+  Result<JSON::String> nameValue = json.find<JSON::String>("Name");
+  if (nameValue.isNone()) {
     return Error("Unable to find Name in container");
+  } else if (nameValue.isError()) {
+    return Error("Error finding Name in container: " + nameValue.error());
   }
 
-  JSON::Value nameValue = entry->second;
-  if (!nameValue.is<JSON::String>()) {
-    return Error("Name in container is not string type");
-  }
-
-  string name = nameValue.as<JSON::String>().value;
+  string name = nameValue.get().value;
 
-  entry = json.values.find("State");
-  if (entry == json.values.end()) {
+  Result<JSON::Object> stateValue = json.find<JSON::Object>("State");
+  if (stateValue.isNone()) {
     return Error("Unable to find State in container");
+  } else if (stateValue.isError()) {
+    return Error("Error finding State in container: " + stateValue.error());
   }
 
-  JSON::Value stateValue = entry->second;
-  if (!stateValue.is<JSON::Object>()) {
-    return Error("State in container is not object type");
-  }
-
-  entry = stateValue.as<JSON::Object>().values.find("Pid");
-  if (entry == json.values.end()) {
+  Result<JSON::Number> pidValue = stateValue.get().find<JSON::Number>("Pid");
+  if (pidValue.isNone()) {
     return Error("Unable to find Pid in State");
+  } else if (pidValue.isError()) {
+    return Error("Error finding Pid in State: " + pidValue.error());
   }
 
-  // TODO(yifan): Reload operator '=' to reuse the value variable above.
-  JSON::Value pidValue = entry->second;
-  if (!pidValue.is<JSON::Number>()) {
-    return Error("Pid in State is not number type");
-  }
-
-  pid_t pid = pid_t(pidValue.as<JSON::Number>().value);
+  pid_t pid = pid_t(pidValue.get().value);
 
   Option<pid_t> optionalPid;
   if (pid != 0) {
     optionalPid = pid;
   }
 
-  return Docker::Container(id, name, optionalPid);
+  Result<JSON::String> startedAtValue =
+    stateValue.get().find<JSON::String>("StartedAt");
+  if (startedAtValue.isNone()) {
+    return Error("Unable to find StartedAt in State");
+  } else if (startedAtValue.isError()) {
+    return Error("Error finding StartedAt in State: " + 
startedAtValue.error());
+  }
+
+  bool started = startedAtValue.get().value != "0001-01-01T00:00:00Z";
+
+  return Docker::Container(id, name, optionalPid, started);
 }
 
 
@@ -307,8 +305,9 @@ Future<Nothing> Docker::run(
     const string& sandboxDirectory,
     const string& mappedDirectory,
     const Option<Resources>& resources,
-    const Option<map<string, string> >& env,
-    bool detached) const
+    const Option<map<string, string>>& env,
+    const Option<string>& stdoutPath,
+    const Option<string>& stderrPath) const
 {
   if (!containerInfo.has_docker()) {
     return Failure("No docker info found in container info");
@@ -320,10 +319,6 @@ Future<Nothing> Docker::run(
   argv.push_back(path);
   argv.push_back("run");
 
-  if (detached) {
-    argv.push_back("-d");
-  }
-
   if (dockerInfo.privileged()) {
     argv.push_back("--privileged");
   }
@@ -381,7 +376,7 @@ Future<Nothing> Docker::run(
     argv.push_back(volumeConfig);
   }
 
-  // Mapping sandbox directory into the contianer mapped directory.
+  // Mapping sandbox directory into the container mapped directory.
   argv.push_back("-v");
   argv.push_back(sandboxDirectory + ":" + mappedDirectory);
 
@@ -501,16 +496,22 @@ Future<Nothing> Docker::run(
   // URI downloads.
   environment["HOME"] = sandboxDirectory;
 
+  Subprocess::IO stdoutIo = Subprocess::PIPE();
+  Subprocess::IO stderrIo = Subprocess::PIPE();
+  if (stdoutPath.isSome()) {
+    stdoutIo = Subprocess::PATH(stdoutPath.get());
+  }
+
+  if (stderrPath.isSome()) {
+    stderrIo = Subprocess::PATH(stderrPath.get());
+  }
+
   Try<Subprocess> s = subprocess(
       path,
       argv,
       Subprocess::PATH("/dev/null"),
-      (detached
-       ? Subprocess::PIPE()
-       : Subprocess::PATH(path::join(sandboxDirectory, "stdout"))),
-      (detached
-       ? Subprocess::PIPE()
-       : Subprocess::PATH(path::join(sandboxDirectory, "stderr"))),
+      stdoutIo,
+      stderrIo,
       None(),
       environment);
 
@@ -518,10 +519,9 @@ Future<Nothing> Docker::run(
     return Failure(s.error());
   }
 
-  if (detached) {
-    return checkError(cmd, s.get());
-  }
-
+  // We don't call checkError here to avoid printing the stderr
+  // of the docker container task as docker run with attach forwards
+  // the container's stderr to the client's stderr.
   return s.get().status()
     .then(lambda::bind(
         &Docker::_run,
@@ -533,7 +533,7 @@ Future<Nothing> Docker::run(
 Future<Nothing> Docker::_run(const Option<int>& status)
 {
   if (status.isNone()) {
-    return Failure("Failed to get exit status.");
+    return Failure("Failed to get exit status");
   } else if (status.get() != 0) {
     return Failure("Container exited on error: " + WSTRINGIFY(status.get()));
   }
@@ -543,7 +543,7 @@ Future<Nothing> Docker::_run(const Option<int>& status)
 
 
 Future<Nothing> Docker::stop(
-    const string& container,
+    const string& containerName,
     const Duration& timeout,
     bool remove) const
 {
@@ -553,7 +553,8 @@ Future<Nothing> Docker::stop(
                    stringify(timeoutSecs));
   }
 
-  string cmd = path + " stop -t " + stringify(timeoutSecs) + " " + container;
+  string cmd = path + " stop -t " + stringify(timeoutSecs) +
+               " " + containerName;
 
   VLOG(1) << "Running " << cmd;
 
@@ -571,7 +572,7 @@ Future<Nothing> Docker::stop(
     .then(lambda::bind(
         &Docker::_stop,
         *this,
-        container,
+        containerName,
         cmd,
         s.get(),
         remove));
@@ -579,7 +580,7 @@ Future<Nothing> Docker::stop(
 
 Future<Nothing> Docker::_stop(
     const Docker& docker,
-    const string& container,
+    const string& containerName,
     const string& cmd,
     const Subprocess& s,
     bool remove)
@@ -588,7 +589,7 @@ Future<Nothing> Docker::_stop(
 
   if (remove) {
     bool force = !status.isSome() || status.get() != 0;
-    return docker.rm(container, force);
+    return docker.rm(containerName, force);
   }
 
   return checkError(cmd, s);
@@ -596,10 +597,10 @@ Future<Nothing> Docker::_stop(
 
 
 Future<Nothing> Docker::rm(
-    const string& container,
+    const string& containerName,
     bool force) const
 {
-  const string cmd = path + (force ? " rm -f " : " rm ") + container;
+  const string cmd = path + (force ? " rm -f " : " rm ") + containerName;
 
   VLOG(1) << "Running " << cmd;
 
@@ -617,9 +618,28 @@ Future<Nothing> Docker::rm(
 }
 
 
-Future<Docker::Container> Docker::inspect(const string& container) const
+Future<Docker::Container> Docker::inspect(
+    const string& containerName,
+    const Option<Duration>& retryInterval) const
 {
-  const string cmd =  path + " inspect " + container;
+  Owned<Promise<Docker::Container>> promise(new Promise<Docker::Container>());
+
+  const string cmd =  path + " inspect " + containerName;
+  _inspect(cmd, promise, retryInterval);
+
+  return promise->future();
+}
+
+
+void Docker::_inspect(
+    const string& cmd,
+    const Owned<Promise<Docker::Container>>& promise,
+    const Option<Duration>& retryInterval)
+{
+  if (promise->future().hasDiscard()) {
+    promise->discard();
+    return;
+  }
 
   VLOG(1) << "Running " << cmd;
 
@@ -630,48 +650,86 @@ Future<Docker::Container> Docker::inspect(const string& 
container) const
       Subprocess::PIPE());
 
   if (s.isError()) {
-    return Failure(s.error());
+    promise->fail(s.error());
+    return;
   }
 
-  return s.get().status()
-    .then(lambda::bind(&Docker::_inspect, cmd, s.get()));
+  s.get().status()
+    .onAny([=]() { __inspect(cmd, promise, retryInterval, s.get()); });
 }
 
 
-Future<Docker::Container> Docker::_inspect(
+void Docker::__inspect(
     const string& cmd,
+    const Owned<Promise<Docker::Container>>& promise,
+    const Option<Duration>& retryInterval,
     const Subprocess& s)
 {
+  if (promise->future().hasDiscard()) {
+    promise->discard();
+    return;
+  }
+
   // Check the exit status of 'docker inspect'.
   CHECK_READY(s.status());
 
   Option<int> status = s.status().get();
 
   if (!status.isSome()) {
-    return Failure("No status found from '" + cmd + "'");
+    promise->fail("No status found from '" + cmd + "'");
   } else if (status.get() != 0) {
+    if (retryInterval.isSome()) {
+      VLOG(1) << "Retrying inspect with non-zero status code. cmd: '"
+              << cmd << "', interval: " << stringify(retryInterval.get());
+      Clock::timer(retryInterval.get(),
+                   [=]() { _inspect(cmd, promise, retryInterval); } );
+      return;
+    }
+
     CHECK_SOME(s.err());
-    return io::read(s.err().get())
+    io::read(s.err().get())
       .then(lambda::bind(
-                failure<Docker::Container>,
+                failure<Nothing>,
                 cmd,
                 status.get(),
-                lambda::_1));
+                lambda::_1))
+      .onAny([=](const Future<Nothing>& future) {
+          CHECK_FAILED(future);
+          promise->fail(future.failure());
+      });
+    return;
   }
 
   // Read to EOF.
   CHECK_SOME(s.out());
-  return io::read(s.out().get())
-    .then(lambda::bind(&Docker::__inspect, lambda::_1));
+  io::read(s.out().get())
+    .onAny([=](const Future<string>& output) {
+      ___inspect(cmd, promise, retryInterval, output);
+    });
 }
 
 
-Future<Docker::Container> Docker::__inspect(const string& output)
+void Docker::___inspect(
+    const string& cmd,
+    const Owned<Promise<Docker::Container>>& promise,
+    const Option<Duration>& retryInterval,
+    const Future<string>& output)
 {
-  Try<JSON::Array> parse = JSON::parse<JSON::Array>(output);
+  if (promise->future().hasDiscard()) {
+    promise->discard();
+    return;
+  }
+
+  if (!output.isReady()) {
+    promise->fail(output.isFailed() ? output.failure() : "future discarded");
+    return;
+  }
+
+  Try<JSON::Array> parse = JSON::parse<JSON::Array>(output.get());
 
   if (parse.isError()) {
-    return Failure("Failed to parse JSON: " + parse.error());
+    promise->fail("Failed to parse JSON: " + parse.error());
+    return;
   }
 
   JSON::Array array = parse.get();
@@ -682,69 +740,30 @@ Future<Docker::Container> Docker::__inspect(const string& 
output)
       Docker::Container::create(array.values.front().as<JSON::Object>());
 
     if (container.isError()) {
-      return Failure("Unable to create container: " + container.error());
+      promise->fail("Unable to create container: " + container.error());
+      return;
     }
 
-    return container.get();
+    if (retryInterval.isSome() && !container.get().started) {
+      VLOG(1) << "Retrying inspect since container not yet started. cmd: '"
+              << cmd << "', interval: " << stringify(retryInterval.get());
+      Clock::timer(retryInterval.get(),
+                   [=]() { _inspect(cmd, promise, retryInterval); } );
+      return;
+    }
+
+    promise->set(container.get());
+    return;
   }
 
   // TODO(benh): Handle the case where the short container ID was
   // not sufficiently unique and 'array.values.size() > 1'.
 
-  return Failure("Failed to find container");
-}
-
-
-Future<Nothing> Docker::logs(
-    const std::string& container,
-    const std::string& directory) const
-{
-  // Redirect the logs into stdout/stderr.
-  //
-  // TODO(benh): This is an intermediate solution for now until we can
-  // reliably stream the logs either from the CLI or from the REST
-  // interface directly. The problem is that it's possible that the
-  // 'docker logs --follow' command will be started AFTER the
-  // container has already terminated, and thus it will continue
-  // running forever because the container has stopped. Unfortunately,
-  // when we later remove the container that still doesn't cause the
-  // 'logs' process to exit. Thus, we wait some period of time until
-  // after the container has terminated in order to let any log data
-  // get flushed, then we kill the 'logs' process ourselves.  A better
-  // solution would be to first "create" the container, then start
-  // following the logs, then finally "start" the container so that
-  // when the container terminates Docker will properly close the log
-  // stream and 'docker logs' will exit. For more information, please
-  // see: https://github.com/docker/docker/issues/7020
-
-  string logs =
-    "logs() {\n"
-    "  " + path + " logs --follow $1 &\n"
-    "  pid=$!\n"
-    "  " + path + " wait $1 >/dev/null 2>&1\n"
-    "  sleep 10\n" // Sleep 10 seconds to make sure the logs are flushed.
-    "  kill -TERM $pid >/dev/null 2>&1 &\n"
-    "}\n"
-    "logs " + container;
-
-  VLOG(1) << "Running " << logs;
-
-  Try<Subprocess> s = subprocess(
-      logs,
-      Subprocess::PATH("/dev/null"),
-      Subprocess::PATH(path::join(directory, "stdout")),
-      Subprocess::PATH(path::join(directory, "stderr")));
-
-  if (s.isError()) {
-    return Failure("Unable to launch docker logs: " + s.error());
-  }
-
-  return s.get().status()
-    .then(lambda::bind(&_nothing));
+  promise->fail("Failed to find container");
 }
 
 
-Future<list<Docker::Container> > Docker::ps(
+Future<list<Docker::Container>> Docker::ps(
     bool all,
     const Option<string>& prefix) const
 {
@@ -772,7 +791,7 @@ Future<list<Docker::Container> > Docker::ps(
 }
 
 
-Future<list<Docker::Container> > Docker::_ps(
+Future<list<Docker::Container>> Docker::_ps(
     const Docker& docker,
     const string& cmd,
     const Subprocess& s,
@@ -789,7 +808,7 @@ Future<list<Docker::Container> > Docker::_ps(
     CHECK_SOME(s.err());
     return io::read(s.err().get())
       .then(lambda::bind(
-                failure<list<Docker::Container> >,
+                failure<list<Docker::Container>>,
                 cmd,
                 status.get(),
                 lambda::_1));
@@ -800,7 +819,7 @@ Future<list<Docker::Container> > Docker::_ps(
 }
 
 
-Future<list<Docker::Container> > Docker::__ps(
+Future<list<Docker::Container>> Docker::__ps(
     const Docker& docker,
     const Option<string>& prefix,
     const string& output)
@@ -811,7 +830,7 @@ Future<list<Docker::Container> > Docker::__ps(
   CHECK(!lines.empty());
   lines.erase(lines.begin());
 
-  list<Future<Docker::Container> > futures;
+  list<Future<Docker::Container>> futures;
 
   foreach (const string& line, lines) {
     // Inspect the containers that we are interested in depending on

http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/docker/docker.hpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp
index 3b6ea64..d06c73a 100644
--- a/src/docker/docker.hpp
+++ b/src/docker/docker.hpp
@@ -52,21 +52,26 @@ public:
     static Try<Container> create(const JSON::Object& json);
 
     // Returns the ID of the container.
-    std::string id;
+    const std::string id;
 
     // Returns the name of the container.
-    std::string name;
+    const std::string name;
 
     // Returns the pid of the container, or None if the container is
     // not running.
-    Option<pid_t> pid;
+    const Option<pid_t> pid;
+
+    // Returns if the container has already started. This field is
+    // needed since pid is empty when the container terminates.
+    const bool started;
 
   private:
     Container(
-        const std::string& _id,
-        const std::string& _name,
-        const Option<pid_t>& _pid)
-      : id(_id), name(_name), pid(_pid) {}
+        const std::string& id,
+        const std::string& name,
+        const Option<pid_t>& pid,
+        bool started)
+      : id(id), name(name), pid(pid), started(started) {}
   };
 
   class Image
@@ -74,26 +79,27 @@ public:
   public:
     static Try<Image> create(const JSON::Object& json);
 
-    Option<std::vector<std::string> > entrypoint;
+    Option<std::vector<std::string>> entrypoint;
 
   private:
-    Image(const Option<std::vector<std::string> >& _entrypoint)
+    Image(const Option<std::vector<std::string>>& _entrypoint)
       : entrypoint(_entrypoint) {}
   };
 
-  // Returns the current docker version.
-  virtual process::Future<Version> version() const;
-
   // Performs 'docker run IMAGE'.
   virtual process::Future<Nothing> run(
       const mesos::ContainerInfo& containerInfo,
       const mesos::CommandInfo& commandInfo,
-      const std::string& name,
+      const std::string& containerName,
       const std::string& sandboxDirectory,
       const std::string& mappedDirectory,
       const Option<mesos::Resources>& resources = None(),
-      const Option<std::map<std::string, std::string> >& env = None(),
-      bool detached = true) const;
+      const Option<std::map<std::string, std::string>>& env = None(),
+      const Option<std::string>& stdoutPath = None(),
+      const Option<std::string>& stderrPath = None()) const;
+
+  // Returns the current docker version.
+  virtual process::Future<Version> version() const;
 
   // Performs 'docker stop -t TIMEOUT CONTAINER'. If remove is true then a rm 
-f
   // will be called when stop failed, otherwise a failure is returned. The
@@ -102,33 +108,27 @@ public:
   // A value of zero (the default value) is the same as issuing a
   // 'docker kill CONTAINER'.
   virtual process::Future<Nothing> stop(
-      const std::string& container,
+      const std::string& containerName,
       const Duration& timeout = Seconds(0),
       bool remove = false) const;
 
   // Performs 'docker rm (-f) CONTAINER'.
   virtual process::Future<Nothing> rm(
-      const std::string& container,
+      const std::string& containerName,
       bool force = false) const;
 
-  // Performs 'docker inspect CONTAINER'.
+  // Performs 'docker inspect CONTAINER'. If retryInterval is set,
+  // we will keep retrying inspect until the container is started or
+  // the future is discarded.
   virtual process::Future<Container> inspect(
-      const std::string& container) const;
+      const std::string& containerName,
+      const Option<Duration>& retryInterval = None()) const;
 
   // Performs 'docker ps (-a)'.
-  virtual process::Future<std::list<Container> > ps(
+  virtual process::Future<std::list<Container>> ps(
       bool all = false,
       const Option<std::string>& prefix = None()) const;
 
-  // Performs a 'docker logs --follow' and sends the output into a
-  // 'stderr' and 'stdout' file in the specfied directory.
-  //
-  // TODO(benh): Return the file descriptors, or some struct around
-  // them so others can do what they want with stdout/stderr.
-  virtual process::Future<Nothing> logs(
-      const std::string& container,
-      const std::string& directory) const;
-
   virtual process::Future<Image> pull(
       const std::string& directory,
       const std::string& image,
@@ -139,6 +139,9 @@ protected:
   Docker(const std::string& _path) : path(_path) {};
 
 private:
+  static process::Future<Nothing> _run(
+      const Option<int>& status);
+
   static process::Future<Version> _version(
       const std::string& cmd,
       const process::Subprocess& s);
@@ -148,29 +151,36 @@ private:
 
   static process::Future<Nothing> _stop(
       const Docker& docker,
-      const std::string& container,
+      const std::string& containerName,
       const std::string& cmd,
       const process::Subprocess& s,
       bool remove);
 
-  static process::Future<Container> _inspect(
+  static void _inspect(
       const std::string& cmd,
-      const process::Subprocess& s);
+      const process::Owned<process::Promise<Container>>& promise,
+      const Option<Duration>& retryInterval);
 
-  static process::Future<Container> __inspect(
-      const std::string& output);
+  static void __inspect(
+      const std::string& cmd,
+      const process::Owned<process::Promise<Container>>& promise,
+      const Option<Duration>& retryInterval,
+      const process::Subprocess& s);
 
-  static process::Future<Nothing> _run(
-      const Option<int>& status);
+  static void ___inspect(
+      const std::string& cmd,
+      const process::Owned<process::Promise<Container>>& promise,
+      const Option<Duration>& retryInterval,
+      const process::Future<std::string>& output);
 
-  static process::Future<std::list<Container> > _ps(
+  static process::Future<std::list<Container>> _ps(
       const Docker& docker,
       const std::string& cmd,
       const process::Subprocess& s,
       const Option<std::string>& prefix,
       process::Future<std::string> output);
 
-  static process::Future<std::list<Container> > __ps(
+  static process::Future<std::list<Container>> __ps(
       const Docker& docker,
       const Option<std::string>& prefix,
       const std::string& output);

http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/docker/executor.cpp
----------------------------------------------------------------------
diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp
index 847997b..075c6b5 100644
--- a/src/docker/executor.cpp
+++ b/src/docker/executor.cpp
@@ -35,6 +35,7 @@
 #include "common/status_utils.hpp"
 
 #include "docker/docker.hpp"
+#include "docker/executor.hpp"
 
 #include "logging/flags.hpp"
 #include "logging/logging.hpp"
@@ -46,6 +47,7 @@ using std::string;
 
 namespace mesos {
 namespace internal {
+namespace docker {
 
 using namespace mesos;
 using namespace process;
@@ -55,9 +57,9 @@ using namespace process;
 // redirect log output to configured stdout and stderr files.
 // Similar to the CommandExecutor, it is only responsible to launch
 // one container and exits afterwards.
-// The executor also assumes it is launched from the
-// DockerContainerizer, which already calls setsid before launching
-// the executor.
+// The executor assumes that it is launched from the
+// DockerContainerizer, which already sets up when launching the
+// executor that ensures its kept running if the slave exits.
 class DockerExecutorProcess : public ProtobufProcess<DockerExecutorProcess>
 {
 public:
@@ -65,13 +67,15 @@ public:
       const Owned<Docker>& docker,
       const string& container,
       const string& sandboxDirectory,
-      const string& mappedDirectory)
-    : launched(false),
-      killed(false),
+      const string& mappedDirectory,
+      const Duration& stopTimeout)
+    : killed(false),
       docker(docker),
       container(container),
       sandboxDirectory(sandboxDirectory),
-      mappedDirectory(mappedDirectory) {}
+      mappedDirectory(mappedDirectory),
+      stopTimeout(stopTimeout),
+      stop(Nothing()) {}
 
   virtual ~DockerExecutorProcess() {}
 
@@ -92,13 +96,16 @@ public:
     cout << "Re-registered docker executor on " << slaveInfo.hostname() << 
endl;
   }
 
-  void disconnected(ExecutorDriver* driver) {}
+  void disconnected(ExecutorDriver* driver)
+  {
+    cout << "Disconnected from the slave" << endl;
+  }
 
   void launchTask(ExecutorDriver* driver, const TaskInfo& task)
   {
-    if (launched) {
+    if (run.isSome()) {
       TaskStatus status;
-      status.mutable_task_id()->MergeFrom(task.task_id());
+      status.mutable_task_id()->CopyFrom(task.task_id());
       status.set_state(TASK_FAILED);
       status.set_message(
           "Attempted to run multiple tasks using a \"docker\" executor");
@@ -109,46 +116,43 @@ public:
 
     cout << "Starting task " << task.task_id().value() << endl;
 
-    // We assume the Docker executor is launched from the
-    // DockerContainerizer, which already calls setsid before
-    // launching the executor.
-
     CHECK(task.has_container());
     CHECK(task.has_command());
 
-    ContainerInfo containerInfo = task.container();
-
-    CHECK(containerInfo.type() == ContainerInfo::DOCKER);
+    CHECK(task.container().type() == ContainerInfo::DOCKER);
 
-    Future<Nothing> run = docker->run(
-        containerInfo,
+    // We're adding task and executor resources to launch docker since
+    // the DockerContainerizer updates the container cgroup limits
+    // directly and it expects it to be the sum of both task and
+    // executor resources. This does leave to a bit of unaccounted
+    // resources for running this executor, but we are assuming
+    // this is just a very small amount of overcommit.
+    run = docker->run(
+        task.container(),
         task.command(),
         container,
         sandboxDirectory,
         mappedDirectory,
         task.resources() + task.executor().resources(),
         None(),
-        false);
-
-    run.onAny(defer(
+        path::join(sandboxDirectory, "stdout"),
+        path::join(sandboxDirectory, "stderr"))
+      .onAny(defer(
         self(),
         &Self::reaped,
         driver,
         task.task_id(),
         lambda::_1));
 
-    dockerRun = run;
-
     TaskStatus status;
-    status.mutable_task_id()->MergeFrom(task.task_id());
+    status.mutable_task_id()->CopyFrom(task.task_id());
     status.set_state(TASK_RUNNING);
     driver->sendStatusUpdate(status);
-
-    launched = true;
   }
 
   void killTask(ExecutorDriver* driver, const TaskID& taskId)
   {
+    cout << "Killing docker task" << endl;
     shutdown(driver);
   }
 
@@ -158,14 +162,19 @@ public:
   {
     cout << "Shutting down" << endl;
 
-    if (dockerRun.isSome() && !killed) {
-      Future<Nothing> dockerRun_ = dockerRun.get();
-      dockerRun_.discard();
+    if (run.isSome() && !killed) {
+      // The docker daemon might still be in progress starting the
+      // container, therefore we kill both the docker run process
+      // and also ask the daemon to stop the container.
+
+      // Making a mutable copy of the future so we can call discard.
+      Future<Nothing>(run.get()).discard();
+      stop = docker->stop(container, stopTimeout);
       killed = true;
     }
   }
 
-  virtual void error(ExecutorDriver* driver, const string& message) {}
+  void error(ExecutorDriver* driver, const string& message) {}
 
 private:
   void reaped(
@@ -173,9 +182,22 @@ private:
       const TaskID& taskId,
       const Future<Nothing>& run)
   {
+    stop.onAny(defer(self(), &Self::_reaped, driver, taskId, run, lambda::_1));
+  }
+
+  void _reaped(
+      ExecutorDriver* driver,
+      const TaskID& taskId,
+      const Future<Nothing>& run,
+      const Future<Nothing>& stop)
+  {
     TaskState state;
     string message;
-    if (killed) {
+    if (!stop.isReady()) {
+      state = TASK_FAILED;
+      message = "Unable to stop docker container, error: " +
+                (stop.isFailed() ? stop.failure() : "future discarded");
+    } else if (killed) {
       state = TASK_KILLED;
     } else if (!run.isReady()) {
       state = TASK_FAILED;
@@ -186,7 +208,7 @@ private:
     }
 
     TaskStatus taskStatus;
-    taskStatus.mutable_task_id()->MergeFrom(taskId);
+    taskStatus.mutable_task_id()->CopyFrom(taskId);
     taskStatus.set_state(state);
     taskStatus.set_message(message);
 
@@ -194,18 +216,21 @@ private:
 
     // A hack for now ... but we need to wait until the status update
     // is sent to the slave before we shut ourselves down.
+    // TODO(tnachen): Remove this hack and also the same hack in the
+    // command executor when we have the new HTTP APIs to wait until
+    // an ack.
     os::sleep(Seconds(1));
     driver->stop();
   }
 
-
-  bool launched;
   bool killed;
   Owned<Docker> docker;
   string container;
   string sandboxDirectory;
   string mappedDirectory;
-  Option<Future<Nothing>> dockerRun;
+  Duration stopTimeout;
+  Option<Future<Nothing>> run;
+  Future<Nothing> stop;
   Option<ExecutorDriver*> driver;
 };
 
@@ -217,22 +242,23 @@ public:
       const Owned<Docker>& docker,
       const string& container,
       const string& sandboxDirectory,
-      const string& mappedDirectory)
+      const string& mappedDirectory,
+      const Duration& stopTimeout)
   {
-    process = new DockerExecutorProcess(
+    process = Owned<DockerExecutorProcess>(new DockerExecutorProcess(
         docker,
         container,
         sandboxDirectory,
-        mappedDirectory);
+        mappedDirectory,
+        stopTimeout));
 
-    spawn(process);
+    spawn(process.get());
   }
 
   virtual ~DockerExecutor()
   {
-    terminate(process);
-    wait(process);
-    delete process;
+    terminate(process.get());
+    wait(process.get());
   }
 
   virtual void registered(
@@ -241,7 +267,7 @@ public:
       const FrameworkInfo& frameworkInfo,
       const SlaveInfo& slaveInfo)
   {
-    dispatch(process,
+    dispatch(process.get(),
              &DockerExecutorProcess::registered,
              driver,
              executorInfo,
@@ -253,7 +279,7 @@ public:
       ExecutorDriver* driver,
       const SlaveInfo& slaveInfo)
   {
-    dispatch(process,
+    dispatch(process.get(),
              &DockerExecutorProcess::reregistered,
              driver,
              slaveInfo);
@@ -261,38 +287,43 @@ public:
 
   virtual void disconnected(ExecutorDriver* driver)
   {
-    dispatch(process, &DockerExecutorProcess::disconnected, driver);
+    dispatch(process.get(), &DockerExecutorProcess::disconnected, driver);
   }
 
   virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
   {
-    dispatch(process, &DockerExecutorProcess::launchTask, driver, task);
+    dispatch(process.get(), &DockerExecutorProcess::launchTask, driver, task);
   }
 
   virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
   {
-    dispatch(process, &DockerExecutorProcess::killTask, driver, taskId);
+    dispatch(process.get(), &DockerExecutorProcess::killTask, driver, taskId);
   }
 
   virtual void frameworkMessage(ExecutorDriver* driver, const string& data)
   {
-    dispatch(process, &DockerExecutorProcess::frameworkMessage, driver, data);
+    dispatch(process.get(),
+             &DockerExecutorProcess::frameworkMessage,
+             driver,
+             data);
   }
 
   virtual void shutdown(ExecutorDriver* driver)
   {
-    dispatch(process, &DockerExecutorProcess::shutdown, driver);
+    dispatch(process.get(), &DockerExecutorProcess::shutdown, driver);
   }
 
   virtual void error(ExecutorDriver* driver, const string& data)
   {
-    dispatch(process, &DockerExecutorProcess::error, driver, data);
+    dispatch(process.get(), &DockerExecutorProcess::error, driver, data);
   }
 
 private:
-  DockerExecutorProcess* process;
+  Owned<DockerExecutorProcess> process;
 };
 
+
+} // namespace docker {
 } // namespace internal {
 } // namespace mesos {
 
@@ -306,41 +337,11 @@ void usage(const char* argv0, const flags::FlagsBase& 
flags)
 }
 
 
-class Flags : public mesos::internal::logging::Flags
-{
-public:
-  Flags()
-  {
-    add(&Flags::container,
-        "container",
-        "The name of the docker container to run.\n");
-
-    add(&Flags::docker,
-        "docker",
-        "The path to the docker cli executable.\n");
-
-    add(&Flags::sandbox_directory,
-        "sandbox_directory",
-        "The path to the container sandbox that stores stdout and stderr\n"
-        "files that is being redirected with docker container logs.\n");
-
-    add(&Flags::mapped_directory,
-        "mapped_directory",
-        "The sandbox directory path that is mapped in the docker 
container.\n");
-  }
-
-  Option<string> container;
-  Option<string> docker;
-  Option<string> sandbox_directory;
-  Option<string> mapped_directory;
-};
-
-
 int main(int argc, char** argv)
 {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
 
-  Flags flags;
+  mesos::internal::docker::Flags flags;
 
   bool help;
   flags.add(&help,
@@ -357,6 +358,8 @@ int main(int argc, char** argv)
     return -1;
   }
 
+  std::cout << stringify(flags) << std::endl;
+
   mesos::internal::logging::initialize(argv[0], flags, true); // Catch signals.
 
   if (help) {
@@ -364,6 +367,8 @@ int main(int argc, char** argv)
     return -1;
   }
 
+  std::cout << stringify(flags) << std::endl;
+
   if (flags.docker.isNone()) {
     LOG(WARNING) << "Expected docker executable path";
     usage(argv[0], flags);
@@ -388,19 +393,27 @@ int main(int argc, char** argv)
     return 0;
   }
 
-  // We skip validation when creating docker abstraction as we
-  // don't want to be checking for cgroups in the executor.
+  if (flags.stop_timeout.isNone()) {
+    LOG(WARNING) << "Expected stop timeout";
+    usage(argv[0], flags);
+    return 0;
+  }
+
+  // The 2nd argument for docker create is set to false so we skip
+  // validation when creating a docker abstraction, as the slave
+  // should have already validated docker.
   Try<Docker*> docker = Docker::create(flags.docker.get(), false);
   if (docker.isError()) {
     LOG(WARNING) << "Unable to create docker abstraction: " << docker.error();
     return -1;
   }
 
-  mesos::internal::DockerExecutor executor(
+  mesos::internal::docker::DockerExecutor executor(
       process::Owned<Docker>(docker.get()),
       flags.container.get(),
       flags.sandbox_directory.get(),
-      flags.mapped_directory.get());
+      flags.mapped_directory.get(),
+      flags.stop_timeout.get());
 
   mesos::MesosExecutorDriver driver(&executor);
   return driver.run() == mesos::DRIVER_STOPPED ? 0 : 1;

http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/docker/executor.hpp
----------------------------------------------------------------------
diff --git a/src/docker/executor.hpp b/src/docker/executor.hpp
new file mode 100644
index 0000000..fa13b6e
--- /dev/null
+++ b/src/docker/executor.hpp
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __DOCKER_EXECUTOR_HPP__
+#define __DOCKER_EXECUTOR_HPP__
+
+#include <stdio.h>
+
+#include <process/process.hpp>
+
+#include "logging/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace docker {
+
+struct Flags : public mesos::internal::logging::Flags
+{
+  Flags() {
+    add(&container,
+        "container",
+        "The name of the docker container to run.\n");
+
+    add(&docker,
+        "docker",
+        "The path to the docker executable.\n");
+
+    add(&sandbox_directory,
+        "sandbox_directory",
+        "The path to the container sandbox holding stdout and stderr files\n"
+        "into which docker container logs will be redirected.");
+
+    add(&mapped_directory,
+        "mapped_directory",
+        "The sandbox directory path that is mapped in the docker 
container.\n");
+
+    add(&stop_timeout,
+        "stop_timeout",
+        "The duration for docker to wait after stopping a running container\n"
+        "before it kills that container.");
+  }
+
+  Option<std::string> container;
+  Option<std::string> docker;
+  Option<std::string> sandbox_directory;
+  Option<std::string> mapped_directory;
+  Option<Duration> stop_timeout;
+};
+
+} // namespace docker {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __DOCKER_EXECUTOR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index 2a99b11..d8d2f98 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -50,6 +50,9 @@ const std::string DEFAULT_PORTS = "[31000-32000]";
 const uint16_t DEFAULT_EPHEMERAL_PORTS_PER_CONTAINER = 1024;
 #endif
 const Duration DOCKER_REMOVE_DELAY = Hours(6);
+const Duration DOCKER_INSPECT_DELAY = Seconds(1);
+// TODO(tnachen): Make this a flag.
+const Duration DOCKER_VERSION_WAIT_TIMEOUT = Seconds(5);
 const std::string DEFAULT_AUTHENTICATEE = "crammd5";
 
 Duration MASTER_PING_TIMEOUT()

http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index fd1c1ab..206d439 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -97,6 +97,14 @@ extern const uint16_t DEFAULT_EPHEMERAL_PORTS_PER_CONTAINER;
 // Default duration that docker containers will be removed after exit.
 extern const Duration DOCKER_REMOVE_DELAY;
 
+// Default duration to wait before retry inspecting a docker
+// container.
+extern const Duration DOCKER_INSPECT_DELAY;
+
+// Default duration that docker containerizer will wait to check
+// docker version.
+extern const Duration DOCKER_VERSION_WAIT_TIMEOUT;
+
 // Name of the default, CRAM-MD5 authenticatee.
 extern const std::string DEFAULT_AUTHENTICATEE;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp 
b/src/slave/containerizer/docker.cpp
index 952d3eb..bd58f94 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -35,8 +35,6 @@
 
 #include "common/status_utils.hpp"
 
-#include "docker/docker.hpp"
-
 #ifdef __linux__
 #include "linux/cgroups.hpp"
 #endif // __linux__
@@ -48,7 +46,6 @@
 #include "slave/containerizer/docker.hpp"
 #include "slave/containerizer/fetcher.hpp"
 
-
 #include "slave/containerizer/isolators/cgroups/constants.hpp"
 
 #include "usage/usage.hpp"
@@ -99,8 +96,8 @@ Option<ContainerID> parse(const Docker::Container& container)
     // was DOCKER_NAME_PREFIX + containerId, and starting with 0.23.0
     // it is changed to DOCKER_NAME_PREFIX + slaveId +
     // DOCKER_NAME_SEPERATOR + containerId.
-    // To be backward compatible during upgrade, we still to support
-    // the previous format.
+    // To be backward compatible during upgrade, we still have to
+    // support the previous format.
     // TODO(tnachen): Remove this check after deprecation cycle.
     if (!strings::contains(name.get(), DOCKER_NAME_SEPERATOR)) {
       ContainerID id;
@@ -124,16 +121,16 @@ Try<DockerContainerizer*> DockerContainerizer::create(
     const Flags& flags,
     Fetcher* fetcher)
 {
-  Try<Docker*> docker = Docker::create(flags.docker);
-  if (docker.isError()) {
-    return Error(docker.error());
+  Try<Docker*> create = Docker::create(flags.docker);
+  if (create.isError()) {
+    return Error("Failed to create docker: " + create.error());
   }
 
-  Shared<Docker> docker_(docker.get());
+  Shared<Docker> docker(create.get());
 
   if (flags.docker_mesos_image.isSome()) {
-    Future<Version> version = docker_->version();
-    if (!version.await(Seconds(5))) {
+    Future<Version> version = docker->version();
+    if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) {
       return Error("Timed out waiting for docker version");
     }
 
@@ -148,7 +145,7 @@ Try<DockerContainerizer*> DockerContainerizer::create(
     }
   }
 
-  return new DockerContainerizer(flags, fetcher, docker_);
+  return new DockerContainerizer(flags, fetcher, docker);
 }
 
 
@@ -177,17 +174,18 @@ DockerContainerizer::~DockerContainerizer()
 }
 
 
-string dockerExecutorCommand(
-    const Flags& flags,
-    const string directory,
-    const string container)
+docker::Flags dockerFlags(
+  const Flags& flags,
+  const string& name,
+  const string& directory)
 {
-  string command = "mesos-docker-executor --docker=" + flags.docker +
-                   " --container=" + container +
-                   " --sandbox_directory=" + directory +
-                   " --mapped_directory=" + flags.docker_sandbox_directory;
-
-  return path::join(flags.launcher_dir, command);
+  docker::Flags dockerFlags;
+  dockerFlags.container = name;
+  dockerFlags.docker = flags.docker;
+  dockerFlags.sandbox_directory = directory;
+  dockerFlags.mapped_directory = flags.docker_sandbox_directory;
+  dockerFlags.stop_timeout = flags.docker_stop_timeout;
+  return dockerFlags;
 }
 
 
@@ -254,34 +252,26 @@ DockerContainerizerProcess::Container::create(
     symlinked = true;
   }
 
-  Container* container = new Container(
-      id,
-      taskInfo,
-      executorInfo,
-      containerWorkdir,
-      user,
-      slaveId,
-      slavePid,
-      checkpoint,
-      symlinked,
-      flags);
-
+  Option<ContainerInfo> containerInfo = None();
+  Option<CommandInfo> commandInfo = None();
+  Option<std::map<string, string>> environment = None();
+  bool launchesExecutorContainer = false;
   if (taskInfo.isSome() && flags.docker_mesos_image.isSome()) {
     // Override the container and command to launch an executor
     // in a docker container.
-    ContainerInfo containerInfo;
+    ContainerInfo newContainerInfo;
 
     // Mounting in the docker socket so the executor can communicate to
     // the host docker daemon. We are assuming the current instance is
     // launching docker containers to the host daemon as well.
-    Volume* dockerSockVolume = containerInfo.add_volumes();
+    Volume* dockerSockVolume = newContainerInfo.add_volumes();
     dockerSockVolume->set_host_path(flags.docker_socket);
     dockerSockVolume->set_container_path(flags.docker_socket);
     dockerSockVolume->set_mode(Volume::RO);
 
     // Mounting in sandbox so the logs from the executor can be
     // persisted over container failures.
-    Volume* sandboxVolume = containerInfo.add_volumes();
+    Volume* sandboxVolume = newContainerInfo.add_volumes();
     sandboxVolume->set_host_path(containerWorkdir);
     sandboxVolume->set_container_path(containerWorkdir);
     sandboxVolume->set_mode(Volume::RW);
@@ -289,27 +279,48 @@ DockerContainerizerProcess::Container::create(
     ContainerInfo::DockerInfo dockerInfo;
     dockerInfo.set_image(flags.docker_mesos_image.get());
 
-    containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+    newContainerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+    docker::Flags dockerExecutorFlags = dockerFlags(
+      flags,
+      Container::name(slaveId, stringify(id)),
+      containerWorkdir);
 
-    const string& command = dockerExecutorCommand(
-        flags, containerWorkdir, container->name());
+    CommandInfo newCommandInfo;
+    // TODO(tnachen): Pass flags directly into docker run.
+    newCommandInfo.set_value(
+      path::join(flags.launcher_dir, "mesos-docker-executor") +
+      " " + stringify(dockerExecutorFlags));
 
-    CommandInfo commandInfo;
-    commandInfo.set_value(command);
-    commandInfo.set_shell(true);
+    newCommandInfo.set_shell(true);
 
-    container->overrideContainer = containerInfo;
-    container->overrideCommand = commandInfo;
-    container->overrideEnvironment = executorEnvironment(
+    containerInfo = newContainerInfo;
+    commandInfo = newCommandInfo;
+    environment = executorEnvironment(
         executorInfo,
         containerWorkdir,
         slaveId,
         slavePid,
         checkpoint,
         flags.recovery_timeout);
+    launchesExecutorContainer = true;
   }
 
-  return container;
+  return new Container(
+      id,
+      taskInfo,
+      executorInfo,
+      containerWorkdir,
+      user,
+      slaveId,
+      slavePid,
+      checkpoint,
+      symlinked,
+      flags,
+      commandInfo,
+      containerInfo,
+      environment,
+      launchesExecutorContainer);
 }
 
 
@@ -321,7 +332,7 @@ Future<Nothing> DockerContainerizerProcess::fetch(
 
   return fetcher->fetch(
       containerId,
-      container->command(),
+      container->command,
       container->directory,
       None(),
       flags);
@@ -329,21 +340,28 @@ Future<Nothing> DockerContainerizerProcess::fetch(
 
 
 Future<Nothing> DockerContainerizerProcess::pull(
-    const ContainerID& containerId,
-    const string& directory,
-    const string& image,
-    bool forcePullImage)
+    const ContainerID& containerId)
 {
-  Future<Docker::Image> future = docker->pull(directory, image, 
forcePullImage);
-  containers_[containerId]->pull = future;
-  return future.then(defer(self(), &Self::_pull, image));
-}
+  if (!containers_.contains(containerId)) {
+    return Failure("Container is already destroyed");
+  }
 
+  Container* container = containers_[containerId];
+  container->state = Container::PULLING;
 
-Future<Nothing> DockerContainerizerProcess::_pull(const string& image)
-{
-  VLOG(1) << "Docker pull " << image << " completed";
-  return Nothing();
+  string image = container->image();
+
+  Future<Docker::Image> future = docker->pull(
+    container->directory,
+    image,
+    container->forcePullImage());
+
+  containers_[containerId]->pull = future;
+
+  return future.then(defer(self(), [=]() {
+    VLOG(1) << "Docker pull " << image << " completed";
+    return Nothing();
+  }));
 }
 
 
@@ -545,10 +563,16 @@ Future<Nothing> DockerContainerizerProcess::_recover(
   // checkpointed container id.
   // TODO(tnachen): Remove this explicit reconciliation 0.24.
   hashset<ContainerID> existingContainers;
+  // Tracks all the task containers that launched an executor in
+  // a docker container.
+  hashset<ContainerID> executorContainers;
   foreach (const Docker::Container& container, _containers) {
     Option<ContainerID> id = parse(container);
     if (id.isSome()) {
       existingContainers.insert(id.get());
+      if (strings::contains(container.name, ".executor")) {
+        executorContainers.insert(id.get());
+      }
     }
   }
 
@@ -623,6 +647,8 @@ Future<Nothing> DockerContainerizerProcess::_recover(
       containers_[containerId] = container;
       container->slaveId = state.id;
       container->state = Container::RUNNING;
+      container->launchesExecutorContainer =
+        executorContainers.contains(containerId);
 
       pid_t pid = run.get().forkedPid.get();
 
@@ -747,18 +773,19 @@ Future<bool> DockerContainerizerProcess::launch(
   if (taskInfo.isSome() && flags.docker_mesos_image.isNone()) {
     // Launching task by forking a subprocess to run docker executor.
     return container.get()->launch = fetch(containerId)
-      .then(defer(self(), &Self::_launch, containerId))
-      .then(defer(self(), &Self::___launch, containerId))
-      .then(defer(self(), &Self::______launch, containerId, lambda::_1))
-      .onFailed(defer(self(), &Self::destroy, containerId, true));
+      .then(defer(self(), [=]() { return pull(containerId); }))
+      .then(defer(self(), [=]() { return launchExecutorProcess(containerId); 
}))
+      .then(defer(self(), [=](pid_t pid) {
+        return reapExecutor(containerId, pid);
+      }));
   }
 
   string containerName = container.get()->name();
 
-  if (flags.docker_mesos_image.isSome()) {
+  if (container.get()->executorName().isSome()) {
     // Launch the container with the executor name as we expect the
     // executor will launch the docker container.
-    containerName = container.get()->executorName();
+    containerName = container.get()->executorName().get();
   }
 
   // Launching task or executor by launching a seperate docker
@@ -768,67 +795,76 @@ Future<bool> DockerContainerizerProcess::launch(
   // we want the executor to keep running when the slave container
   // dies.
   return container.get()->launch = fetch(containerId)
-    .then(defer(self(), &Self::_launch, containerId))
-    .then(defer(self(), &Self::__launch, containerId, containerName))
-    .then(defer(self(), &Self::____launch, containerId, containerName))
-    .then(defer(self(), &Self::_____launch, containerId, lambda::_1))
-    .then(defer(self(), &Self::______launch, containerId, lambda::_1));
+    .then(defer(self(), [=]() { return pull(containerId); }))
+    .then(defer(self(), [=]() {
+      return launchExecutorContainer(containerId, containerName);
+    }))
+    .then(defer(self(), [=](const Docker::Container& dockerContainer) {
+      return checkpointExecutor(containerId, dockerContainer);
+    }))
+    .then(defer(self(), [=](pid_t pid) {
+      return reapExecutor(containerId, pid);
+    }));
 }
 
 
-Future<Nothing> DockerContainerizerProcess::_launch(
-    const ContainerID& containerId)
+Future<Docker::Container> DockerContainerizerProcess::launchExecutorContainer(
+    const ContainerID& containerId,
+    const string& containerName)
 {
-  // Doing the fetch might have succeded but we were actually asked to
-  // destroy the container, which we did, so don't continue.
   if (!containers_.contains(containerId)) {
-    return Failure("Container was destroyed while launching");
+    return Failure("Container is already destroyed");
   }
 
   Container* container = containers_[containerId];
-
-  container->state = Container::PULLING;
-
-  return pull(
-      containerId,
-      container->directory,
-      container->image(),
-      container->forcePullImage());
+  container->state = Container::RUNNING;
+
+  // Start the executor in a Docker container.
+  // This executor could either be a custom executor specified by an
+  // ExecutorInfo, or the docker executor.
+  Future<Nothing> run = docker->run(
+    container->container,
+    container->command,
+    containerName,
+    container->directory,
+    flags.docker_sandbox_directory,
+    container->resources,
+    container->environment,
+    path::join(container->directory, "stdout"),
+    path::join(container->directory, "stderr"));
+
+  Owned<Promise<Docker::Container>> promise(new Promise<Docker::Container>());
+  // We like to propogate the run failure when run fails so slave can
+  // send this failure back to the scheduler. Otherwise we return
+  // inspect's result or its failure, which should not fail when
+  // the container isn't launched.
+  Future<Docker::Container> inspect =
+    docker->inspect(containerName, slave::DOCKER_INSPECT_DELAY)
+      .onAny([=](Future<Docker::Container> f) {
+          // We cannot associate the promise outside of the callback
+          // because we like to propagate run's failure when
+          // available.
+          promise->associate(f);
+      });
+
+  run.onFailed([=](const string& failure) mutable {
+    inspect.discard();
+    promise->fail(failure);
+  });
+
+  return promise->future();
 }
 
 
-Future<Nothing> DockerContainerizerProcess::__launch(
-    const ContainerID& containerId,
-    const string& container)
+Future<pid_t> DockerContainerizerProcess::launchExecutorProcess(
+    const ContainerID& containerId)
 {
   if (!containers_.contains(containerId)) {
-    return Failure("Container was destroyed while pulling image");
+    return Failure("Container is already destroyed");
   }
 
-  Container* container_ = containers_[containerId];
-
-  container_->state = Container::RUNNING;
-
-  // Try and start the Docker container.
-  return docker->run(
-      container_->container(),
-      container_->command(),
-      container,
-      container_->directory,
-      flags.docker_sandbox_directory,
-      container_->resources,
-      container_->environment());
-}
-
-
-Future<pid_t> DockerContainerizerProcess::___launch(
-    const ContainerID& containerId)
-{
-  // After we do Docker::run we shouldn't remove a container until
-  // after we set Container::status.
-  CHECK(containers_.contains(containerId));
-
   Container* container = containers_[containerId];
+  container->state = Container::RUNNING;
 
   // Prepare environment variables for the executor.
   map<string, string> environment = executorEnvironment(
@@ -850,19 +886,19 @@ Future<pid_t> DockerContainerizerProcess::___launch(
     environment["GLOG_v"] = os::getenv("GLOG_v");
   }
 
-  const string& command = dockerExecutorCommand(
-      flags, container->directory, container->name());
-
-  VLOG(1) << "Launching docker executor with command: " << command;
+  vector<string> argv;
+  argv.push_back("mesos-docker-executor");
 
   // Construct the mesos-docker-executor using the "name" we gave the
   // container (to distinguish it from Docker containers not created
   // by Mesos).
   Try<Subprocess> s = subprocess(
-      command,
+      path::join(flags.launcher_dir, "mesos-docker-executor"),
+      argv,
       Subprocess::PIPE(),
       Subprocess::PATH(path::join(container->directory, "stdout")),
       Subprocess::PATH(path::join(container->directory, "stderr")),
+      dockerFlags(flags, container->name(), container->directory),
       environment,
       lambda::bind(&setup, container->directory));
 
@@ -893,37 +929,23 @@ Future<pid_t> DockerContainerizerProcess::___launch(
   if (length != sizeof(c)) {
     string error = string(strerror(errno));
     os::close(s.get().in().get());
-    Failure failure("Failed to synchronize with child process: " + error);
-
-    container->run = failure;
-    return failure;
+    return Failure("Failed to synchronize with child process: " + error);
   }
 
   return s.get().pid();
 }
 
 
-Future<Docker::Container> DockerContainerizerProcess::____launch(
-    const ContainerID& containerId,
-    const std::string& container)
-{
-  CHECK(containers_.contains(containerId));
-
-  return docker->inspect(container);
-}
-
 
-Future<pid_t> DockerContainerizerProcess::_____launch(
+Future<pid_t> DockerContainerizerProcess::checkpointExecutor(
     const ContainerID& containerId,
-    const Docker::Container& container)
+    const Docker::Container& dockerContainer)
 {
   // After we do Docker::run we shouldn't remove a container until
   // after we set Container::status.
   CHECK(containers_.contains(containerId));
 
-  Container* container_ = containers_[containerId];
-
-  Option<int> pid = container.pid;
+  Option<int> pid = dockerContainer.pid;
 
   if (!pid.isSome()) {
     return Failure("Unable to get executor pid after launch");
@@ -936,16 +958,11 @@ Future<pid_t> DockerContainerizerProcess::_____launch(
         "Failed to checkpoint executor's pid: " + checkpointed.error());
   }
 
-  // TODO(tnachen): We need to handle the slave in container scenario.
-  // Instead of forking a log process, launch a log container for
-  // executor.
-  docker->logs(container_->name(), container_->directory);
-
   return pid.get();
 }
 
 
-Future<bool> DockerContainerizerProcess::______launch(
+Future<bool> DockerContainerizerProcess::reapExecutor(
     const ContainerID& containerId,
     pid_t pid)
 {
@@ -1355,6 +1372,25 @@ void DockerContainerizerProcess::destroy(
 
   container->state = Container::DESTROYING;
 
+  if (killed && container->executorPid.isSome()) {
+    LOG(INFO) << "Sending SIGTERM to executor with pid: "
+              << container->executorPid.get();
+    // We need to clean up the executor as the executor might not have
+    // received run task due to a failed containerizer update.
+    // We also kill the executor first since container->status below
+    // is waiting for the executor to finish.
+    Try<std::list<os::ProcessTree>> kill =
+      os::killtree(container->executorPid.get(), SIGTERM);
+
+    if (kill.isError()) {
+      // Ignoring the error from killing executor as it can already
+      // have exited.
+      VLOG(1) << "Ignoring error when killing executor pid "
+              << container->executorPid.get() << " in destroy, error: "
+              << kill.error();
+    }
+  }
+
   // Otherwise, wait for Docker::run to succeed, in which case we'll
   // continue in _destroy (calling Docker::kill) or for Docker::run to
   // fail, in which case we'll re-execute this function and cleanup
@@ -1374,17 +1410,19 @@ void DockerContainerizerProcess::_destroy(
 
   CHECK(container->state == Container::DESTROYING);
 
-  // Do a 'docker rm -f' which we'll then find out about in '_destroy'
+  // Do a 'docker stop' which we'll then find out about in '_destroy'
   // after we've reaped either the container's root process (in the
   // event that we had just launched a container for an executor) or
   // the mesos-docker-executor (in the case we launched a container
   // for a task).
-
   LOG(INFO) << "Running docker stop on container '" << containerId << "'";
 
-  docker->stop(container->name(),
-              flags.docker_stop_timeout)
-    .onAny(defer(self(), &Self::__destroy, containerId, killed, lambda::_1));
+  if (killed) {
+    docker->stop(container->name(), flags.docker_stop_timeout)
+      .onAny(defer(self(), &Self::__destroy, containerId, killed, lambda::_1));
+  } else {
+    __destroy(containerId, killed, Nothing());
+  }
 }
 
 
@@ -1397,7 +1435,7 @@ void DockerContainerizerProcess::__destroy(
 
   Container* container = containers_[containerId];
 
-  if (!kill.isReady()) {
+  if (!kill.isReady() && !container->status.future().isReady()) {
     // TODO(benh): This means we've failed to do a Docker::kill, which
     // means it's possible that the container is still going to be
     // running after we return! We either need to have a periodic
@@ -1422,7 +1460,7 @@ void DockerContainerizerProcess::__destroy(
   }
 
   // Status must be ready since we did a Docker::kill.
-  CHECK_READY(containers_[containerId]->status.future());
+  CHECK_READY(container->status.future());
 
   container->status.future().get()
     .onAny(defer(self(), &Self::___destroy, containerId, killed, lambda::_1));
@@ -1483,11 +1521,13 @@ void DockerContainerizerProcess::reaped(const 
ContainerID& containerId)
 
 
 void DockerContainerizerProcess::remove(
-    const string& container,
-    const string& executor)
+    const string& containerName,
+    const Option<string>& executor)
 {
-  docker->rm(container, true);
-  docker->rm(executor, true);
+  docker->rm(containerName, true);
+  if (executor.isSome()) {
+    docker->rm(executor.get(), true);
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp 
b/src/slave/containerizer/docker.hpp
index 2961048..d2cca4b 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -21,9 +21,11 @@
 
 #include <process/shared.hpp>
 
+#include <stout/flags.hpp>
 #include <stout/hashset.hpp>
 
 #include "docker/docker.hpp"
+#include "docker/executor.hpp"
 
 #include "slave/containerizer/containerizer.hpp"
 
@@ -36,7 +38,7 @@ namespace slave {
 extern const std::string DOCKER_NAME_PREFIX;
 
 // Seperator used to compose docker container name, which is made up
-// of slave id and container id.
+// of slave ID and container ID.
 extern const std::string DOCKER_NAME_SEPERATOR;
 
 // Directory that stores all the symlinked sandboxes that is mapped
@@ -152,11 +154,7 @@ public:
 
   virtual process::Future<Nothing> fetch(const ContainerID& containerId);
 
-  virtual process::Future<Nothing> pull(
-      const ContainerID& containerId,
-      const std::string& directory,
-      const std::string& image,
-      bool forcePullImage);
+  virtual process::Future<Nothing> pull(const ContainerID& containerId);
 
   virtual process::Future<hashset<ContainerID>> containers();
 
@@ -166,8 +164,6 @@ private:
       const ContainerID& containerId,
       const Option<int>& status);
 
-  process::Future<Nothing> _pull(const std::string& image);
-
   Try<Nothing> checkpoint(
       const ContainerID& containerId,
       pid_t pid);
@@ -179,32 +175,21 @@ private:
   process::Future<Nothing> __recover(
       const std::list<Docker::Container>& containers);
 
-  process::Future<Nothing> _launch(
-      const ContainerID& containerId);
-
-  process::Future<Nothing> __launch(
+  // Starts the executor in a Docker container.
+  process::Future<Docker::Container> launchExecutorContainer(
       const ContainerID& containerId,
-      const std::string& container);
+      const std::string& containerName);
 
-  // NOTE: This continuation is only applicable when launching a
-  // container for a task.
-  process::Future<pid_t> ___launch(
+  // Starts the docker executor with a subprocess.
+  process::Future<pid_t> launchExecutorProcess(
       const ContainerID& containerId);
 
-
-  // NOTE: This continuation is only applicable when launching a
-  // container for an executor.
-  process::Future<Docker::Container> ____launch(
+  process::Future<pid_t> checkpointExecutor(
       const ContainerID& containerId,
-      const std::string& container);
+      const Docker::Container& dockerContainer);
 
-  // NOTE: This continuation is only applicable when launching a
-  // container for an executor.
-  process::Future<pid_t> _____launch(
-      const ContainerID& containerId,
-      const Docker::Container& container);
-
-  process::Future<bool> ______launch(
+  // Reaps on the executor pid.
+  process::Future<bool> reapExecutor(
       const ContainerID& containerId,
       pid_t pid);
 
@@ -245,7 +230,9 @@ private:
   void reaped(const ContainerID& containerId);
 
   // Removes the docker container.
-  void remove(const std::string& container, const std::string& executor);
+  void remove(
+      const std::string& containerName,
+      const Option<std::string>& executor);
 
   const Flags flags;
 
@@ -266,6 +253,12 @@ private:
         bool checkpoint,
         const Flags& flags);
 
+    static std::string name(const SlaveID& slaveId, const std::string& id)
+    {
+      return DOCKER_NAME_PREFIX + slaveId.value() + DOCKER_NAME_SEPERATOR +
+        stringify(id);
+    }
+
     Container(const ContainerID& id)
       : state(FETCHING), id(id) {}
 
@@ -278,7 +271,11 @@ private:
               const process::PID<Slave>& slavePid,
               bool checkpoint,
               bool symlinked,
-              const Flags& flags)
+              const Flags& flags,
+              const Option<CommandInfo>& _command,
+              const Option<ContainerInfo>& _container,
+              const Option<std::map<std::string, std::string>>& _environment,
+              bool launchesExecutorContainer)
       : state(FETCHING),
         id(id),
         task(taskInfo),
@@ -289,7 +286,8 @@ private:
         slavePid(slavePid),
         checkpoint(checkpoint),
         symlinked(symlinked),
-        flags(flags)
+        flags(flags),
+        launchesExecutorContainer(launchesExecutorContainer)
     {
       // NOTE: The task's resources are included in the executor's
       // resources in order to make sure when launching the executor
@@ -307,13 +305,33 @@ private:
         CHECK(resources.contains(task.get().resources()));
       }
 
-      overrideEnvironment = executorEnvironment(
-          executor,
-          directory,
-          slaveId,
-          slavePid,
-          checkpoint,
-          flags.recovery_timeout);
+      if (_command.isSome()) {
+        command = _command.get();
+      } else if (task.isSome()) {
+        command = task.get().command();
+      } else {
+        command = executor.command();
+      }
+
+      if (_container.isSome()) {
+        container = _container.get();
+      } else if (task.isSome()) {
+        container = task.get().container();
+      } else {
+        container = executor.container();
+      }
+
+      if (_environment.isSome()) {
+        environment = _environment.get();
+      } else {
+        environment = executorEnvironment(
+            executor,
+            directory,
+            slaveId,
+            slavePid,
+            checkpoint,
+            flags.recovery_timeout);
+      }
     }
 
     ~Container()
@@ -327,13 +345,16 @@ private:
 
     std::string name()
     {
-      return DOCKER_NAME_PREFIX + slaveId.value() + DOCKER_NAME_SEPERATOR +
-        stringify(id);
+      return name(slaveId, stringify(id));
     }
 
-    std::string executorName()
+    Option<std::string> executorName()
     {
-      return name() + DOCKER_NAME_SEPERATOR + "executor";
+      if (launchesExecutorContainer) {
+        return name() + DOCKER_NAME_SEPERATOR + "executor";
+      } else {
+        return None();
+      }
     }
 
     std::string image() const
@@ -354,43 +375,6 @@ private:
       return executor.container().docker().force_pull_image();
     }
 
-    ContainerInfo container() const
-    {
-      if (overrideContainer.isSome()) {
-        return overrideContainer.get();
-      }
-
-      if (task.isSome()) {
-        return task.get().container();
-      }
-
-      return executor.container();
-    }
-
-    CommandInfo command() const
-    {
-      if (overrideCommand.isSome()) {
-        return overrideCommand.get();
-      }
-
-      if (task.isSome()) {
-        return task.get().command();
-      }
-
-      return executor.command();
-    }
-
-    // Returns any extra environment varaibles to set when launching
-    // the Docker container (beyond the those found in CommandInfo).
-    std::map<std::string, std::string> environment() const
-    {
-      if (overrideEnvironment.isSome()) {
-        return overrideEnvironment.get();
-      }
-
-      return std::map<std::string, std::string>();
-    }
-
     // The DockerContainerier needs to be able to properly clean up
     // Docker containers, regardless of when they are destroyed. For
     // example, if a container gets destroyed while we are fetching,
@@ -421,23 +405,22 @@ private:
     } state;
 
     const ContainerID id;
-    Option<TaskInfo> task;
-    ExecutorInfo executor;
+    const Option<TaskInfo> task;
+    const ExecutorInfo executor;
+    ContainerInfo container;
+    CommandInfo command;
+    std::map<std::string, std::string> environment;
 
     // The sandbox directory for the container. This holds the
     // symlinked path if symlinked boolean is true.
-    std::string directory;
+    const std::string directory;
 
-    Option<std::string> user;
+    const Option<std::string> user;
     SlaveID slaveId;
-    process::PID<Slave> slavePid;
+    const process::PID<Slave> slavePid;
     bool checkpoint;
     bool symlinked;
-    Flags flags;
-
-    Option<ContainerInfo> overrideContainer;
-    Option<CommandInfo> overrideCommand;
-    Option<std::map<std::string, std::string>> overrideEnvironment;
+    const Flags flags;
 
     // Promise for future returned from wait().
     Promise<containerizer::Termination> termination;
@@ -469,6 +452,10 @@ private:
     // container. This is stored so we can clean up the executor
     // on destroy.
     Option<pid_t> executorPid;
+
+    // Marks if this container launches a executor in a docker
+    // container.
+    bool launchesExecutorContainer;
   };
 
   hashmap<ContainerID, Container*> containers_;

http://git-wip-us.apache.org/repos/asf/mesos/blob/3baa6096/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index acf30d1..a8c7c49 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -294,22 +294,28 @@ mesos::internal::slave::Flags::Flags()
 
   add(&Flags::docker_kill_orphans,
       "docker_kill_orphans",
-      "Enable docker containerizer to kill orphaned containers",
+      "Enable docker containerizer to kill orphaned containers.\n"
+      "You should consider setting this to false when you launch multiple\n"
+      "slaves in the same OS, to avoid one of the DockerContainerizer \n"
+      "removing docker tasks launched by other slaves. However you should\n"
+      "also make sure to enable checkpoint for the slave so the same slave 
id\n"
+      "can be reused, otherwise docker tasks on slave restart will not be\n"
+      "cleaned up.\n",
       true);
 
   add(&Flags::docker_mesos_image,
       "docker_mesos_image",
       "The docker image used to launch this mesos slave instance.\n"
       "If an image is specified, the docker containerizer assumes the slave\n"
-      "is running in a docker container. This enables the docker\n"
-      "containerizer to launch executors in docker containers, so they keep\n"
-      "running when the slave container exits.");
+      "is running in a docker container, and launches executors with\n"
+      "docker containers in order to recover them when the slave restarts 
and\n"
+      "recovers.\n");
 
   add(&Flags::docker_socket,
       "docker_socket",
-      "The docker UNIX socket path that the docker CLI uses to communicate\n"
-      "to the docker daemon. Mesos needs this to launch docker containers\n"
-      "that can run the docker CLI.\n",
+      "The UNIX socket path to be mounted into the docker executor container\n"
+      "to provide docker CLI access to the docker daemon. This must be the\n"
+      "path used by the slave's docker image.\n",
       "/var/run/docker.sock");
 
   add(&Flags::default_container_info,

Reply via email to