Implemented launching executors in DockerContainerizer. Also added a test for running an executor that builds a Docker image from a Dockerfile.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2caf7b9a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2caf7b9a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2caf7b9a Branch: refs/heads/master Commit: 2caf7b9a42c5edd17d980ca1f24dd6adededb1ce Parents: fb27015 Author: Timothy Chen <[email protected]> Authored: Mon Jul 7 11:02:45 2014 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Mon Aug 4 15:08:17 2014 -0700 ---------------------------------------------------------------------- src/Makefile.am | 3 + src/docker/docker.cpp | 36 ++-- src/docker/docker.hpp | 3 +- src/slave/containerizer/docker.cpp | 160 ++++++++++++++- src/tests/docker_containerizer_tests.cpp | 199 ++++++++++++++++--- src/tests/environment.cpp | 34 +++- .../mesos_test_executor_docker_image/Dockerfile | 16 ++ .../mesos_test_executor_docker_image/install.sh | 6 + 8 files changed, 409 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/2caf7b9a/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index d0b3285..850fad3 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1154,6 +1154,9 @@ EXTRA_DIST += examples/python/test_containerizer.py \ examples/python/test_framework.py +# Docker test executor image files. +EXTRA_DIST += tests/mesos_test_executor_docker_image/Dockerfile \ + tests/mesos_test_executor_docker_image/install.sh dist_check_SCRIPTS += \ tests/balloon_framework_test.sh \ http://git-wip-us.apache.org/repos/asf/mesos/blob/2caf7b9a/src/docker/docker.cpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp index 1142ef3..4842cee 100644 --- a/src/docker/docker.cpp +++ b/src/docker/docker.cpp @@ -116,27 +116,37 @@ Future<Option<int> > Docker::run( const string& image, const string& command, const string& name, - const mesos::Resources& resources) const + const Option<mesos::Resources>& resources, + const Option<map<string, string> >& env) const { - CHECK(resources.size() != 0); string cmd = " run -d"; - // TODO(yifan): Support other resources (e.g. disk, ports). - Option<double> cpus = resources.cpus(); - if (cpus.isSome()) { - uint64_t cpuShare = - std::max((uint64_t) (CPU_SHARES_PER_CPU * cpus.get()), MIN_CPU_SHARES); - cmd += " -c " + stringify(cpuShare); + if (resources.isSome()) { + // TODO(yifan): Support other resources (e.g. disk, ports). + Option<double> cpus = resources.get().cpus(); + if (cpus.isSome()) { + uint64_t cpuShare = + std::max((uint64_t) (CPU_SHARES_PER_CPU * cpus.get()), MIN_CPU_SHARES); + cmd += " -c " + stringify(cpuShare); + } + + Option<Bytes> mem = resources.get().mem(); + if (mem.isSome()) { + Bytes memLimit = std::max(mem.get(), MIN_MEMORY); + cmd += " -m " + stringify(memLimit.bytes()); + } } - Option<Bytes> mem = resources.mem(); - if (mem.isSome()) { - Bytes memLimit = std::max(mem.get(), MIN_MEMORY); - cmd += " -m " + stringify(memLimit.bytes()); + if (env.isSome()) { + foreachpair (string key, string value, env.get()) { + key = strings::replace(key, "\"", "\\\""); + value = strings::replace(value, "\"", "\\\""); + cmd += " -e \"" + key + "=" + value + "\""; + } } - cmd += " --name=" + name + " " + image + " " + command; + cmd += " --net=host --name=" + name + " " + image + " " + command; VLOG(1) << "Running " << path << cmd; http://git-wip-us.apache.org/repos/asf/mesos/blob/2caf7b9a/src/docker/docker.hpp ---------------------------------------------------------------------- diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp index 912859c..c4724de 100644 --- a/src/docker/docker.hpp +++ b/src/docker/docker.hpp @@ -67,7 +67,8 @@ public: const std::string& image, const std::string& command, const std::string& name, - const mesos::Resources& resources) const; + const Option<mesos::Resources>& resources = None(), + const Option<std::map<std::string, std::string> >& env = None()) const; // Performs 'docker kill CONTAINER'. process::Future<Option<int> > kill( http://git-wip-us.apache.org/repos/asf/mesos/blob/2caf7b9a/src/slave/containerizer/docker.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index 60ac262..7d3549c 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -134,6 +134,23 @@ private: bool checkpoint, const Option<int>& status); + process::Future<bool> _launch( + const ContainerID& containerId, + const ExecutorInfo& executorInfo, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint, + const Option<int>& status); + + process::Future<bool> __launch( + const ContainerID& containerId, + const ExecutorInfo& executorInfo, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint, + const Docker::Container& container); + + void _destroy( const ContainerID& containerId, const bool& killed, @@ -470,8 +487,68 @@ Future<bool> DockerContainerizerProcess::launch( const PID<Slave>& slavePid, bool checkpoint) { - // TODO(benh): Implement support for launching an ExecutorInfo. - return false; + if (promises.contains(containerId)) { + LOG(ERROR) << "Cannot start already running container '" + << containerId << "'"; + return Failure("Container already started"); + } + + CommandInfo command = executorInfo.command(); + + if (!command.has_container()) { + return false; + } + + string image = command.container().image(); + + // Check if we should try and launch this command. + if (!strings::startsWith(image, "docker:///")) { + return false; + } + + Owned<Promise<containerizer::Termination> > promise( + new Promise<containerizer::Termination>()); + + promises.put(containerId, promise); + + LOG(INFO) << "Starting container '" << containerId + << "' for executor '" << executorInfo.executor_id() + << "' and framework '" << executorInfo.framework_id() << "'"; + + // Extract the Docker image. + image = strings::remove(image, "docker:///", strings::PREFIX); + + // Construct the Docker container name. + string name = DOCKER_NAME_PREFIX + stringify(containerId); + + map<string, string> env = executorEnvironment( + executorInfo, + directory, + slaveId, + slavePid, + checkpoint, + flags.recovery_timeout); + + // Include any environment variables from CommandInfo. + foreach (const Environment::Variable& variable, + command.environment().variables()) { + env[variable.name()] = variable.value(); + } + + Resources resources = executorInfo.resources(); + + // Start a docker container then launch the executor (but destroy + // the Docker container if launching the executor failed). + return docker.run(image, command.value(), name, resources, env) + .then(defer(self(), + &Self::_launch, + containerId, + executorInfo, + slaveId, + slavePid, + checkpoint, + lambda::_1)) + .onFailed(defer(self(), &Self::destroy, containerId, false)); } @@ -646,6 +723,84 @@ Future<bool> DockerContainerizerProcess::_launch( } +Future<bool> DockerContainerizerProcess::_launch( + const ContainerID& containerId, + const ExecutorInfo& executorInfo, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint, + const Option<int>& status) +{ + if (status.isSome() && status.get() != 0) { + // Best effort kill and remove the container just in case. + docker.killAndRm(DOCKER_NAME_PREFIX + stringify(containerId)); + return Failure("Failed to run the container (" + + WSTRINGIFY(status.get()) + ")"); + } + + return docker.inspect(DOCKER_NAME_PREFIX + stringify(containerId)) + .then(defer(self(), + &Self::__launch, + containerId, + executorInfo, + slaveId, + slavePid, + checkpoint, + lambda::_1)); +} + + +Future<bool> DockerContainerizerProcess::__launch( + const ContainerID& containerId, + const ExecutorInfo& executorInfo, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint, + const Docker::Container& container) +{ + Option<int> pid = container.pid(); + + if (!pid.isSome()) { + return Failure("Unable to get executor pid after launch"); + } + + if (checkpoint) { + // TODO(tnachen): We might not be able to checkpoint + // if the slave dies before it can checkpoint while + // the executor is still running. Optinally we can consider + // recording the slave id and executor id as part of the + // docker container name so we can recover from this. + const string& path = + slave::paths::getForkedPidPath( + slave::paths::getMetaRootDir(flags.work_dir), + slaveId, + executorInfo.framework_id(), + executorInfo.executor_id(), + containerId); + + LOG(INFO) << "Checkpointing executor's forked pid " + << pid.get() << " to '" << path << "'"; + + Try<Nothing> checkpointed = + slave::state::checkpoint(path, stringify(pid.get())); + + if (checkpointed.isError()) { + LOG(ERROR) << "Failed to checkpoint executor's forked pid to '" + << path << "': " << checkpointed.error(); + + return Failure("Could not checkpoint executor's pid"); + } + } + + statuses[containerId] = process::reap(pid.get()); + + statuses[containerId] + .onAny(defer(self(), &Self::reaped, containerId)); + + return true; +} + + Future<Nothing> DockerContainerizerProcess::update( const ContainerID& containerId, const Resources& _resources) @@ -865,6 +1020,7 @@ Future<ResourceStatistics> DockerContainerizerProcess::_usage( if (cpus.isSome()) { result.set_cpus_limit(cpus.get()); } + return result; } http://git-wip-us.apache.org/repos/asf/mesos/blob/2caf7b9a/src/tests/docker_containerizer_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp index 2fa18e0..1f5bc60 100644 --- a/src/tests/docker_containerizer_tests.cpp +++ b/src/tests/docker_containerizer_tests.cpp @@ -54,7 +54,26 @@ using testing::DoDefault; using testing::Invoke; using testing::Return; -class DockerContainerizerTest : public MesosTest {}; +class DockerContainerizerTest : public MesosTest +{ +public: + static bool containerExists( + const list<Docker::Container>& containers, + const ContainerID& containerId) + { + string expectedName = slave::DOCKER_NAME_PREFIX + containerId.value(); + + foreach (const Docker::Container& container, containers) { + // Docker inspect name contains an extra slash in the beginning. + if (strings::contains(container.name(), expectedName)) { + return true; + } + } + + return false; + } +}; + class MockDockerContainerizer : public DockerContainerizer { public: @@ -64,6 +83,9 @@ public: const Docker& docker) : DockerContainerizer(flags, local, docker) { + EXPECT_CALL(*this, launch(_, _, _, _, _, _, _)) + .WillRepeatedly(Invoke(this, &MockDockerContainerizer::_launchExecutor)); + EXPECT_CALL(*this, launch(_, _, _, _, _, _, _, _)) .WillRepeatedly(Invoke(this, &MockDockerContainerizer::_launch)); @@ -71,6 +93,19 @@ public: .WillRepeatedly(Invoke(this, &MockDockerContainerizer::_update)); } + + MOCK_METHOD7( + launch, + process::Future<bool>( + const ContainerID&, + const ExecutorInfo&, + const std::string&, + const Option<std::string>&, + const SlaveID&, + const process::PID<slave::Slave>&, + bool checkpoint)); + + MOCK_METHOD8( launch, process::Future<bool>( @@ -112,6 +147,25 @@ public: checkpoint); } + process::Future<bool> _launchExecutor( + const ContainerID& containerId, + const ExecutorInfo& executorInfo, + const string& directory, + const Option<string>& user, + const SlaveID& slaveId, + const PID<Slave>& slavePid, + bool checkpoint) + { + return DockerContainerizer::launch( + containerId, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint); + } + process::Future<Nothing> _update( const ContainerID& containerId, const Resources& resources) @@ -123,7 +177,11 @@ public: }; -TEST_F(DockerContainerizerTest, DOCKER_Launch) +// Only enable executor launch on linux as other platforms +// requires running linux VM and need special port forwarding +// to get host networking to work. +#ifdef __linux__ +TEST_F(DockerContainerizerTest, DOCKER_Launch_Executor) { Try<PID<Master> > master = StartMaster(); ASSERT_SOME(master); @@ -165,6 +223,107 @@ TEST_F(DockerContainerizerTest, DOCKER_Launch) task.mutable_slave_id()->CopyFrom(offer.slave_id()); task.mutable_resources()->CopyFrom(offer.resources()); + ExecutorInfo executorInfo; + ExecutorID executorId; + executorId.set_value("e1"); + executorInfo.mutable_executor_id()->CopyFrom(executorId); + CommandInfo command; + command.set_value("test-executor"); + command.mutable_container()->set_image("docker:///mesos/test-executor"); + executorInfo.mutable_command()->CopyFrom(command); + + task.mutable_executor()->CopyFrom(executorInfo); + + vector<TaskInfo> tasks; + tasks.push_back(task); + + Future<ContainerID> containerId; + EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _)) + .WillOnce(DoAll(FutureArg<0>(&containerId), + Invoke(&dockerContainerizer, + &MockDockerContainerizer::_launchExecutor))); + + Future<TaskStatus> statusRunning; + Future<TaskStatus> statusFinished; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusRunning)) + .WillOnce(FutureArg<1>(&statusFinished)); + + driver.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY_FOR(containerId, Seconds(60)); + AWAIT_READY_FOR(statusRunning, Seconds(60)); + EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); + AWAIT_READY_FOR(statusFinished, Seconds(60)); + EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); + + Future<list<Docker::Container> > containers = + docker.ps(true, slave::DOCKER_NAME_PREFIX); + + AWAIT_READY(containers); + + ASSERT_TRUE(containerExists(containers.get(), containerId.get())); + + Future<containerizer::Termination> termination = + dockerContainerizer.wait(containerId.get()); + + driver.stop(); + driver.join(); + + AWAIT_READY(termination); + + containers = docker.ps(true, slave::DOCKER_NAME_PREFIX); + AWAIT_READY(containers); + + ASSERT_FALSE(containerExists(containers.get(), containerId.get())); + + Shutdown(); +} +#endif // __linux__ + + +TEST_F(DockerContainerizerTest, DOCKER_Launch) +{ + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = CreateSlaveFlags(); + + Docker docker(tests::flags.docker); + + MockDockerContainerizer dockerContainerizer(flags, true, docker); + + Try<PID<Slave> > slave = StartSlave(&dockerContainerizer); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + Future<vector<Offer> > offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + const Offer& offer = offers.get()[0]; + + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->CopyFrom(offer.slave_id()); + task.mutable_resources()->CopyFrom(offer.resources()); + CommandInfo command; CommandInfo::ContainerInfo* containerInfo = command.mutable_container(); containerInfo->set_image("docker:///busybox"); @@ -199,18 +358,7 @@ TEST_F(DockerContainerizerTest, DOCKER_Launch) ASSERT_TRUE(containers.get().size() > 0); - bool foundContainer = false; - string expectedName = slave::DOCKER_NAME_PREFIX + containerId.get().value(); - - foreach (const Docker::Container& container, containers.get()) { - // Docker inspect name contains an extra slash in the beginning. - if (strings::contains(container.name(), expectedName)) { - foundContainer = true; - break; - } - } - - ASSERT_TRUE(foundContainer); + ASSERT_TRUE(containerExists(containers.get(), containerId.get())); dockerContainerizer.destroy(containerId.get()); @@ -237,7 +385,7 @@ TEST_F(DockerContainerizerTest, DOCKER_Kill) MockScheduler sched; MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); Future<FrameworkID> frameworkId; EXPECT_CALL(sched, registered(&driver, _, _)) @@ -308,18 +456,7 @@ TEST_F(DockerContainerizerTest, DOCKER_Kill) AWAIT_READY(containers); - bool foundContainer = false; - string expectedName = slave::DOCKER_NAME_PREFIX + containerId.get().value(); - - foreach (const Docker::Container& container, containers.get()) { - // Docker inspect name contains an extra slash in the beginning. - if (strings::contains(container.name(), expectedName)) { - foundContainer = true; - break; - } - } - - ASSERT_FALSE(foundContainer); + ASSERT_FALSE(containerExists(containers.get(), containerId.get())); driver.stop(); driver.join(); @@ -346,7 +483,7 @@ TEST_F(DockerContainerizerTest, DOCKER_Usage) MockScheduler sched; MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); Future<FrameworkID> frameworkId; EXPECT_CALL(sched, registered(&driver, _, _)) @@ -465,7 +602,7 @@ TEST_F(DockerContainerizerTest, DOCKER_Update) MockScheduler sched; MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); Future<FrameworkID> frameworkId; EXPECT_CALL(sched, registered(&driver, _, _)) @@ -504,8 +641,8 @@ TEST_F(DockerContainerizerTest, DOCKER_Update) Future<ContainerID> containerId; EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _)) .WillOnce(DoAll(FutureArg<0>(&containerId), - Invoke(&dockerContainerizer, - &MockDockerContainerizer::_launch))); + Invoke(&dockerContainerizer, + &MockDockerContainerizer::_launch))); Future<TaskStatus> statusRunning; EXPECT_CALL(sched, statusUpdate(&driver, _)) http://git-wip-us.apache.org/repos/asf/mesos/blob/2caf7b9a/src/tests/environment.cpp ---------------------------------------------------------------------- diff --git a/src/tests/environment.cpp b/src/tests/environment.cpp index b75c485..b1c70e7 100644 --- a/src/tests/environment.cpp +++ b/src/tests/environment.cpp @@ -29,6 +29,7 @@ #include <process/gmock.hpp> #include <process/gtest.hpp> +#include <process/subprocess.hpp> #include <stout/check.hpp> #include <stout/error.hpp> @@ -143,7 +144,38 @@ static bool enable(const ::testing::TestInfo& test) } #ifdef __linux__ - return user.get() == "root" && !validate.isError(); + if (user.get() == "root" && !validate.isError()) { + // Install docker test executor image for testing launching + // executor in docker image. + Try<process::Subprocess> install = + process::subprocess( + path::join( + flags.source_dir, + "src", + "tests", + "mesos_test_executor_docker_image", + "install.sh")); + + if (install.isError()) { + std::cerr + << "Unable to launch test executor install script: " + << install.error() + << std::endl; + return false; + } + + process::Future<Option<int> > status = install.get().status(); + status.await(Minutes(2)); + + if (!status.isReady() || !status.get().isSome() || status.get() != 0) { + std::cerr << "Unable to install test executor"; + return false; + } + + return true; + } + + return false; #else return !validate.isError(); #endif http://git-wip-us.apache.org/repos/asf/mesos/blob/2caf7b9a/src/tests/mesos_test_executor_docker_image/Dockerfile ---------------------------------------------------------------------- diff --git a/src/tests/mesos_test_executor_docker_image/Dockerfile b/src/tests/mesos_test_executor_docker_image/Dockerfile new file mode 100644 index 0000000..8ecc374 --- /dev/null +++ b/src/tests/mesos_test_executor_docker_image/Dockerfile @@ -0,0 +1,16 @@ +FROM stackbrew/ubuntu:13.10 +MAINTAINER Timothy Chen <[email protected]> + +ADD http://downloads.mesosphere.io/master/ubuntu/13.10/mesos-test-executor.deb /tmp/mesos.deb + +RUN ["env", "DEBIAN_FRONTEND=noninteractive", "apt-get", "update"] +RUN ["env", "DEBIAN_FRONTEND=noninteractive", "apt-get", "install", "-y", "--fix-missing", "--force-yes", "libsasl2-2", "libcurl3"] +RUN ["env", "DEBIAN_FRONTEND=noninteractive", "apt-get", "install", "-y", "--fix-missing", "--force-yes", "default-jre-headless"] + +RUN ["env", "DEBIAN_FRONTEND=noninteractive", "dpkg", "-i", "/tmp/mesos.deb"] +RUN ["bash", "-c", "echo manual > /etc/init/mesos-master.override"] +RUN ["bash", "-c", "echo manual > /etc/init/mesos-slave.override"] + +RUN ["rm", "-rf", "/tmp/mesos.deb"] + +CMD ["true"] http://git-wip-us.apache.org/repos/asf/mesos/blob/2caf7b9a/src/tests/mesos_test_executor_docker_image/install.sh ---------------------------------------------------------------------- diff --git a/src/tests/mesos_test_executor_docker_image/install.sh b/src/tests/mesos_test_executor_docker_image/install.sh new file mode 100755 index 0000000..dcec4e0 --- /dev/null +++ b/src/tests/mesos_test_executor_docker_image/install.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +docker images | cut -d" " -f1 | grep -q mesos/test-executor +if [ $? -ne 0 ]; then + docker build -t mesos/test-executor `dirname $0` +fi
