This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 23703fbb3 [libprocess] Add io::Watcher for fs notifications.
23703fbb3 is described below

commit 23703fbb3d9deac3d50a74b67afac2b002d8fd37
Author: Jason Zhou <[email protected]>
AuthorDate: Tue Aug 20 18:02:22 2024 -0400

    [libprocess] Add io::Watcher for fs notifications.
    
    Adds basic watcher class for filesystem watch notifications. We
    currently only support Linux with inotify.
    
    We currently support inotify events for writing, deleting, and renaming
    a file. We do not support watching directories.
    
    Review: https://reviews.apache.org/r/75182/
---
 3rdparty/libprocess/include/process/io.hpp | 120 ++++++++++++++++
 3rdparty/libprocess/src/posix/io.cpp       | 213 +++++++++++++++++++++++++++++
 3rdparty/libprocess/src/tests/io_tests.cpp | 198 +++++++++++++++++++++++++++
 3 files changed, 531 insertions(+)

diff --git a/3rdparty/libprocess/include/process/io.hpp 
b/3rdparty/libprocess/include/process/io.hpp
index 00519d4fc..5a13f29ec 100644
--- a/3rdparty/libprocess/include/process/io.hpp
+++ b/3rdparty/libprocess/include/process/io.hpp
@@ -17,6 +17,7 @@
 #include <string>
 
 #include <process/future.hpp>
+#include <process/queue.hpp>
 
 #include <stout/nothing.hpp>
 #ifdef __WINDOWS__
@@ -166,6 +167,125 @@ Future<Nothing> redirect(
     size_t chunk = 4096,
     const std::vector<lambda::function<void(const std::string&)>>& hooks = {});
 
+
+// Forward declarations.
+class Watcher;
+namespace testing {
+Future<Nothing> watcher_read_loop(Watcher w);
+} // namespace testing {
+
+
+// This provides a high level interface for cross-platform filesystem watch
+// notifications. Currently, only Linux is supported via inotify, but macOS
+// BSD, and Windows implementations can be added.
+//
+// On Linux, inotify provides a vast set of features and comes with a vast
+// amount of subtleties to deal with and providing a cross-platform filesystem
+// watcher while exposing all these subtleties is quite challenging. Therefore,
+// our initial implementation only provides basic functionality in order to
+// simplify the life of the user, and to make cross platform implementation
+// viable.
+//
+// TODO(bmahler): Add support for directories.
+class Watcher
+{
+public:
+  struct Event
+  {
+    // Path to the file for the event. In the case of a Failure event type,
+    // this will be a failure message instead.
+    std::string path;
+
+    // TODO(bmahler): Add more events (e.g. access events, close events,
+    // attribute changes).
+    enum {
+      // The read loop encountered a unrecoverable failure, the watcher is
+      // no longer running and the caller must create a new watcher if desired!
+      Failure,
+
+      // File was modified, note that more writes may follow.
+      Write,
+
+      // The path was removed; any watches on it will be removed.
+      // Some "remove" operations may trigger a Rename if the file is
+      // actually moved (for example "remove to trash" is often a rename).
+      Remove,
+
+      // The path was renamed to something else; any watches on it will be
+      // removed.
+      Rename,
+    } type;
+  };
+
+  // Adds the file for event monitoring.
+  //
+  // Returns an error if:
+  //   * we don't have read access to the provided path
+  //   * the path has already been watched (and not implicitly or
+  //     explicitly removed)
+  //   * the path doesn't exist
+  //   * the path is a directory (not currently supported)
+  //
+  // In order for the caller to not miss any updates to the file, you
+  // *must* read the file yourself after calling add(). Otherwise, if
+  // you were to read the file first, updates between reading the file
+  // and add() the file will be missed!
+  Try<Nothing> add(const std::string& path);
+
+  // Removes the file for event monitoring, removing an already removed
+  // file is a no-op and also returns Nothing.
+  Try<Nothing> remove(const std::string& path);
+
+  Queue<Event> events();
+
+private:
+  friend Try<Watcher> create_watcher();
+  friend Future<Nothing> testing::watcher_read_loop(Watcher w);
+
+  Watcher(int inotify_fd);
+
+  // Start the inotify read loop.
+  void run();
+
+  struct Data
+  {
+    Data() = default;
+
+    ~Data();
+
+    // Rather than use a process to serialize access to the queue's
+    // internal data we use a 'std::atomic_flag' which will spin lock.
+    std::atomic_flag lock = ATOMIC_FLAG_INIT;
+
+    // We need a bidirectional mapping between watch descriptors and
+    // the path the watch descriptor maps to.
+    hashmap<int, std::string> wd_to_path;
+    hashmap<std::string, int> path_to_wd;
+
+    process::Future<Nothing> read_loop;
+
+    // Queue is already thread safe and doesn't require locking.
+    Queue<Watcher::Event> events;
+  };
+
+  const int inotify_fd;
+  std::shared_ptr<Data> data;
+};
+
+
+// Creates a watcher that can be used to monitor for fs changes.
+Try<Watcher> create_watcher();
+
+
+namespace testing {
+
+// Exposed to test read loop discard.
+inline Future<Nothing> watcher_read_loop(Watcher w)
+{
+  return w.data->read_loop;
+}
+
+} // namespace testing {
 } // namespace io {
 } // namespace process {
 
