Repository: mesos Updated Branches: refs/heads/master 4a1bbf7d3 -> 3bfb136e9
Add destroy tests for docker containerizer. Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3bfb136e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3bfb136e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3bfb136e Branch: refs/heads/master Commit: 3bfb136e9dcc83715657cf7862000987fc0699b6 Parents: 4a1bbf7 Author: Timothy Chen <[email protected]> Authored: Mon Nov 3 13:59:34 2014 -0800 Committer: Timothy Chen <[email protected]> Committed: Sun Nov 9 23:26:14 2014 -0800 ---------------------------------------------------------------------- src/slave/containerizer/docker.cpp | 408 ++++---------------------- src/slave/containerizer/docker.hpp | 332 ++++++++++++++++++++- src/tests/docker_containerizer_tests.cpp | 272 +++++++++++++++++ 3 files changed, 654 insertions(+), 358 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3bfb136e/src/slave/containerizer/docker.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index 37f422a..5978ec2 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -79,329 +79,6 @@ const string DOCKER_NAME_PREFIX = "mesos-"; // Declared in header, see explanation there. const string DOCKER_SYMLINK_DIRECTORY = "docker/links"; - -class DockerContainerizerProcess - : public process::Process<DockerContainerizerProcess> -{ -public: - DockerContainerizerProcess( - const Flags& _flags, - Shared<Docker> _docker) - : flags(_flags), - docker(_docker) {} - - virtual process::Future<Nothing> recover( - const Option<state::SlaveState>& state); - - virtual process::Future<bool> launch( - const ContainerID& containerId, - const ExecutorInfo& executorInfo, - const std::string& directory, - const Option<string>& user, - const SlaveID& slaveId, - const PID<Slave>& slavePid, - bool checkpoint); - - virtual process::Future<bool> launch( - const ContainerID& containerId, - const TaskInfo& taskInfo, - const ExecutorInfo& executorInfo, - const std::string& directory, - const Option<string>& user, - const SlaveID& slaveId, - const PID<Slave>& slavePid, - bool checkpoint); - - virtual process::Future<Nothing> update( - const ContainerID& containerId, - const Resources& resources); - - virtual process::Future<ResourceStatistics> usage( - const ContainerID& containerId); - - virtual Future<containerizer::Termination> wait( - const ContainerID& containerId); - - virtual void destroy( - const ContainerID& containerId, - bool killed = true); // process is either killed or reaped. - - virtual process::Future<hashset<ContainerID> > containers(); - -private: - // Continuations and helpers. - process::Future<Nothing> fetch(const ContainerID& containerId); - - process::Future<Nothing> _fetch( - const ContainerID& containerId, - const Option<int>& status); - - process::Future<Nothing> pull( - const ContainerID& containerId, - const std::string& directory, - const std::string& image); - - process::Future<Nothing> _pull(const std::string& image); - - Try<Nothing> checkpoint( - const ContainerID& containerId, - pid_t pid); - - process::Future<Nothing> _recover( - const std::list<Docker::Container>& containers); - - process::Future<Nothing> _launch( - const ContainerID& containerId); - - process::Future<Nothing> __launch( - const ContainerID& containerId); - - // NOTE: This continuation is only applicable when launching a - // container for a task. - process::Future<pid_t> ___launch( - const ContainerID& containerId); - - // NOTE: This continuation is only applicable when launching a - // container for an executor. - process::Future<Docker::Container> ____launch( - const ContainerID& containerId); - - // NOTE: This continuation is only applicable when launching a - // container for an executor. - process::Future<pid_t> _____launch( - const ContainerID& containerId, - const Docker::Container& container); - - process::Future<bool> ______launch( - const ContainerID& containerId, - pid_t pid); - - void _destroy( - const ContainerID& containerId, - bool killed); - - void __destroy( - const ContainerID& containerId, - bool killed, - const Future<Nothing>& future); - - void ___destroy( - const ContainerID& containerId, - bool killed, - const Future<Option<int> >& status); - - process::Future<Nothing> _update( - const ContainerID& containerId, - const Resources& resources, - const Docker::Container& container); - - process::Future<Nothing> __update( - const ContainerID& containerId, - const Resources& resources, - pid_t pid); - - Future<ResourceStatistics> _usage( - const ContainerID& containerId, - const Docker::Container& container); - - Future<ResourceStatistics> __usage( - const ContainerID& containerId, - pid_t pid); - - // Call back for when the executor exits. This will trigger - // container destroy. - void reaped(const ContainerID& containerId); - - // Removes the docker container. - void remove(const std::string& container); - - const Flags flags; - - Shared<Docker> docker; - - struct Container - { - static Try<Container*> create( - const ContainerID& id, - const Option<TaskInfo>& taskInfo, - const ExecutorInfo& executorInfo, - const std::string& directory, - const Option<std::string>& user, - const SlaveID& slaveId, - const PID<Slave>& slavePid, - bool checkpoint, - const Flags& flags); - - Container(const ContainerID& id) - : state(FETCHING), id(id) {} - - Container(const ContainerID& id, - const Option<TaskInfo>& taskInfo, - const ExecutorInfo& executorInfo, - const std::string& directory, - const Option<std::string>& user, - const SlaveID& slaveId, - const PID<Slave>& slavePid, - bool checkpoint, - bool symlinked, - const Flags& flags) - : state(FETCHING), - id(id), - task(taskInfo), - executor(executorInfo), - directory(directory), - user(user), - slaveId(slaveId), - slavePid(slavePid), - checkpoint(checkpoint), - symlinked(symlinked), - flags(flags) - { - if (task.isSome()) { - resources = task.get().resources(); - } else { - resources = executor.resources(); - } - } - - ~Container() - { - if (symlinked) { - // The sandbox directory is a symlink, remove it at container - // destroy. - os::rm(directory); - } - } - - std::string name() - { - return DOCKER_NAME_PREFIX + stringify(id); - } - - std::string image() const - { - if (task.isSome()) { - return task.get().container().docker().image(); - } - - return executor.container().docker().image(); - } - - ContainerInfo container() const - { - if (task.isSome()) { - return task.get().container(); - } - - return executor.container(); - } - - CommandInfo command() const - { - if (task.isSome()) { - return task.get().command(); - } - - return executor.command(); - } - - // Returns any extra environment varaibles to set when launching - // the Docker container (beyond the those found in CommandInfo). - std::map<std::string, std::string> environment() const - { - if (task.isNone()) { - return executorEnvironment( - executor, - directory, - slaveId, - slavePid, - checkpoint, - flags.recovery_timeout); - } - - return std::map<std::string, std::string>(); - } - - // The DockerContainerier needs to be able to properly clean up - // Docker containers, regardless of when they are destroyed. For - // example, if a container gets destroyed while we are fetching, - // we need to not keep running the fetch, nor should we try and - // start the Docker container. For this reason, we've split out - // the states into: - // - // FETCHING - // PULLING - // RUNNING - // DESTROYING - // - // In particular, we made 'PULLING' be it's own state so that we - // could easily destroy and cleanup when a user initiated pulling - // a really big image but we timeout due to the executor - // registration timeout. Since we curently have no way to discard - // a Docker::run, we needed to explicitely do the pull (which is - // the part that takes the longest) so that we can also explicitly - // kill it when asked. Once the functions at Docker::* get support - // for discarding, then we won't need to make pull be it's own - // state anymore, although it doesn't hurt since it gives us - // better error messages. - enum State { - FETCHING = 1, - PULLING = 2, - RUNNING = 3, - DESTROYING = 4 - } state; - - ContainerID id; - Option<TaskInfo> task; - ExecutorInfo executor; - - // The sandbox directory for the container. This holds the - // symlinked path if symlinked boolean is true. - std::string directory; - - Option<std::string> user; - SlaveID slaveId; - PID<Slave> slavePid; - bool checkpoint; - bool symlinked; - Flags flags; - - // Promise for future returned from wait(). - Promise<containerizer::Termination> termination; - - // Exit status of executor or container (depending on whether or - // not we used the command executor). Represented as a promise so - // that destroying can chain with it being set. - Promise<Future<Option<int> > > status; - - // Future that tells us whether or not the run is still pending or - // has failed so we know whether or not to wait for 'status'. - Future<Nothing> run; - - // We keep track of the resources for each container so we can set - // the ResourceStatistics limits in usage(). Note that this is - // different than just what we might get from TaskInfo::resources - // or ExecutorInfo::resources because they can change dynamically. - Resources resources; - - // The mesos-fetcher subprocess, kept around so that we can do a - // killtree on it if we're asked to destroy a container while we - // are fetching. - Option<Subprocess> fetcher; - - // The docker pull future is stored so we can discard when - // destroy is called while docker is pulling the image. - Future<Docker::Image> pull; - - // Once the container is running, this saves the pid of the - // running container. - Option<pid_t> pid; - }; - - hashmap<ContainerID, Container*> containers_; -}; - - // Parse the ContainerID from a Docker container and return None if // the container was not launched from Mesos. Option<ContainerID> parse(const Docker::Container& container) @@ -438,19 +115,26 @@ Try<DockerContainerizer*> DockerContainerizer::create(const Flags& flags) DockerContainerizer::DockerContainerizer( + const Owned<DockerContainerizerProcess>& _process) + : process(_process) +{ + spawn(process.get()); +} + + +DockerContainerizer::DockerContainerizer( const Flags& flags, Shared<Docker> docker) + : process(new DockerContainerizerProcess(flags, docker)) { - process = new DockerContainerizerProcess(flags, docker); - spawn(process); + spawn(process.get()); } DockerContainerizer::~DockerContainerizer() { - terminate(process); - process::wait(process); - delete process; + terminate(process.get()); + process::wait(process.get()); } @@ -644,7 +328,7 @@ Try<Nothing> DockerContainerizerProcess::checkpoint( Future<Nothing> DockerContainerizer::recover( const Option<SlaveState>& state) { - return dispatch(process, &DockerContainerizerProcess::recover, state); + return dispatch(process.get(), &DockerContainerizerProcess::recover, state); } @@ -657,15 +341,16 @@ Future<bool> DockerContainerizer::launch( const PID<Slave>& slavePid, bool checkpoint) { - return dispatch(process, - &DockerContainerizerProcess::launch, - containerId, - executorInfo, - directory, - user, - slaveId, - slavePid, - checkpoint); + return dispatch( + process.get(), + &DockerContainerizerProcess::launch, + containerId, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint); } @@ -679,16 +364,17 @@ Future<bool> DockerContainerizer::launch( const PID<Slave>& slavePid, bool checkpoint) { - return dispatch(process, - &DockerContainerizerProcess::launch, - containerId, - taskInfo, - executorInfo, - directory, - user, - slaveId, - slavePid, - checkpoint); + return dispatch( + process.get(), + &DockerContainerizerProcess::launch, + containerId, + taskInfo, + executorInfo, + directory, + user, + slaveId, + slavePid, + checkpoint); } @@ -696,36 +382,46 @@ Future<Nothing> DockerContainerizer::update( const ContainerID& containerId, const Resources& resources) { - return dispatch(process, - &DockerContainerizerProcess::update, - containerId, - resources); + return dispatch( + process.get(), + &DockerContainerizerProcess::update, + containerId, + resources); } Future<ResourceStatistics> DockerContainerizer::usage( const ContainerID& containerId) { - return dispatch(process, &DockerContainerizerProcess::usage, containerId); + return dispatch( + process.get(), + &DockerContainerizerProcess::usage, + containerId); } Future<containerizer::Termination> DockerContainerizer::wait( const ContainerID& containerId) { - return dispatch(process, &DockerContainerizerProcess::wait, containerId); + return dispatch( + process.get(), + &DockerContainerizerProcess::wait, + containerId); } void DockerContainerizer::destroy(const ContainerID& containerId) { - dispatch(process, &DockerContainerizerProcess::destroy, containerId, true); + dispatch( + process.get(), + &DockerContainerizerProcess::destroy, + containerId, true); } Future<hashset<ContainerID> > DockerContainerizer::containers() { - return dispatch(process, &DockerContainerizerProcess::containers); + return dispatch(process.get(), &DockerContainerizerProcess::containers); } http://git-wip-us.apache.org/repos/asf/mesos/blob/3bfb136e/src/slave/containerizer/docker.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp index ec6b9cd..f9f3ffb 100644 --- a/src/slave/containerizer/docker.hpp +++ b/src/slave/containerizer/docker.hpp @@ -56,6 +56,10 @@ public: const Flags& flags, process::Shared<Docker> docker); + // This is only public for tests. + DockerContainerizer( + const process::Owned<DockerContainerizerProcess>& _process); + virtual ~DockerContainerizer(); virtual process::Future<Nothing> recover( @@ -92,12 +96,336 @@ public: virtual void destroy(const ContainerID& containerId); - virtual process::Future<hashset<ContainerID> > containers(); + virtual process::Future<hashset<ContainerID>> containers(); private: - DockerContainerizerProcess* process; + process::Owned<DockerContainerizerProcess> process; }; + + +class DockerContainerizerProcess + : public process::Process<DockerContainerizerProcess> +{ +public: + DockerContainerizerProcess( + const Flags& _flags, + process::Shared<Docker> _docker) + : flags(_flags), + docker(_docker) {} + + virtual process::Future<Nothing> recover( + const Option<state::SlaveState>& state); + + virtual process::Future<bool> launch( + const ContainerID& containerId, + const ExecutorInfo& executorInfo, + const std::string& directory, + const Option<std::string>& user, + const SlaveID& slaveId, + const process::PID<Slave>& slavePid, + bool checkpoint); + + virtual process::Future<bool> launch( + const ContainerID& containerId, + const TaskInfo& taskInfo, + const ExecutorInfo& executorInfo, + const std::string& directory, + const Option<std::string>& user, + const SlaveID& slaveId, + const process::PID<Slave>& slavePid, + bool checkpoint); + + virtual process::Future<Nothing> update( + const ContainerID& containerId, + const Resources& resources); + + virtual process::Future<ResourceStatistics> usage( + const ContainerID& containerId); + + virtual Future<containerizer::Termination> wait( + const ContainerID& containerId); + + virtual void destroy( + const ContainerID& containerId, + bool killed = true); // process is either killed or reaped. + + virtual process::Future<Nothing> fetch(const ContainerID& containerId); + + virtual process::Future<Nothing> pull( + const ContainerID& containerId, + const std::string& directory, + const std::string& image); + + virtual process::Future<hashset<ContainerID>> containers(); + +private: + // Continuations and helpers. + process::Future<Nothing> _fetch( + const ContainerID& containerId, + const Option<int>& status); + + process::Future<Nothing> _pull(const std::string& image); + + Try<Nothing> checkpoint( + const ContainerID& containerId, + pid_t pid); + + process::Future<Nothing> _recover( + const std::list<Docker::Container>& containers); + + process::Future<Nothing> _launch( + const ContainerID& containerId); + + process::Future<Nothing> __launch( + const ContainerID& containerId); + + // NOTE: This continuation is only applicable when launching a + // container for a task. + process::Future<pid_t> ___launch( + const ContainerID& containerId); + + // NOTE: This continuation is only applicable when launching a + // container for an executor. + process::Future<Docker::Container> ____launch( + const ContainerID& containerId); + + // NOTE: This continuation is only applicable when launching a + // container for an executor. + process::Future<pid_t> _____launch( + const ContainerID& containerId, + const Docker::Container& container); + + process::Future<bool> ______launch( + const ContainerID& containerId, + pid_t pid); + + void _destroy( + const ContainerID& containerId, + bool killed); + + void __destroy( + const ContainerID& containerId, + bool killed, + const Future<Nothing>& future); + + void ___destroy( + const ContainerID& containerId, + bool killed, + const Future<Option<int>>& status); + + process::Future<Nothing> _update( + const ContainerID& containerId, + const Resources& resources, + const Docker::Container& container); + + process::Future<Nothing> __update( + const ContainerID& containerId, + const Resources& resources, + pid_t pid); + + Future<ResourceStatistics> _usage( + const ContainerID& containerId, + const Docker::Container& container); + + Future<ResourceStatistics> __usage( + const ContainerID& containerId, + pid_t pid); + + // Call back for when the executor exits. This will trigger + // container destroy. + void reaped(const ContainerID& containerId); + + // Removes the docker container. + void remove(const std::string& container); + + const Flags flags; + + process::Shared<Docker> docker; + + struct Container + { + static Try<Container*> create( + const ContainerID& id, + const Option<TaskInfo>& taskInfo, + const ExecutorInfo& executorInfo, + const std::string& directory, + const Option<std::string>& user, + const SlaveID& slaveId, + const process::PID<Slave>& slavePid, + bool checkpoint, + const Flags& flags); + + Container(const ContainerID& id) + : state(FETCHING), id(id) {} + + Container(const ContainerID& id, + const Option<TaskInfo>& taskInfo, + const ExecutorInfo& executorInfo, + const std::string& directory, + const Option<std::string>& user, + const SlaveID& slaveId, + const process::PID<Slave>& slavePid, + bool checkpoint, + bool symlinked, + const Flags& flags) + : state(FETCHING), + id(id), + task(taskInfo), + executor(executorInfo), + directory(directory), + user(user), + slaveId(slaveId), + slavePid(slavePid), + checkpoint(checkpoint), + symlinked(symlinked), + flags(flags) + { + if (task.isSome()) { + resources = task.get().resources(); + } else { + resources = executor.resources(); + } + } + + ~Container() + { + if (symlinked) { + // The sandbox directory is a symlink, remove it at container + // destroy. + os::rm(directory); + } + } + + std::string name() + { + return DOCKER_NAME_PREFIX + stringify(id); + } + + std::string image() const + { + if (task.isSome()) { + return task.get().container().docker().image(); + } + + return executor.container().docker().image(); + } + + ContainerInfo container() const + { + if (task.isSome()) { + return task.get().container(); + } + + return executor.container(); + } + + CommandInfo command() const + { + if (task.isSome()) { + return task.get().command(); + } + + return executor.command(); + } + + // Returns any extra environment varaibles to set when launching + // the Docker container (beyond the those found in CommandInfo). + std::map<std::string, std::string> environment() const + { + if (task.isNone()) { + return executorEnvironment( + executor, + directory, + slaveId, + slavePid, + checkpoint, + flags.recovery_timeout); + } + + return std::map<std::string, std::string>(); + } + + // The DockerContainerier needs to be able to properly clean up + // Docker containers, regardless of when they are destroyed. For + // example, if a container gets destroyed while we are fetching, + // we need to not keep running the fetch, nor should we try and + // start the Docker container. For this reason, we've split out + // the states into: + // + // FETCHING + // PULLING + // RUNNING + // DESTROYING + // + // In particular, we made 'PULLING' be it's own state so that we + // could easily destroy and cleanup when a user initiated pulling + // a really big image but we timeout due to the executor + // registration timeout. Since we curently have no way to discard + // a Docker::run, we needed to explicitely do the pull (which is + // the part that takes the longest) so that we can also explicitly + // kill it when asked. Once the functions at Docker::* get support + // for discarding, then we won't need to make pull be it's own + // state anymore, although it doesn't hurt since it gives us + // better error messages. + enum State { + FETCHING = 1, + PULLING = 2, + RUNNING = 3, + DESTROYING = 4 + } state; + + ContainerID id; + Option<TaskInfo> task; + ExecutorInfo executor; + + // The sandbox directory for the container. This holds the + // symlinked path if symlinked boolean is true. + std::string directory; + + Option<std::string> user; + SlaveID slaveId; + process::PID<Slave> slavePid; + bool checkpoint; + bool symlinked; + Flags flags; + + // Promise for future returned from wait(). + Promise<containerizer::Termination> termination; + + // Exit status of executor or container (depending on whether or + // not we used the command executor). Represented as a promise so + // that destroying can chain with it being set. + Promise<Future<Option<int>>> status; + + // Future that tells us whether or not the run is still pending or + // has failed so we know whether or not to wait for 'status'. + Future<Nothing> run; + + // We keep track of the resources for each container so we can set + // the ResourceStatistics limits in usage(). Note that this is + // different than just what we might get from TaskInfo::resources + // or ExecutorInfo::resources because they can change dynamically. + Resources resources; + + // The mesos-fetcher subprocess, kept around so that we can do a + // killtree on it if we're asked to destroy a container while we + // are fetching. + Option<Subprocess> fetcher; + + // The docker pull future is stored so we can discard when + // destroy is called while docker is pulling the image. + Future<Docker::Image> pull; + + // Once the container is running, this saves the pid of the + // running container. + Option<pid_t> pid; + }; + + hashmap<ContainerID, Container*> containers_; +}; + + } // namespace slave { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/3bfb136e/src/tests/docker_containerizer_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp index 9d4ccc5..66552ad 100644 --- a/src/tests/docker_containerizer_tests.cpp +++ b/src/tests/docker_containerizer_tests.cpp @@ -21,6 +21,7 @@ #include <process/future.hpp> #include <process/gmock.hpp> +#include <process/owned.hpp> #include <process/subprocess.hpp> #include "linux/cgroups.hpp" @@ -48,6 +49,7 @@ using mesos::internal::master::Master; using mesos::internal::slave::Slave; using mesos::internal::slave::DockerContainerizer; +using mesos::internal::slave::DockerContainerizerProcess; using process::Future; using process::Message; @@ -59,6 +61,7 @@ using std::list; using std::string; using testing::_; +using testing::DoAll; using testing::DoDefault; using testing::Eq; using testing::Invoke; @@ -164,6 +167,17 @@ public: Shared<Docker> docker) : DockerContainerizer(flags, docker) { + initialize(); + } + + MockDockerContainerizer(const Owned<DockerContainerizerProcess>& process) + : DockerContainerizer(process) + { + initialize(); + } + + void initialize() + { // NOTE: See TestContainerizer::setup for why we use // 'EXPECT_CALL' and 'WillRepeatedly' here instead of // 'ON_CALL' and 'WillByDefault'. @@ -259,6 +273,50 @@ public: }; +class MockDockerContainerizerProcess : public DockerContainerizerProcess +{ +public: + MockDockerContainerizerProcess( + const slave::Flags& flags, + const Shared<Docker>& docker) + : DockerContainerizerProcess(flags, docker) + { + EXPECT_CALL(*this, fetch(_)) + .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_fetch)); + + EXPECT_CALL(*this, pull(_, _, _)) + .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_pull)); + } + + MOCK_METHOD1( + fetch, + process::Future<Nothing>(const ContainerID& containerId)); + + MOCK_METHOD3( + pull, + process::Future<Nothing>( + const ContainerID& containerId, + const std::string& directory, + const std::string& image)); + + process::Future<Nothing> _fetch(const ContainerID& containerId) + { + return DockerContainerizerProcess::fetch(containerId); + } + + process::Future<Nothing> _pull( + const ContainerID& containerId, + const std::string& directory, + const std::string& image) + { + return DockerContainerizerProcess::pull( + containerId, + directory, + image); + } +}; + + // Only enable executor launch on linux as other platforms // requires running linux VM and need special port forwarding // to get host networking to work. @@ -2276,3 +2334,217 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon) Shutdown(); } + + +TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching) +{ + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = CreateSlaveFlags(); + + MockDocker* mockDocker = new MockDocker(tests::flags.docker); + Shared<Docker> docker(mockDocker); + + // The docker containerizer will free the process, so we must + // allocate on the heap. + MockDockerContainerizerProcess* process = + new MockDockerContainerizerProcess(flags, docker); + + MockDockerContainerizer dockerContainerizer( + (Owned<DockerContainerizerProcess>(process))); + + Promise<Nothing> promise; + Future<Nothing> fetch; + + // We want to pause the fetch call to simulate a long fetch time. + EXPECT_CALL(*process, fetch(_)) + .WillOnce(DoAll(FutureSatisfy(&fetch), + Return(promise.future()))); + + Try<PID<Slave> > slave = StartSlave(&dockerContainerizer); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + Future<vector<Offer> > offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + const Offer& offer = offers.get()[0]; + + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->CopyFrom(offer.slave_id()); + task.mutable_resources()->CopyFrom(offer.resources()); + + CommandInfo command; + command.set_value("sleep 1000"); + + ContainerInfo containerInfo; + containerInfo.set_type(ContainerInfo::DOCKER); + + ContainerInfo::DockerInfo dockerInfo; + dockerInfo.set_image("busybox"); + containerInfo.mutable_docker()->CopyFrom(dockerInfo); + + task.mutable_command()->CopyFrom(command); + task.mutable_container()->CopyFrom(containerInfo); + + vector<TaskInfo> tasks; + tasks.push_back(task); + + Future<TaskStatus> statusFailed; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusFailed)); + + Future<ContainerID> containerId; + EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _)) + .WillOnce(DoAll(FutureArg<0>(&containerId), + Invoke(&dockerContainerizer, + &MockDockerContainerizer::_launch))); + + driver.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY_FOR(containerId, Seconds(60)); + + AWAIT_READY(fetch); + + dockerContainerizer.destroy(containerId.get()); + + // Resume docker launch. + promise.set(Nothing()); + + AWAIT_READY(statusFailed); + + EXPECT_EQ(TASK_FAILED, statusFailed.get().state()); + + driver.stop(); + driver.join(); + + Shutdown(); +} + + +TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling) +{ + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = CreateSlaveFlags(); + + MockDocker* mockDocker = new MockDocker(tests::flags.docker); + Shared<Docker> docker(mockDocker); + + // The docker containerizer will free the process, so we must + // allocate on the heap. + MockDockerContainerizerProcess* process = + new MockDockerContainerizerProcess(flags, docker); + + MockDockerContainerizer dockerContainerizer( + (Owned<DockerContainerizerProcess>(process))); + + Future<Nothing> fetch; + EXPECT_CALL(*process, fetch(_)) + .WillOnce(DoAll(FutureSatisfy(&fetch), + Return(Nothing()))); + + Promise<Nothing> promise; + + // We want to pause the fetch call to simulate a long fetch time. + EXPECT_CALL(*process, pull(_, _, _)) + .WillOnce(Return(promise.future())); + + Try<PID<Slave> > slave = StartSlave(&dockerContainerizer); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + Future<vector<Offer> > offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + const Offer& offer = offers.get()[0]; + + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->CopyFrom(offer.slave_id()); + task.mutable_resources()->CopyFrom(offer.resources()); + + CommandInfo command; + command.set_value("sleep 1000"); + + ContainerInfo containerInfo; + containerInfo.set_type(ContainerInfo::DOCKER); + + ContainerInfo::DockerInfo dockerInfo; + dockerInfo.set_image("busybox"); + containerInfo.mutable_docker()->CopyFrom(dockerInfo); + + task.mutable_command()->CopyFrom(command); + task.mutable_container()->CopyFrom(containerInfo); + + Future<TaskStatus> statusFailed; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusFailed)); + + Future<ContainerID> containerId; + EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _)) + .WillOnce(DoAll(FutureArg<0>(&containerId), + Invoke(&dockerContainerizer, + &MockDockerContainerizer::_launch))); + + vector<TaskInfo> tasks; + tasks.push_back(task); + + driver.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY_FOR(containerId, Seconds(60)); + + // Wait until fetch is finished. + AWAIT_READY(fetch); + + dockerContainerizer.destroy(containerId.get()); + + // Resume docker launch. + promise.set(Nothing()); + + AWAIT_READY(statusFailed); + + EXPECT_EQ(TASK_FAILED, statusFailed.get().state()); + + driver.stop(); + driver.join(); + + Shutdown(); +}
