Repository: mesos
Updated Branches:
  refs/heads/master 04f8302c0 -> dfb2794d8


Added support for listening on cgroups memory pressures.

Review: https://reviews.apache.org/r/30545


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cc5826de
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cc5826de
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cc5826de

Branch: refs/heads/master
Commit: cc5826defbebb46bc66e8bbc8900d66b8f27893c
Parents: 04f8302
Author: Chi Zhang <[email protected]>
Authored: Fri Mar 27 10:04:23 2015 -0700
Committer: Jie Yu <[email protected]>
Committed: Fri Mar 27 10:18:44 2015 -0700

----------------------------------------------------------------------
 src/linux/cgroups.cpp | 134 +++++++++++++++++++++++++++++++++++++++++++--
 src/linux/cgroups.hpp |  51 +++++++++++++++++
 2 files changed, 181 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cc5826de/src/linux/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp
index a533b31..df3211a 100644
--- a/src/linux/cgroups.cpp
+++ b/src/linux/cgroups.cpp
@@ -55,6 +55,7 @@
 #include <stout/proc.hpp>
 #include <stout/stringify.hpp>
 #include <stout/strings.hpp>
+#include <stout/unreachable.hpp>
 
 #include "linux/cgroups.hpp"
 #include "linux/fs.hpp"
@@ -71,6 +72,7 @@ using std::istringstream;
 using std::list;
 using std::map;
 using std::ofstream;
+using std::ostream;
 using std::ostringstream;
 using std::set;
 using std::string;
@@ -2149,11 +2151,8 @@ Try<Bytes> max_usage_in_bytes(const string& hierarchy, 
const string& cgroup)
 
 namespace oom {
 
-namespace {
+static Nothing _nothing() { return Nothing(); }
 
-Nothing _nothing() { return Nothing(); }
-
-} // namespace {
 
 Future<Nothing> listen(const string& hierarchy, const string& cgroup)
 {
@@ -2239,6 +2238,133 @@ Try<Nothing> disable(const string& hierarchy, const 
string& cgroup)
 
 } // namespace oom {
 
+
+namespace pressure {
+
+ostream& operator << (ostream& stream, Level level)
+{
+  switch (level) {
+    case LOW:
+      stream << "low";
+      break;
+    case MEDIUM:
+      stream << "medium";
+      break;
+    case CRITICAL:
+      stream << "critical";
+      break;
+    default:
+      UNREACHABLE();
+  }
+
+  return stream;
+}
+
+
+// The process drives the event::Listener to keep listening on cgroups
+// memory pressure counters.
+class CounterProcess : public Process<CounterProcess>
+{
+public:
+  CounterProcess(const string& hierarchy,
+                 const string& cgroup,
+                 Level level)
+    : value_(0),
+      error(None()),
+      process(new event::Listener(
+          hierarchy,
+          cgroup,
+          "memory.pressure_level",
+          stringify(level))) {}
+
+  virtual ~CounterProcess() {}
+
+  Future<uint64_t> value()
+  {
+    if (error.isSome()) {
+      return Failure(error.get());
+    }
+
+    return value_;
+  }
+
+protected:
+  virtual void initialize()
+  {
+    spawn(CHECK_NOTNULL(process.get()));
+    listen();
+  }
+
+  virtual void finalize()
+  {
+    terminate(process.get());
+    wait(process.get());
+  }
+
+private:
+  void listen()
+  {
+    dispatch(process.get(), &event::Listener::listen)
+      .onAny(defer(self(), &CounterProcess::_listen, lambda::_1));
+  }
+
+  void _listen(const process::Future<uint64_t>& future)
+  {
+    CHECK(error.isNone());
+
+    if (future.isReady()) {
+      value_ += future.get();
+      listen();
+    } else if (future.isFailed()) {
+      error = Error(future.failure());
+    } else if (future.isDiscarded()) {
+      error = Error("Listening stopped unexpectedly");
+    }
+  }
+
+  uint64_t value_;
+  Option<Error> error;
+  process::Owned<event::Listener> process;
+};
+
+
+Try<Owned<Counter>> Counter::create(
+    const string& hierarchy,
+    const string& cgroup,
+    Level level)
+{
+  Option<Error> error = verify(hierarchy, cgroup);
+  if (error.isSome()) {
+    return Error(error.get());
+  }
+
+  return Owned<Counter>(new Counter(hierarchy, cgroup, level));
+}
+
+
+Counter::Counter(const string& hierarchy,
+                 const string& cgroup,
+                 Level level)
+  : process(new CounterProcess(hierarchy, cgroup, level))
+{
+  spawn(CHECK_NOTNULL(process.get()));
+}
+
+
+Counter::~Counter()
+{
+  terminate(process.get(), true);
+  wait(process.get());
+}
+
+
+Future<uint64_t> Counter::value() const
+{
+  return dispatch(process.get(), &CounterProcess::value);
+}
+
+} // namespace pressure {
+
 } // namespace memory {
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc5826de/src/linux/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.hpp b/src/linux/cgroups.hpp
index e07772f..f3a6c50 100644
--- a/src/linux/cgroups.hpp
+++ b/src/linux/cgroups.hpp
@@ -528,6 +528,57 @@ Try<Nothing> disable(
 
 } // namespace oom {
 
+
+// Memory pressure counters.
+namespace pressure {
+
+enum Level {
+  LOW,
+  MEDIUM,
+  CRITICAL
+};
+
+
+std::ostream& operator << (std::ostream& stream, Level level);
+
+
+// Forward declaration.
+class CounterProcess;
+
+
+// Counter is a primitive to listen on events of a given memory
+// pressure level for a cgroup and keep track of the number of
+// occurrence of that event. Use the public 'create' function to
+// create a new counter; see 'value' for how to use.
+class Counter
+{
+public:
+  // Create a memory pressure counter for the given cgroup on the
+  // specified level.
+  static Try<process::Owned<Counter>> create(
+      const std::string& hierarchy,
+      const std::string& cgroup,
+      Level level);
+
+  virtual ~Counter();
+
+  // Returns the current accumulated number of occurrences of the
+  // pressure event. Returns a failure if any error occurs while
+  // monitoring the pressure events, and any subsequent calls to
+  // 'value' will return the same failure. In such case, the user
+  // should consider creating a new Counter.
+  process::Future<uint64_t> value() const;
+
+private:
+  Counter(const std::string& hierarchy,
+          const std::string& cgroup,
+          Level level);
+
+  process::Owned<CounterProcess> process;
+};
+
+} // namespace pressure {
+
 } // namespace memory {
 
 

Reply via email to