Repository: mesos Updated Branches: refs/heads/master 04f8302c0 -> dfb2794d8
Added support for listening on cgroups memory pressures. Review: https://reviews.apache.org/r/30545 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cc5826de Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cc5826de Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cc5826de Branch: refs/heads/master Commit: cc5826defbebb46bc66e8bbc8900d66b8f27893c Parents: 04f8302 Author: Chi Zhang <[email protected]> Authored: Fri Mar 27 10:04:23 2015 -0700 Committer: Jie Yu <[email protected]> Committed: Fri Mar 27 10:18:44 2015 -0700 ---------------------------------------------------------------------- src/linux/cgroups.cpp | 134 +++++++++++++++++++++++++++++++++++++++++++-- src/linux/cgroups.hpp | 51 +++++++++++++++++ 2 files changed, 181 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/cc5826de/src/linux/cgroups.cpp ---------------------------------------------------------------------- diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp index a533b31..df3211a 100644 --- a/src/linux/cgroups.cpp +++ b/src/linux/cgroups.cpp @@ -55,6 +55,7 @@ #include <stout/proc.hpp> #include <stout/stringify.hpp> #include <stout/strings.hpp> +#include <stout/unreachable.hpp> #include "linux/cgroups.hpp" #include "linux/fs.hpp" @@ -71,6 +72,7 @@ using std::istringstream; using std::list; using std::map; using std::ofstream; +using std::ostream; using std::ostringstream; using std::set; using std::string; @@ -2149,11 +2151,8 @@ Try<Bytes> max_usage_in_bytes(const string& hierarchy, const string& cgroup) namespace oom { -namespace { +static Nothing _nothing() { return Nothing(); } -Nothing _nothing() { return Nothing(); } - -} // namespace { Future<Nothing> listen(const string& hierarchy, const string& cgroup) { @@ -2239,6 +2238,133 @@ Try<Nothing> disable(const string& hierarchy, const string& cgroup) } // namespace oom { + +namespace pressure { + +ostream& operator << (ostream& stream, Level level) +{ + switch (level) { + case LOW: + stream << "low"; + break; + case MEDIUM: + stream << "medium"; + break; + case CRITICAL: + stream << "critical"; + break; + default: + UNREACHABLE(); + } + + return stream; +} + + +// The process drives the event::Listener to keep listening on cgroups +// memory pressure counters. +class CounterProcess : public Process<CounterProcess> +{ +public: + CounterProcess(const string& hierarchy, + const string& cgroup, + Level level) + : value_(0), + error(None()), + process(new event::Listener( + hierarchy, + cgroup, + "memory.pressure_level", + stringify(level))) {} + + virtual ~CounterProcess() {} + + Future<uint64_t> value() + { + if (error.isSome()) { + return Failure(error.get()); + } + + return value_; + } + +protected: + virtual void initialize() + { + spawn(CHECK_NOTNULL(process.get())); + listen(); + } + + virtual void finalize() + { + terminate(process.get()); + wait(process.get()); + } + +private: + void listen() + { + dispatch(process.get(), &event::Listener::listen) + .onAny(defer(self(), &CounterProcess::_listen, lambda::_1)); + } + + void _listen(const process::Future<uint64_t>& future) + { + CHECK(error.isNone()); + + if (future.isReady()) { + value_ += future.get(); + listen(); + } else if (future.isFailed()) { + error = Error(future.failure()); + } else if (future.isDiscarded()) { + error = Error("Listening stopped unexpectedly"); + } + } + + uint64_t value_; + Option<Error> error; + process::Owned<event::Listener> process; +}; + + +Try<Owned<Counter>> Counter::create( + const string& hierarchy, + const string& cgroup, + Level level) +{ + Option<Error> error = verify(hierarchy, cgroup); + if (error.isSome()) { + return Error(error.get()); + } + + return Owned<Counter>(new Counter(hierarchy, cgroup, level)); +} + + +Counter::Counter(const string& hierarchy, + const string& cgroup, + Level level) + : process(new CounterProcess(hierarchy, cgroup, level)) +{ + spawn(CHECK_NOTNULL(process.get())); +} + + +Counter::~Counter() +{ + terminate(process.get(), true); + wait(process.get()); +} + + +Future<uint64_t> Counter::value() const +{ + return dispatch(process.get(), &CounterProcess::value); +} + +} // namespace pressure { + } // namespace memory { http://git-wip-us.apache.org/repos/asf/mesos/blob/cc5826de/src/linux/cgroups.hpp ---------------------------------------------------------------------- diff --git a/src/linux/cgroups.hpp b/src/linux/cgroups.hpp index e07772f..f3a6c50 100644 --- a/src/linux/cgroups.hpp +++ b/src/linux/cgroups.hpp @@ -528,6 +528,57 @@ Try<Nothing> disable( } // namespace oom { + +// Memory pressure counters. +namespace pressure { + +enum Level { + LOW, + MEDIUM, + CRITICAL +}; + + +std::ostream& operator << (std::ostream& stream, Level level); + + +// Forward declaration. +class CounterProcess; + + +// Counter is a primitive to listen on events of a given memory +// pressure level for a cgroup and keep track of the number of +// occurrence of that event. Use the public 'create' function to +// create a new counter; see 'value' for how to use. +class Counter +{ +public: + // Create a memory pressure counter for the given cgroup on the + // specified level. + static Try<process::Owned<Counter>> create( + const std::string& hierarchy, + const std::string& cgroup, + Level level); + + virtual ~Counter(); + + // Returns the current accumulated number of occurrences of the + // pressure event. Returns a failure if any error occurs while + // monitoring the pressure events, and any subsequent calls to + // 'value' will return the same failure. In such case, the user + // should consider creating a new Counter. + process::Future<uint64_t> value() const; + +private: + Counter(const std::string& hierarchy, + const std::string& cgroup, + Level level); + + process::Owned<CounterProcess> process; +}; + +} // namespace pressure { + } // namespace memory {
