Repository: mesos Updated Branches: refs/heads/master 4aa3ec22c -> 849fc4d36
Fixed bug in io::poll. When a future returned from process::io::poll is discarded we need to fully remove the ev_io watchers from libev otherwise the reuse of those file descriptors can cause untended side effects because they're still being used by the kernel/libev. Review: https://reviews.apache.org/r/24194 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/849fc4d3 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/849fc4d3 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/849fc4d3 Branch: refs/heads/master Commit: 849fc4d361e40062073324153ba97e98e294fdf2 Parents: 4aa3ec2 Author: Benjamin Hindman <[email protected]> Authored: Fri Aug 1 13:01:15 2014 -0700 Committer: Benjamin Hindman <[email protected]> Committed: Wed Aug 13 00:30:08 2014 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/src/process.cpp | 186 ++++++++++++++++++++++++------- 1 file changed, 147 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/849fc4d3/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 14cf317..c2bee98 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -560,10 +560,18 @@ static ev_timer timeouts_watcher; // Server watcher for accepting connections. static ev_io server_watcher; -// Queue of I/O watchers. +// Queue of I/O watchers to be asynchronously added to the event loop +// (protected by 'watchers' below). +// TODO(benh): Replace this queue with functions that we put in +// 'functions' below that perform the ev_io_start themselves. static queue<ev_io*>* watchers = new queue<ev_io*>(); static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER; +// Queue of functions to be invoked asynchronously within the vent +// loop (protected by 'watchers' below). +static queue<lambda::function<void(void)> >* functions = + new queue<lambda::function<void(void)> >(); + // We store the timers in a map of lists indexed by the timeout of the // timer so that we can have two timers that have the same timeout. We // exploit that the map is SORTED! @@ -876,6 +884,40 @@ static Message* parse(Request* request) return message; } +// Wrapper around function we want to run in the event loop. +template <typename T> +void _run_in_event_loop( + const lambda::function<Future<T>(void)>& f, + const Owned<Promise<T> >& promise) +{ + // Don't bother running the function if the future has been discarded. + if (promise->future().hasDiscard()) { + promise->discard(); + } else { + promise->set(f()); + } +} + + +// Helper for running a function in the event loop. +template <typename T> +Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f) +{ + Owned<Promise<T> > promise(new Promise<T>()); + + Future<T> future = promise->future(); + + // Enqueue the function. + synchronized (watchers) { + functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise)); + } + + // Interrupt the loop. + ev_async_send(loop, &async_watcher); + + return future; +} + void handle_async(struct ev_loop* loop, ev_async* _, int revents) { @@ -886,6 +928,11 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents) watchers->pop(); ev_io_start(loop, watcher); } + + while (!functions->empty()) { + (functions->front())(); + functions->pop(); + } } synchronized (timeouts) { @@ -1344,14 +1391,66 @@ void accept(struct ev_loop* loop, ev_io* watcher, int revents) } +// Data necessary for polling so we can discard polling and actually +// stop it in the event loop. +struct Poll +{ + Poll() + { + // Need to explicitly instantiate the watchers. + watcher.io.reset(new ev_io()); + watcher.async.reset(new ev_async()); + } + + // An I/O watcher for checking for readability or writeability and + // an async watcher for being able to discard the polling. + struct { + memory::shared_ptr<ev_io> io; + memory::shared_ptr<ev_async> async; + } watcher; + + Promise<short> promise; +}; + + +// Event loop callback when I/O is ready on polling file descriptor. void polled(struct ev_loop* loop, ev_io* watcher, int revents) { - Promise<short>* promise = (Promise<short>*) watcher->data; - promise->set(revents); - delete promise; + Poll* poll = (Poll*) watcher->data; + + ev_io_stop(loop, poll->watcher.io.get()); + + // Stop the async watcher (also clears if pending so 'discard_poll' + // will not get invoked and we can delete 'poll' here). + ev_async_stop(loop, poll->watcher.async.get()); + + poll->promise.set(revents); + + delete poll; +} + + +// Event loop callback when future associated with polling file +// descriptor has been discarded. +void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents) +{ + Poll* poll = (Poll*) watcher->data; + + // Check and see if we have a pending 'polled' callback and if so + // let it "win". + if (ev_is_pending(poll->watcher.io.get())) { + return; + } - ev_io_stop(loop, watcher); - delete watcher; + ev_async_stop(loop, poll->watcher.async.get()); + + // Stop the I/O watcher (but note we check if pending above) so it + // won't get invoked and we can delete 'poll' here. + ev_io_stop(loop, poll->watcher.io.get()); + + poll->promise.discard(); + + delete poll; } @@ -3586,6 +3685,44 @@ namespace io { namespace internal { +// Helper/continuation of 'poll' on future discard. +void _poll(const memory::shared_ptr<ev_async>& async) +{ + ev_async_send(loop, async.get()); +} + + +Future<short> poll(int fd, short events) +{ + Poll* poll = new Poll(); + + // Have the watchers data point back to the struct. + poll->watcher.async->data = poll; + poll->watcher.io->data = poll; + + // Get a copy of the future to avoid any races with the event loop. + Future<short> future = poll->promise.future(); + + // Initialize and start the async watcher. + ev_async_init(poll->watcher.async.get(), discard_poll); + ev_async_start(loop, poll->watcher.async.get()); + + // Make sure we stop polling if a discard occurs on our future. + // Note that it's possible that we'll invoke '_poll' when someone + // does a discard even after the polling has already completed, but + // in this case while we will interrupt the event loop since the + // async watcher has already been stopped we won't cause + // 'discard_poll' to get invoked. + future.onDiscard(lambda::bind(&_poll, poll->watcher.async)); + + // Initialize and start the I/O watcher. + ev_io_init(poll->watcher.io.get(), polled, fd, events); + ev_io_start(loop, poll->watcher.io.get()); + + return future; +} + + void read( int fd, void* data, @@ -3595,6 +3732,7 @@ void read( { // Ignore this function if the read operation has been discarded. if (promise->future().hasDiscard()) { + CHECK(!future.isPending()); promise->discard(); return; } @@ -3614,7 +3752,7 @@ void read( if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { // Restart the read operation. Future<short> future = - poll(fd, process::io::READ).onAny( + io::poll(fd, process::io::READ).onAny( lambda::bind(&internal::read, fd, data, @@ -3705,7 +3843,7 @@ void write( if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { // Restart the write operation. Future<short> future = - poll(fd, process::io::WRITE).onAny( + io::poll(fd, process::io::WRITE).onAny( lambda::bind(&internal::write, fd, data, @@ -3737,37 +3875,7 @@ Future<short> poll(int fd, short events) // TODO(benh): Check if the file descriptor is non-blocking? - Promise<short>* promise = new Promise<short>(); - - // Get a copy of the future to avoid any races with the event loop. - Future<short> future = promise->future(); - - // Make sure we stop polling if a discard occurs on our future. - // TODO(benh): This is actually insuffient in as much as we need to - // interrupt the libev event loop and stop and remove the - // watcher. This has been left as a TODO since (a) it's a - // non-trivial change (i.e., updating 'handle_async' to also remove - // watchers) and (b) it's most likely that the file descriptor being - // polled will be closed after the promise is discarded which will - // invoke 'polled' which will then cause the watcher to be stopped - // and deleted. Note that we needed to make Promise<T>::discard a - // 'friend' which should be removed once we clean this up. - future.onDiscard(lambda::bind(&process::internal::discarded<short>, future)); - - ev_io* watcher = new ev_io(); - watcher->data = promise; - - ev_io_init(watcher, polled, fd, events); - - // Enqueue the watcher. - synchronized (watchers) { - watchers->push(watcher); - } - - // Interrupt the loop. - ev_async_send(loop, &async_watcher); - - return future; + return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events)); }
