Launch docker container and log through docker executor.

Review: https://reviews.apache.org/r/29337


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/88da1f6e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/88da1f6e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/88da1f6e

Branch: refs/heads/master
Commit: 88da1f6e05addadb9a9f559c58888cfefb58f851
Parents: 7fee619
Author: Timothy Chen <[email protected]>
Authored: Tue Dec 16 13:16:42 2014 -0800
Committer: Timothy Chen <[email protected]>
Committed: Fri May 22 23:13:51 2015 -0700

----------------------------------------------------------------------
 src/docker/docker.cpp                    |  55 ++++--
 src/docker/docker.hpp                    |   6 +-
 src/docker/executor.cpp                  | 156 +++++++++------
 src/slave/containerizer/docker.cpp       |  87 +++------
 src/slave/containerizer/docker.hpp       |   5 -
 src/tests/docker_containerizer_tests.cpp | 266 ++++++--------------------
 6 files changed, 232 insertions(+), 343 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/88da1f6e/src/docker/docker.cpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp
index 3a485a2..d7a33dd 100644
--- a/src/docker/docker.cpp
+++ b/src/docker/docker.cpp
@@ -175,6 +175,13 @@ Try<Docker*> Docker::create(const string& path, bool 
validate)
 }
 
 
+void commandDiscarded(const Subprocess& s, const string& cmd)
+{
+  VLOG(1) << "'" << cmd << "' is being discarded";
+  os::killtree(s.pid(), SIGKILL);
+}
+
+
 Try<Docker::Container> Docker::Container::create(const JSON::Object& json)
 {
   map<string, JSON::Value>::const_iterator entry =
@@ -280,7 +287,8 @@ Future<Nothing> Docker::run(
     const string& sandboxDirectory,
     const string& mappedDirectory,
     const Option<Resources>& resources,
-    const Option<map<string, string> >& env) const
+    const Option<map<string, string> >& env,
+    bool detached) const
 {
   if (!containerInfo.has_docker()) {
     return Failure("No docker info found in container info");
@@ -291,7 +299,10 @@ Future<Nothing> Docker::run(
   vector<string> argv;
   argv.push_back(path);
   argv.push_back("run");
-  argv.push_back("-d");
+
+  if (detached) {
+    argv.push_back("-d");
+  }
 
   if (dockerInfo.privileged()) {
     argv.push_back("--privileged");
@@ -474,8 +485,12 @@ Future<Nothing> Docker::run(
       path,
       argv,
       Subprocess::PATH("/dev/null"),
-      Subprocess::PIPE(),
-      Subprocess::PIPE(),
+      (detached
+       ? Subprocess::PIPE()
+       : Subprocess::PATH(path::join(sandboxDirectory, "stdout"))),
+      (detached
+       ? Subprocess::PIPE()
+       : Subprocess::PATH(path::join(sandboxDirectory, "stderr"))),
       None(),
       environment);
 
@@ -483,9 +498,30 @@ Future<Nothing> Docker::run(
     return Failure(s.error());
   }
 
-  return checkError(cmd, s.get());
+  if (detached) {
+    return checkError(cmd, s.get());
+  }
+
+  return s.get().status()
+    .then(lambda::bind(
+        &Docker::_run,
+        lambda::_1))
+    .onDiscard(lambda::bind(&commandDiscarded, s.get(), cmd));
 }
 
+
+Future<Nothing> Docker::_run(const Option<int>& status)
+{
+  if (status.isNone()) {
+    return Failure("Failed to get exit status.");
+  } else if (status.get() != 0) {
+    return Failure("Container exited on error: " + WSTRINGIFY(status.get()));
+  }
+
+  return Nothing();
+}
+
+
 Future<Nothing> Docker::stop(
     const string& container,
     const Duration& timeout,
@@ -893,14 +929,7 @@ Future<Docker::Image> Docker::__pull(
         cmd,
         directory,
         image))
-    .onDiscard(lambda::bind(&Docker::pullDiscarded, s_.get(), cmd));
-}
-
-
-void Docker::pullDiscarded(const Subprocess& s, const string& cmd)
-{
-  VLOG(1) << "'" << cmd << "' is being discarded";
-  os::killtree(s.pid(), SIGKILL);
+    .onDiscard(lambda::bind(&commandDiscarded, s_.get(), cmd));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/88da1f6e/src/docker/docker.hpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp
index 3ebbc1f..0ba7e0e 100644
--- a/src/docker/docker.hpp
+++ b/src/docker/docker.hpp
@@ -89,7 +89,8 @@ public:
       const std::string& sandboxDirectory,
       const std::string& mappedDirectory,
       const Option<mesos::Resources>& resources = None(),
-      const Option<std::map<std::string, std::string> >& env = None()) const;
+      const Option<std::map<std::string, std::string> >& env = None(),
+      bool detached = true) const;
 
   // Performs 'docker stop -t TIMEOUT CONTAINER'. If remove is true then a rm 
-f
   // will be called when stop failed, otherwise a failure is returned. The
@@ -149,6 +150,9 @@ private:
   static process::Future<Container> __inspect(
       const std::string& output);
 
+  static process::Future<Nothing> _run(
+      const Option<int>& status);
+
   static process::Future<std::list<Container> > _ps(
       const Docker& docker,
       const std::string& cmd,

http://git-wip-us.apache.org/repos/asf/mesos/blob/88da1f6e/src/docker/executor.cpp
----------------------------------------------------------------------
diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp
index 1a5ab86..61aeae9 100644
--- a/src/docker/executor.cpp
+++ b/src/docker/executor.cpp
@@ -27,12 +27,15 @@
 #include <process/protobuf.hpp>
 #include <process/subprocess.hpp>
 #include <process/reap.hpp>
+#include <process/owned.hpp>
 
 #include <stout/flags.hpp>
 #include <stout/os.hpp>
 
 #include "common/status_utils.hpp"
 
+#include "docker/docker.hpp"
+
 #include "logging/logging.hpp"
 
 using std::cerr;
@@ -57,11 +60,17 @@ using namespace process;
 class DockerExecutorProcess : public ProtobufProcess<DockerExecutorProcess>
 {
 public:
-  DockerExecutorProcess(const string& docker, const string& container)
+  DockerExecutorProcess(
+      const Owned<Docker>& docker,
+      const string& container,
+      const string& sandboxDirectory,
+      const string& mappedDirectory)
     : launched(false),
+      killed(false),
       docker(docker),
       container(container),
-      pid(-1) {}
+      sandboxDirectory(sandboxDirectory),
+      mappedDirectory(mappedDirectory) {}
 
   virtual ~DockerExecutorProcess() {}
 
@@ -99,26 +108,35 @@ public:
 
     cout << "Starting task " << task.task_id().value() << endl;
 
-    Try<Subprocess> subprocess = process::subprocess(
-        "exit `" + docker + " wait + " + container + "`",
-        Subprocess::PATH("/dev/null"),
-        Subprocess::FD(STDOUT_FILENO),
-        Subprocess::FD(STDERR_FILENO));
+    // We assume the Docker executor is launched from the
+    // DockerContainerizer, which already calls setsid before
+    // launching the executor.
 
-    if (subprocess.isError()) {
-      cerr << "Couldn't launch docker wait process: " << subprocess.error();
-      abort();
-    }
+    CHECK(task.has_container());
+    CHECK(task.has_command());
+
+    ContainerInfo containerInfo = task.container();
 
-    pid = subprocess.get().pid();
+    CHECK(containerInfo.type() == ContainerInfo::DOCKER);
 
-    process::reap(pid)
-      .onAny(defer(self(),
-                   &Self::reaped,
-                   driver,
-                   task.task_id(),
-                   pid,
-                   lambda::_1));
+    Future<Nothing> run = docker->run(
+        containerInfo,
+        task.command(),
+        container,
+        sandboxDirectory,
+        mappedDirectory,
+        task.resources(),
+        None(),
+        false);
+
+    run.onAny(defer(
+        self(),
+        &Self::reaped,
+        driver,
+        task.task_id(),
+        lambda::_1));
+
+    dockerRun = run;
 
     TaskStatus status;
     status.mutable_task_id()->MergeFrom(task.task_id());
@@ -139,8 +157,9 @@ public:
   {
     cout << "Shutting down" << endl;
 
-    if (pid > 0 && !killed) {
-      ::kill(pid, SIGKILL);
+    if (dockerRun.isSome() && !killed) {
+      Future<Nothing> dockerRun_ = dockerRun.get();
+      dockerRun_.discard();
       killed = true;
     }
   }
@@ -151,41 +170,20 @@ private:
   void reaped(
       ExecutorDriver* driver,
       const TaskID& taskId,
-      pid_t pid,
-      const Future<Option<int>>& status)
+      const Future<Nothing>& run)
   {
     TaskState state;
     string message;
-    if (!status.isReady()) {
-      state = TASK_FAILED;
-      message =
-        "Failed to get exit status for Docker executor: " +
-        (status.isFailed() ? status.failure() : "future discarded");
-    } else if (status.get().isNone()) {
+    if (killed) {
+      state = TASK_KILLED;
+    } else if (!run.isReady()) {
       state = TASK_FAILED;
-      message = "Failed to get exit status for Docker executor";
+      message = "Docker container run error: " +
+                (run.isFailed() ? run.failure() : "future discarded");
     } else {
-      int s = status.get().get();
-
-      // Subprocess status is gathered from waitpid, therefore we can
-      // get the exit status from WIFEXITED.
-      CHECK(WIFEXITED(s) || WIFSIGNALED(s)) << "status code: " << s;
-
-      if (WIFEXITED(s) && WEXITSTATUS(s) == 0) {
-        state = TASK_FINISHED;
-      } else if (killed) {
-        // Send TASK_KILLED if the task was killed as a result of
-        // killTask() or shutdown().
-        state = TASK_KILLED;
-      } else {
-        state = TASK_FAILED;
-      }
-
-      message = "Docker  " + WSTRINGIFY(s);
+      state = TASK_FINISHED;
     }
 
-    cout << message << " (pid: " << pid << ")" << endl;
-
     TaskStatus taskStatus;
     taskStatus.mutable_task_id()->MergeFrom(taskId);
     taskStatus.set_state(state);
@@ -201,10 +199,12 @@ private:
 
 
   bool launched;
-  string docker;
-  string container;
-  pid_t pid;
   bool killed;
+  Owned<Docker> docker;
+  string container;
+  string sandboxDirectory;
+  string mappedDirectory;
+  Option<Future<Nothing>> dockerRun;
   Option<ExecutorDriver*> driver;
 };
 
@@ -212,9 +212,18 @@ private:
 class DockerExecutor : public Executor
 {
 public:
-  DockerExecutor(const string& docker, const string& container)
+  DockerExecutor(
+      const Owned<Docker>& docker,
+      const string& container,
+      const string& sandboxDirectory,
+      const string& mappedDirectory)
   {
-    process = new DockerExecutorProcess(docker, container);
+    process = new DockerExecutorProcess(
+        docker,
+        container,
+        sandboxDirectory,
+        mappedDirectory);
+
     spawn(process);
   }
 
@@ -303,15 +312,26 @@ public:
   {
     add(&Flags::container,
         "container",
-        "The name of the docker container to wait on.");
+        "The name of the docker container to run.\n");
 
     add(&Flags::docker,
         "docker",
-        "The path to the docker cli executable.");
+        "The path to the docker cli executable.\n");
+
+    add(&Flags::sandbox_directory,
+        "sandbox_directory",
+        "The path to the container sandbox that stores stdout and stderr\n"
+        "files that is being redirected with docker container logs.\n");
+
+    add(&Flags::mapped_directory,
+        "mapped_directory",
+        "The sandbox directory path that is mapped in the docker 
container.\n");
   }
 
   Option<string> container;
   Option<string> docker;
+  Option<string> sandbox_directory;
+  Option<string> mapped_directory;
 };
 
 
@@ -351,8 +371,30 @@ int main(int argc, char** argv)
     return 0;
   }
 
+  if (flags.sandbox_directory.isNone()) {
+    LOG(WARNING) << "Expected sandbox directory path";
+    usage(argv[0], flags);
+    return 0;
+  }
+
+  if (flags.mapped_directory.isNone()) {
+    LOG(WARNING) << "Expected mapped sandbox directory path";
+    usage(argv[0], flags);
+    return 0;
+  }
+
+  Try<Docker*> docker = Docker::create(flags.docker.get());
+  if (docker.isError()) {
+    LOG(WARNING) << "Unable to create docker abstraction: " << docker.error();
+    return -1;
+  }
+
   mesos::internal::DockerExecutor executor(
-      flags.docker.get(), flags.container.get());
+      process::Owned<Docker>(docker.get()),
+      flags.container.get(),
+      flags.sandbox_directory.get(),
+      flags.mapped_directory.get());
+
   mesos::MesosExecutorDriver driver(&executor);
   return driver.run() == mesos::DRIVER_STOPPED ? 0 : 1;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/88da1f6e/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp 
b/src/slave/containerizer/docker.cpp
index 50a5b72..58505f9 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -819,7 +819,10 @@ Future<pid_t> DockerContainerizerProcess::___launch(
   if (length != sizeof(c)) {
     string error = string(strerror(errno));
     os::close(s.get().in().get());
-    return Failure("Failed to synchronize with child process: " + error);
+    Failure failure("Failed to synchronize with child process: " + error);
+
+    container->run = failure;
+    return failure;
   }
 
   return s.get().pid();
@@ -851,14 +854,29 @@ Future<Nothing> 
DockerContainerizerProcess::___launchInContainer(
     environment[variable.name()] = variable.value();
   }
 
+  // We are launching a mesos-docker-executor in a docker container so
+  // that the containerizer can recover the executor container, as we
+  // are assuming this instance is launched in a docker container and
+  // forked processes are killed on exit.
   ContainerInfo containerInfo;
+
+  // Mounting in the docker socket so the executor can communicate to
+  // the host docker daemon. We are assuming the current instance is
+  // launching docker containers to the host daemon as well.
+  Volume* volume = containerInfo.add_volumes();
+  volume->set_host_path(flags.docker_socket);
+  volume->set_container_path(flags.docker_socket);
+  volume->set_mode(Volume::RO);
+
   ContainerInfo::DockerInfo dockerInfo;
 
   dockerInfo.set_image(flags.docker_mesos_image.get());
   containerInfo.mutable_docker()->CopyFrom(dockerInfo);
 
   string command = "mesos-docker-executor --docker=" + flags.docker +
-                   " --container=" + container->name();
+                   " --container=" + container->name() +
+                   " --sandbox_directory=" + container->directory +
+                   " --mapped_directory=" + flags.docker_sandbox_directory;
 
   command = path::join(flags.launcher_dir, command);
 
@@ -1003,58 +1021,6 @@ Future<pid_t> DockerContainerizerProcess::______launch(
 }
 
 
-Future<pid_t> DockerContainerizerProcess::______launchInContainer(
-    const ContainerID& containerId,
-    pid_t pid)
-{
-  CHECK(containers_.contains(containerId));
-  CHECK(flags.docker_mesos_image.isSome());
-
-  Container* container = containers_[containerId];
-
-  // We are launching a docker container to read the logs from
-  // a given launched container into the sandbox stdout and stderr
-  // files. This requires the docker container to run docker logs,
-  // which requires us mounting in the docker socket and docker
-  // CLI binary.
-  ContainerInfo containerInfo;
-  Volume* volume = containerInfo.add_volumes();
-  volume->set_host_path(flags.docker_socket);
-  volume->set_container_path(flags.docker_socket);
-  volume->set_mode(Volume::RO);
-
-  volume = containerInfo.add_volumes();
-  volume->set_host_path(flags.docker);
-  volume->set_container_path(flags.docker);
-  volume->set_mode(Volume::RO);
-
-  ContainerInfo::DockerInfo dockerInfo;
-  dockerInfo.set_image(flags.docker_mesos_image.get());
-  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
-
-  string command = flags.docker + " logs --follow " + container->name() +
-                   " 2>> " +
-                   path::join(flags.docker_sandbox_directory, "stderr") +
-                   " 1>> " +
-                   path::join(flags.docker_sandbox_directory, "stdout");
-
-  VLOG(1) << "Running docker logs in container with command: " << command;
-
-  CommandInfo commandInfo;
-  commandInfo.set_value(command);
-  commandInfo.set_shell(true);
-
-  docker->run(
-      containerInfo,
-      commandInfo,
-      container->logName(),
-      container->directory,
-      flags.docker_sandbox_directory);
-
-  return pid;
-}
-
-
 Future<pid_t> DockerContainerizerProcess::____launchInContainer(
     const ContainerID& containerId)
 {
@@ -1127,6 +1093,11 @@ Future<Nothing> DockerContainerizerProcess::update(
   // Store the resources for usage().
   container->resources = _resources;
 
+  if (flags.docker_mesos_image.isSome()) {
+    LOG(INFO) << "Ignoring update as slave is running under container.";
+    return Nothing();
+  }
+
 #ifdef __linux__
   if (!_resources.cpus().isSome() && !_resources.mem().isSome()) {
     LOG(WARNING) << "Ignoring update as no supported resources are present";
@@ -1486,12 +1457,11 @@ void DockerContainerizerProcess::destroy(
 
   CHECK(container->state == Container::RUNNING);
 
-  // Remove the executor and log docker containers. They might not
+  // Remove the executor docker containers. They might not
   // been configured to launch but we might have recovered containers
   // on previous slave run that has configured to launch executor in
   // docker.
-  docker->rm(container->logName(), true);
-  docker->rm(container->executorName(), true);
+  docker->stop(container->executorName(), Seconds(0), true);
 
   container->state = Container::DESTROYING;
 
@@ -1537,8 +1507,7 @@ void DockerContainerizerProcess::_destroy(
   // after we've reaped either the container's root process (in the
   // event that we had just launched a container for an executor) or
   // the mesos-docker-executor (in the case we launched a container
-  // for a task). As a reminder, the mesos-docker-executor exits
-  // because it's doing a 'docker wait' on the container.
+  // for a task).
 
   LOG(INFO) << "Running docker stop on container '" << containerId << "'";
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/88da1f6e/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp 
b/src/slave/containerizer/docker.hpp
index 0eda1c0..e683ea4 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -349,11 +349,6 @@ private:
         stringify(id);
     }
 
-    std::string logName()
-    {
-      return name() + DOCKER_NAME_SEPERATOR + "log";
-    }
-
     std::string executorName()
     {
       return name() + DOCKER_NAME_SEPERATOR + "executor";

http://git-wip-us.apache.org/repos/asf/mesos/blob/88da1f6e/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp 
b/src/tests/docker_containerizer_tests.cpp
index 3fa663e..378a526 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -178,35 +178,56 @@ public:
   }
 
   static bool exists(
-      const list<Docker::Container>& containers,
+      const process::Shared<Docker>& docker,
       const SlaveID& slaveId,
       const ContainerID& containerId)
   {
+    Duration waited = Duration::zero();
     string expectedName = containerName(slaveId, containerId);
 
-    foreach (const Docker::Container& container, containers) {
-      // Docker inspect name contains an extra slash in the beginning.
-      if (strings::contains(container.name, expectedName)) {
+    do {
+      Future<Docker::Container> container = docker->inspect(expectedName);
+
+      if (!container.await(Seconds(3))) {
+        return false;
+      }
+
+      if(!container.isFailed()) {
         return true;
       }
-    }
+
+      os::sleep(Milliseconds(200));
+      waited += Milliseconds(200);
+    } while (waited < Seconds(5));
 
     return false;
   }
 
 
   static bool running(
-      const list<Docker::Container>& containers,
+      const process::Shared<Docker>& docker,
+      const SlaveID& slaveId,
       const ContainerID& containerId)
   {
-    string expectedName = slave::DOCKER_NAME_PREFIX + stringify(containerId);
+    Duration waited = Duration::zero();
+    string expectedName = containerName(slaveId, containerId);
+
+    do {
+      Future<Docker::Container> container = docker->inspect(expectedName);
 
-    foreach (const Docker::Container& container, containers) {
-      // Docker inspect name contains an extra slash in the beginning.
-      if (strings::contains(container.name, expectedName)) {
-        return container.pid.isSome();
+      if (!container.await(Seconds(3))) {
+        return false;
       }
-    }
+
+      if (container.isReady()) {
+        if (container.get().pid.isSome()) {
+          return true;
+        }
+      }
+
+      os::sleep(Milliseconds(200));
+      waited += Milliseconds(200);
+    } while (waited < Seconds(5));
 
     return false;
   }
@@ -496,12 +517,7 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_Launch_Executor)
   AWAIT_READY_FOR(statusFinished, Seconds(60));
   EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
 
-  Future<list<Docker::Container>> containers =
-    docker->ps(true, slave::DOCKER_NAME_PREFIX);
-
-  AWAIT_READY(containers);
-
-  ASSERT_TRUE(exists(containers.get(), slaveId, containerId.get()));
+  ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
 
   Future<containerizer::Termination> termination =
     dockerContainerizer.wait(containerId.get());
@@ -511,11 +527,7 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_Launch_Executor)
 
   AWAIT_READY(termination);
 
-  containers = docker->ps(true, slave::DOCKER_NAME_PREFIX);
-
-  AWAIT_READY(containers);
-
-  ASSERT_FALSE(running(containers.get(), containerId.get()));
+  ASSERT_FALSE(running(docker, slaveId, containerId.get()));
 
   // See above where we assign logs future for more comments.
   AWAIT_READY_FOR(logs, Seconds(30));
@@ -629,12 +641,7 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_Launch_Executor_Bridged)
   AWAIT_READY_FOR(statusFinished, Seconds(60));
   EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
 
-  Future<list<Docker::Container>> containers =
-    docker->ps(true, slave::DOCKER_NAME_PREFIX);
-
-  AWAIT_READY(containers);
-
-  ASSERT_TRUE(exists(containers.get(), slaveId, containerId.get()));
+  ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
 
   Future<containerizer::Termination> termination =
     dockerContainerizer.wait(containerId.get());
@@ -644,10 +651,7 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_Launch_Executor_Bridged)
 
   AWAIT_READY(termination);
 
-  containers = docker->ps(true, slave::DOCKER_NAME_PREFIX);
-  AWAIT_READY(containers);
-
-  ASSERT_FALSE(running(containers.get(), containerId.get()));
+  ASSERT_FALSE(running(docker, slaveId, containerId.get()));
 
   // See above where we assign logs future for more comments.
   AWAIT_READY_FOR(logs, Seconds(30));
@@ -665,15 +669,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
-  // We need to capture and await on the logs process's future so that
-  // we can ensure there is no child process at the end of the test.
-  // The logs future is being awaited at teardown.
-  Future<Nothing> logs;
-  EXPECT_CALL(*mockDocker, logs(_, _))
-    .WillOnce(FutureResult(&logs,
-                           Invoke((MockDocker*) docker.get(),
-                                  &MockDocker::_logs)));
-
   slave::Flags flags = CreateSlaveFlags();
 
   Fetcher fetcher;
@@ -746,14 +741,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
   AWAIT_READY_FOR(statusRunning, Seconds(60));
   EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
 
-  Future<list<Docker::Container>> containers =
-    docker->ps(true, slave::DOCKER_NAME_PREFIX);
-
-  AWAIT_READY(containers);
-
-  ASSERT_TRUE(containers.get().size() > 0);
-
-  ASSERT_TRUE(exists(containers.get(), slaveId, containerId.get()));
+  ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
 
   Future<containerizer::Termination> termination =
     dockerContainerizer.wait(containerId.get());
@@ -763,12 +751,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
 
   AWAIT_READY(termination);
 
-  containers = docker->ps(true, slave::DOCKER_NAME_PREFIX);
-
-  ASSERT_FALSE(running(containers.get(), containerId.get()));
-
-  // See above where we assign logs future for more comments.
-  AWAIT_READY_FOR(logs, Seconds(30));
+  ASSERT_FALSE(running(docker, slaveId, containerId.get()));
 
   Shutdown();
 }
@@ -782,15 +765,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
-  // We need to capture and await on the logs process's future so that
-  // we can ensure there is no child process at the end of the test.
-  // The logs future is being awaited at teardown.
-  Future<Nothing> logs;
-  EXPECT_CALL(*mockDocker, logs(_, _))
-    .WillOnce(FutureResult(&logs,
-                           Invoke((MockDocker*) docker.get(),
-                                  &MockDocker::_logs)));
-
   slave::Flags flags = CreateSlaveFlags();
 
   Fetcher fetcher;
@@ -822,6 +796,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
 
   const Offer& offer = offers.get()[0];
 
+  SlaveID slaveId = offer.slave_id();
+
   TaskInfo task;
   task.set_name("");
   task.mutable_task_id()->set_value("1");
@@ -874,19 +850,11 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
 
   AWAIT_READY(termination);
 
-  Future<list<Docker::Container>> containers =
-    docker->ps(true, slave::DOCKER_NAME_PREFIX);
-
-  AWAIT_READY(containers);
-
-  ASSERT_FALSE(running(containers.get(), containerId.get()));
+  ASSERT_FALSE(running(docker, slaveId, containerId.get()));
 
   driver.stop();
   driver.join();
 
-  // See above where we assign logs future for more comments.
-  AWAIT_READY_FOR(logs, Seconds(30));
-
   Shutdown();
 }
 
@@ -903,15 +871,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
-  // We need to capture and await on the logs process's future so that
-  // we can ensure there is no child process at the end of the test.
-  // The logs future is being awaited at teardown.
-  Future<Nothing> logs;
-  EXPECT_CALL(*mockDocker, logs(_, _))
-    .WillOnce(FutureResult(&logs,
-                           Invoke((MockDocker*) docker.get(),
-                                  &MockDocker::_logs)));
-
   Fetcher fetcher;
 
   MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
@@ -991,13 +950,15 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage)
   do {
     Future<ResourceStatistics> usage =
       dockerContainerizer.usage(containerId.get());
-    AWAIT_READY(usage);
+    ASSERT_TRUE(usage.await(Seconds(3)));
 
-    statistics = usage.get();
+    if (usage.isReady()) {
+      statistics = usage.get();
 
-    if (statistics.cpus_user_time_secs() > 0 &&
-        statistics.cpus_system_time_secs() > 0) {
-      break;
+      if (statistics.cpus_user_time_secs() > 0 &&
+          statistics.cpus_system_time_secs() > 0) {
+        break;
+      }
     }
 
     os::sleep(Milliseconds(200));
@@ -1027,9 +988,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage)
   driver.stop();
   driver.join();
 
-  // See above where we assign logs future for more comments.
-  AWAIT_READY_FOR(logs, Seconds(30));
-
   Shutdown();
 }
 
@@ -1045,15 +1003,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
-  // We need to capture and await on the logs process's future so that
-  // we can ensure there is no child process at the end of the test.
-  // The logs future is being awaited at teardown.
-  Future<Nothing> logs;
-  EXPECT_CALL(*mockDocker, logs(_, _))
-    .WillOnce(FutureResult(&logs,
-                           Invoke((MockDocker*) docker.get(),
-                                  &MockDocker::_logs)));
-
   Fetcher fetcher;
 
   MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
@@ -1125,6 +1074,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
   AWAIT_READY_FOR(statusRunning, Seconds(60));
   EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
 
+  ASSERT_TRUE(running(docker, slaveId, containerId.get()));
+
   string name = containerName(slaveId, containerId.get());
 
   Future<Docker::Container> container = docker->inspect(name);
@@ -1193,9 +1144,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
   driver.stop();
   driver.join();
 
-  // See above where we assign logs future for more comments.
-  AWAIT_READY_FOR(logs, Seconds(30));
-
   Shutdown();
 }
 #endif //__linux__