diff --git a/3rdparty/libprocess/src/posix/io.cpp 
b/3rdparty/libprocess/src/posix/io.cpp
index 3862e3b9f..7b99b82c6 100644
--- a/3rdparty/libprocess/src/posix/io.cpp
+++ b/3rdparty/libprocess/src/posix/io.cpp
@@ -10,11 +10,18 @@
 // See the License for the specific language governing permissions and
 // limitations under the License
 
+#ifdef __linux__
+#include <memory>
+
+#include <sys/inotify.h>
+#endif
+
 #include <process/future.hpp>
 #include <process/io.hpp>
 #include <process/loop.hpp>
 
 #include <stout/error.hpp>
+#include <stout/hashmap.hpp>
 #include <stout/none.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
@@ -26,6 +33,11 @@
 
 #include "io_internal.hpp"
 
+using std::default_delete;
+using std::shared_ptr;
+using std::string;
+using std::weak_ptr;
+
 namespace process {
 namespace io {
 namespace internal {
@@ -136,5 +148,206 @@ Try<bool> is_async(int_fd fd)
 }
 
 } // namespace internal {
+
+#ifdef __linux__
+
+Watcher::Watcher(int inotify_fd) : inotify_fd(inotify_fd), data(new Data()) {}
+
+
+Watcher::Data::~Data()
+{
+  // When the last reference of data goes away, we discard the read loop
+  // which should ensure that the loop future transitions, at which point
+  // the inotify fd will get closed.
+  read_loop.discard();
+}
+
+
+void Watcher::run()
+{
+  // For now, we only use a small buffer that is sufficient for reading
+  // *at least* 32 events, but the caller may want to customize the buffer
+  // size depending on the expected volume of events.
+  size_t buffer_size = 32 * (sizeof(inotify_event) + NAME_MAX + 1);
+  shared_ptr<char> buffer(new char[buffer_size], default_delete<char[]>());
+
+  // We take a weak pointer here to avoid keeping Data alive forever, when
+  // the caller throws away the last Watcher copy, we want Data to be destroyed
+  // and the loop to be stopped.
+  weak_ptr<Data> weak_data = data;
+  int fd = inotify_fd;
+  data->read_loop = loop(
+      [fd, buffer, buffer_size]() {
+        return io::read(fd, buffer.get(), buffer_size);
+      },
+      [weak_data, buffer](size_t read) -> Future<ControlFlow<Nothing>> {
+        if (read == 0) {
+          return Failure("Unexpected EOF");
+        }
+
+        // If we can't get the shared pointer, Data is destroyed and we
+        // need to stop the loop.
+        shared_ptr<Data> data = weak_data.lock();
+        if (!data) {
+          return Break();
+        }
+
+        size_t offset = 0;
+        for (offset = 0; offset <= read - sizeof(inotify_event);) {
+          inotify_event* event = (inotify_event*) &(buffer.get()[offset]);
+          offset += sizeof(inotify_event) + event->len;
+
+          if (event-> mask & IN_Q_OVERFLOW) {
+            return Failure("inotify event overflow");
+          }
+
+          // For IN_IGNORED generated by inotify_rm_watch, we've already
+          // remove the path from our tracking maps. For other cases of
+          // IN_IGNORED (e.g. IN_DELETE_SELF, IN_UNMOUNT, etc), we remove the
+          // path during those events rather than the subsequent IN_IGNORED.
+          if (event->mask & IN_IGNORED) {
+            continue;
+          }
+
+          Event e;
+
+          if (event->mask & IN_MODIFY) {
+            e.type = Event::Write;
+          }
+          if (event->mask & IN_MOVE_SELF) {
+            e.type = Event::Rename;
+          }
+          if (event->mask & IN_DELETE_SELF || event->mask & IN_UNMOUNT) {
+            e.type = Event::Remove;
+          }
+
+          synchronized (data->lock) {
+            Option<string> path = data->wd_to_path.get(event->wd);
+
+            if (path.isNone()) {
+              continue; // Unknown watch, likely we just removed this watch.
+            }
+
+            e.path = std::move(*path);
+
+            if (event->mask & IN_MOVE_SELF
+                || event->mask & IN_DELETE_SELF
+                || event->mask & IN_UNMOUNT) {
+              data->wd_to_path.erase(event->wd);
+              data->path_to_wd.erase(*path);
+            }
+
+            data->events.put(std::move(e));
+          }
+        }
+
+        if (offset != read) {
+          return Failure("Unexpected partial read from inotify");
+        }
+
+        return Continue();
+      });
+
+  data->read_loop
+    .onFailed([weak_data](const string& message) {
+      shared_ptr<Data> data = weak_data.lock();
+      if (data) {
+        Watcher::Event e;
+        e.type = Watcher::Event::Failure;
+        e.path = message;
+        data->events.put(e);
+      }
+    });
+
+  // We need to close the inotify fd whenever the loop stops.
+  data->read_loop
+    .onAny([fd]() {
+      ::close(fd);
+    });
+}
+
+
+Try<Nothing> Watcher::add(const string& path)
+{
+  // Since we only currently support watching a file (not directories),
+  // and we're only interested in modifications to the file contents,
+  // we only need to watch for the following relevant events:
+  int mask = IN_MODIFY | IN_DELETE_SELF | IN_MOVE_SELF;
+
+#ifdef IN_MASK_CREATE
+  // Fail with EEXIST if a watch already exists. This ensures that new
+  // watches don't modify existing ones, either because the path gets
+  // watched multiple times, or when two paths refer to the same inode.
+  mask = mask | IN_MASK_CREATE;
+#endif
+
+
+  if (os::stat::isdir(path)) {
+    return Error("Directories are not supported");
+  }
+
+  synchronized (data->lock) {
+    if (data->path_to_wd.get(path).isSome()) {
+      return Error("Path is already added");
+    }
+
+    int wd = inotify_add_watch(inotify_fd, path.c_str(), mask);
+    if (wd < 0) {
+      return ErrnoError("Failed to inotify_add_watch");
+    }
+
+    data->wd_to_path[wd] = path;
+    data->path_to_wd[path] = wd;
+  }
+
+  return Nothing();
+}
+
+
+Try<Nothing> Watcher::remove(const string& path)
+{
+  Option<int> wd;
+  synchronized (data->lock) {
+    wd = data->path_to_wd.get(path);
+    if (wd.isNone()) {
+      // Note that the path may have been implicitly removed via the
+      // read loop when the file gets removed. Should we treat this as
+      // a failure? Or a no-op?
+      return Nothing();
+    }
+
+    data->wd_to_path.erase(*wd);
+    data->path_to_wd.erase(path);
+  }
+
+  // Note that removing the watch will trigger an IN_IGNORED event.
+  if (inotify_rm_watch(inotify_fd, *wd) < 0) {
+    return ErrnoError("Failed to inotify_rm_watch");
+  }
+
+  return Nothing();
+}
+
+
+Queue<Watcher::Event> Watcher::events()
+{
+  return data->events;
+}
+
+
+Try<Watcher> create_watcher()
+{
+  int inotify_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
+  if (inotify_fd < 0) {
+    return ErrnoError("Failed to inotify_init1");
+  }
+
+  Watcher watcher(inotify_fd);
+  watcher.run();
+  return watcher;
+}
+
+#endif // __linux__
+
 } // namespace io {
 } // namespace process {
diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp 
b/3rdparty/libprocess/src/tests/io_tests.cpp
index c2bb36b18..70ab7236e 100644
--- a/3rdparty/libprocess/src/tests/io_tests.cpp
+++ b/3rdparty/libprocess/src/tests/io_tests.cpp
@@ -20,6 +20,7 @@
 
 #include <stout/gtest.hpp>
 #include <stout/os.hpp>
+#include <stout/hashset.hpp>
 
 #include <stout/os/pipe.hpp>
 #include <stout/os/read.hpp>
@@ -33,6 +34,8 @@ namespace io = process::io;
 
 using process::Clock;
 using process::Future;
+using process::Queue;
+using process::io::Watcher;
 
 using std::array;
 using std::string;
@@ -40,6 +43,29 @@ using std::string;
 class IOTest: public TemporaryDirectoryTest {};
 
 
+Try<Nothing> append(const string& path, const string& message)
+{
+  Try<int_fd> fd = os::open(
+      path,
+      O_WRONLY | O_CREAT | O_CLOEXEC,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+  if (fd.isError()) {
+    return Error(fd.error());
+  }
+
+  Try<Nothing> write = os::write(fd.get(), message);
+  Try<Nothing> close = os::close(fd.get());
+
+  // We propagate `close` failures if `write` on the file was successful.
+  if (write.isSome() && close.isError()) {
+    write = Error("Failed to close '" + stringify(*fd) + "':" + close.error());
+  }
+
+  return write;
+}
+
+
 TEST_F(IOTest, PollRead)
 {
   Try<std::array<int_fd, 2>> pipes = os::pipe();
@@ -482,3 +508,175 @@ TEST_F(IOTest, Redirect)
   // accumulated in the redirect hook.
   EXPECT_EQ(data, accumulated);
 }
+
+
+TEST_F(IOTest, WatcherWrite)
+{
+  string path = path::join(sandbox.get(), "test_file");
+
+  ASSERT_SOME(os::touch(path));
+
+  Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+  CHECK_NOTERROR(watcher.add(path));
+
+  Queue<Watcher::Event> events = watcher.events();
+
+  Future<Watcher::Event> event = events.get();
+  ASSERT_SOME(os::write(path, "hello", true));
+  AWAIT_READY(event);
+  ASSERT_EQ(Watcher::Event::Write, event->type);
+  ASSERT_EQ(path, event->path);
+}
+
+
+TEST_F(IOTest, WatcherRename)
+{
+  string path = path::join(sandbox.get(), "test_file");
+  string new_path = path::join(sandbox.get(), "renamed");
+
+  ASSERT_SOME(os::touch(path));
+
+  Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+  CHECK_NOTERROR(watcher.add(path));
+  Queue<Watcher::Event> events = watcher.events();
+  Future<Watcher::Event> event = events.get();
+  ASSERT_SOME(os::rename(path, new_path, true));
+  AWAIT_READY(event);
+  ASSERT_EQ(Watcher::Event::Rename, event->type);
+  ASSERT_EQ(path, event->path);
+}
+
+
+TEST_F(IOTest, WatcherRm)
+{
+  string path = path::join(sandbox.get(), "test_file");
+
+  ASSERT_SOME(os::touch(path));
+
+  Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+  CHECK_NOTERROR(watcher.add(path));
+  Queue<Watcher::Event> events = watcher.events();
+
+  ASSERT_SOME(os::rm(path));
+
+  Future<Watcher::Event> event = events.get();
+  AWAIT_READY(event);
+  ASSERT_EQ(Watcher::Event::Remove, event->type);
+  ASSERT_EQ(path, event->path);
+}
+
+
+TEST_F(IOTest, WatcherAddMultipleFiles)
+{
+  string path1 = path::join(sandbox.get(), "test_file1");
+  string path2 = path::join(sandbox.get(), "test_file2");
+  hashset<string> paths = {path1, path2};
+
+  ASSERT_SOME(os::touch(path1));
+  ASSERT_SOME(os::touch(path2));
+
+  Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+  CHECK_NOTERROR(watcher.add(path1));
+  CHECK_NOTERROR(watcher.add(path2));
+
+  Queue<Watcher::Event> events = watcher.events();
+
+  ASSERT_SOME(append(path1, "hello"));
+  ASSERT_SOME(append(path2, "hello"));
+  Future<Watcher::Event> event;
+  for (int i = 0; i < 2; ++i) {
+    event = events.get();
+    AWAIT_READY(event);
+    ASSERT_EQ(Watcher::Event::Write, event->type);
+    paths.erase(event->path);
+  }
+
+  EXPECT_EQ(0u, paths.size());
+}
+
+
+TEST_F(IOTest, WatcherAlreadyAddAddedFile)
+{
+  string path = path::join(sandbox.get(), "test_file");
+
+  ASSERT_SOME(os::touch(path));
+
+  Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+  CHECK_NOTERROR(watcher.add(path));
+  EXPECT_ERROR(watcher.add(path));
+}
+
+
+TEST_F(IOTest, WatcherAddDirectory)
+{
+  Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+  EXPECT_ERROR(watcher.add(sandbox.get()));
+}
+
+
+TEST_F(IOTest, WatcherRemove)
+{
+  string path1 = path::join(sandbox.get(), "test_file1");
+  string path2 = path::join(sandbox.get(), "test_file2");
+
+  ASSERT_SOME(os::touch(path1));
+  ASSERT_SOME(os::touch(path2));
+
+  Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+  CHECK_NOTERROR(watcher.add(path1));
+  CHECK_NOTERROR(watcher.add(path2));
+
+  CHECK_NOTERROR(watcher.remove(path1));
+
+  Queue<Watcher::Event> events = watcher.events();
+
+  ASSERT_SOME(os::write(path1, "hello", true));
+  ASSERT_SOME(os::write(path2, "hello", true));
+
+  Future<Watcher::Event> event = events.get();
+  AWAIT_READY(event);
+  ASSERT_EQ(Watcher::Event::Write, event->type);
+  ASSERT_EQ(path2, event->path);
+}
+
+
+TEST_F(IOTest, WatcherRemoveRemovedFile)
+{
+  string path = path::join(sandbox.get(), "test_file");
+
+  ASSERT_SOME(os::touch(path));
+
+  Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+  CHECK_NOTERROR(watcher.add(path));
+
+  CHECK_NOTERROR(watcher.remove(path));
+  CHECK_NOTERROR(watcher.remove(path));
+}
+
+
+TEST_F(IOTest, WatcherDestruction)
+{
+  string path = path::join(sandbox.get(), "test_file");
+
+  ASSERT_SOME(os::touch(path));
+
+  Future<Nothing> read_loop;
+  {
+    Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+    CHECK_NOTERROR(watcher.add(path));
+    read_loop = io::testing::watcher_read_loop(watcher);
+
+    EXPECT_FALSE(read_loop.hasDiscard());
+  }
+
+  EXPECT_TRUE(read_loop.hasDiscard());
+}

Reply via email to