Implemented DockerContainerizer::update.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a5d683b6 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a5d683b6 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a5d683b6 Branch: refs/heads/master Commit: a5d683b6c5d4a7c8cc9608f6e57cb9cf7f172ba4 Parents: 81782c0 Author: Timothy Chen <[email protected]> Authored: Mon Jun 30 15:33:00 2014 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Mon Aug 4 15:08:17 2014 -0700 ---------------------------------------------------------------------- src/linux/cgroups.cpp | 19 ++++ src/linux/cgroups.hpp | 5 + src/slave/containerizer/docker.cpp | 144 +++++++++++++++++++++++++- src/slave/containerizer/docker.hpp | 2 + src/tests/docker_containerizer_tests.cpp | 120 +++++++++++++++++++++ 5 files changed, 286 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/linux/cgroups.cpp ---------------------------------------------------------------------- diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp index ccb86cf..39a4874 100644 --- a/src/linux/cgroups.cpp +++ b/src/linux/cgroups.cpp @@ -1846,6 +1846,25 @@ Try<Nothing> shares( } +Try<uint64_t> shares( + const string& hierarchy, + const string& cgroup) +{ + Try<string> read = cgroups::read(hierarchy, cgroup, "cpu.shares"); + + if (read.isError()) { + return Error(read.error()); + } + + uint64_t shares; + std::istringstream ss(read.get()); + + ss >> shares; + + return shares; +} + + Try<Nothing> cfs_period_us( const string& hierarchy, const string& cgroup, http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/linux/cgroups.hpp ---------------------------------------------------------------------- diff --git a/src/linux/cgroups.hpp b/src/linux/cgroups.hpp index c571e91..9dfba6e 100644 --- a/src/linux/cgroups.hpp +++ b/src/linux/cgroups.hpp @@ -392,6 +392,11 @@ Try<Nothing> shares( const std::string& cgroup, uint64_t shares); +// Returns the cpu shares from cpu.shares. +Try<uint64_t> shares( + const std::string& hierarchy, + const std::string& cgroup); + // Returns the cpu shares from cpu.shares. Try<uint64_t> shares( http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/slave/containerizer/docker.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index 7cd2d7d..5a68d94 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -30,12 +30,19 @@ #include "docker/docker.hpp" +#ifdef __linux__ +#include "linux/cgroups.hpp" +#endif // __linux__ + #include "slave/paths.hpp" #include "slave/slave.hpp" #include "slave/containerizer/containerizer.hpp" #include "slave/containerizer/docker.hpp" +#include "slave/containerizer/isolators/cgroups/cpushare.hpp" +#include "slave/containerizer/isolators/cgroups/mem.hpp" + #include "usage/usage.hpp" @@ -134,6 +141,11 @@ private: const bool& killed, const Future<Option<int > >& status); + process::Future<Nothing> _update( + const ContainerID& containerId, + const Resources& resources, + const Future<Docker::Container>& future); + Future<ResourceStatistics> _usage( const ContainerID& containerId, const Docker::Container& container); @@ -169,6 +181,30 @@ private: }; +Try<Nothing> DockerContainerizer::prepareCgroups(const Flags& flags) +{ +#ifdef __linux__ + std::vector<string> subsystems; + subsystems.push_back("cpu"); + subsystems.push_back("cpuacct"); + subsystems.push_back("memory"); + + foreach (const string& subsystem, subsystems) { + // We're assuming docker is under cgroup directory "docker". + Try<string> hierarchy = + cgroups::prepare(flags.cgroups_hierarchy, subsystem, "docker"); + + if (hierarchy.isError()) { + return Error( + "Failed to prepare cgroup hierarchy " + flags.cgroups_hierarchy + + " subsystem '" + subsystem + "' for Docker: " + hierarchy.error()); + } + } +#endif // __linux__ + return Nothing(); +} + + Try<DockerContainerizer*> DockerContainerizer::create( const Flags& flags, bool local) @@ -179,6 +215,11 @@ Try<DockerContainerizer*> DockerContainerizer::create( return Error(validation.error()); } + Try<Nothing> prepare = prepareCgroups(flags); + if (prepare.isError()) { + return Error(prepare.error()); + } + return new DockerContainerizer(flags, local, docker); } @@ -620,11 +661,106 @@ Future<bool> DockerContainerizerProcess::_launch( Future<Nothing> DockerContainerizerProcess::update( const ContainerID& containerId, - const Resources& resources) + const Resources& _resources) +{ + if (!promises.contains(containerId)) { + LOG(WARNING) + << "Ignoring updating unknown container: " + << containerId.value(); + return Nothing(); + } + +#ifdef __linux__ + if (!_resources.cpus().isSome() && !_resources.mem().isSome()) { + LOG(WARNING) << "Ignoring update as no supported resources are present"; + return Nothing(); + } + + // Store the resources for usage() + resources.put(containerId, _resources); + + return docker.inspect(DOCKER_NAME_PREFIX + stringify(containerId)) + .then(defer(self(), &Self::_update, containerId, _resources, lambda::_1)); +#else + return Nothing(); +#endif // __linux__ +} + + +Future<Nothing> DockerContainerizerProcess::_update( + const ContainerID& containerId, + const Resources& _resources, + const Future<Docker::Container>& future) { - // TODO(benh): Right now we're only launching tasks so we don't - // expect the containers to be resized. This will need to get - // implemented to support executors. +#ifdef __linux__ + const string& id = path::join("docker", future.get().id()); + + // Update CPU shares. + if (_resources.cpus().isSome()) { + double cpuShares = _resources.cpus().get(); + + uint64_t shares = + std::max((uint64_t) (CPU_SHARES_PER_CPU * cpuShares), MIN_CPU_SHARES); + + Try<Nothing> write = + cgroups::cpu::shares( + path::join(flags.cgroups_hierarchy, "cpu"), id, shares); + + if (write.isError()) { + return Failure("Failed to update 'cpu.shares': " + write.error()); + } + + LOG(INFO) + << "Updated 'cpu.shares' to " << shares + << " for container " << containerId; + } + + // Update Memory. + if (_resources.mem().isSome()) { + Bytes mem = _resources.mem().get(); + Bytes limit = std::max(mem, MIN_MEMORY); + + std::string memHierarchy = + path::join(flags.cgroups_hierarchy, "memory"); + + // Always set the soft limit. + Try<Nothing> write = + cgroups::memory::soft_limit_in_bytes(memHierarchy, id, limit); + + if (write.isError()) { + return Failure("Failed to set 'memory.soft_limit_in_bytes': " + + write.error()); + } + + LOG(INFO) + << "Updated 'memory.soft_limit_in_bytes' to " << limit + << " for container " << containerId; + + // Read the existing limit. + Try<Bytes> currentLimit = + cgroups::memory::limit_in_bytes(memHierarchy, id); + + if (currentLimit.isError()) { + return Failure("Failed to read 'memory.limit_in_bytes': " + + currentLimit.error()); + } + + // Only update if new limit is higher. + if (limit > currentLimit.get()) { + write = cgroups::memory::limit_in_bytes(memHierarchy, id, limit); + + if (write.isError()) { + return Failure("Failed to set 'memory.limit_in_bytes': " + + write.error()); + } + + LOG(INFO) + << "Updated 'memory.limit_in_bytes' to " << limit + << " for container " << containerId; + } + } +#endif // __linux__ + return Nothing(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/slave/containerizer/docker.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp index 1a5d1c2..f4eb0ff 100644 --- a/src/slave/containerizer/docker.hpp +++ b/src/slave/containerizer/docker.hpp @@ -45,6 +45,8 @@ public: const Flags& flags, bool local); + static Try<Nothing> prepareCgroups(const Flags& flags); + DockerContainerizer( const Flags& flags, bool local, http://git-wip-us.apache.org/repos/asf/mesos/blob/a5d683b6/src/tests/docker_containerizer_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp index b0b8b39..cd1c88c 100644 --- a/src/tests/docker_containerizer_tests.cpp +++ b/src/tests/docker_containerizer_tests.cpp @@ -22,6 +22,8 @@ #include <process/future.hpp> #include <process/subprocess.hpp> +#include "linux/cgroups.hpp" + #include "tests/flags.hpp" #include "tests/mesos.hpp" @@ -62,6 +64,7 @@ public: const Docker& docker) : DockerContainerizer(flags, local, docker) { + DockerContainerizer::prepareCgroups(flags); EXPECT_CALL(*this, launch(_, _, _, _, _, _, _, _)) .WillRepeatedly(Invoke(this, &MockDockerContainerizer::_launch)); } @@ -317,6 +320,123 @@ TEST_F(DockerContainerizerTest, DOCKER_Usage) } +#ifdef __linux__ +TEST_F(DockerContainerizerTest, DOCKER_Update) +{ + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = CreateSlaveFlags(); + + Docker docker(tests::flags.docker); + + MockDockerContainerizer dockerContainerizer(flags, true, docker); + + Try<PID<Slave> > slave = StartSlave(&dockerContainerizer); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + Future<vector<Offer> > offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + const Offer& offer = offers.get()[0]; + + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->CopyFrom(offer.slave_id()); + task.mutable_resources()->CopyFrom(offer.resources()); + + CommandInfo command; + CommandInfo::ContainerInfo* containerInfo = command.mutable_container(); + containerInfo->set_image("docker://busybox"); + command.set_value("sleep 180"); + + task.mutable_command()->CopyFrom(command); + + Future<TaskStatus> statusRunning; + + vector<TaskInfo> tasks; + tasks.push_back(task); + + Future<ContainerID> containerId; + EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _)) + .WillOnce(DoAll(FutureArg<0>(&containerId), + Invoke(&dockerContainerizer, + &MockDockerContainerizer::_launch))); + + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusRunning)) + .WillRepeatedly(DoDefault()); + + driver.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY(containerId); + + AWAIT_READY_FOR(statusRunning, Seconds(60)); + EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); + + string containerName = slave::DOCKER_NAME_PREFIX + containerId.get().value(); + Future<Docker::Container> container = docker.inspect(containerName); + + AWAIT_READY(container); + + Try<Resources> newResources = Resources::parse("cpus:1;mem:128"); + + ASSERT_SOME(newResources); + + Future<Nothing> update = + dockerContainerizer.update(containerId.get(), newResources.get()); + + AWAIT_READY(update); + + string id = path::join("docker", container.get().id()); + + Try<Bytes> mem = + cgroups::memory::soft_limit_in_bytes( + path::join(flags.cgroups_hierarchy, "memory"), id); + ASSERT_SOME(mem); + + Try<uint64_t> cpu = + cgroups::cpu::shares( + path::join(flags.cgroups_hierarchy, "cpu"), id); + + ASSERT_SOME(cpu); + + EXPECT_EQ(1024, cpu.get()); + EXPECT_EQ(128, mem.get().megabytes()); + + Future<containerizer::Termination> termination = + dockerContainerizer.wait(containerId.get()); + + dockerContainerizer.destroy(containerId.get()); + + AWAIT_READY(termination); + + driver.stop(); + driver.join(); + + Shutdown(); +} +#endif //__linux__ + + TEST_F(DockerContainerizerTest, DOCKER_Recover) { slave::Flags flags = CreateSlaveFlags();
