Started moving libev specific functionality out of process.cpp.

Review: https://reviews.apache.org/r/27504


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f78ae663
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f78ae663
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f78ae663

Branch: refs/heads/master
Commit: f78ae6635ce0aae89b1b9bd3dd1eae02c1da0936
Parents: 0e19796
Author: Benjamin Hindman <[email protected]>
Authored: Sun Nov 2 18:19:23 2014 -0800
Committer: Benjamin Hindman <[email protected]>
Committed: Sat Nov 15 16:25:58 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am     |  2 +
 3rdparty/libprocess/src/libev.cpp   | 21 ++++++++++
 3rdparty/libprocess/src/libev.hpp   | 70 ++++++++++++++++++++++++++++++++
 3rdparty/libprocess/src/process.cpp | 64 ++++-------------------------
 4 files changed, 100 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 0008e68..2de9989 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -38,6 +38,8 @@ libprocess_la_SOURCES =               \
   src/help.cpp                 \
   src/http.cpp                 \
   src/latch.cpp                        \
+  src/libev.hpp                        \
+  src/libev.cpp                        \
   src/metrics/metrics.cpp      \
   src/pid.cpp                  \
   src/process.cpp              \

http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp 
b/3rdparty/libprocess/src/libev.cpp
new file mode 100644
index 0000000..efc89d8
--- /dev/null
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -0,0 +1,21 @@
+#include <ev.h>
+
+#include <queue>
+
+#include <stout/lambda.hpp>
+
+#include "libev.hpp"
+
+namespace process {
+
+// Defines the initial values for all of the declarations made in
+// libev.hpp (since these need to live in the static data space).
+struct ev_loop* loop = NULL;
+ev_async async_watcher;
+ev_io server_watcher;
+std::queue<ev_io*>* watchers = new std::queue<ev_io*>();
+synchronizable(watchers);
+std::queue<lambda::function<void(void)>>* functions =
+  new std::queue<lambda::function<void(void)>>();
+
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp 
b/3rdparty/libprocess/src/libev.hpp
new file mode 100644
index 0000000..bac8b6a
--- /dev/null
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -0,0 +1,70 @@
+#include <ev.h>
+
+#include <queue>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include <stout/lambda.hpp>
+
+#include "synchronized.hpp"
+
+namespace process {
+
+// Event loop.
+extern struct ev_loop* loop;
+
+// Asynchronous watcher for interrupting loop to specifically deal
+// with IO watchers and functions (via run_in_event_loop).
+extern ev_async async_watcher;
+
+// Server watcher for accepting connections.
+extern ev_io server_watcher;
+
+// Queue of I/O watchers to be asynchronously added to the event loop
+// (protected by 'watchers' below).
+// TODO(benh): Replace this queue with functions that we put in
+// 'functions' below that perform the ev_io_start themselves.
+extern std::queue<ev_io*>* watchers;
+extern synchronizable(watchers);
+
+// Queue of functions to be invoked asynchronously within the vent
+// loop (protected by 'watchers' above).
+extern std::queue<lambda::function<void(void)>>* functions;
+
+
+// Wrapper around function we want to run in the event loop.
+template <typename T>
+void _run_in_event_loop(
+    const lambda::function<Future<T>(void)>& f,
+    const Owned<Promise<T>>& promise)
+{
+  // Don't bother running the function if the future has been discarded.
+  if (promise->future().hasDiscard()) {
+    promise->discard();
+  } else {
+    promise->set(f());
+  }
+}
+
+
+// Helper for running a function in the event loop.
+template <typename T>
+Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
+{
+  Owned<Promise<T>> promise(new Promise<T>());
+
+  Future<T> future = promise->future();
+
+  // Enqueue the function.
+  synchronized (watchers) {
+    functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
+  }
+
+  // Interrupt the loop.
+  ev_async_send(loop, &async_watcher);
+
+  return future;
+}
+
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f78ae663/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index bac4200..d96d881 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -83,6 +83,7 @@
 #include "decoder.hpp"
 #include "encoder.hpp"
 #include "gate.hpp"
+#include "libev.hpp"
 #include "process_reference.hpp"
 #include "synchronized.hpp"
 
@@ -446,28 +447,6 @@ static SocketManager* socket_manager = NULL;
 // Active ProcessManager (eventually will probably be thread-local).
 static ProcessManager* process_manager = NULL;
 
-// Event loop.
-struct ev_loop* loop = NULL;
-
-// Asynchronous watcher for interrupting loop to specifically deal
-// with IO watchers and functions (via run_in_event_loop).
-static ev_async async_watcher;
-
-// Server watcher for accepting connections.
-static ev_io server_watcher;
-
-// Queue of I/O watchers to be asynchronously added to the event loop
-// (protected by 'watchers' below).
-// TODO(benh): Replace this queue with functions that we put in
-// 'functions' below that perform the ev_io_start themselves.
-static queue<ev_io*>* watchers = new queue<ev_io*>();
-static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER;
-
-// Queue of functions to be invoked asynchronously within the vent
-// loop (protected by 'watchers' below).
-static queue<lambda::function<void(void)> >* functions =
-  new queue<lambda::function<void(void)> >();
-
 // Scheduling gate that threads wait at when there is nothing to run.
 static Gate* gate = new Gate();
 
@@ -593,40 +572,6 @@ static Message* parse(Request* request)
   return message;
 }
 
-// Wrapper around function we want to run in the event loop.
-template <typename T>
-void _run_in_event_loop(
-    const lambda::function<Future<T>(void)>& f,
-    const Owned<Promise<T> >& promise)
-{
-  // Don't bother running the function if the future has been discarded.
-  if (promise->future().hasDiscard()) {
-    promise->discard();
-  } else {
-    promise->set(f());
-  }
-}
-
-
-// Helper for running a function in the event loop.
-template <typename T>
-Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
-{
-  Owned<Promise<T> > promise(new Promise<T>());
-
-  Future<T> future = promise->future();
-
-  // Enqueue the function.
-  synchronized (watchers) {
-    functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
-  }
-
-  // Interrupt the loop.
-  ev_async_send(loop, &async_watcher);
-
-  return future;
-}
-
 
 void handle_async(struct ev_loop* loop, ev_async* _, int revents)
 {
@@ -1280,7 +1225,12 @@ void initialize(const string& delegate)
     PLOG(FATAL) << "Failed to initialize, listen";
   }
 
-  // Setup event loop.
+  // Initialize libev.
+  //
+  // TODO(benh): Eventually move this all out of process.cpp after
+  // more is disentangled.
+  synchronizer(watchers) = SYNCHRONIZED_INITIALIZER;
+
 #ifdef __sun__
   loop = ev_default_loop(EVBACKEND_POLL | EVBACKEND_SELECT);
 #else

Reply via email to