@@ -1370,15 +1318,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
-  // We need to capture and await on the logs process's future so that
-  // we can ensure there is no child process at the end of the test.
-  // The logs future is being awaited at teardown.
-  Future<Nothing> logs;
-  EXPECT_CALL(*mockDocker, logs(_, _))
-    .WillOnce(FutureResult(&logs,
-                           Invoke((MockDocker*) docker.get(),
-                                  &MockDocker::_logs)));
-
   // We skip stopping the docker container because stopping a container
   // even when it terminated might not flush the logs and we end up
   // not getting stdout/stderr in our tests.
@@ -1462,9 +1401,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
   AWAIT_READY_FOR(statusFinished, Seconds(60));
   EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
 
-  // See above where we assign logs future for more comments.
-  AWAIT_READY_FOR(logs, Seconds(30));
-
   // Now check that the proper output is in stderr and stdout (which
   // might also contain other things, hence the use of a UUID).
   Try<string> read = os::read(path::join(directory.get(), "stderr"));
@@ -1498,15 +1434,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
-  // We need to capture and await on the logs process's future so that
-  // we can ensure there is no child process at the end of the test.
-  // The logs future is being awaited at teardown.
-  Future<Nothing> logs;
-  EXPECT_CALL(*mockDocker, logs(_, _))
-    .WillOnce(FutureResult(&logs,
-                           Invoke((MockDocker*) docker.get(),
-                                  &MockDocker::_logs)));
-
   // We skip stopping the docker container because stopping a container
   // even when it terminated might not flush the logs and we end up
   // not getting stdout/stderr in our tests.
