Repository: mesos Updated Branches: refs/heads/master 1d5cc168b -> 95499fd9b
Exposed container memory pressures in the cgroups memory isolator. Review: https://reviews.apache.org/r/30546 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/95499fd9 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/95499fd9 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/95499fd9 Branch: refs/heads/master Commit: 95499fd9b001c7797f18281ddb1149bc446582b5 Parents: 1d5cc16 Author: Chi Zhang <[email protected]> Authored: Tue Mar 31 15:43:41 2015 -0700 Committer: Jie Yu <[email protected]> Committed: Tue Mar 31 17:47:52 2015 -0700 ---------------------------------------------------------------------- include/mesos/mesos.proto | 9 + src/Makefile.am | 1 + .../containerizer/isolators/cgroups/mem.cpp | 108 +++++++ .../containerizer/isolators/cgroups/mem.hpp | 21 +- src/tests/memory_pressure_tests.cpp | 294 +++++++++++++++++++ 5 files changed, 432 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/include/mesos/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto index 3c592d5..0cbee3b 100644 --- a/include/mesos/mesos.proto +++ b/include/mesos/mesos.proto @@ -463,6 +463,15 @@ message ResourceStatistics { optional uint64 mem_anon_bytes = 11; optional uint64 mem_mapped_file_bytes = 12; + // Number of occurrences of different levels of memory pressure + // events reported by memory cgroup. Pressure listening (re)starts + // with these values set to 0 when slave (re)starts. See + // https://www.kernel.org/doc/Documentation/cgroups/memory.txt for + // more details. + optional uint64 mem_low_pressure_counter = 32; + optional uint64 mem_medium_pressure_counter = 33; + optional uint64 mem_critical_pressure_counter = 34; + // Disk Usage Information for executor working directory. optional uint64 disk_limit_bytes = 26; optional uint64 disk_used_bytes = 27; http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 56ed9d9..9c01f5d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1417,6 +1417,7 @@ if OS_LINUX mesos_tests_SOURCES += tests/cgroups_isolator_tests.cpp mesos_tests_SOURCES += tests/cgroups_tests.cpp mesos_tests_SOURCES += tests/fs_tests.cpp + mesos_tests_SOURCES += tests/memory_pressure_tests.cpp mesos_tests_SOURCES += tests/ns_tests.cpp mesos_tests_SOURCES += tests/perf_tests.cpp mesos_tests_SOURCES += tests/setns_test_helper.cpp http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/src/slave/containerizer/isolators/cgroups/mem.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/isolators/cgroups/mem.cpp b/src/slave/containerizer/isolators/cgroups/mem.cpp index eaeb301..a7a83ef 100644 --- a/src/slave/containerizer/isolators/cgroups/mem.cpp +++ b/src/slave/containerizer/isolators/cgroups/mem.cpp @@ -18,6 +18,7 @@ #include <stdint.h> +#include <list> #include <vector> #include <mesos/resources.hpp> @@ -45,6 +46,9 @@ using namespace process; +using cgroups::memory::pressure::Level; +using cgroups::memory::pressure::Counter; + using std::list; using std::ostringstream; using std::set; @@ -64,6 +68,13 @@ using mesos::slave::Limitation; template<class T> static Future<Option<T>> none() { return None(); } + +static const vector<Level> levels() +{ + return {Level::LOW, Level::MEDIUM, Level::CRITICAL}; +} + + CgroupsMemIsolatorProcess::CgroupsMemIsolatorProcess( const Flags& _flags, const string& _hierarchy, @@ -110,6 +121,22 @@ Try<Isolator*> CgroupsMemIsolatorProcess::create(const Flags& flags) return Error(enable.error()); } + // Test if memory pressure listening is enabled. We test that on the + // root cgroup. We rely on 'Counter::create' to test if memory + // pressure listening is enabled or not. The created counters will + // be destroyed immediately. + foreach (Level level, levels()) { + Try<Owned<Counter>> counter = Counter::create( + hierarchy.get(), + flags.cgroups_root, + level); + + if (counter.isError()) { + return Error("Failed to listen on " + stringify(level) + + " memory events: " + counter.error()); + } + } + // Determine whether to limit swap or not. bool limitSwap = false; @@ -167,6 +194,7 @@ Future<Nothing> CgroupsMemIsolatorProcess::recover( cgroups.insert(cgroup); oomListen(containerId); + pressureListen(containerId); } Try<vector<string>> orphans = cgroups::get( @@ -245,6 +273,7 @@ Future<Option<CommandInfo>> CgroupsMemIsolatorProcess::prepare( } oomListen(containerId); + pressureListen(containerId); return update(containerId, executorInfo.resources()) .then(lambda::bind(none<CommandInfo>)); @@ -417,6 +446,59 @@ Future<ResourceStatistics> CgroupsMemIsolatorProcess::usage( result.set_mem_mapped_file_bytes(total_mapped_file.get()); } + // Get pressure counter readings. + list<Level> levels; + list<Future<uint64_t>> values; + foreachpair (Level level, + const Owned<Counter>& counter, + info->pressureCounters) { + levels.push_back(level); + values.push_back(counter->value()); + } + + return await(values) + .then(defer(PID<CgroupsMemIsolatorProcess>(this), + &CgroupsMemIsolatorProcess::_usage, + containerId, + result, + levels, + lambda::_1)); +} + + +Future<ResourceStatistics> CgroupsMemIsolatorProcess::_usage( + const ContainerID& containerId, + ResourceStatistics result, + const list<Level>& levels, + const list<Future<uint64_t>>& values) +{ + if (!infos.contains(containerId)) { + return Failure("Unknown container"); + } + + list<Level>::const_iterator iterator = levels.begin(); + foreach (const Future<uint64_t>& value, values) { + if (value.isReady()) { + switch (*iterator) { + case Level::LOW: + result.set_mem_low_pressure_counter(value.get()); + break; + case Level::MEDIUM: + result.set_mem_medium_pressure_counter(value.get()); + break; + case Level::CRITICAL: + result.set_mem_critical_pressure_counter(value.get()); + break; + } + } else { + LOG(ERROR) << "Failed to listen on " << stringify(*iterator) + << " pressure events for container " << containerId << ": " + << (value.isFailed() ? value.failure() : "discarded"); + } + + ++iterator; + } + return result; } @@ -580,6 +662,32 @@ void CgroupsMemIsolatorProcess::oom(const ContainerID& containerId) info->limitation.set(Limitation(mem, message.str())); } + +void CgroupsMemIsolatorProcess::pressureListen( + const ContainerID& containerId) +{ + CHECK(infos.contains(containerId)); + Info* info = CHECK_NOTNULL(infos[containerId]); + + foreach (Level level, levels()) { + Try<Owned<Counter>> counter = Counter::create( + hierarchy, + info->cgroup, + level); + + if (counter.isError()) { + LOG(ERROR) << "Failed to listen on " << level << " memory pressure " + << "events for container " << containerId << ": " + << counter.error(); + } else { + info->pressureCounters[level] = counter.get(); + + LOG(INFO) << "Started listening on " << level << " memory pressure " + << "events for container " << containerId; + } + } +} + } // namespace slave { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/src/slave/containerizer/isolators/cgroups/mem.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/isolators/cgroups/mem.hpp b/src/slave/containerizer/isolators/cgroups/mem.hpp index a00f723..d510bc0 100644 --- a/src/slave/containerizer/isolators/cgroups/mem.hpp +++ b/src/slave/containerizer/isolators/cgroups/mem.hpp @@ -19,10 +19,16 @@ #ifndef __MEM_ISOLATOR_HPP__ #define __MEM_ISOLATOR_HPP__ +#include <list> + #include <mesos/slave/isolator.hpp> +#include <process/owned.hpp> + #include <stout/hashmap.hpp> +#include "linux/cgroups.hpp" + #include "slave/flags.hpp" #include "slave/containerizer/isolators/cgroups/constants.hpp" @@ -70,7 +76,13 @@ private: const std::string& hierarchy, bool limitSwap); - virtual process::Future<Nothing> _cleanup( + process::Future<ResourceStatistics> _usage( + const ContainerID& containerId, + ResourceStatistics result, + const std::list<cgroups::memory::pressure::Level>& levels, + const std::list<process::Future<uint64_t>>& values); + + process::Future<Nothing> _cleanup( const ContainerID& containerId, const process::Future<Nothing>& future); @@ -87,6 +99,10 @@ private: // Used to cancel the OOM listening. process::Future<Nothing> oomNotifier; + + hashmap<cgroups::memory::pressure::Level, + process::Owned<cgroups::memory::pressure::Counter>> + pressureCounters; }; // Start listening on OOM events. This function will create an @@ -102,6 +118,9 @@ private: // This function is invoked when the OOM event happens. void oom(const ContainerID& containerId); + // Start listening on memory pressure events. + void pressureListen(const ContainerID& containerId); + const Flags flags; // The path to the cgroups subsystem hierarchy root. http://git-wip-us.apache.org/repos/asf/mesos/blob/95499fd9/src/tests/memory_pressure_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/memory_pressure_tests.cpp b/src/tests/memory_pressure_tests.cpp new file mode 100644 index 0000000..e0b33ae --- /dev/null +++ b/src/tests/memory_pressure_tests.cpp @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include <vector> + +#include <mesos/resources.hpp> +#include <mesos/scheduler.hpp> + +#include <process/gtest.hpp> + +#include <stout/gtest.hpp> +#include <stout/os.hpp> + +#include "master/master.hpp" + +#include "slave/slave.hpp" + +#include "slave/containerizer/containerizer.hpp" +#include "slave/containerizer/fetcher.hpp" + +#include "messages/messages.hpp" + +#include "tests/mesos.hpp" + +using namespace process; + +using mesos::internal::master::Master; + +using mesos::internal::slave::Fetcher; +using mesos::internal::slave::MesosContainerizer; +using mesos::internal::slave::Slave; + +using std::vector; + +using testing::_; +using testing::Eq; +using testing::Return; + +namespace mesos { +namespace internal { +namespace tests { + +class MemoryPressureMesosTest : public ContainerizerTest<MesosContainerizer> +{ +public: + static void SetUpTestCase() + { + // Verify that the dd command and its flags used in a bit are valid + // on this system. + ASSERT_EQ(0, os::system("dd count=1 bs=1M if=/dev/zero of=/dev/null")) + << "Cannot find a compatible 'dd' command"; + } +}; + + +TEST_F(MemoryPressureMesosTest, CGROUPS_ROOT_Statistics) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = CreateSlaveFlags(); + + // We only care about memory cgroup for this test. + flags.isolation = "cgroups/mem"; + flags.slave_subsystems = None(); + + Fetcher fetcher; + + Try<MesosContainerizer*> containerizer = + MesosContainerizer::create(flags, true, &fetcher); + + ASSERT_SOME(containerizer); + + Try<PID<Slave>> slave = StartSlave(containerizer.get(), flags); + ASSERT_SOME(slave); + + MockScheduler sched; + + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(_, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + Offer offer = offers.get()[0]; + + // Run a task that triggers memory pressure event. We request 1G + // disk because we are going to write a 512 MB file repeatedly. + TaskInfo task = createTask( + offer.slave_id(), + Resources::parse("cpus:1;mem:256;disk:1024").get(), + "while true; do dd count=512 bs=1M if=/dev/zero of=./temp; done"); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + driver.launchTasks(offer.id(), {task}); + + AWAIT_READY(status); + EXPECT_EQ(task.task_id(), status.get().task_id()); + EXPECT_EQ(TASK_RUNNING, status.get().state()); + + Future<hashset<ContainerID>> containers = containerizer.get()->containers(); + AWAIT_READY(containers); + ASSERT_EQ(1u, containers.get().size()); + + ContainerID containerId = *(containers.get().begin()); + + Duration waited = Duration::zero(); + do { + Future<ResourceStatistics> usage = containerizer.get()->usage(containerId); + AWAIT_READY(usage); + + if (usage.get().mem_low_pressure_counter() > 0) { + EXPECT_GE(usage.get().mem_low_pressure_counter(), + usage.get().mem_medium_pressure_counter()); + EXPECT_GE(usage.get().mem_medium_pressure_counter(), + usage.get().mem_critical_pressure_counter()); + break; + } + + os::sleep(Milliseconds(100)); + waited += Milliseconds(100); + } while (waited < Seconds(5)); + + EXPECT_LE(waited, Seconds(5)); + + driver.stop(); + driver.join(); + + Shutdown(); + delete containerizer.get(); +} + + +// Test that memory pressure listening is restarted after recovery. +TEST_F(MemoryPressureMesosTest, CGROUPS_ROOT_SlaveRecovery) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = CreateSlaveFlags(); + + // We only care about memory cgroup for this test. + flags.isolation = "cgroups/mem"; + flags.slave_subsystems = None(); + + Fetcher fetcher; + + Try<MesosContainerizer*> containerizer1 = + MesosContainerizer::create(flags, true, &fetcher); + + ASSERT_SOME(containerizer1); + + Try<PID<Slave>> slave = StartSlave(containerizer1.get(), flags); + ASSERT_SOME(slave); + + MockScheduler sched; + + // Enable checkpointing for the framework. + FrameworkInfo frameworkInfo; + frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO); + frameworkInfo.set_checkpoint(true); + + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(_, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + Offer offer = offers.get()[0]; + + // Run a task that triggers memory pressure event. We request 1G + // disk because we are going to write a 512 MB file repeatedly. + TaskInfo task = createTask( + offer.slave_id(), + Resources::parse("cpus:1;mem:256;disk:1024").get(), + "while true; do dd count=512 bs=1M if=/dev/zero of=./temp; done"); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + driver.launchTasks(offers.get()[0].id(), {task}); + + AWAIT_READY(status); + EXPECT_EQ(task.task_id(), status.get().task_id()); + EXPECT_EQ(TASK_RUNNING, status.get().state()); + + // We restart the slave to let it recover. + Stop(slave.get()); + delete containerizer1.get(); + + Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover); + + Future<SlaveReregisteredMessage> slaveReregisteredMessage = + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + // Use the same flags. + Try<MesosContainerizer*> containerizer2 = + MesosContainerizer::create(flags, true, &fetcher); + + ASSERT_SOME(containerizer2); + + slave = StartSlave(containerizer2.get(), flags); + ASSERT_SOME(slave); + + Clock::pause(); + + AWAIT_READY(_recover); + + // Wait for slave to schedule reregister timeout. + Clock::settle(); + + // Ensure the slave considers itself recovered. + Clock::advance(slave::EXECUTOR_REREGISTER_TIMEOUT); + + Clock::resume(); + + // Wait for the slave to re-register. + AWAIT_READY(slaveReregisteredMessage); + + Future<hashset<ContainerID>> containers = containerizer2.get()->containers(); + AWAIT_READY(containers); + ASSERT_EQ(1u, containers.get().size()); + + ContainerID containerId = *(containers.get().begin()); + + Duration waited = Duration::zero(); + do { + Future<ResourceStatistics> usage = containerizer2.get()->usage(containerId); + AWAIT_READY(usage); + + if (usage.get().mem_low_pressure_counter() > 0) { + EXPECT_GE(usage.get().mem_low_pressure_counter(), + usage.get().mem_medium_pressure_counter()); + EXPECT_GE(usage.get().mem_medium_pressure_counter(), + usage.get().mem_critical_pressure_counter()); + break; + } + + os::sleep(Milliseconds(100)); + waited += Milliseconds(100); + } while (waited < Seconds(5)); + + EXPECT_LE(waited, Seconds(5)); + + driver.stop(); + driver.join(); + + Shutdown(); + delete containerizer2.get(); +} + +} // namespace tests { +} // namespace internal { +} // namespace mesos {
