Launch docker container and log through docker executor. Review: https://reviews.apache.org/r/29337
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/88da1f6e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/88da1f6e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/88da1f6e Branch: refs/heads/master Commit: 88da1f6e05addadb9a9f559c58888cfefb58f851 Parents: 7fee619 Author: Timothy Chen <[email protected]> Authored: Tue Dec 16 13:16:42 2014 -0800 Committer: Timothy Chen <[email protected]> Committed: Fri May 22 23:13:51 2015 -0700 ---------------------------------------------------------------------- src/docker/docker.cpp | 55 ++++-- src/docker/docker.hpp | 6 +- src/docker/executor.cpp | 156 +++++++++------ src/slave/containerizer/docker.cpp | 87 +++------ src/slave/containerizer/docker.hpp | 5 - src/tests/docker_containerizer_tests.cpp | 266 ++++++-------------------- 6 files changed, 232 insertions(+), 343 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/88da1f6e/src/docker/docker.cpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp index 3a485a2..d7a33dd 100644 --- a/src/docker/docker.cpp +++ b/src/docker/docker.cpp @@ -175,6 +175,13 @@ Try<Docker*> Docker::create(const string& path, bool validate) } +void commandDiscarded(const Subprocess& s, const string& cmd) +{ + VLOG(1) << "'" << cmd << "' is being discarded"; + os::killtree(s.pid(), SIGKILL); +} + + Try<Docker::Container> Docker::Container::create(const JSON::Object& json) { map<string, JSON::Value>::const_iterator entry = @@ -280,7 +287,8 @@ Future<Nothing> Docker::run( const string& sandboxDirectory, const string& mappedDirectory, const Option<Resources>& resources, - const Option<map<string, string> >& env) const + const Option<map<string, string> >& env, + bool detached) const { if (!containerInfo.has_docker()) { return Failure("No docker info found in container info"); @@ -291,7 +299,10 @@ Future<Nothing> Docker::run( vector<string> argv; argv.push_back(path); argv.push_back("run"); - argv.push_back("-d"); + + if (detached) { + argv.push_back("-d"); + } if (dockerInfo.privileged()) { argv.push_back("--privileged"); @@ -474,8 +485,12 @@ Future<Nothing> Docker::run( path, argv, Subprocess::PATH("/dev/null"), - Subprocess::PIPE(), - Subprocess::PIPE(), + (detached + ? Subprocess::PIPE() + : Subprocess::PATH(path::join(sandboxDirectory, "stdout"))), + (detached + ? Subprocess::PIPE() + : Subprocess::PATH(path::join(sandboxDirectory, "stderr"))), None(), environment); @@ -483,9 +498,30 @@ Future<Nothing> Docker::run( return Failure(s.error()); } - return checkError(cmd, s.get()); + if (detached) { + return checkError(cmd, s.get()); + } + + return s.get().status() + .then(lambda::bind( + &Docker::_run, + lambda::_1)) + .onDiscard(lambda::bind(&commandDiscarded, s.get(), cmd)); } + +Future<Nothing> Docker::_run(const Option<int>& status) +{ + if (status.isNone()) { + return Failure("Failed to get exit status."); + } else if (status.get() != 0) { + return Failure("Container exited on error: " + WSTRINGIFY(status.get())); + } + + return Nothing(); +} + + Future<Nothing> Docker::stop( const string& container, const Duration& timeout, @@ -893,14 +929,7 @@ Future<Docker::Image> Docker::__pull( cmd, directory, image)) - .onDiscard(lambda::bind(&Docker::pullDiscarded, s_.get(), cmd)); -} - - -void Docker::pullDiscarded(const Subprocess& s, const string& cmd) -{ - VLOG(1) << "'" << cmd << "' is being discarded"; - os::killtree(s.pid(), SIGKILL); + .onDiscard(lambda::bind(&commandDiscarded, s_.get(), cmd)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/88da1f6e/src/docker/docker.hpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp index 3ebbc1f..0ba7e0e 100644 --- a/src/docker/docker.hpp +++ b/src/docker/docker.hpp @@ -89,7 +89,8 @@ public: const std::string& sandboxDirectory, const std::string& mappedDirectory, const Option<mesos::Resources>& resources = None(), - const Option<std::map<std::string, std::string> >& env = None()) const; + const Option<std::map<std::string, std::string> >& env = None(), + bool detached = true) const; // Performs 'docker stop -t TIMEOUT CONTAINER'. If remove is true then a rm -f // will be called when stop failed, otherwise a failure is returned. The @@ -149,6 +150,9 @@ private: static process::Future<Container> __inspect( const std::string& output); + static process::Future<Nothing> _run( + const Option<int>& status); + static process::Future<std::list<Container> > _ps( const Docker& docker, const std::string& cmd, http://git-wip-us.apache.org/repos/asf/mesos/blob/88da1f6e/src/docker/executor.cpp ---------------------------------------------------------------------- diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp index 1a5ab86..61aeae9 100644 --- a/src/docker/executor.cpp +++ b/src/docker/executor.cpp @@ -27,12 +27,15 @@ #include <process/protobuf.hpp> #include <process/subprocess.hpp> #include <process/reap.hpp> +#include <process/owned.hpp> #include <stout/flags.hpp> #include <stout/os.hpp> #include "common/status_utils.hpp" +#include "docker/docker.hpp" + #include "logging/logging.hpp" using std::cerr; @@ -57,11 +60,17 @@ using namespace process; class DockerExecutorProcess : public ProtobufProcess<DockerExecutorProcess> { public: - DockerExecutorProcess(const string& docker, const string& container) + DockerExecutorProcess( + const Owned<Docker>& docker, + const string& container, + const string& sandboxDirectory, + const string& mappedDirectory) : launched(false), + killed(false), docker(docker), container(container), - pid(-1) {} + sandboxDirectory(sandboxDirectory), + mappedDirectory(mappedDirectory) {} virtual ~DockerExecutorProcess() {} @@ -99,26 +108,35 @@ public: cout << "Starting task " << task.task_id().value() << endl; - Try<Subprocess> subprocess = process::subprocess( - "exit `" + docker + " wait + " + container + "`", - Subprocess::PATH("/dev/null"), - Subprocess::FD(STDOUT_FILENO), - Subprocess::FD(STDERR_FILENO)); + // We assume the Docker executor is launched from the + // DockerContainerizer, which already calls setsid before + // launching the executor. - if (subprocess.isError()) { - cerr << "Couldn't launch docker wait process: " << subprocess.error(); - abort(); - } + CHECK(task.has_container()); + CHECK(task.has_command()); + + ContainerInfo containerInfo = task.container(); - pid = subprocess.get().pid(); + CHECK(containerInfo.type() == ContainerInfo::DOCKER); - process::reap(pid) - .onAny(defer(self(), - &Self::reaped, - driver, - task.task_id(), - pid, - lambda::_1)); + Future<Nothing> run = docker->run( + containerInfo, + task.command(), + container, + sandboxDirectory, + mappedDirectory, + task.resources(), + None(), + false); + + run.onAny(defer( + self(), + &Self::reaped, + driver, + task.task_id(), + lambda::_1)); + + dockerRun = run; TaskStatus status; status.mutable_task_id()->MergeFrom(task.task_id()); @@ -139,8 +157,9 @@ public: { cout << "Shutting down" << endl; - if (pid > 0 && !killed) { - ::kill(pid, SIGKILL); + if (dockerRun.isSome() && !killed) { + Future<Nothing> dockerRun_ = dockerRun.get(); + dockerRun_.discard(); killed = true; } } @@ -151,41 +170,20 @@ private: void reaped( ExecutorDriver* driver, const TaskID& taskId, - pid_t pid, - const Future<Option<int>>& status) + const Future<Nothing>& run) { TaskState state; string message; - if (!status.isReady()) { - state = TASK_FAILED; - message = - "Failed to get exit status for Docker executor: " + - (status.isFailed() ? status.failure() : "future discarded"); - } else if (status.get().isNone()) { + if (killed) { + state = TASK_KILLED; + } else if (!run.isReady()) { state = TASK_FAILED; - message = "Failed to get exit status for Docker executor"; + message = "Docker container run error: " + + (run.isFailed() ? run.failure() : "future discarded"); } else { - int s = status.get().get(); - - // Subprocess status is gathered from waitpid, therefore we can - // get the exit status from WIFEXITED. - CHECK(WIFEXITED(s) || WIFSIGNALED(s)) << "status code: " << s; - - if (WIFEXITED(s) && WEXITSTATUS(s) == 0) { - state = TASK_FINISHED; - } else if (killed) { - // Send TASK_KILLED if the task was killed as a result of - // killTask() or shutdown(). - state = TASK_KILLED; - } else { - state = TASK_FAILED; - } - - message = "Docker " + WSTRINGIFY(s); + state = TASK_FINISHED; } - cout << message << " (pid: " << pid << ")" << endl; - TaskStatus taskStatus; taskStatus.mutable_task_id()->MergeFrom(taskId); taskStatus.set_state(state); @@ -201,10 +199,12 @@ private: bool launched; - string docker; - string container; - pid_t pid; bool killed; + Owned<Docker> docker; + string container; + string sandboxDirectory; + string mappedDirectory; + Option<Future<Nothing>> dockerRun; Option<ExecutorDriver*> driver; }; @@ -212,9 +212,18 @@ private: class DockerExecutor : public Executor { public: - DockerExecutor(const string& docker, const string& container) + DockerExecutor( + const Owned<Docker>& docker, + const string& container, + const string& sandboxDirectory, + const string& mappedDirectory) { - process = new DockerExecutorProcess(docker, container); + process = new DockerExecutorProcess( + docker, + container, + sandboxDirectory, + mappedDirectory); + spawn(process); } @@ -303,15 +312,26 @@ public: { add(&Flags::container, "container", - "The name of the docker container to wait on."); + "The name of the docker container to run.\n"); add(&Flags::docker, "docker", - "The path to the docker cli executable."); + "The path to the docker cli executable.\n"); + + add(&Flags::sandbox_directory, + "sandbox_directory", + "The path to the container sandbox that stores stdout and stderr\n" + "files that is being redirected with docker container logs.\n"); + + add(&Flags::mapped_directory, + "mapped_directory", + "The sandbox directory path that is mapped in the docker container.\n"); } Option<string> container; Option<string> docker; + Option<string> sandbox_directory; + Option<string> mapped_directory; }; @@ -351,8 +371,30 @@ int main(int argc, char** argv) return 0; } + if (flags.sandbox_directory.isNone()) { + LOG(WARNING) << "Expected sandbox directory path"; + usage(argv[0], flags); + return 0; + } + + if (flags.mapped_directory.isNone()) { + LOG(WARNING) << "Expected mapped sandbox directory path"; + usage(argv[0], flags); + return 0; + } + + Try<Docker*> docker = Docker::create(flags.docker.get()); + if (docker.isError()) { + LOG(WARNING) << "Unable to create docker abstraction: " << docker.error(); + return -1; + } + mesos::internal::DockerExecutor executor( - flags.docker.get(), flags.container.get()); + process::Owned<Docker>(docker.get()), + flags.container.get(), + flags.sandbox_directory.get(), + flags.mapped_directory.get()); + mesos::MesosExecutorDriver driver(&executor); return driver.run() == mesos::DRIVER_STOPPED ? 0 : 1; } http://git-wip-us.apache.org/repos/asf/mesos/blob/88da1f6e/src/slave/containerizer/docker.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index 50a5b72..58505f9 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -819,7 +819,10 @@ Future<pid_t> DockerContainerizerProcess::___launch( if (length != sizeof(c)) { string error = string(strerror(errno)); os::close(s.get().in().get()); - return Failure("Failed to synchronize with child process: " + error); + Failure failure("Failed to synchronize with child process: " + error); + + container->run = failure; + return failure; } return s.get().pid(); @@ -851,14 +854,29 @@ Future<Nothing> DockerContainerizerProcess::___launchInContainer( environment[variable.name()] = variable.value(); } + // We are launching a mesos-docker-executor in a docker container so + // that the containerizer can recover the executor container, as we + // are assuming this instance is launched in a docker container and + // forked processes are killed on exit. ContainerInfo containerInfo; + + // Mounting in the docker socket so the executor can communicate to + // the host docker daemon. We are assuming the current instance is + // launching docker containers to the host daemon as well. + Volume* volume = containerInfo.add_volumes(); + volume->set_host_path(flags.docker_socket); + volume->set_container_path(flags.docker_socket); + volume->set_mode(Volume::RO); + ContainerInfo::DockerInfo dockerInfo; dockerInfo.set_image(flags.docker_mesos_image.get()); containerInfo.mutable_docker()->CopyFrom(dockerInfo); string command = "mesos-docker-executor --docker=" + flags.docker + - " --container=" + container->name(); + " --container=" + container->name() + + " --sandbox_directory=" + container->directory + + " --mapped_directory=" + flags.docker_sandbox_directory; command = path::join(flags.launcher_dir, command); @@ -1003,58 +1021,6 @@ Future<pid_t> DockerContainerizerProcess::______launch( } -Future<pid_t> DockerContainerizerProcess::______launchInContainer( - const ContainerID& containerId, - pid_t pid) -{ - CHECK(containers_.contains(containerId)); - CHECK(flags.docker_mesos_image.isSome()); - - Container* container = containers_[containerId]; - - // We are launching a docker container to read the logs from - // a given launched container into the sandbox stdout and stderr - // files. This requires the docker container to run docker logs, - // which requires us mounting in the docker socket and docker - // CLI binary. - ContainerInfo containerInfo; - Volume* volume = containerInfo.add_volumes(); - volume->set_host_path(flags.docker_socket); - volume->set_container_path(flags.docker_socket); - volume->set_mode(Volume::RO); - - volume = containerInfo.add_volumes(); - volume->set_host_path(flags.docker); - volume->set_container_path(flags.docker); - volume->set_mode(Volume::RO); - - ContainerInfo::DockerInfo dockerInfo; - dockerInfo.set_image(flags.docker_mesos_image.get()); - containerInfo.mutable_docker()->CopyFrom(dockerInfo); - - string command = flags.docker + " logs --follow " + container->name() + - " 2>> " + - path::join(flags.docker_sandbox_directory, "stderr") + - " 1>> " + - path::join(flags.docker_sandbox_directory, "stdout"); - - VLOG(1) << "Running docker logs in container with command: " << command; - - CommandInfo commandInfo; - commandInfo.set_value(command); - commandInfo.set_shell(true); - - docker->run( - containerInfo, - commandInfo, - container->logName(), - container->directory, - flags.docker_sandbox_directory); - - return pid; -} - - Future<pid_t> DockerContainerizerProcess::____launchInContainer( const ContainerID& containerId) { @@ -1127,6 +1093,11 @@ Future<Nothing> DockerContainerizerProcess::update( // Store the resources for usage(). container->resources = _resources; + if (flags.docker_mesos_image.isSome()) { + LOG(INFO) << "Ignoring update as slave is running under container."; + return Nothing(); + } + #ifdef __linux__ if (!_resources.cpus().isSome() && !_resources.mem().isSome()) { LOG(WARNING) << "Ignoring update as no supported resources are present"; @@ -1486,12 +1457,11 @@ void DockerContainerizerProcess::destroy( CHECK(container->state == Container::RUNNING); - // Remove the executor and log docker containers. They might not + // Remove the executor docker containers. They might not // been configured to launch but we might have recovered containers // on previous slave run that has configured to launch executor in // docker. - docker->rm(container->logName(), true); - docker->rm(container->executorName(), true); + docker->stop(container->executorName(), Seconds(0), true); container->state = Container::DESTROYING; @@ -1537,8 +1507,7 @@ void DockerContainerizerProcess::_destroy( // after we've reaped either the container's root process (in the // event that we had just launched a container for an executor) or // the mesos-docker-executor (in the case we launched a container - // for a task). As a reminder, the mesos-docker-executor exits - // because it's doing a 'docker wait' on the container. + // for a task). LOG(INFO) << "Running docker stop on container '" << containerId << "'"; http://git-wip-us.apache.org/repos/asf/mesos/blob/88da1f6e/src/slave/containerizer/docker.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp index 0eda1c0..e683ea4 100644 --- a/src/slave/containerizer/docker.hpp +++ b/src/slave/containerizer/docker.hpp @@ -349,11 +349,6 @@ private: stringify(id); } - std::string logName() - { - return name() + DOCKER_NAME_SEPERATOR + "log"; - } - std::string executorName() { return name() + DOCKER_NAME_SEPERATOR + "executor"; http://git-wip-us.apache.org/repos/asf/mesos/blob/88da1f6e/src/tests/docker_containerizer_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp index 3fa663e..378a526 100644 --- a/src/tests/docker_containerizer_tests.cpp +++ b/src/tests/docker_containerizer_tests.cpp @@ -178,35 +178,56 @@ public: } static bool exists( - const list<Docker::Container>& containers, + const process::Shared<Docker>& docker, const SlaveID& slaveId, const ContainerID& containerId) { + Duration waited = Duration::zero(); string expectedName = containerName(slaveId, containerId); - foreach (const Docker::Container& container, containers) { - // Docker inspect name contains an extra slash in the beginning. - if (strings::contains(container.name, expectedName)) { + do { + Future<Docker::Container> container = docker->inspect(expectedName); + + if (!container.await(Seconds(3))) { + return false; + } + + if(!container.isFailed()) { return true; } - } + + os::sleep(Milliseconds(200)); + waited += Milliseconds(200); + } while (waited < Seconds(5)); return false; } static bool running( - const list<Docker::Container>& containers, + const process::Shared<Docker>& docker, + const SlaveID& slaveId, const ContainerID& containerId) { - string expectedName = slave::DOCKER_NAME_PREFIX + stringify(containerId); + Duration waited = Duration::zero(); + string expectedName = containerName(slaveId, containerId); + + do { + Future<Docker::Container> container = docker->inspect(expectedName); - foreach (const Docker::Container& container, containers) { - // Docker inspect name contains an extra slash in the beginning. - if (strings::contains(container.name, expectedName)) { - return container.pid.isSome(); + if (!container.await(Seconds(3))) { + return false; } - } + + if (container.isReady()) { + if (container.get().pid.isSome()) { + return true; + } + } + + os::sleep(Milliseconds(200)); + waited += Milliseconds(200); + } while (waited < Seconds(5)); return false; } @@ -496,12 +517,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor) AWAIT_READY_FOR(statusFinished, Seconds(60)); EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); - Future<list<Docker::Container>> containers = - docker->ps(true, slave::DOCKER_NAME_PREFIX); - - AWAIT_READY(containers); - - ASSERT_TRUE(exists(containers.get(), slaveId, containerId.get())); + ASSERT_TRUE(exists(docker, slaveId, containerId.get())); Future<containerizer::Termination> termination = dockerContainerizer.wait(containerId.get()); @@ -511,11 +527,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor) AWAIT_READY(termination); - containers = docker->ps(true, slave::DOCKER_NAME_PREFIX); - - AWAIT_READY(containers); - - ASSERT_FALSE(running(containers.get(), containerId.get())); + ASSERT_FALSE(running(docker, slaveId, containerId.get())); // See above where we assign logs future for more comments. AWAIT_READY_FOR(logs, Seconds(30)); @@ -629,12 +641,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor_Bridged) AWAIT_READY_FOR(statusFinished, Seconds(60)); EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); - Future<list<Docker::Container>> containers = - docker->ps(true, slave::DOCKER_NAME_PREFIX); - - AWAIT_READY(containers); - - ASSERT_TRUE(exists(containers.get(), slaveId, containerId.get())); + ASSERT_TRUE(exists(docker, slaveId, containerId.get())); Future<containerizer::Termination> termination = dockerContainerizer.wait(containerId.get()); @@ -644,10 +651,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor_Bridged) AWAIT_READY(termination); - containers = docker->ps(true, slave::DOCKER_NAME_PREFIX); - AWAIT_READY(containers); - - ASSERT_FALSE(running(containers.get(), containerId.get())); + ASSERT_FALSE(running(docker, slaveId, containerId.get())); // See above where we assign logs future for more comments. AWAIT_READY_FOR(logs, Seconds(30)); @@ -665,15 +669,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch) MockDocker* mockDocker = new MockDocker(tests::flags.docker); Shared<Docker> docker(mockDocker); - // We need to capture and await on the logs process's future so that - // we can ensure there is no child process at the end of the test. - // The logs future is being awaited at teardown. - Future<Nothing> logs; - EXPECT_CALL(*mockDocker, logs(_, _)) - .WillOnce(FutureResult(&logs, - Invoke((MockDocker*) docker.get(), - &MockDocker::_logs))); - slave::Flags flags = CreateSlaveFlags(); Fetcher fetcher; @@ -746,14 +741,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch) AWAIT_READY_FOR(statusRunning, Seconds(60)); EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); - Future<list<Docker::Container>> containers = - docker->ps(true, slave::DOCKER_NAME_PREFIX); - - AWAIT_READY(containers); - - ASSERT_TRUE(containers.get().size() > 0); - - ASSERT_TRUE(exists(containers.get(), slaveId, containerId.get())); + ASSERT_TRUE(exists(docker, slaveId, containerId.get())); Future<containerizer::Termination> termination = dockerContainerizer.wait(containerId.get()); @@ -763,12 +751,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch) AWAIT_READY(termination); - containers = docker->ps(true, slave::DOCKER_NAME_PREFIX); - - ASSERT_FALSE(running(containers.get(), containerId.get())); - - // See above where we assign logs future for more comments. - AWAIT_READY_FOR(logs, Seconds(30)); + ASSERT_FALSE(running(docker, slaveId, containerId.get())); Shutdown(); } @@ -782,15 +765,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill) MockDocker* mockDocker = new MockDocker(tests::flags.docker); Shared<Docker> docker(mockDocker); - // We need to capture and await on the logs process's future so that - // we can ensure there is no child process at the end of the test. - // The logs future is being awaited at teardown. - Future<Nothing> logs; - EXPECT_CALL(*mockDocker, logs(_, _)) - .WillOnce(FutureResult(&logs, - Invoke((MockDocker*) docker.get(), - &MockDocker::_logs))); - slave::Flags flags = CreateSlaveFlags(); Fetcher fetcher; @@ -822,6 +796,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill) const Offer& offer = offers.get()[0]; + SlaveID slaveId = offer.slave_id(); + TaskInfo task; task.set_name(""); task.mutable_task_id()->set_value("1"); @@ -874,19 +850,11 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill) AWAIT_READY(termination); - Future<list<Docker::Container>> containers = - docker->ps(true, slave::DOCKER_NAME_PREFIX); - - AWAIT_READY(containers); - - ASSERT_FALSE(running(containers.get(), containerId.get())); + ASSERT_FALSE(running(docker, slaveId, containerId.get())); driver.stop(); driver.join(); - // See above where we assign logs future for more comments. - AWAIT_READY_FOR(logs, Seconds(30)); - Shutdown(); } @@ -903,15 +871,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage) MockDocker* mockDocker = new MockDocker(tests::flags.docker); Shared<Docker> docker(mockDocker); - // We need to capture and await on the logs process's future so that - // we can ensure there is no child process at the end of the test. - // The logs future is being awaited at teardown. - Future<Nothing> logs; - EXPECT_CALL(*mockDocker, logs(_, _)) - .WillOnce(FutureResult(&logs, - Invoke((MockDocker*) docker.get(), - &MockDocker::_logs))); - Fetcher fetcher; MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker); @@ -991,13 +950,15 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage) do { Future<ResourceStatistics> usage = dockerContainerizer.usage(containerId.get()); - AWAIT_READY(usage); + ASSERT_TRUE(usage.await(Seconds(3))); - statistics = usage.get(); + if (usage.isReady()) { + statistics = usage.get(); - if (statistics.cpus_user_time_secs() > 0 && - statistics.cpus_system_time_secs() > 0) { - break; + if (statistics.cpus_user_time_secs() > 0 && + statistics.cpus_system_time_secs() > 0) { + break; + } } os::sleep(Milliseconds(200)); @@ -1027,9 +988,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage) driver.stop(); driver.join(); - // See above where we assign logs future for more comments. - AWAIT_READY_FOR(logs, Seconds(30)); - Shutdown(); } @@ -1045,15 +1003,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update) MockDocker* mockDocker = new MockDocker(tests::flags.docker); Shared<Docker> docker(mockDocker); - // We need to capture and await on the logs process's future so that - // we can ensure there is no child process at the end of the test. - // The logs future is being awaited at teardown. - Future<Nothing> logs; - EXPECT_CALL(*mockDocker, logs(_, _)) - .WillOnce(FutureResult(&logs, - Invoke((MockDocker*) docker.get(), - &MockDocker::_logs))); - Fetcher fetcher; MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker); @@ -1125,6 +1074,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update) AWAIT_READY_FOR(statusRunning, Seconds(60)); EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); + ASSERT_TRUE(running(docker, slaveId, containerId.get())); + string name = containerName(slaveId, containerId.get()); Future<Docker::Container> container = docker->inspect(name); @@ -1193,9 +1144,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update) driver.stop(); driver.join(); - // See above where we assign logs future for more comments. - AWAIT_READY_FOR(logs, Seconds(30)); - Shutdown(); } #endif //__linux__ @@ -1370,15 +1318,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs) MockDocker* mockDocker = new MockDocker(tests::flags.docker); Shared<Docker> docker(mockDocker); - // We need to capture and await on the logs process's future so that - // we can ensure there is no child process at the end of the test. - // The logs future is being awaited at teardown. - Future<Nothing> logs; - EXPECT_CALL(*mockDocker, logs(_, _)) - .WillOnce(FutureResult(&logs, - Invoke((MockDocker*) docker.get(), - &MockDocker::_logs))); - // We skip stopping the docker container because stopping a container // even when it terminated might not flush the logs and we end up // not getting stdout/stderr in our tests. @@ -1462,9 +1401,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs) AWAIT_READY_FOR(statusFinished, Seconds(60)); EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); - // See above where we assign logs future for more comments. - AWAIT_READY_FOR(logs, Seconds(30)); - // Now check that the proper output is in stderr and stdout (which // might also contain other things, hence the use of a UUID). Try<string> read = os::read(path::join(directory.get(), "stderr")); @@ -1498,15 +1434,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD) MockDocker* mockDocker = new MockDocker(tests::flags.docker); Shared<Docker> docker(mockDocker); - // We need to capture and await on the logs process's future so that - // we can ensure there is no child process at the end of the test. - // The logs future is being awaited at teardown. - Future<Nothing> logs; - EXPECT_CALL(*mockDocker, logs(_, _)) - .WillOnce(FutureResult(&logs, - Invoke((MockDocker*) docker.get(), - &MockDocker::_logs))); - // We skip stopping the docker container because stopping a container // even when it terminated might not flush the logs and we end up // not getting stdout/stderr in our tests. @@ -1592,9 +1519,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD) AWAIT_READY_FOR(statusFinished, Seconds(60)); EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); - // See above where we assign logs future for more comments. - AWAIT_READY_FOR(logs, Seconds(30)); - Try<string> read = os::read(path::join(directory.get(), "stdout")); ASSERT_SOME(read); @@ -1627,15 +1551,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override) MockDocker* mockDocker = new MockDocker(tests::flags.docker); Shared<Docker> docker(mockDocker); - // We need to capture and await on the logs process's future so that - // we can ensure there is no child process at the end of the test. - // The logs future is being awaited at teardown. - Future<Nothing> logs; - EXPECT_CALL(*mockDocker, logs(_, _)) - .WillOnce(FutureResult(&logs, - Invoke((MockDocker*) docker.get(), - &MockDocker::_logs))); - // We skip stopping the docker container because stopping a container // even when it terminated might not flush the logs and we end up // not getting stdout/stderr in our tests. @@ -1723,9 +1638,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override) AWAIT_READY_FOR(statusFinished, Seconds(60)); EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); - // See above where we assign logs future for more comments. - AWAIT_READY_FOR(logs, Seconds(30)); - // Now check that the proper output is in stderr and stdout. Try<string> read = os::read(path::join(directory.get(), "stdout")); @@ -1761,15 +1673,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args) MockDocker* mockDocker = new MockDocker(tests::flags.docker); Shared<Docker> docker(mockDocker); - // We need to capture and await on the logs process's future so that - // we can ensure there is no child process at the end of the test. - // The logs future is being awaited at teardown. - Future<Nothing> logs; - EXPECT_CALL(*mockDocker, logs(_, _)) - .WillOnce(FutureResult(&logs, - Invoke((MockDocker*) docker.get(), - &MockDocker::_logs))); - // We skip stopping the docker container because stopping a container // even when it terminated might not flush the logs and we end up // not getting stdout/stderr in our tests. @@ -1858,9 +1761,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args) AWAIT_READY_FOR(statusFinished, Seconds(60)); EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); - // See above where we assign logs future for more comments. - AWAIT_READY_FOR(logs, Seconds(30)); - // Now check that the proper output is in stderr and stdout. Try<string> read = os::read(path::join(directory.get(), "stdout")); @@ -1897,15 +1797,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer) MockDocker* mockDocker = new MockDocker(tests::flags.docker); Shared<Docker> docker(mockDocker); - // We need to capture and await on the logs process's future so that - // we can ensure there is no child process at the end of the test. - // The logs future is being awaited at teardown. - Future<Nothing> logs; - EXPECT_CALL(*mockDocker, logs(_, _)) - .WillOnce(FutureResult(&logs, - Invoke((MockDocker*) docker.get(), - &MockDocker::_logs))); - Fetcher fetcher; // We put the containerizer on the heap so we can more easily @@ -2019,13 +1910,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer) AWAIT_READY(status); ASSERT_EQ(TASK_RUNNING, status.get().state()); - // Make sure the container is still running. - Future<list<Docker::Container>> containers = - docker->ps(true, slave::DOCKER_NAME_PREFIX); - - AWAIT_READY(containers); - - ASSERT_TRUE(exists(containers.get(), slaveId, containerId.get())); + ASSERT_TRUE(exists(docker, slaveId, containerId.get())); Future<containerizer::Termination> termination = dockerContainerizer2->wait(containerId.get()); @@ -2035,9 +1920,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer) AWAIT_READY(termination); - // See above where we assign logs future for more comments. - AWAIT_READY_FOR(logs, Seconds(30)); - Shutdown(); delete dockerContainerizer2; @@ -2212,13 +2094,7 @@ TEST_F(DockerContainerizerTest, AWAIT_READY(status); ASSERT_EQ(TASK_RUNNING, status.get().state()); - // Make sure the container is still running. - Future<list<Docker::Container>> containers = - docker->ps(true, slave::DOCKER_NAME_PREFIX); - - AWAIT_READY(containers); - - ASSERT_TRUE(exists(containers.get(), slaveId.get(), containerId.get())); + ASSERT_TRUE(exists(docker, slaveId.get(), containerId.get())); driver.stop(); driver.join(); @@ -2246,15 +2122,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_PortMapping) MockDocker* mockDocker = new MockDocker(tests::flags.docker); Shared<Docker> docker(mockDocker); - // We need to capture and await on the logs process's future so that - // we can ensure there is no child process at the end of the test. - // The logs future is being awaited at teardown. - Future<Nothing> logs; - EXPECT_CALL(*mockDocker, logs(_, _)) - .WillOnce(FutureResult(&logs, - Invoke((MockDocker*) docker.get(), - &MockDocker::_logs))); - // We skip stopping the docker container because stopping a container // even when it terminated might not flush the logs and we end up // not getting stdout/stderr in our tests. @@ -2290,6 +2157,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_PortMapping) const Offer& offer = offers.get()[0]; + SlaveID slaveId = offer.slave_id(); + TaskInfo task; task.set_name(""); task.mutable_task_id()->set_value("1"); @@ -2345,6 +2214,8 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_PortMapping) AWAIT_READY_FOR(statusRunning, Seconds(60)); EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); + ASSERT_TRUE(running(docker, slaveId, containerId.get())); + string uuid = UUID::random().toString(); // Write uuid to docker mapped host port. @@ -2357,9 +2228,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_PortMapping) AWAIT_READY_FOR(statusFinished, Seconds(60)); EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); - // See above where we assign logs future for more comments. - AWAIT_READY_FOR(logs, Seconds(30)); - // Now check that the proper output is in stdout. Try<string> read = os::read(path::join(directory.get(), "stdout")); @@ -2390,14 +2258,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon) MockDocker* mockDocker = new MockDocker(tests::flags.docker); Shared<Docker> docker(mockDocker); - // We need to capture and await on the logs process's future so that - // we can ensure there is no child process at the end of the test. - // The logs future is being awaited at teardown. - Future<Nothing> logs; - EXPECT_CALL(*mockDocker, logs(_, _)) - .WillRepeatedly(FutureResult( - &logs, Invoke((MockDocker*)docker.get(), &MockDocker::_logs))); - Fetcher fetcher; MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker); @@ -2468,14 +2328,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon) AWAIT_READY_FOR(statusRunning, Seconds(60)); EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); - Future<list<Docker::Container> > containers = - docker->ps(true, slave::DOCKER_NAME_PREFIX); - - AWAIT_READY(containers); - - ASSERT_TRUE(containers.get().size() > 0); - - ASSERT_TRUE(exists(containers.get(), slaveId, containerId.get())); + ASSERT_TRUE(exists(docker, slaveId, containerId.get())); Future<containerizer::Termination> termination = dockerContainerizer.wait(containerId.get()); @@ -2485,9 +2338,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon) AWAIT_READY(termination); - // See above where we assign logs future for more comments. - AWAIT_READY_FOR(logs, Seconds(30)); - Shutdown(); }