@@ -1592,9 +1519,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
   AWAIT_READY_FOR(statusFinished, Seconds(60));
   EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
 
-  // See above where we assign logs future for more comments.
-  AWAIT_READY_FOR(logs, Seconds(30));
-
   Try<string> read = os::read(path::join(directory.get(), "stdout"));
 
   ASSERT_SOME(read);
@@ -1627,15 +1551,6 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_Default_CMD_Override)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
-  // We need to capture and await on the logs process's future so that
-  // we can ensure there is no child process at the end of the test.
-  // The logs future is being awaited at teardown.
-  Future<Nothing> logs;
-  EXPECT_CALL(*mockDocker, logs(_, _))
-    .WillOnce(FutureResult(&logs,
-                           Invoke((MockDocker*) docker.get(),
-                                  &MockDocker::_logs)));
-
   // We skip stopping the docker container because stopping  a container
   // even when it terminated might not flush the logs and we end up
   // not getting stdout/stderr in our tests.
@@ -1723,9 +1638,6 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_Default_CMD_Override)
   AWAIT_READY_FOR(statusFinished, Seconds(60));
   EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
 
-  // See above where we assign logs future for more comments.
-  AWAIT_READY_FOR(logs, Seconds(30));
-
   // Now check that the proper output is in stderr and stdout.
   Try<string> read = os::read(path::join(directory.get(), "stdout"));
 
