This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d187ce7f [cgroups2] Introduce an OomListener.
9d187ce7f is described below

commit 9d187ce7f2fa7f3aa7416d541e3741aeb03951ba
Author: Jason Zhou <[email protected]>
AuthorDate: Thu Aug 22 16:02:11 2024 -0400

    [cgroups2] Introduce an OomListener.
    
    We add an OomListener process to allow users to listen for oom events
    in a cgroup or any of its descendants.
    
    If the OomListener is terminated, any remaining unsatisfied futures will
    be failed.
    
    If the listened cgroup or any of its descendants encounters an oom
    event, then the returned future from listen() will become ready, and
    action can be taken upon the oom event via future onReady handlers.
    
    The caller can also discard a returned future to stop listening for
    events.
    
    Review: https://reviews.apache.org/r/75184/
---
 src/linux/cgroups2.cpp                     | 190 ++++++++++++++++++++++++++---
 src/linux/cgroups2.hpp                     |  40 +++++-
 src/tests/containerizer/cgroups2_tests.cpp |  68 ++++++++++-
 3 files changed, 273 insertions(+), 25 deletions(-)

diff --git a/src/linux/cgroups2.cpp b/src/linux/cgroups2.cpp
index d1c415db8..3c7382286 100644
--- a/src/linux/cgroups2.cpp
+++ b/src/linux/cgroups2.cpp
@@ -28,6 +28,8 @@
 #include <process/after.hpp>
 #include <process/loop.hpp>
 #include <process/pid.hpp>
+#include <process/io.hpp>
+#include <process/owned.hpp>
 
 #include <stout/adaptor.hpp>
 #include <stout/linkedhashmap.hpp>
@@ -45,6 +47,7 @@
 using std::ostream;
 using std::set;
 using std::string;
+using std::unique_ptr;
 using std::vector;
 
 using process::Break;
@@ -53,6 +56,9 @@ using process::ControlFlow;
 using process::Failure;
 using process::Future;
 using process::loop;
+using process::io::Watcher;
+using process::Owned;
+using process::Promise;
 
 using mesos::internal::fs::MountTable;
 
@@ -953,29 +959,175 @@ Try<Events> parse(const string& content)
 
 } // namespace events {
 
