Started moving libev specific functionality out of process.cpp. Review: https://reviews.apache.org/r/27504
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f78ae663 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f78ae663 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f78ae663 Branch: refs/heads/master Commit: f78ae6635ce0aae89b1b9bd3dd1eae02c1da0936 Parents: 0e19796 Author: Benjamin Hindman <[email protected]> Authored: Sun Nov 2 18:19:23 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Sat Nov 15 16:25:58 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/Makefile.am | 2 + 3rdparty/libprocess/src/libev.cpp | 21 ++++++++++ 3rdparty/libprocess/src/libev.hpp | 70 ++++++++++++++++++++++++++++++++ 3rdparty/libprocess/src/process.cpp | 64 ++++------------------------- 4 files changed, 100 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/Makefile.am ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am index 0008e68..2de9989 100644 --- a/3rdparty/libprocess/Makefile.am +++ b/3rdparty/libprocess/Makefile.am @@ -38,6 +38,8 @@ libprocess_la_SOURCES = \ src/help.cpp \ src/http.cpp \ src/latch.cpp \ + src/libev.hpp \ + src/libev.cpp \ src/metrics/metrics.cpp \ src/pid.cpp \ src/process.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/src/libev.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp new file mode 100644 index 0000000..efc89d8 --- /dev/null +++ b/3rdparty/libprocess/src/libev.cpp @@ -0,0 +1,21 @@ +#include <ev.h> + +#include <queue> + +#include <stout/lambda.hpp> + +#include "libev.hpp" + +namespace process { + +// Defines the initial values for all of the declarations made in +// libev.hpp (since these need to live in the static data space). +struct ev_loop* loop = NULL; +ev_async async_watcher; +ev_io server_watcher; +std::queue<ev_io*>* watchers = new std::queue<ev_io*>(); +synchronizable(watchers); +std::queue<lambda::function<void(void)>>* functions = + new std::queue<lambda::function<void(void)>>(); + +} // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/src/libev.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp new file mode 100644 index 0000000..bac8b6a --- /dev/null +++ b/3rdparty/libprocess/src/libev.hpp @@ -0,0 +1,70 @@ +#include <ev.h> + +#include <queue> + +#include <process/future.hpp> +#include <process/owned.hpp> + +#include <stout/lambda.hpp> + +#include "synchronized.hpp" + +namespace process { + +// Event loop. +extern struct ev_loop* loop; + +// Asynchronous watcher for interrupting loop to specifically deal +// with IO watchers and functions (via run_in_event_loop). +extern ev_async async_watcher; + +// Server watcher for accepting connections. +extern ev_io server_watcher; + +// Queue of I/O watchers to be asynchronously added to the event loop +// (protected by 'watchers' below). +// TODO(benh): Replace this queue with functions that we put in +// 'functions' below that perform the ev_io_start themselves. +extern std::queue<ev_io*>* watchers; +extern synchronizable(watchers); + +// Queue of functions to be invoked asynchronously within the vent +// loop (protected by 'watchers' above). +extern std::queue<lambda::function<void(void)>>* functions; + + +// Wrapper around function we want to run in the event loop. +template <typename T> +void _run_in_event_loop( + const lambda::function<Future<T>(void)>& f, + const Owned<Promise<T>>& promise) +{ + // Don't bother running the function if the future has been discarded. + if (promise->future().hasDiscard()) { + promise->discard(); + } else { + promise->set(f()); + } +} + + +// Helper for running a function in the event loop. +template <typename T> +Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f) +{ + Owned<Promise<T>> promise(new Promise<T>()); + + Future<T> future = promise->future(); + + // Enqueue the function. + synchronized (watchers) { + functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise)); + } + + // Interrupt the loop. + ev_async_send(loop, &async_watcher); + + return future; +} + +} // namespace process { http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index bac4200..d96d881 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -83,6 +83,7 @@ #include "decoder.hpp" #include "encoder.hpp" #include "gate.hpp" +#include "libev.hpp" #include "process_reference.hpp" #include "synchronized.hpp" @@ -446,28 +447,6 @@ static SocketManager* socket_manager = NULL; // Active ProcessManager (eventually will probably be thread-local). static ProcessManager* process_manager = NULL; -// Event loop. -struct ev_loop* loop = NULL; - -// Asynchronous watcher for interrupting loop to specifically deal -// with IO watchers and functions (via run_in_event_loop). -static ev_async async_watcher; - -// Server watcher for accepting connections. -static ev_io server_watcher; - -// Queue of I/O watchers to be asynchronously added to the event loop -// (protected by 'watchers' below). -// TODO(benh): Replace this queue with functions that we put in -// 'functions' below that perform the ev_io_start themselves. -static queue<ev_io*>* watchers = new queue<ev_io*>(); -static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER; - -// Queue of functions to be invoked asynchronously within the vent -// loop (protected by 'watchers' below). -static queue<lambda::function<void(void)> >* functions = - new queue<lambda::function<void(void)> >(); - // Scheduling gate that threads wait at when there is nothing to run. static Gate* gate = new Gate(); @@ -593,40 +572,6 @@ static Message* parse(Request* request) return message; } -// Wrapper around function we want to run in the event loop. -template <typename T> -void _run_in_event_loop( - const lambda::function<Future<T>(void)>& f, - const Owned<Promise<T> >& promise) -{ - // Don't bother running the function if the future has been discarded. - if (promise->future().hasDiscard()) { - promise->discard(); - } else { - promise->set(f()); - } -} - - -// Helper for running a function in the event loop. -template <typename T> -Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f) -{ - Owned<Promise<T> > promise(new Promise<T>()); - - Future<T> future = promise->future(); - - // Enqueue the function. - synchronized (watchers) { - functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise)); - } - - // Interrupt the loop. - ev_async_send(loop, &async_watcher); - - return future; -} - void handle_async(struct ev_loop* loop, ev_async* _, int revents) { @@ -1280,7 +1225,12 @@ void initialize(const string& delegate) PLOG(FATAL) << "Failed to initialize, listen"; } - // Setup event loop. + // Initialize libev. + // + // TODO(benh): Eventually move this all out of process.cpp after + // more is disentangled. + synchronizer(watchers) = SYNCHRONIZED_INITIALIZER; + #ifdef __sun__ loop = ev_default_loop(EVBACKEND_POLL | EVBACKEND_SELECT); #else