@@ -1761,15 +1673,6 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_Default_CMD_Args)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
-  // We need to capture and await on the logs process's future so that
-  // we can ensure there is no child process at the end of the test.
-  // The logs future is being awaited at teardown.
-  Future<Nothing> logs;
-  EXPECT_CALL(*mockDocker, logs(_, _))
-    .WillOnce(FutureResult(&logs,
-                           Invoke((MockDocker*) docker.get(),
-                                  &MockDocker::_logs)));
-
   // We skip stopping the docker container because stopping a container
   // even when it terminated might not flush the logs and we end up
   // not getting stdout/stderr in our tests.
@@ -1858,9 +1761,6 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_Default_CMD_Args)
   AWAIT_READY_FOR(statusFinished, Seconds(60));
   EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
 
-  // See above where we assign logs future for more comments.
-  AWAIT_READY_FOR(logs, Seconds(30));
-
   // Now check that the proper output is in stderr and stdout.
   Try<string> read = os::read(path::join(directory.get(), "stdout"));
 
@@ -1897,15 +1797,6 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_SlaveRecoveryTaskContainer)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
-  // We need to capture and await on the logs process's future so that
-  // we can ensure there is no child process at the end of the test.
-  // The logs future is being awaited at teardown.
-  Future<Nothing> logs;
-  EXPECT_CALL(*mockDocker, logs(_, _))
-    .WillOnce(FutureResult(&logs,
-                           Invoke((MockDocker*) docker.get(),
-                                  &MockDocker::_logs)));
-
   Fetcher fetcher;
 
   // We put the containerizer on the heap so we can more easily
