This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 23703fbb3 [libprocess] Add io::Watcher for fs notifications.
23703fbb3 is described below
commit 23703fbb3d9deac3d50a74b67afac2b002d8fd37
Author: Jason Zhou <[email protected]>
AuthorDate: Tue Aug 20 18:02:22 2024 -0400
[libprocess] Add io::Watcher for fs notifications.
Adds basic watcher class for filesystem watch notifications. We
currently only support Linux with inotify.
We currently support inotify events for writing, deleting, and renaming
a file. We do not support watching directories.
Review: https://reviews.apache.org/r/75182/
---
3rdparty/libprocess/include/process/io.hpp | 120 ++++++++++++++++
3rdparty/libprocess/src/posix/io.cpp | 213 +++++++++++++++++++++++++++++
3rdparty/libprocess/src/tests/io_tests.cpp | 198 +++++++++++++++++++++++++++
3 files changed, 531 insertions(+)
diff --git a/3rdparty/libprocess/include/process/io.hpp
b/3rdparty/libprocess/include/process/io.hpp
index 00519d4fc..5a13f29ec 100644
--- a/3rdparty/libprocess/include/process/io.hpp
+++ b/3rdparty/libprocess/include/process/io.hpp
@@ -17,6 +17,7 @@
#include <string>
#include <process/future.hpp>
+#include <process/queue.hpp>
#include <stout/nothing.hpp>
#ifdef __WINDOWS__
@@ -166,6 +167,125 @@ Future<Nothing> redirect(
size_t chunk = 4096,
const std::vector<lambda::function<void(const std::string&)>>& hooks = {});
+
+// Forward declarations.
+class Watcher;
+namespace testing {
+Future<Nothing> watcher_read_loop(Watcher w);
+} // namespace testing {
+
+
+// This provides a high level interface for cross-platform filesystem watch
+// notifications. Currently, only Linux is supported via inotify, but macOS
+// BSD, and Windows implementations can be added.
+//
+// On Linux, inotify provides a vast set of features and comes with a vast
+// amount of subtleties to deal with and providing a cross-platform filesystem
+// watcher while exposing all these subtleties is quite challenging. Therefore,
+// our initial implementation only provides basic functionality in order to
+// simplify the life of the user, and to make cross platform implementation
+// viable.
+//
+// TODO(bmahler): Add support for directories.
+class Watcher
+{
+public:
+ struct Event
+ {
+ // Path to the file for the event. In the case of a Failure event type,
+ // this will be a failure message instead.
+ std::string path;
+
+ // TODO(bmahler): Add more events (e.g. access events, close events,
+ // attribute changes).
+ enum {
+ // The read loop encountered a unrecoverable failure, the watcher is
+ // no longer running and the caller must create a new watcher if desired!
+ Failure,
+
+ // File was modified, note that more writes may follow.
+ Write,
+
+ // The path was removed; any watches on it will be removed.
+ // Some "remove" operations may trigger a Rename if the file is
+ // actually moved (for example "remove to trash" is often a rename).
+ Remove,
+
+ // The path was renamed to something else; any watches on it will be
+ // removed.
+ Rename,
+ } type;
+ };
+
+ // Adds the file for event monitoring.
+ //
+ // Returns an error if:
+ // * we don't have read access to the provided path
+ // * the path has already been watched (and not implicitly or
+ // explicitly removed)
+ // * the path doesn't exist
+ // * the path is a directory (not currently supported)
+ //
+ // In order for the caller to not miss any updates to the file, you
+ // *must* read the file yourself after calling add(). Otherwise, if
+ // you were to read the file first, updates between reading the file
+ // and add() the file will be missed!
+ Try<Nothing> add(const std::string& path);
+
+ // Removes the file for event monitoring, removing an already removed
+ // file is a no-op and also returns Nothing.
+ Try<Nothing> remove(const std::string& path);
+
+ Queue<Event> events();
+
+private:
+ friend Try<Watcher> create_watcher();
+ friend Future<Nothing> testing::watcher_read_loop(Watcher w);
+
+ Watcher(int inotify_fd);
+
+ // Start the inotify read loop.
+ void run();
+
+ struct Data
+ {
+ Data() = default;
+
+ ~Data();
+
+ // Rather than use a process to serialize access to the queue's
+ // internal data we use a 'std::atomic_flag' which will spin lock.
+ std::atomic_flag lock = ATOMIC_FLAG_INIT;
+
+ // We need a bidirectional mapping between watch descriptors and
+ // the path the watch descriptor maps to.
+ hashmap<int, std::string> wd_to_path;
+ hashmap<std::string, int> path_to_wd;
+
+ process::Future<Nothing> read_loop;
+
+ // Queue is already thread safe and doesn't require locking.
+ Queue<Watcher::Event> events;
+ };
+
+ const int inotify_fd;
+ std::shared_ptr<Data> data;
+};
+
+
+// Creates a watcher that can be used to monitor for fs changes.
+Try<Watcher> create_watcher();
+
+
+namespace testing {
+
+// Exposed to test read loop discard.
+inline Future<Nothing> watcher_read_loop(Watcher w)
+{
+ return w.data->read_loop;
+}
+
+} // namespace testing {
} // namespace io {
} // namespace process {
diff --git a/3rdparty/libprocess/src/posix/io.cpp
b/3rdparty/libprocess/src/posix/io.cpp
index 3862e3b9f..7b99b82c6 100644
--- a/3rdparty/libprocess/src/posix/io.cpp
+++ b/3rdparty/libprocess/src/posix/io.cpp
@@ -10,11 +10,18 @@
// See the License for the specific language governing permissions and
// limitations under the License
+#ifdef __linux__
+#include <memory>
+
+#include <sys/inotify.h>
+#endif
+
#include <process/future.hpp>
#include <process/io.hpp>
#include <process/loop.hpp>
#include <stout/error.hpp>
+#include <stout/hashmap.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
@@ -26,6 +33,11 @@
#include "io_internal.hpp"
+using std::default_delete;
+using std::shared_ptr;
+using std::string;
+using std::weak_ptr;
+
namespace process {
namespace io {
namespace internal {
@@ -136,5 +148,206 @@ Try<bool> is_async(int_fd fd)
}
} // namespace internal {
+
+#ifdef __linux__
+
+Watcher::Watcher(int inotify_fd) : inotify_fd(inotify_fd), data(new Data()) {}
+
+
+Watcher::Data::~Data()
+{
+ // When the last reference of data goes away, we discard the read loop
+ // which should ensure that the loop future transitions, at which point
+ // the inotify fd will get closed.
+ read_loop.discard();
+}
+
+
+void Watcher::run()
+{
+ // For now, we only use a small buffer that is sufficient for reading
+ // *at least* 32 events, but the caller may want to customize the buffer
+ // size depending on the expected volume of events.
+ size_t buffer_size = 32 * (sizeof(inotify_event) + NAME_MAX + 1);
+ shared_ptr<char> buffer(new char[buffer_size], default_delete<char[]>());
+
+ // We take a weak pointer here to avoid keeping Data alive forever, when
+ // the caller throws away the last Watcher copy, we want Data to be destroyed
+ // and the loop to be stopped.
+ weak_ptr<Data> weak_data = data;
+ int fd = inotify_fd;
+ data->read_loop = loop(
+ [fd, buffer, buffer_size]() {
+ return io::read(fd, buffer.get(), buffer_size);
+ },
+ [weak_data, buffer](size_t read) -> Future<ControlFlow<Nothing>> {
+ if (read == 0) {
+ return Failure("Unexpected EOF");
+ }
+
+ // If we can't get the shared pointer, Data is destroyed and we
+ // need to stop the loop.
+ shared_ptr<Data> data = weak_data.lock();
+ if (!data) {
+ return Break();
+ }
+
+ size_t offset = 0;
+ for (offset = 0; offset <= read - sizeof(inotify_event);) {
+ inotify_event* event = (inotify_event*) &(buffer.get()[offset]);
+ offset += sizeof(inotify_event) + event->len;
+
+ if (event-> mask & IN_Q_OVERFLOW) {
+ return Failure("inotify event overflow");
+ }
+
+ // For IN_IGNORED generated by inotify_rm_watch, we've already
+ // remove the path from our tracking maps. For other cases of
+ // IN_IGNORED (e.g. IN_DELETE_SELF, IN_UNMOUNT, etc), we remove the
+ // path during those events rather than the subsequent IN_IGNORED.
+ if (event->mask & IN_IGNORED) {
+ continue;
+ }
+
+ Event e;
+
+ if (event->mask & IN_MODIFY) {
+ e.type = Event::Write;
+ }
+ if (event->mask & IN_MOVE_SELF) {
+ e.type = Event::Rename;
+ }
+ if (event->mask & IN_DELETE_SELF || event->mask & IN_UNMOUNT) {
+ e.type = Event::Remove;
+ }
+
+ synchronized (data->lock) {
+ Option<string> path = data->wd_to_path.get(event->wd);
+
+ if (path.isNone()) {
+ continue; // Unknown watch, likely we just removed this watch.
+ }
+
+ e.path = std::move(*path);
+
+ if (event->mask & IN_MOVE_SELF
+ || event->mask & IN_DELETE_SELF
+ || event->mask & IN_UNMOUNT) {
+ data->wd_to_path.erase(event->wd);
+ data->path_to_wd.erase(*path);
+ }
+
+ data->events.put(std::move(e));
+ }
+ }
+
+ if (offset != read) {
+ return Failure("Unexpected partial read from inotify");
+ }
+
+ return Continue();
+ });
+
+ data->read_loop
+ .onFailed([weak_data](const string& message) {
+ shared_ptr<Data> data = weak_data.lock();
+ if (data) {
+ Watcher::Event e;
+ e.type = Watcher::Event::Failure;
+ e.path = message;
+ data->events.put(e);
+ }
+ });
+
+ // We need to close the inotify fd whenever the loop stops.
+ data->read_loop
+ .onAny([fd]() {
+ ::close(fd);
+ });
+}
+
+
+Try<Nothing> Watcher::add(const string& path)
+{
+ // Since we only currently support watching a file (not directories),
+ // and we're only interested in modifications to the file contents,
+ // we only need to watch for the following relevant events:
+ int mask = IN_MODIFY | IN_DELETE_SELF | IN_MOVE_SELF;
+
+#ifdef IN_MASK_CREATE
+ // Fail with EEXIST if a watch already exists. This ensures that new
+ // watches don't modify existing ones, either because the path gets
+ // watched multiple times, or when two paths refer to the same inode.
+ mask = mask | IN_MASK_CREATE;
+#endif
+
+
+ if (os::stat::isdir(path)) {
+ return Error("Directories are not supported");
+ }
+
+ synchronized (data->lock) {
+ if (data->path_to_wd.get(path).isSome()) {
+ return Error("Path is already added");
+ }
+
+ int wd = inotify_add_watch(inotify_fd, path.c_str(), mask);
+ if (wd < 0) {
+ return ErrnoError("Failed to inotify_add_watch");
+ }
+
+ data->wd_to_path[wd] = path;
+ data->path_to_wd[path] = wd;
+ }
+
+ return Nothing();
+}
+
+
+Try<Nothing> Watcher::remove(const string& path)
+{
+ Option<int> wd;
+ synchronized (data->lock) {
+ wd = data->path_to_wd.get(path);
+ if (wd.isNone()) {
+ // Note that the path may have been implicitly removed via the
+ // read loop when the file gets removed. Should we treat this as
+ // a failure? Or a no-op?
+ return Nothing();
+ }
+
+ data->wd_to_path.erase(*wd);
+ data->path_to_wd.erase(path);
+ }
+
+ // Note that removing the watch will trigger an IN_IGNORED event.
+ if (inotify_rm_watch(inotify_fd, *wd) < 0) {
+ return ErrnoError("Failed to inotify_rm_watch");
+ }
+
+ return Nothing();
+}
+
+
+Queue<Watcher::Event> Watcher::events()
+{
+ return data->events;
+}
+
+
+Try<Watcher> create_watcher()
+{
+ int inotify_fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
+ if (inotify_fd < 0) {
+ return ErrnoError("Failed to inotify_init1");
+ }
+
+ Watcher watcher(inotify_fd);
+ watcher.run();
+ return watcher;
+}
+
+#endif // __linux__
+
} // namespace io {
} // namespace process {
diff --git a/3rdparty/libprocess/src/tests/io_tests.cpp
b/3rdparty/libprocess/src/tests/io_tests.cpp
index c2bb36b18..70ab7236e 100644
--- a/3rdparty/libprocess/src/tests/io_tests.cpp
+++ b/3rdparty/libprocess/src/tests/io_tests.cpp
@@ -20,6 +20,7 @@
#include <stout/gtest.hpp>
#include <stout/os.hpp>
+#include <stout/hashset.hpp>
#include <stout/os/pipe.hpp>
#include <stout/os/read.hpp>
@@ -33,6 +34,8 @@ namespace io = process::io;
using process::Clock;
using process::Future;
+using process::Queue;
+using process::io::Watcher;
using std::array;
using std::string;
@@ -40,6 +43,29 @@ using std::string;
class IOTest: public TemporaryDirectoryTest {};
+Try<Nothing> append(const string& path, const string& message)
+{
+ Try<int_fd> fd = os::open(
+ path,
+ O_WRONLY | O_CREAT | O_CLOEXEC,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+ if (fd.isError()) {
+ return Error(fd.error());
+ }
+
+ Try<Nothing> write = os::write(fd.get(), message);
+ Try<Nothing> close = os::close(fd.get());
+
+ // We propagate `close` failures if `write` on the file was successful.
+ if (write.isSome() && close.isError()) {
+ write = Error("Failed to close '" + stringify(*fd) + "':" + close.error());
+ }
+
+ return write;
+}
+
+
TEST_F(IOTest, PollRead)
{
Try<std::array<int_fd, 2>> pipes = os::pipe();
@@ -482,3 +508,175 @@ TEST_F(IOTest, Redirect)
// accumulated in the redirect hook.
EXPECT_EQ(data, accumulated);
}
+
+
+TEST_F(IOTest, WatcherWrite)
+{
+ string path = path::join(sandbox.get(), "test_file");
+
+ ASSERT_SOME(os::touch(path));
+
+ Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+ CHECK_NOTERROR(watcher.add(path));
+
+ Queue<Watcher::Event> events = watcher.events();
+
+ Future<Watcher::Event> event = events.get();
+ ASSERT_SOME(os::write(path, "hello", true));
+ AWAIT_READY(event);
+ ASSERT_EQ(Watcher::Event::Write, event->type);
+ ASSERT_EQ(path, event->path);
+}
+
+
+TEST_F(IOTest, WatcherRename)
+{
+ string path = path::join(sandbox.get(), "test_file");
+ string new_path = path::join(sandbox.get(), "renamed");
+
+ ASSERT_SOME(os::touch(path));
+
+ Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+ CHECK_NOTERROR(watcher.add(path));
+ Queue<Watcher::Event> events = watcher.events();
+ Future<Watcher::Event> event = events.get();
+ ASSERT_SOME(os::rename(path, new_path, true));
+ AWAIT_READY(event);
+ ASSERT_EQ(Watcher::Event::Rename, event->type);
+ ASSERT_EQ(path, event->path);
+}
+
+
+TEST_F(IOTest, WatcherRm)
+{
+ string path = path::join(sandbox.get(), "test_file");
+
+ ASSERT_SOME(os::touch(path));
+
+ Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+ CHECK_NOTERROR(watcher.add(path));
+ Queue<Watcher::Event> events = watcher.events();
+
+ ASSERT_SOME(os::rm(path));
+
+ Future<Watcher::Event> event = events.get();
+ AWAIT_READY(event);
+ ASSERT_EQ(Watcher::Event::Remove, event->type);
+ ASSERT_EQ(path, event->path);
+}
+
+
+TEST_F(IOTest, WatcherAddMultipleFiles)
+{
+ string path1 = path::join(sandbox.get(), "test_file1");
+ string path2 = path::join(sandbox.get(), "test_file2");
+ hashset<string> paths = {path1, path2};
+
+ ASSERT_SOME(os::touch(path1));
+ ASSERT_SOME(os::touch(path2));
+
+ Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+ CHECK_NOTERROR(watcher.add(path1));
+ CHECK_NOTERROR(watcher.add(path2));
+
+ Queue<Watcher::Event> events = watcher.events();
+
+ ASSERT_SOME(append(path1, "hello"));
+ ASSERT_SOME(append(path2, "hello"));
+ Future<Watcher::Event> event;
+ for (int i = 0; i < 2; ++i) {
+ event = events.get();
+ AWAIT_READY(event);
+ ASSERT_EQ(Watcher::Event::Write, event->type);
+ paths.erase(event->path);
+ }
+
+ EXPECT_EQ(0u, paths.size());
+}
+
+
+TEST_F(IOTest, WatcherAlreadyAddAddedFile)
+{
+ string path = path::join(sandbox.get(), "test_file");
+
+ ASSERT_SOME(os::touch(path));
+
+ Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+ CHECK_NOTERROR(watcher.add(path));
+ EXPECT_ERROR(watcher.add(path));
+}
+
+
+TEST_F(IOTest, WatcherAddDirectory)
+{
+ Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+ EXPECT_ERROR(watcher.add(sandbox.get()));
+}
+
+
+TEST_F(IOTest, WatcherRemove)
+{
+ string path1 = path::join(sandbox.get(), "test_file1");
+ string path2 = path::join(sandbox.get(), "test_file2");
+
+ ASSERT_SOME(os::touch(path1));
+ ASSERT_SOME(os::touch(path2));
+
+ Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+ CHECK_NOTERROR(watcher.add(path1));
+ CHECK_NOTERROR(watcher.add(path2));
+
+ CHECK_NOTERROR(watcher.remove(path1));
+
+ Queue<Watcher::Event> events = watcher.events();
+
+ ASSERT_SOME(os::write(path1, "hello", true));
+ ASSERT_SOME(os::write(path2, "hello", true));
+
+ Future<Watcher::Event> event = events.get();
+ AWAIT_READY(event);
+ ASSERT_EQ(Watcher::Event::Write, event->type);
+ ASSERT_EQ(path2, event->path);
+}
+
+
+TEST_F(IOTest, WatcherRemoveRemovedFile)
+{
+ string path = path::join(sandbox.get(), "test_file");
+
+ ASSERT_SOME(os::touch(path));
+
+ Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+ CHECK_NOTERROR(watcher.add(path));
+
+ CHECK_NOTERROR(watcher.remove(path));
+ CHECK_NOTERROR(watcher.remove(path));
+}
+
+
+TEST_F(IOTest, WatcherDestruction)
+{
+ string path = path::join(sandbox.get(), "test_file");
+
+ ASSERT_SOME(os::touch(path));
+
+ Future<Nothing> read_loop;
+ {
+ Watcher watcher = CHECK_NOTERROR(io::create_watcher());
+
+ CHECK_NOTERROR(watcher.add(path));
+ read_loop = io::testing::watcher_read_loop(watcher);
+
+ EXPECT_FALSE(read_loop.hasDiscard());
+ }
+
+ EXPECT_TRUE(read_loop.hasDiscard());
+}