Added cgroups memory pressure counter tests. Review: https://reviews.apache.org/r/31276
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dfb2794d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dfb2794d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dfb2794d Branch: refs/heads/master Commit: dfb2794d8b0fb5d41c725d3944395606d24fbf26 Parents: cc5826d Author: Chi Zhang <[email protected]> Authored: Fri Mar 27 10:20:46 2015 -0700 Committer: Jie Yu <[email protected]> Committed: Fri Mar 27 12:56:24 2015 -0700 ---------------------------------------------------------------------- src/Makefile.am | 11 + src/tests/cgroups_tests.cpp | 197 +++++++++++++++++- src/tests/memory_test_helper.cpp | 320 +++++++++++++++++++++++++++++ src/tests/memory_test_helper.hpp | 89 ++++++++ src/tests/memory_test_helper_main.cpp | 32 +++ 5 files changed, 647 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index 7a06c70..84a62d4 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -562,6 +562,7 @@ libmesos_no_3rdparty_la_SOURCES += \ tests/isolator.hpp \ tests/launcher.hpp \ tests/limiter.hpp \ + tests/memory_test_helper.hpp \ tests/mesos.hpp \ tests/module.hpp \ tests/script.hpp \ @@ -1277,8 +1278,17 @@ if OS_LINUX tests/setns_test_helper.cpp setns_test_helper_CPPFLAGS = $(MESOS_CPPFLAGS) setns_test_helper_LDADD = libmesos.la $(LDADD) + endif +check_PROGRAMS += memory-test-helper +memory_test_helper_SOURCES = \ + tests/flags.cpp \ + tests/memory_test_helper_main.cpp \ + tests/memory_test_helper.cpp +memory_test_helper_CPPFLAGS = $(mesos_tests_CPPFLAGS) +memory_test_helper_LDADD = libmesos.la $(LDADD) + check_PROGRAMS += active-user-test-helper active_user_test_helper_SOURCES = tests/active_user_test_helper.cpp active_user_test_helper_CPPFLAGS = $(MESOS_CPPFLAGS) @@ -1363,6 +1373,7 @@ mesos_tests_SOURCES = \ tests/master_slave_reconciliation_tests.cpp \ tests/master_tests.cpp \ tests/master_validation_tests.cpp \ + tests/memory_test_helper.cpp \ tests/mesos.cpp \ tests/metrics_tests.cpp \ tests/module.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/tests/cgroups_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/cgroups_tests.cpp b/src/tests/cgroups_tests.cpp index 75c61aa..8483738 100644 --- a/src/tests/cgroups_tests.cpp +++ b/src/tests/cgroups_tests.cpp @@ -21,7 +21,6 @@ #include <signal.h> #include <stdint.h> #include <stdio.h> -#include <stdlib.h> #include <string.h> #include <unistd.h> @@ -37,6 +36,7 @@ #include <gmock/gmock.h> #include <process/gtest.hpp> +#include <process/owned.hpp> #include <stout/gtest.hpp> #include <stout/hashmap.hpp> @@ -51,10 +51,15 @@ #include "linux/cgroups.hpp" #include "linux/perf.hpp" +#include "tests/memory_test_helper.hpp" #include "tests/mesos.hpp" // For TEST_CGROUPS_(HIERARCHY|ROOT). +#include "tests/utils.hpp" using namespace process; +using cgroups::memory::pressure::Level; +using cgroups::memory::pressure::Counter; + using std::set; namespace mesos { @@ -62,7 +67,7 @@ namespace internal { namespace tests { -class CgroupsTest : public ::testing::Test +class CgroupsTest : public TemporaryDirectoryTest { public: static void SetUpTestCase() @@ -117,6 +122,8 @@ public: protected: virtual void SetUp() { + CgroupsTest::SetUp(); + foreach (const std::string& subsystem, strings::tokenize(subsystems, ",")) { // Establish the base hierarchy if this is the first subsystem checked. if (baseHierarchy.empty()) { @@ -182,6 +189,8 @@ protected: } } } + + CgroupsTest::TearDown(); } const std::string subsystems; // Subsystems required to run tests. @@ -1033,6 +1042,190 @@ TEST_F(CgroupsAnyHierarchyWithPerfEventTest, ROOT_CGROUPS_Perf) AWAIT_READY(destroy); } + +class CgroupsAnyHierarchyMemoryPressureTest + : public CgroupsAnyHierarchyTest +{ +public: + CgroupsAnyHierarchyMemoryPressureTest() + : CgroupsAnyHierarchyTest("memory"), + cgroup(TEST_CGROUPS_ROOT) {} + +protected: + virtual void SetUp() + { + CgroupsAnyHierarchyTest::SetUp(); + + hierarchy = path::join(baseHierarchy, "memory"); + + ASSERT_SOME(cgroups::create(hierarchy, cgroup)); + } + + void listen() + { + const std::vector<Level> levels = { + Level::LOW, + Level::MEDIUM, + Level::CRITICAL + }; + + foreach (Level level, levels) { + Try<Owned<Counter>> counter = Counter::create(hierarchy, cgroup, level); + EXPECT_SOME(counter); + + counters[level] = counter.get(); + } + } + + std::string hierarchy; + const std::string cgroup; + + hashmap<Level, Owned<Counter>> counters; +}; + + +TEST_F(CgroupsAnyHierarchyMemoryPressureTest, ROOT_IncreaseUnlockedRSS) +{ + MemoryTestHelper helper; + ASSERT_SOME(helper.spawn()); + ASSERT_SOME(helper.pid()); + + const Bytes limit = Megabytes(16); + + // Move the memory test helper into a cgroup and set the limit. + EXPECT_SOME(cgroups::memory::limit_in_bytes(hierarchy, cgroup, limit)); + EXPECT_SOME(cgroups::assign(hierarchy, cgroup, helper.pid().get())); + + listen(); + + // Used to save the counter readings from last iteration. + uint64_t previousLow; + uint64_t previousMedium; + uint64_t previousCritical; + + // Used to save the counter readings from this iteration. + uint64_t low; + uint64_t medium; + uint64_t critical; + + // Use a guard to error out if it's been too long. + // TODO(chzhcn): Use a better way to set testing time limit. + uint64_t iterationLimit = limit.bytes() / getpagesize() * 10; + + for (uint64_t i = 0; i < iterationLimit; i++) { + EXPECT_SOME(helper.increaseRSS(getpagesize())); + + Future<uint64_t> _low = counters[Level::LOW]->value(); + Future<uint64_t> _medium = counters[Level::MEDIUM]->value(); + Future<uint64_t> _critical = counters[Level::CRITICAL]->value(); + + AWAIT_READY(_low); + AWAIT_READY(_medium); + AWAIT_READY(_critical); + + low = _low.get(); + medium = _medium.get(); + critical = _critical.get(); + + // We need to know the readings are the same as last time to be + // sure they are stable, because the reading is not atomic. For + // example, the medium could turn positive after we read low to be + // 0, but this should be fixed by the next read immediately. + if ((low == previousLow && + medium == previousMedium && + critical == previousCritical)) { + if (low != 0) { + EXPECT_LE(medium, low); + EXPECT_LE(critical, medium); + + // When child's RSS is full, it will be OOM-kill'ed if we + // don't stop it right away. + break; + } else { + EXPECT_EQ(0u, medium); + EXPECT_EQ(0u, critical); + } + } + + previousLow = low; + previousMedium = medium; + previousCritical = critical; + } +} + + +TEST_F(CgroupsAnyHierarchyMemoryPressureTest, ROOT_IncreasePageCache) +{ + MemoryTestHelper helper; + ASSERT_SOME(helper.spawn()); + ASSERT_SOME(helper.pid()); + + const Bytes limit = Megabytes(16); + + // Move the memory test helper into a cgroup and set the limit. + EXPECT_SOME(cgroups::memory::limit_in_bytes(hierarchy, cgroup, limit)); + EXPECT_SOME(cgroups::assign(hierarchy, cgroup, helper.pid().get())); + + listen(); + + // Used to save the counter readings from last iteration. + uint64_t previousLow; + uint64_t previousMedium; + uint64_t previousCritical; + + // Used to save the counter readings from this iteration. + uint64_t low; + uint64_t medium; + uint64_t critical; + + // Use a guard to error out if it's been too long. + // TODO(chzhcn): Use a better way to set testing time limit. + uint64_t iterationLimit = limit.bytes() / Megabytes(1).bytes() * 2; + + for (uint64_t i = 0; i < iterationLimit; i++) { + EXPECT_SOME(helper.increasePageCache(Megabytes(1))); + + Future<uint64_t> _low = counters[Level::LOW]->value(); + Future<uint64_t> _medium = counters[Level::MEDIUM]->value(); + Future<uint64_t> _critical = counters[Level::CRITICAL]->value(); + + AWAIT_READY(_low); + AWAIT_READY(_medium); + AWAIT_READY(_critical); + + low = _low.get(); + medium = _medium.get(); + critical = _critical.get(); + + // We need to know the readings are the same as last time to be + // sure they are stable, because the reading is not atomic. For + // example, the medium could turn positive after we read low to be + // 0, but this should be fixed by the next read immediately. + if ((low == previousLow && + medium == previousMedium && + critical == previousCritical)) { + if (low != 0) { + EXPECT_LE(medium, low); + EXPECT_LE(critical, medium); + + // Different from the RSS test, since the child is only + // consuming at a slow rate the page cache, which is evictable + // and reclaimable, we could therefore be in this state + // forever. Our guard will let us out shortly. + } else { + EXPECT_EQ(0u, medium); + EXPECT_EQ(0u, critical); + } + } + + previousLow = low; + previousMedium = medium; + previousCritical = critical; + } + + EXPECT_LT(0u, low); +} + } // namespace tests { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/tests/memory_test_helper.cpp ---------------------------------------------------------------------- diff --git a/src/tests/memory_test_helper.cpp b/src/tests/memory_test_helper.cpp new file mode 100644 index 0000000..cdf769b --- /dev/null +++ b/src/tests/memory_test_helper.cpp @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License.n + */ + +#include <signal.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include <sys/mman.h> +#include <sys/types.h> +#include <sys/wait.h> + +#include <string> +#include <vector> + +#include <stout/bytes.hpp> +#include <stout/error.hpp> +#include <stout/hashmap.hpp> +#include <stout/lambda.hpp> +#include <stout/os.hpp> +#include <stout/stringify.hpp> +#include <stout/strings.hpp> +#include <stout/try.hpp> + +#include "tests/flags.hpp" +#include "tests/memory_test_helper.hpp" + +using process::Subprocess; + +using std::cerr; +using std::cin; +using std::cout; +using std::endl; +using std::flush; +using std::getline; +using std::string; +using std::vector; + +namespace mesos { +namespace internal { +namespace tests { + +// Constants used to sync MemoryTestHelper and its subprocess. + +// Used by the subprocess to inform that it has started. +const char STARTED = 'S'; + +// Used by the subprocess to inform that the work requested is done. +const char DONE = 'D'; + +// Used to signal an increaseRSS request. +const char INCREASE_RSS[] = "INCREASE_RSS"; + +// Used to signal an increasePageCache request. +const char INCREASE_PAGE_CACHE[] = "INCREASE_PAGE_CACHE"; + + +// This helper allocates and locks specified anonymous memory (RSS). +// It uses mlock and memset to make sure allocated memory is mapped. +static Try<void*> allocateRSS(const Bytes& size, bool lock = true) +{ + void* rss = NULL; + + if (posix_memalign(&rss, getpagesize(), size.bytes()) != 0) { + return ErrnoError("Failed to increase RSS memory, posix_memalign"); + } + + // Locking a page makes it unevictable in the kernel. + if (lock && mlock(rss, size.bytes()) != 0) { + return ErrnoError("Failed to lock memory, mlock"); + } + + // Use memset to actually page in the memory in the kernel. + memset(rss, 1, size.bytes()); + + return rss; +} + + +static Try<Nothing> increaseRSS(const vector<string>& tokens) +{ + if (tokens.size() < 2) { + return Error("Expect at least one argument"); + } + + Try<Bytes> size = Bytes::parse(tokens[1]); + if (size.isError()) { + return Error("The first argument '" + tokens[1] + "' is not a byte size"); + } + + Try<void*> memory = allocateRSS(size.get()); + if (memory.isError()) { + return Error("Failed to allocate RSS memory: " + memory.error()); + } + + return Nothing(); +} + + +static Try<Nothing> increasePageCache(const vector<string>& tokens) +{ + const Bytes UNIT = Megabytes(1); + + if (tokens.size() < 2) { + return Error("Expect at least one argument"); + } + + Try<Bytes> size = Bytes::parse(tokens[1]); + if (size.isError()) { + return Error("The first argument '" + tokens[1] + "' is not a byte size"); + } + + // TODO(chzhcn): Currently, we assume the current working directory + // is a temporary directory and will be cleaned up when the test + // finishes. Since the child process will inherit the current + // working directory from the parent process, that means the test + // that uses this helper probably needs to inherit from + // TemporaryDirectoryTest. Consider relaxing this constraint. + Try<string> path = os::mktemp(path::join(os::getcwd(), "XXXXXX")); + if (path.isError()) { + return Error("Failed to create a temporary file: " + path.error()); + } + + Try<int> fd = os::open(path.get(), O_WRONLY); + if (fd.isError()) { + return Error("Failed to open file: " + fd.error()); + } + + // NOTE: We are doing round-down here to calculate the number of + // writes to do. + for (uint64_t i = 0; i < size.get().bytes() / UNIT.bytes(); i++) { + // Write UNIT size to disk at a time. The content isn't important. + Try<Nothing> write = os::write(fd.get(), string(UNIT.bytes(), 'a')); + if (write.isError()) { + os::close(fd.get()); + return Error("Failed to write file: " + write.error()); + } + + // Use fsync to make sure data is written to disk. + if (fsync(fd.get()) == -1) { + // Save the error message because os::close below might + // overwrite the errno. + const string message = strerror(errno); + + os::close(fd.get()); + return Error("Failed to fsync: " + message); + } + } + + os::close(fd.get()); + return Nothing(); +} + + +MemoryTestHelper::~MemoryTestHelper() +{ + cleanup(); +} + + +Try<Nothing> MemoryTestHelper::spawn() +{ + if (s.isSome()) { + return Error("A subprocess has been spawned already"); + } + + vector<string> argv; + argv.push_back("memory-test-helper"); + argv.push_back(MemoryTestHelperMain::NAME); + + Try<Subprocess> process = subprocess( + path::join(flags.build_dir, + "src", + "memory-test-helper"), + argv, + Subprocess::PIPE(), + Subprocess::PIPE(), + Subprocess::FD(STDERR_FILENO)); + + if (process.isError()) { + return Error("Failed to spawn a subprocess: " + process.error()); + } + + s = process.get(); + + // Wait for the child to inform it has started before returning. + // Otherwise, the user might set the memory limit too earlier, and + // cause the child oom-killed because 'ld' could use a lot of + // memory. + Result<string> read = os::read(s.get().out().get(), sizeof(STARTED)); + if (!read.isSome() || read.get() != string(sizeof(STARTED), STARTED)) { + cleanup(); + return Error("Failed to sync with the subprocess"); + } + + return Nothing(); +} + + +void MemoryTestHelper::cleanup() +{ + if (s.isSome()) { + // We just want to make sure the subprocess is terminated in case + // it's stuck, but we don't care about its status. Any error + // should have been logged in the subprocess directly. + ::kill(s.get().pid(), SIGKILL); + ::waitpid(s.get().pid(), NULL, 0); + s = None(); + } +} + + +Try<pid_t> MemoryTestHelper::pid() +{ + if (s.isNone()) { + return Error("The subprocess has not been spawned yet"); + } + + return s.get().pid(); +} + + +// Send a request to the subprocess and wait for its signal that the +// work has been done. +Try<Nothing> MemoryTestHelper::requestAndWait(const string& request) +{ + if (s.isNone()) { + return Error("The subprocess has not been spawned yet"); + } + + Try<Nothing> write = os::write(s.get().in().get(), request + "\n"); + if (write.isError()) { + cleanup(); + return Error("Fail to sync with the subprocess: " + write.error()); + } + + Result<string> read = os::read(s.get().out().get(), sizeof(DONE)); + if (!read.isSome() || read.get() != string(sizeof(DONE), DONE)) { + cleanup(); + return Error("Failed to sync with the subprocess"); + } + + return Nothing(); +} + + +Try<Nothing> MemoryTestHelper::increaseRSS(const Bytes& size) +{ + return requestAndWait(string(INCREASE_RSS) + " " + stringify(size)); +} + + +Try<Nothing> MemoryTestHelper::increasePageCache(const Bytes& size) +{ + return requestAndWait(string(INCREASE_PAGE_CACHE) + " " + stringify(size)); +} + + +const char MemoryTestHelperMain::NAME[] = "MemoryTestHelperMain"; + + +int MemoryTestHelperMain::execute() +{ + hashmap<string, Try<Nothing>(*)(const vector<string>&)> commands; + commands[INCREASE_RSS] = &increaseRSS; + commands[INCREASE_PAGE_CACHE] = &increasePageCache; + + // Tell the parent that child has started. + cout << STARTED << flush; + + string line; + while(cin.good()) { + getline(cin, line); + vector<string> tokens = strings::tokenize(line, " "); + + if (tokens.empty()) { + cerr << "No command from the parent" << endl; + return 1; + } + + if (!commands.contains(tokens[0])) { + cerr << "Unknown command from the parent '" << tokens[0] << "'" << endl; + return 1; + } + + Try<Nothing> result = commands[tokens[0]](tokens); + if (result.isError()) { + cerr << result.error(); + return 1; + } + + cout << DONE << flush; + } + + if (!cin) { + cerr << "Failed to sync with the parent" << endl; + return 1; + } + + return 0; +} + +} // namespace tests { +} // namespace internal { +} // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/tests/memory_test_helper.hpp ---------------------------------------------------------------------- diff --git a/src/tests/memory_test_helper.hpp b/src/tests/memory_test_helper.hpp new file mode 100644 index 0000000..11712d7 --- /dev/null +++ b/src/tests/memory_test_helper.hpp @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __MEMORY_TEST_HELPER_HPP__ +#define __MEMORY_TEST_HELPER_HPP__ + +#include <process/subprocess.hpp> + +#include <stout/bytes.hpp> +#include <stout/option.hpp> +#include <stout/subcommand.hpp> +#include <stout/try.hpp> + +namespace mesos { +namespace internal { +namespace tests { + +// The abstraction for controlling the memory usage of a subprocess. +// TODO(chzhcn): Currently, this helper is only supposed to be used by +// one thread. Consider making it thread safe. +class MemoryTestHelper +{ +public: + MemoryTestHelper() {}; + ~MemoryTestHelper(); + + // Spawns a subprocess. + // TODO(chzhcn): Consider returning a future instead of blocking. + Try<Nothing> spawn(); + + // Kill and reap the subprocess if exists. + // TODO(chzhcn): Consider returning a future instead of blocking. + void cleanup(); + + // Returns the pid of the subprocess. + Try<pid_t> pid(); + + // Allocate and lock specified page-aligned anonymous memory (RSS) + // in the subprocess. It uses mlock and memset to make sure + // allocated memory is mapped. + // TODO(chzhcn): Consider returning a future instead of blocking. + Try<Nothing> increaseRSS(const Bytes& size); + + // This function attempts to generate requested size of page cache + // in the subprocess by using a small buffer and writing it to disk + // multiple times. + // TODO(chzhcn): Consider returning a future instead of blocking. + Try<Nothing> increasePageCache(const Bytes& size = Megabytes(1)); + +private: + Try<Nothing> requestAndWait(const std::string& request); + + Option<process::Subprocess> s; +}; + + +// The actual subprocess behind MemoryTestHelper. It runs in a loop +// and executes commands passed from stdin. +class MemoryTestHelperMain : public Subcommand +{ +public: + static const char NAME[]; + + MemoryTestHelperMain() : Subcommand(NAME) {}; + +protected: + virtual int execute(); +}; + +} // namespace tests { +} // namespace internal { +} // namespace mesos { + +#endif // __MEMORY_TEST_HELPER_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/dfb2794d/src/tests/memory_test_helper_main.cpp ---------------------------------------------------------------------- diff --git a/src/tests/memory_test_helper_main.cpp b/src/tests/memory_test_helper_main.cpp new file mode 100644 index 0000000..362535f --- /dev/null +++ b/src/tests/memory_test_helper_main.cpp @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stout/subcommand.hpp> + +#include "tests/memory_test_helper.hpp" + +using mesos::internal::tests::MemoryTestHelperMain; + +int main(int argc, char** argv) +{ + return Subcommand::dispatch( + None(), + argc, + argv, + new MemoryTestHelperMain()); +}