@@ -2019,13 +1910,7 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_SlaveRecoveryTaskContainer)
   AWAIT_READY(status);
   ASSERT_EQ(TASK_RUNNING, status.get().state());
 
-  // Make sure the container is still running.
-  Future<list<Docker::Container>> containers =
-    docker->ps(true, slave::DOCKER_NAME_PREFIX);
-
-  AWAIT_READY(containers);
-
-  ASSERT_TRUE(exists(containers.get(), slaveId, containerId.get()));
+  ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
 
   Future<containerizer::Termination> termination =
     dockerContainerizer2->wait(containerId.get());
@@ -2035,9 +1920,6 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_SlaveRecoveryTaskContainer)
 
   AWAIT_READY(termination);
 
-  // See above where we assign logs future for more comments.
-  AWAIT_READY_FOR(logs, Seconds(30));
-
   Shutdown();
 
   delete dockerContainerizer2;
@@ -2212,13 +2094,7 @@ TEST_F(DockerContainerizerTest,
   AWAIT_READY(status);
   ASSERT_EQ(TASK_RUNNING, status.get().state());
 
-  // Make sure the container is still running.
-  Future<list<Docker::Container>> containers =
-    docker->ps(true, slave::DOCKER_NAME_PREFIX);
-
-  AWAIT_READY(containers);
-
-  ASSERT_TRUE(exists(containers.get(), slaveId.get(), containerId.get()));
+  ASSERT_TRUE(exists(docker, slaveId.get(), containerId.get()));
 
   driver.stop();
   driver.join();
@@ -2246,15 +2122,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_PortMapping)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
-  // We need to capture and await on the logs process's future so that
-  // we can ensure there is no child process at the end of the test.
-  // The logs future is being awaited at teardown.
-  Future<Nothing> logs;
-  EXPECT_CALL(*mockDocker, logs(_, _))
-    .WillOnce(FutureResult(&logs,
-                           Invoke((MockDocker*) docker.get(),
-                                  &MockDocker::_logs)));
-
   // We skip stopping the docker container because stopping a container
   // even when it terminated might not flush the logs and we end up
   // not getting stdout/stderr in our tests.
