Moved event loop specific polling into poll.cpp. Review: https://reviews.apache.org/r/27505
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/413ce94f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/413ce94f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/413ce94f Branch: refs/heads/master Commit: 413ce94f8d74b8c29657eef7dbc2f6ade4143bc7 Parents: f78ae66 Author: Benjamin Hindman <[email protected]> Authored: Sun Nov 2 18:22:15 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 16:25:58 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/Makefile.am | 1 + 3rdparty/libprocess/src/poll.cpp | 129 +++++++++++++++++++++++++++++++ 3rdparty/libprocess/src/process.cpp | 112 --------------------------- 3 files changed, 130 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/Makefile.am ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am index 2de9989..41c3bd1 100644 --- a/3rdparty/libprocess/Makefile.am +++ b/3rdparty/libprocess/Makefile.am @@ -42,6 +42,7 @@ libprocess_la_SOURCES = \ src/libev.cpp \ src/metrics/metrics.cpp \ src/pid.cpp \ + src/poll.cpp \ src/process.cpp \ src/process_reference.hpp \ src/reap.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/src/poll.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/poll.cpp b/3rdparty/libprocess/src/poll.cpp new file mode 100644 index 0000000..324e4dd --- /dev/null +++ b/3rdparty/libprocess/src/poll.cpp @@ -0,0 +1,129 @@ +#include <ev.h> + +#include <process/future.hpp> +#include <process/process.hpp> // For process::initialize. + +#include <stout/lambda.hpp> +#include <stout/memory.hpp> + +#include "libev.hpp" + +namespace process { + +// Data necessary for polling so we can discard polling and actually +// stop it in the event loop. +struct Poll +{ + Poll() + { + // Need to explicitly instantiate the watchers. + watcher.io.reset(new ev_io()); + watcher.async.reset(new ev_async()); + } + + // An I/O watcher for checking for readability or writeability and + // an async watcher for being able to discard the polling. + struct { + memory::shared_ptr<ev_io> io; + memory::shared_ptr<ev_async> async; + } watcher; + + Promise<short> promise; +}; + + +// Event loop callback when I/O is ready on polling file descriptor. +void polled(struct ev_loop* loop, ev_io* watcher, int revents) +{ + Poll* poll = (Poll*) watcher->data; + + ev_io_stop(loop, poll->watcher.io.get()); + + // Stop the async watcher (also clears if pending so 'discard_poll' + // will not get invoked and we can delete 'poll' here). + ev_async_stop(loop, poll->watcher.async.get()); + + poll->promise.set(revents); + + delete poll; +} + + +// Event loop callback when future associated with polling file +// descriptor has been discarded. +void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents) +{ + Poll* poll = (Poll*) watcher->data; + + // Check and see if we have a pending 'polled' callback and if so + // let it "win". + if (ev_is_pending(poll->watcher.io.get())) { + return; + } + + ev_async_stop(loop, poll->watcher.async.get()); + + // Stop the I/O watcher (but note we check if pending above) so it + // won't get invoked and we can delete 'poll' here. + ev_io_stop(loop, poll->watcher.io.get()); + + poll->promise.discard(); + + delete poll; +} + + +namespace io { +namespace internal { + +// Helper/continuation of 'poll' on future discard. +void _poll(const memory::shared_ptr<ev_async>& async) +{ + ev_async_send(loop, async.get()); +} + + +Future<short> poll(int fd, short events) +{ + Poll* poll = new Poll(); + + // Have the watchers data point back to the struct. + poll->watcher.async->data = poll; + poll->watcher.io->data = poll; + + // Get a copy of the future to avoid any races with the event loop. + Future<short> future = poll->promise.future(); + + // Initialize and start the async watcher. + ev_async_init(poll->watcher.async.get(), discard_poll); + ev_async_start(loop, poll->watcher.async.get()); + + // Make sure we stop polling if a discard occurs on our future. + // Note that it's possible that we'll invoke '_poll' when someone + // does a discard even after the polling has already completed, but + // in this case while we will interrupt the event loop since the + // async watcher has already been stopped we won't cause + // 'discard_poll' to get invoked. + future.onDiscard(lambda::bind(&_poll, poll->watcher.async)); + + // Initialize and start the I/O watcher. + ev_io_init(poll->watcher.io.get(), polled, fd, events); + ev_io_start(loop, poll->watcher.io.get()); + + return future; +} + +} // namespace internal { + + +Future<short> poll(int fd, short events) +{ + process::initialize(); + + // TODO(benh): Check if the file descriptor is non-blocking? + + return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events)); +} + +} // namespace io { +} // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/413ce94f/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index d96d881..1fc8874 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -927,69 +927,6 @@ void accept(struct ev_loop* loop, ev_io* watcher, int revents) } -// Data necessary for polling so we can discard polling and actually -// stop it in the event loop. -struct Poll -{ - Poll() - { - // Need to explicitly instantiate the watchers. - watcher.io.reset(new ev_io()); - watcher.async.reset(new ev_async()); - } - - // An I/O watcher for checking for readability or writeability and - // an async watcher for being able to discard the polling. - struct { - memory::shared_ptr<ev_io> io; - memory::shared_ptr<ev_async> async; - } watcher; - - Promise<short> promise; -}; - - -// Event loop callback when I/O is ready on polling file descriptor. -void polled(struct ev_loop* loop, ev_io* watcher, int revents) -{ - Poll* poll = (Poll*) watcher->data; - - ev_io_stop(loop, poll->watcher.io.get()); - - // Stop the async watcher (also clears if pending so 'discard_poll' - // will not get invoked and we can delete 'poll' here). - ev_async_stop(loop, poll->watcher.async.get()); - - poll->promise.set(revents); - - delete poll; -} - - -// Event loop callback when future associated with polling file -// descriptor has been discarded. -void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents) -{ - Poll* poll = (Poll*) watcher->data; - - // Check and see if we have a pending 'polled' callback and if so - // let it "win". - if (ev_is_pending(poll->watcher.io.get())) { - return; - } - - ev_async_stop(loop, poll->watcher.async.get()); - - // Stop the I/O watcher (but note we check if pending above) so it - // won't get invoked and we can delete 'poll' here. - ev_io_stop(loop, poll->watcher.io.get()); - - poll->promise.discard(); - - delete poll; -} - - void* serve(void* arg) { ev_loop(((struct ev_loop*) arg), 0); @@ -3179,47 +3116,8 @@ void post(const UPID& from, namespace io { - namespace internal { -// Helper/continuation of 'poll' on future discard. -void _poll(const memory::shared_ptr<ev_async>& async) -{ - ev_async_send(loop, async.get()); -} - - -Future<short> poll(int fd, short events) -{ - Poll* poll = new Poll(); - - // Have the watchers data point back to the struct. - poll->watcher.async->data = poll; - poll->watcher.io->data = poll; - - // Get a copy of the future to avoid any races with the event loop. - Future<short> future = poll->promise.future(); - - // Initialize and start the async watcher. - ev_async_init(poll->watcher.async.get(), discard_poll); - ev_async_start(loop, poll->watcher.async.get()); - - // Make sure we stop polling if a discard occurs on our future. - // Note that it's possible that we'll invoke '_poll' when someone - // does a discard even after the polling has already completed, but - // in this case while we will interrupt the event loop since the - // async watcher has already been stopped we won't cause - // 'discard_poll' to get invoked. - future.onDiscard(lambda::bind(&_poll, poll->watcher.async)); - - // Initialize and start the I/O watcher. - ev_io_init(poll->watcher.io.get(), polled, fd, events); - ev_io_start(loop, poll->watcher.io.get()); - - return future; -} - - void read( int fd, void* data, @@ -3366,16 +3264,6 @@ void write( } // namespace internal { -Future<short> poll(int fd, short events) -{ - process::initialize(); - - // TODO(benh): Check if the file descriptor is non-blocking? - - return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events)); -} - - Future<size_t> read(int fd, void* data, size_t size) { process::initialize();
