Repository: mesos Updated Branches: refs/heads/master 454cdf42d -> 541b3d963
Fixed persistent volumes with docker tasks. Review: https://reviews.apache.org/r/43015 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/541b3d96 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/541b3d96 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/541b3d96 Branch: refs/heads/master Commit: 541b3d963cccf07e979ce5362cbb6ace0144f31a Parents: 454cdf4 Author: Timothy Chen <[email protected]> Authored: Fri Jan 29 18:09:52 2016 -0500 Committer: Timothy Chen <[email protected]> Committed: Thu Feb 18 14:02:42 2016 -0800 ---------------------------------------------------------------------- src/slave/containerizer/docker.cpp | 270 ++++++++++- src/slave/containerizer/docker.hpp | 17 +- .../docker_containerizer_tests.cpp | 480 +++++++++++++++++++ 3 files changed, 762 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/541b3d96/src/slave/containerizer/docker.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index ed1c9a5..50248e5 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -22,6 +22,7 @@ #include <mesos/slave/container_logger.hpp> #include <process/check.hpp> +#include <process/collect.hpp> #include <process/defer.hpp> #include <process/delay.hpp> #include <process/io.hpp> @@ -29,6 +30,7 @@ #include <process/reap.hpp> #include <process/subprocess.hpp> +#include <stout/adaptor.hpp> #include <stout/fs.hpp> #include <stout/hashmap.hpp> #include <stout/hashset.hpp> @@ -40,6 +42,7 @@ #ifdef __linux__ #include "linux/cgroups.hpp" +#include "linux/fs.hpp" #include "linux/systemd.hpp" #endif // __linux__ @@ -155,6 +158,9 @@ Try<DockerContainerizer*> DockerContainerizer::create( } } + // TODO(tnachen): We should also mark the work directory as shared + // mount here, more details please refer to MESOS-3483. + return new DockerContainerizer( flags, fetcher, @@ -387,6 +393,163 @@ Future<Nothing> DockerContainerizerProcess::pull( } +Try<Nothing> DockerContainerizerProcess::updatePersistentVolumes( + const ContainerID& containerId, + const string& directory, + const Resources& current, + const Resources& updated) +{ + // Docker Containerizer currently is only expected to run on Linux. +#ifdef __linux__ + // Unmount all persistent volumes that are no longer present. + foreach (const Resource& resource, current.persistentVolumes()) { + // This is enforced by the master. + CHECK(resource.disk().has_volume()); + + // Ignore absolute and nested paths. + const string& containerPath = resource.disk().volume().container_path(); + if (strings::contains(containerPath, "/")) { + LOG(WARNING) << "Skipping updating mount for persistent volume " + << resource << " of container " << containerId + << " because the container path '" << containerPath + << "' contains slash"; + continue; + } + + if (updated.contains(resource)) { + continue; + } + + const string target = path::join( + directory, resource.disk().volume().container_path()); + + Try<Nothing> unmount = fs::unmount(target); + if (unmount.isError()) { + return Error("Failed to unmount persistent volume at '" + target + + "': " + unmount.error()); + } + + // TODO(tnachen): Remove mount point after unmounting. This requires + // making sure the work directory is marked as a shared mount. For + // more details please refer to MESOS-3483. + } + + // Set the ownership of the persistent volume to match that of the + // sandbox directory. + // + // NOTE: Currently, persistent volumes in Mesos are exclusive, + // meaning that if a persistent volume is used by one task or + // executor, it cannot be concurrently used by other task or + // executor. But if we allow multiple executors to use same + // persistent volume at the same time in the future, the ownership + // of the persistent volume may conflict here. + // + // TODO(haosdent): Consider letting the frameworks specify the + // user/group of the persistent volumes. + struct stat s; + if (::stat(directory.c_str(), &s) < 0) { + return Error("Failed to get ownership for '" + directory + "': " + + os::strerror(errno)); + } + + // Mount all new persistent volumes added. + foreach (const Resource& resource, updated.persistentVolumes()) { + // This is enforced by the master. + CHECK(resource.disk().has_volume()); + + if (current.contains(resource)) { + continue; + } + + const string source = + paths::getPersistentVolumePath(flags.work_dir, resource); + + // Ignore absolute and nested paths. + const string& containerPath = resource.disk().volume().container_path(); + if (strings::contains(containerPath, "/")) { + LOG(WARNING) << "Skipping updating mount for persistent volume " + << resource << " of container " << containerId + << " because the container path '" << containerPath + << "' contains slash"; + continue; + } + + const string target = path::join(directory, containerPath); + + LOG(INFO) << "Changing the ownership of the persistent volume at '" + << source << "' with uid " << s.st_uid + << " and gid " << s.st_gid; + + Try<Nothing> chown = os::chown(s.st_uid, s.st_gid, source, true); + if (chown.isError()) { + return Error( + "Failed to change the ownership of the persistent volume at '" + + source + "' with uid " + stringify(s.st_uid) + + " and gid " + stringify(s.st_gid) + ": " + chown.error()); + } + + // TODO(tnachen): We should check if the target already exists + // when we support updating persistent mounts. + + Try<Nothing> mkdir = os::mkdir(target); + if (mkdir.isError()) { + return Error("Failed to create persistent mount point at '" + target + + "': " + mkdir.error()); + } + + LOG(INFO) << "Mounting '" << source << "' to '" << target + << "' for persistent volume " << resource + << " of container " << containerId; + + Try<Nothing> mount = fs::mount(source, target, None(), MS_BIND, NULL); + if (mount.isError()) { + return Error( + "Failed to mount persistent volume from '" + + source + "' to '" + target + "': " + mount.error()); + } + } +#else + if (!current.persistentVolumes().empty() || + !updated.persistentVolumes().empty()) { + return Error("Persistent volumes are only supported on linux"); + } +#endif // __linux__ + + return Nothing(); +} + + +Future<Nothing> DockerContainerizerProcess::mountPersistentVolumes( + const ContainerID& containerId) +{ + if (!containers_.contains(containerId)) { + return Failure("Container is already destroyed"); + } + + Container* container = containers_[containerId]; + container->state = Container::MOUNTING; + + if (container->task.isNone() && + !container->resources.persistentVolumes().empty()) { + LOG(ERROR) << "Persistent volumes found with container '" << containerId + << "' but are not supported with custom executors"; + return Nothing(); + } + + Try<Nothing> updateVolumes = updatePersistentVolumes( + containerId, + container->directory, + Resources(), + container->resources); + + if (updateVolumes.isError()) { + return Failure(updateVolumes.error()); + } + + return Nothing(); +} + + Try<Nothing> DockerContainerizerProcess::checkpoint( const ContainerID& containerId, pid_t pid) @@ -699,6 +862,7 @@ Future<Nothing> DockerContainerizerProcess::_recover( framework.id, executor.id, containerId); + container->directory = sandboxDirectory; // Pass recovered containers to the container logger. // NOTE: The current implementation of the container logger only @@ -721,9 +885,44 @@ Future<Nothing> DockerContainerizerProcess::_recover( } +/** + * Unmount persistent volumes that is mounted for a container. + */ +Try<Nothing> unmountPersistentVolumes(const ContainerID& containerId) +{ + // We assume volumes are only supported on Linux, and also + // the target path contains the containerId. +#ifdef __linux__ + Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); + if (table.isError()) { + return Error("Failed to get mount table: " + table.error()); + } + + foreach (const fs::MountInfoTable::Entry& entry, + adaptor::reverse(table.get().entries)) { + // TODO(tnachen): We assume there is only one docker container + // running per container Id and no other mounts will have the + // container Id name. We might need to revisit if this is no + // longer true. + if (strings::contains(entry.target, containerId.value())) { + LOG(INFO) << "Unmounting volume for container '" << containerId + << "'"; + Try<Nothing> unmount = fs::unmount(entry.target); + if (unmount.isError()) { + return Error("Failed to unmount volume '" + entry.target + + "': " + unmount.error()); + } + } + } +#endif // __linux__ + return Nothing(); +} + + Future<Nothing> DockerContainerizerProcess::__recover( const list<Docker::Container>& _containers) { + list<Future<ContainerID>> futures; foreach (const Docker::Container& container, _containers) { VLOG(1) << "Checking if Docker container named '" << container.name << "' was started by Mesos"; @@ -742,11 +941,33 @@ Future<Nothing> DockerContainerizerProcess::__recover( // if not, rm -f the Docker container. if (!containers_.contains(id.get())) { // TODO(tnachen): Consider using executor_shutdown_grace_period. - docker->stop(container.id, flags.docker_stop_timeout, true); + futures.push_back( + docker->stop( + container.id, + flags.docker_stop_timeout, + true) + .then([id]() { return id.get(); })); } } - return Nothing(); + return collect(futures) + .then([](Future<list<ContainerID>> future) -> Future<Nothing> { + if (!future.isReady()) { + return Failure("Unable to stop orphaned Docker containers: " + + (future.isFailed() ? + future.failure() : "future discarded")); + } + + foreach (const ContainerID& containerId, future.get()) { + Try<Nothing> unmount = unmountPersistentVolumes(containerId); + if (unmount.isError()) { + return Failure("Unable to unmount volumes for Docker container '" + + containerId.value() + "': " + unmount.error()); + } + } + + return Nothing(); + }); } @@ -827,6 +1048,9 @@ Future<bool> DockerContainerizerProcess::launch( // Launching task by forking a subprocess to run docker executor. return container.get()->launch = fetch(containerId, slaveId) .then(defer(self(), [=]() { return pull(containerId); })) + .then(defer(self(), [=]() { + return mountPersistentVolumes(containerId); + })) .then(defer(self(), [=]() { return launchExecutorProcess(containerId); })) .then(defer(self(), [=](pid_t pid) { return reapExecutor(containerId, pid); @@ -850,6 +1074,9 @@ Future<bool> DockerContainerizerProcess::launch( return container.get()->launch = fetch(containerId, slaveId) .then(defer(self(), [=]() { return pull(containerId); })) .then(defer(self(), [=]() { + return mountPersistentVolumes(containerId); + })) + .then(defer(self(), [=]() { return launchExecutorContainer(containerId, containerName); })) .then(defer(self(), [=](const Docker::Container& dockerContainer) { @@ -1080,6 +1307,8 @@ Future<Nothing> DockerContainerizerProcess::update( return Nothing(); } + // TODO(tnachen): Support updating persistent volumes, which requires + // Docker mount propagation support. // Store the resources for usage(). container->resources = _resources; @@ -1468,6 +1697,9 @@ void DockerContainerizerProcess::destroy( // cleanup. Just as above, we'll need to deal with the race with // 'docker pull' returning successfully. // + // If we're MOUNTING, we want to unmount all the persistent volumes + // that has been mounted. + // // If we're RUNNING, we want to wait for the status to get set, then // do a Docker::kill, then wait for the status to complete, then // cleanup. @@ -1507,6 +1739,29 @@ void DockerContainerizerProcess::destroy( return; } + if (container->state == Container::MOUNTING) { + LOG(INFO) << "Destroying Container '" << containerId + << "' in MOUNTING state"; + + // Persistent volumes might already been mounted, remove them + // if necessary. + Try<Nothing> unmount = unmountPersistentVolumes(containerId); + if (unmount.isError()) { + LOG(WARNING) << "Failed to remove persistent volumes on destroy for " + << "container '" << containerId << "': " + << unmount.error(); + } + + containerizer::Termination termination; + termination.set_message("Container destroyed while mounting volumes"); + container->termination.set(termination); + + containers_.erase(containerId); + delete container; + + return; + } + CHECK(container->state == Container::RUNNING); container->state = Container::DESTROYING; @@ -1613,6 +1868,17 @@ void DockerContainerizerProcess::___destroy( { CHECK(containers_.contains(containerId)); + Try<Nothing> unmount = unmountPersistentVolumes(containerId); + if (unmount.isError()) { + // TODO(tnachen): Failing to unmount a persistent volume now + // leads to leaving the volume on the host, and we won't retry + // again since the Docker container is removed. We should consider + // not removing the container so we can retry. + LOG(WARNING) << "Failed to remove persistent volumes on destroy for " + << "container '" << containerId << "': " + << unmount.error(); + } + Container* container = containers_[containerId]; containerizer::Termination termination; http://git-wip-us.apache.org/repos/asf/mesos/blob/541b3d96/src/slave/containerizer/docker.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp index 77a50d8..4d70381 100644 --- a/src/slave/containerizer/docker.hpp +++ b/src/slave/containerizer/docker.hpp @@ -223,6 +223,15 @@ private: const Resources& resources, pid_t pid); + process::Future<Nothing> mountPersistentVolumes( + const ContainerID& containerId); + + Try<Nothing> updatePersistentVolumes( + const ContainerID& containerId, + const std::string& directory, + const Resources& current, + const Resources& updated); + Try<ResourceStatistics> cgroupsStatistics(pid_t pid) const; // Call back for when the executor exits. This will trigger @@ -387,6 +396,7 @@ private: // // FETCHING // PULLING + // MOUNTING // RUNNING // DESTROYING // @@ -404,8 +414,9 @@ private: { FETCHING = 1, PULLING = 2, - RUNNING = 3, - DESTROYING = 4 + MOUNTING = 3, + RUNNING = 4, + DESTROYING = 5 } state; const ContainerID id; @@ -417,7 +428,7 @@ private: // The sandbox directory for the container. This holds the // symlinked path if symlinked boolean is true. - const std::string directory; + std::string directory; const Option<std::string> user; SlaveID slaveId; http://git-wip-us.apache.org/repos/asf/mesos/blob/541b3d96/src/tests/containerizer/docker_containerizer_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/containerizer/docker_containerizer_tests.cpp b/src/tests/containerizer/docker_containerizer_tests.cpp index 645bdcf..8541a9a 100644 --- a/src/tests/containerizer/docker_containerizer_tests.cpp +++ b/src/tests/containerizer/docker_containerizer_tests.cpp @@ -27,7 +27,10 @@ #include <stout/duration.hpp> +#ifdef __linux__ #include "linux/cgroups.hpp" +#include "linux/fs.hpp" +#endif // __linux__ #include "messages/messages.hpp" @@ -1081,6 +1084,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Recover) RunState runState; runState.id = containerId; runState.forkedPid = wait.get().pid(); + execState.runs.put(containerId, runState); frameworkState.executors.put(execId, execState); @@ -1166,6 +1170,482 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SkipRecoverNonDocker) } +#ifdef __linux__ +// This test verifies that we can launch a docker container with +// persistent volume. +TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchWithPersistentVolumes) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockDocker* mockDocker = + new MockDocker(tests::flags.docker, tests::flags.docker_socket); + + Shared<Docker> docker(mockDocker); + + slave::Flags flags = CreateSlaveFlags(); + flags.resources = "cpu:2;mem:2048;disk(role1):2048"; + + Fetcher fetcher; + + Try<ContainerLogger*> logger = + ContainerLogger::create(flags.container_logger); + + ASSERT_SOME(logger); + + MockDockerContainerizer dockerContainerizer( + flags, + &fetcher, + Owned<ContainerLogger>(logger.get()), + docker); + + Try<PID<Slave>> slave = StartSlave(&dockerContainerizer, flags); + ASSERT_SOME(slave); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_role("role1"); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(frameworkId); + + AWAIT_READY(offers); + ASSERT_NE(0u, offers.get().size()); + + Offer offer = offers.get()[0]; + + SlaveID slaveId = offer.slave_id(); + + Resource volume = createPersistentVolume( + Megabytes(64), + "role1", + "id1", + "path1"); + + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->CopyFrom(offer.slave_id()); + task.mutable_resources()->CopyFrom( + Resources::parse("cpus:1;mem:64;").get() + volume); + + CommandInfo command; + command.set_value("echo abc > " + + path::join(flags.sandbox_directory, "path1", "file")); + + ContainerInfo containerInfo; + containerInfo.set_type(ContainerInfo::DOCKER); + + // TODO(tnachen): Use local image to test if possible. + ContainerInfo::DockerInfo dockerInfo; + dockerInfo.set_image("alpine"); + containerInfo.mutable_docker()->CopyFrom(dockerInfo); + + task.mutable_command()->CopyFrom(command); + task.mutable_container()->CopyFrom(containerInfo); + + // We use the filter explicitly here so that the resources will not + // be filtered for 5 seconds (the default). + Filters filters; + filters.set_refuse_seconds(0); + + Future<ContainerID> containerId; + Future<string> directory; + EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _)) + .WillOnce(DoAll(FutureArg<0>(&containerId), + FutureArg<3>(&directory), + Invoke(&dockerContainerizer, + &MockDockerContainerizer::_launch))); + + Future<TaskStatus> statusRunning; + Future<TaskStatus> statusFinished; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusRunning)) + .WillOnce(FutureArg<1>(&statusFinished)) + .WillRepeatedly(DoDefault()); + + driver.acceptOffers( + {offer.id()}, + {CREATE(volume), LAUNCH({task})}, + filters); + + AWAIT_READY_FOR(containerId, Seconds(60)); + AWAIT_READY(directory); + AWAIT_READY_FOR(statusRunning, Seconds(60)); + EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); + + AWAIT_READY(statusFinished); + EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); + + Future<containerizer::Termination> termination = + dockerContainerizer.wait(containerId.get()); + + driver.stop(); + driver.join(); + + AWAIT_READY(termination); + + ASSERT_FALSE( + exists(docker, slaveId, containerId.get(), ContainerState::RUNNING)); + + const string& volumePath = getPersistentVolumePath( + flags.work_dir, + volume); + + EXPECT_SOME_EQ("abc\n", os::read(path::join(volumePath, "file"))); + + Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); + EXPECT_SOME(table); + + // Verify that the persistent volume is unmounted. + foreach (const fs::MountInfoTable::Entry& entry, table.get().entries) { + EXPECT_FALSE( + strings::contains(entry.target, path::join(directory.get(), "path1"))); + } + + Shutdown(); +} + + +// This test checks the docker containerizer is able to recover containers +// with persistent volumes and destroy it properly. +TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverPersistentVolumes) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockDocker* mockDocker = + new MockDocker(tests::flags.docker, tests::flags.docker_socket); + + Shared<Docker> docker(mockDocker); + + slave::Flags flags = CreateSlaveFlags(); + flags.resources = "cpu:2;mem:2048;disk(role1):2048"; + + Fetcher fetcher; + + Try<ContainerLogger*> logger = + ContainerLogger::create(flags.container_logger); + + ASSERT_SOME(logger); + + MockDockerContainerizer* dockerContainerizer = new MockDockerContainerizer( + flags, + &fetcher, + Owned<ContainerLogger>(logger.get()), + docker); + + Try<PID<Slave>> slave = StartSlave(dockerContainerizer, flags); + ASSERT_SOME(slave); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_role("role1"); + frameworkInfo.set_checkpoint(true); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + Filters filters; + filters.set_refuse_seconds(0); + + // NOTE: We set filter explicitly here so that the resources will + // not be filtered for 5 seconds (the default). + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(DeclineOffers(filters)); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(frameworkId); + + AWAIT_READY(offers); + ASSERT_NE(0u, offers.get().size()); + + Offer offer = offers.get()[0]; + + SlaveID slaveId = offer.slave_id(); + + Resource volume = createPersistentVolume( + Megabytes(64), + "role1", + "id1", + "path1"); + + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->CopyFrom(offer.slave_id()); + task.mutable_resources()->CopyFrom( + Resources::parse("cpus:1;mem:64;").get() + volume); + + CommandInfo command; + command.set_value("sleep 1000"); + + ContainerInfo containerInfo; + containerInfo.set_type(ContainerInfo::DOCKER); + + // TODO(tnachen): Use local image to test if possible. + ContainerInfo::DockerInfo dockerInfo; + dockerInfo.set_image("alpine"); + containerInfo.mutable_docker()->CopyFrom(dockerInfo); + + task.mutable_command()->CopyFrom(command); + task.mutable_container()->CopyFrom(containerInfo); + + Future<ContainerID> containerId; + Future<string> directory; + EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _, _, _, _, _)) + .WillOnce(DoAll(FutureArg<0>(&containerId), + FutureArg<3>(&directory), + Invoke(dockerContainerizer, + &MockDockerContainerizer::_launch))); + + Future<TaskStatus> statusRunning; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusRunning)) + .WillRepeatedly(DoDefault()); + + driver.acceptOffers( + {offer.id()}, + {CREATE(volume), LAUNCH({task})}, + filters); + + AWAIT_READY_FOR(containerId, Seconds(60)); + AWAIT_READY(directory); + AWAIT_READY_FOR(statusRunning, Seconds(60)); + EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); + + Stop(slave.get()); + + // Recreate containerizer and start slave again. + delete dockerContainerizer; + + logger = ContainerLogger::create(flags.container_logger); + ASSERT_SOME(logger); + + dockerContainerizer = new MockDockerContainerizer( + flags, + &fetcher, + Owned<ContainerLogger>(logger.get()), + docker); + + slave = StartSlave(dockerContainerizer, flags); + ASSERT_SOME(slave); + + Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); + + // Wait until containerizer recover is complete. + AWAIT_READY(_recover); + + Future<containerizer::Termination> termination = + dockerContainerizer->wait(containerId.get()); + + dockerContainerizer->destroy(containerId.get()); + + AWAIT_READY(termination); + + Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); + EXPECT_SOME(table); + + // Verify that the recovered container's persistent volume is + // unmounted. + foreach (const fs::MountInfoTable::Entry& entry, table.get().entries) { + EXPECT_FALSE( + strings::contains(entry.target, path::join(directory.get(), "path1"))); + } + + driver.stop(); + driver.join(); + + Shutdown(); + delete dockerContainerizer; +} + + +// This test checks the docker containerizer is able to clean up +// orphaned containers with persistent volumes. +TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverOrphanedPersistentVolumes) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockDocker* mockDocker = + new MockDocker(tests::flags.docker, tests::flags.docker_socket); + + Shared<Docker> docker(mockDocker); + + slave::Flags flags = CreateSlaveFlags(); + flags.resources = "cpu:2;mem:2048;disk(role1):2048"; + + Fetcher fetcher; + + Try<ContainerLogger*> logger = + ContainerLogger::create(flags.container_logger); + + ASSERT_SOME(logger); + + MockDockerContainerizer* dockerContainerizer = new MockDockerContainerizer( + flags, + &fetcher, + Owned<ContainerLogger>(logger.get()), + docker); + + Try<PID<Slave>> slave = StartSlave(dockerContainerizer, flags); + ASSERT_SOME(slave); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_role("role1"); + frameworkInfo.set_checkpoint(true); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + Filters filters; + filters.set_refuse_seconds(0); + + // NOTE: We set filter explicitly here so that the resources will + // not be filtered for 5 seconds (the default). + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(DeclineOffers(filters)); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(frameworkId); + + AWAIT_READY(offers); + ASSERT_NE(0u, offers.get().size()); + + Offer offer = offers.get()[0]; + + Resource volume = createPersistentVolume( + Megabytes(64), + "role1", + "id1", + "path1"); + + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->CopyFrom(offer.slave_id()); + task.mutable_resources()->CopyFrom( + Resources::parse("cpus:1;mem:64;").get() + volume); + + CommandInfo command; + command.set_value("sleep 1000"); + + ContainerInfo containerInfo; + containerInfo.set_type(ContainerInfo::DOCKER); + + // TODO(tnachen): Use local image to test if possible. + ContainerInfo::DockerInfo dockerInfo; + dockerInfo.set_image("alpine"); + containerInfo.mutable_docker()->CopyFrom(dockerInfo); + + task.mutable_command()->CopyFrom(command); + task.mutable_container()->CopyFrom(containerInfo); + + Future<ContainerID> containerId; + Future<string> directory; + EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _, _, _, _, _)) + .WillOnce(DoAll(FutureArg<0>(&containerId), + FutureArg<3>(&directory), + Invoke(dockerContainerizer, + &MockDockerContainerizer::_launch))); + + Future<TaskStatus> statusRunning; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusRunning)) + .WillRepeatedly(DoDefault()); + + driver.acceptOffers( + {offer.id()}, + {CREATE(volume), LAUNCH({task})}, + filters); + + AWAIT_READY_FOR(containerId, Seconds(60)); + AWAIT_READY(directory); + AWAIT_READY_FOR(statusRunning, Seconds(60)); + EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); + + Stop(slave.get()); + + // Wipe the framework directory so that the slave will treat the + // above running task as an orphan. We don't want to wipe the whole + // meta directory since Docker Containerizer will skip recover if + // state is not found. + ASSERT_SOME( + os::rmdir(getFrameworkPath( + getMetaRootDir(flags.work_dir), + offer.slave_id(), + frameworkId.get()))); + + // Recreate containerizer and start slave again. + delete dockerContainerizer; + + logger = ContainerLogger::create(flags.container_logger); + ASSERT_SOME(logger); + + dockerContainerizer = new MockDockerContainerizer( + flags, + &fetcher, + Owned<ContainerLogger>(logger.get()), + docker); + + slave = StartSlave(dockerContainerizer, flags); + ASSERT_SOME(slave); + + Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); + + // Wait until containerizer recover is complete. + AWAIT_READY(_recover); + + Try<fs::MountInfoTable> table = fs::MountInfoTable::read(); + EXPECT_SOME(table); + + // Verify that the orphaned container's persistent volume is + // unmounted. + foreach (const fs::MountInfoTable::Entry& entry, table.get().entries) { + EXPECT_FALSE( + strings::contains(entry.target, path::join(directory.get(), "path1"))); + } + + driver.stop(); + driver.join(); + + Shutdown(); + delete dockerContainerizer; + + EXPECT_FALSE(exists(docker, offer.slave_id(), containerId.get())); +} +#endif // __linux__ + + TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs) { Try<PID<Master> > master = StartMaster();