@@ -2290,6 +2157,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_PortMapping)
 
   const Offer& offer = offers.get()[0];
 
+  SlaveID slaveId = offer.slave_id();
+
   TaskInfo task;
   task.set_name("");
   task.mutable_task_id()->set_value("1");
@@ -2345,6 +2214,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_PortMapping)
   AWAIT_READY_FOR(statusRunning, Seconds(60));
   EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
 
+  ASSERT_TRUE(running(docker, slaveId, containerId.get()));
+
   string uuid = UUID::random().toString();
 
   // Write uuid to docker mapped host port.
@@ -2357,9 +2228,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_PortMapping)
   AWAIT_READY_FOR(statusFinished, Seconds(60));
   EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
 
-  // See above where we assign logs future for more comments.
-  AWAIT_READY_FOR(logs, Seconds(30));
-
   // Now check that the proper output is in stdout.
   Try<string> read = os::read(path::join(directory.get(), "stdout"));
 
@@ -2390,14 +2258,6 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_LaunchSandboxWithColon)
   MockDocker* mockDocker = new MockDocker(tests::flags.docker);
   Shared<Docker> docker(mockDocker);
 
-  // We need to capture and await on the logs process's future so that
-  // we can ensure there is no child process at the end of the test.
-  // The logs future is being awaited at teardown.
-  Future<Nothing> logs;
-  EXPECT_CALL(*mockDocker, logs(_, _))
-    .WillRepeatedly(FutureResult(
-        &logs, Invoke((MockDocker*)docker.get(), &MockDocker::_logs)));
-
   Fetcher fetcher;
 
   MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
@@ -2468,14 +2328,7 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_LaunchSandboxWithColon)
   AWAIT_READY_FOR(statusRunning, Seconds(60));
   EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
 
-  Future<list<Docker::Container> > containers =
-    docker->ps(true, slave::DOCKER_NAME_PREFIX);
-
-  AWAIT_READY(containers);
-
-  ASSERT_TRUE(containers.get().size() > 0);
-
-  ASSERT_TRUE(exists(containers.get(), slaveId, containerId.get()));
+  ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
 
   Future<containerizer::Termination> termination =
     dockerContainerizer.wait(containerId.get());
@@ -2485,9 +2338,6 @@ TEST_F(DockerContainerizerTest, 
ROOT_DOCKER_LaunchSandboxWithColon)
 
   AWAIT_READY(termination);
 
-  // See above where we assign logs future for more comments.
-  AWAIT_READY_FOR(logs, Seconds(30));
-
   Shutdown();
 }
 

Reply via email to