-Future<Nothing> oom(const string& cgroup)
+
+class OomListenerProcess : public process::Process<OomListenerProcess>
 {
-  // TODO(dleamy): Update this to use inotify, rather than polling.
-  return loop(
-      []() {
-        return process::after(Milliseconds(100));
-      },
-      [=](const Nothing&) -> Future<ControlFlow<Nothing>> {
-        Try<string> content = cgroups2::read<string>(cgroup, control::EVENTS);
-        if (content.isError()) {
-          return Failure("Failed to read 'memory.events': " + content.error());
-        }
+public:
+  OomListenerProcess(const Watcher& _watcher)
+    : ProcessBase(process::ID::generate("oom-listener")), watcher(_watcher) {}
 
-        Try<Events> events = events::parse(strings::trim(*content));
-        if (events.isError()) {
-          return Failure("Failed to parse 'memory.events': " + events.error());
-        }
+  void initialize() override
+  {
+    event_loop = loop(
+        self(),
+        [this]() {
+          return watcher.events().get();
+        },
+        [this](const Watcher::Event& event) -> Future<ControlFlow<Nothing>> {
+          if (event.type == Watcher::Event::Failure) {
+            // event.path contains error message for Failure events.
+            return Failure("Watcher failed: " + event.path);
+          }
+
+          if (!(event.type == Watcher::Event::Write)) {
+            return Continue();
+          }
 
-        if (events->oom > 0) {
-          return Break(Nothing());
+          read_events(event.path);
+          return Continue();
+        });
+
+    event_loop
+      .onAny(defer(self(), [this](const Future<Nothing>& f) {
+        if (f.isFailed())     fail("Read loop has terminated: " + f.failure());
+        if (f.isDiscarded())  fail("Read loop has terminated: discarded");
+        if (f.isReady())      fail("Read loop has terminated: future is 
ready");
+        if (f.isAbandoned())  fail("Read loop has terminated: abandoned");
+      }));
+  }
+
+  void finalize() override
+  {
+    event_loop.discard();
+
+    // Must explicitly fail all remaining oom futures because we
+    // are already in finalize, so we can't dispatch into the
+    // process in the event_loop's onAny handler.
+    fail("OomListenerProcess is terminating");
+  }
+
+  Future<Nothing> listen(const string& cgroup)
+  {
+    string events_path = path::join(cgroups2::path(cgroup), control::EVENTS);
+    if (ooms.contains(events_path)) {
+      return Failure("Already listening");
+    }
+
+    Try<Nothing> add = watcher.add(events_path);
+    if (add.isError()) {
+      return Failure("Failed to add file to watcher: " + add.error());
+    }
+
+    Promise<Nothing> promise;
+    Future<Nothing> future = promise.future();
+
+    ooms.emplace(events_path, std::move(promise));
+
+    future
+      .onDiscard(defer(self(), [this, events_path]() {
+        auto it = ooms.find(events_path);
+        if (it == ooms.end()) {
+          return; // Already removed.
         }
-        return Continue();
-      });
+
+        Promise<Nothing> promise = std::move(it->second);
+        ooms.erase(events_path);
+
+        // Ignoring remove failures since caller doesn't care about the file
+        // anyway now.
+        watcher.remove(events_path);
+        promise.discard();
+      }));
+
+    // Read the events file after adding to watcher in case an oom event
+    // occurred before the add was complete.
+    read_events(events_path);
+
+    return future;
+  }
+
+  void read_events(const string& path)
+  {
+    auto it = ooms.find(path);
+    if (it == ooms.end()) {
+      return;
+    }
+
+    Try<string> content = os::read(path);
+    if (content.isError()) {
+      it->second.fail("Failed to read 'memory.events': " + content.error());
+      ooms.erase(it);
+      return;
+    }
+
+    Try<Events> events = events::parse(strings::trim(*content));
+    if (events.isError()) {
+      it->second.fail("Failed to parse 'memory.events': " + events.error());
+      ooms.erase(it);
+      return;
+    }
+
+    if (events->oom > 0) {
+      it->second.set(Nothing());
+      ooms.erase(it);
+      return;
+    }
+  }
+
+  void fail(const string& reason)
+  {
+    foreachvalue (Promise<Nothing>& promise, ooms) {
+      promise.fail(reason);
+    }
+    ooms.clear();
+  }
+
+private:
+  // A map of cgroup memory.event file names to their respective futures.
+  hashmap<string, Promise<Nothing>> ooms;
+
+  Future<Nothing> event_loop;
+
+  Watcher watcher;
+};
+
+
+OomListener::OomListener(OomListener&&) = default;
+
+
+OomListener& OomListener::operator=(OomListener&&) = default;
+
+
+Try<OomListener> OomListener::create()
+{
+  Try<Watcher> watcher = process::io::create_watcher();
+  if (watcher.isError()) {
+    return Error("Failed to create watcher: " + watcher.error());
+  }
+  return OomListener(
+      unique_ptr<OomListenerProcess>(new OomListenerProcess(*watcher)));
+}
+
+
+OomListener::OomListener(unique_ptr<OomListenerProcess>&& _process)
+  : process(std::move(_process))
+{
+  spawn(*process);
+};
+
+
+OomListener::~OomListener()
+{
+  if (process) {
+    terminate(*process);
+    process::wait(*process);
+  }
+}
+
+
+Future<Nothing> OomListener::listen(const string& cgroup)
+{
+  return dispatch(*process, &OomListenerProcess::listen, cgroup);
 }
 
 
diff --git a/src/linux/cgroups2.hpp b/src/linux/cgroups2.hpp
index e13e57076..16521637c 100644
--- a/src/linux/cgroups2.hpp
+++ b/src/linux/cgroups2.hpp
@@ -17,11 +17,13 @@
 #ifndef __CGROUPS_V2_HPP__
 #define __CGROUPS_V2_HPP__
 
+#include <memory>
 #include <set>
 #include <string>
 #include <vector>
 
 #include <process/future.hpp>
+#include <process/owned.hpp>
 
 #include <stout/bytes.hpp>
 #include <stout/duration.hpp>
@@ -339,10 +341,42 @@ struct Events
 };
 
 
-// Listen for an OOM event for the cgroup or any descendants.
+// Forward declaration.
+class OomListenerProcess;
+
+
+// The OomListener provides an interface for the caller to listen for the first
+// oom event in any cgroup by monitoring the future returned by listen().
 //
-// Cannot be used for the root cgroup.
-process::Future<Nothing> oom(const std::string& cgroup);
+// TODO(jasonzhou): provide functionality to minitor other memory events such
+// as low, high, max, and oom_kill.
+class OomListener
+{
+public:
+  OomListener(OomListener&&);
+  OomListener& operator=(OomListener&&);
+
+  static Try<OomListener> create();
+
+  virtual ~OomListener();
+
+  // Listen for an OOM event for the cgroup or any descendants.
+  //
+  // The returned future will become ready if an oom occurs at the
+  // target cgroup or its descendants.
+  //
+  // The future can be discarded if no longer needed, and the cgroup
+  // will no longer be monitored for oom.
+  process::Future<Nothing> listen(const std::string& cgroup);
+
+private:
+  OomListener(std::unique_ptr<OomListenerProcess>&& process);
+
+  OomListener(const OomListener&) = delete; // Not copyable.
+  OomListener& operator=(const OomListener&) = delete; // Not assignable.
+
+  std::unique_ptr<OomListenerProcess> process;
+};
 
 
 // Current memory usage of a cgroup and its descendants in bytes.
diff --git a/src/tests/containerizer/cgroups2_tests.cpp 
b/src/tests/containerizer/cgroups2_tests.cpp
index fc3899526..b85444826 100644
--- a/src/tests/containerizer/cgroups2_tests.cpp
+++ b/src/tests/containerizer/cgroups2_tests.cpp
@@ -22,6 +22,7 @@
 #include <utility>
 #include <vector>
 
+#include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/reap.hpp>
 #include <process/gmock.hpp>
@@ -41,6 +42,7 @@
 
 #include "tests/containerizer/memory_test_helper.hpp"
 
+using process::Clock;
 using process::Future;
 
 using std::pair;
@@ -51,6 +53,8 @@ using std::tuple;
 using std::unique_ptr;
 using std::vector;
 
+using cgroups2::memory::OomListener;
+
 namespace cpu = cgroups2::cpu;
 namespace devices = cgroups2::devices;
 
@@ -530,15 +534,73 @@ TEST_F(Cgroups2Test, ROOT_CGROUPS2_OomDetection)
   ASSERT_SOME(cgroups2::assign(leaf_cgroup, *helper.pid()));
   ASSERT_SOME(cgroups2::memory::set_max(TEST_CGROUP, limit));
 
-  Future<Nothing> oomEvent = cgroups2::memory::oom(TEST_CGROUP);
+  Try<OomListener> listener = OomListener::create();
+
+  ASSERT_SOME(listener);
+
+  Future<Nothing> oom = listener->listen(TEST_CGROUP);
 
   // Assert that the OOM event has not been triggered.
-  EXPECT_FALSE(oomEvent.isReady());
+  EXPECT_FALSE(oom.isReady());
 
   // Increase memory usage beyond the limit.
   ASSERT_ERROR(helper.increaseRSS(limit * 2));
 
-  AWAIT_EXPECT_READY(oomEvent);
+  AWAIT_EXPECT_READY(oom);
+}
+
+
+TEST_F(Cgroups2Test, ROOT_CGROUPS2_OomListenerDestruction)
+{
+  ASSERT_SOME(enable_controllers({"memory"}));
+
+  ASSERT_SOME(cgroups2::create(TEST_CGROUP));
+  ASSERT_SOME(cgroups2::controllers::enable(TEST_CGROUP, {"memory"}));
+
+  const string leaf_cgroup = TEST_CGROUP + "/leaf";
+  ASSERT_SOME(cgroups2::create(leaf_cgroup));
+
+  Future<Nothing> oom;
+  {
+    Try<OomListener> listener = OomListener::create();
+
+    ASSERT_SOME(listener);
+
+    oom = listener->listen(TEST_CGROUP);
+
+    // We want to ensure that the dispatch for OomListenerProcess::listen goes
+    // through.
+    Clock::pause();
+    Clock::settle();
+    Clock::resume();
+  }
+
+  // Assert that the OOM event has not been triggered.
+  EXPECT_FALSE(oom.isReady());
+
+  AWAIT_EXPECT_FAILED(oom);
+}
+
+
+TEST_F(Cgroups2Test, ROOT_CGROUPS2_OomFutureDiscard)
+{
+  ASSERT_SOME(enable_controllers({"memory"}));
+
+  ASSERT_SOME(cgroups2::create(TEST_CGROUP));
+  ASSERT_SOME(cgroups2::controllers::enable(TEST_CGROUP, {"memory"}));
+
+  const string leaf_cgroup = TEST_CGROUP + "/leaf";
+  ASSERT_SOME(cgroups2::create(leaf_cgroup));
+
+  Try<OomListener> listener = OomListener::create();
+
+  ASSERT_SOME(listener);
+
+  Future<Nothing> oom = listener->listen(TEST_CGROUP);
+
+  oom.discard();
+
+  AWAIT_ASSERT_DISCARDED(oom);
 }
 
 

Reply via email to