PROTON-1400: [C++ binding] Implement container level event_loops
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/570d0a1a Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/570d0a1a Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/570d0a1a Branch: refs/heads/master Commit: 570d0a1aae29adfab861a76f37d2aa9f14488a9a Parents: ec2364f Author: Andrew Stitcher <[email protected]> Authored: Thu Apr 20 15:20:40 2017 -0400 Committer: Andrew Stitcher <[email protected]> Committed: Fri Jul 21 12:50:06 2017 -0400 ---------------------------------------------------------------------- .../bindings/cpp/include/proton/container.hpp | 1 + .../bindings/cpp/include/proton/event_loop.hpp | 1 + proton-c/bindings/cpp/src/event_loop.cpp | 3 + .../cpp/src/include/proactor_container_impl.hpp | 10 ++ .../src/include/proactor_event_loop_impl.hpp | 23 +-- .../cpp/src/proactor_container_impl.cpp | 156 ++++++++++++++----- 6 files changed, 139 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/570d0a1a/proton-c/bindings/cpp/include/proton/container.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp index be83e5e..0262e0f 100644 --- a/proton-c/bindings/cpp/include/proton/container.hpp +++ b/proton-c/bindings/cpp/include/proton/container.hpp @@ -224,6 +224,7 @@ class PN_CPP_CLASS_EXTERN container { friend class session_options; friend class receiver_options; friend class sender_options; + friend class event_loop; }; } // proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/570d0a1a/proton-c/bindings/cpp/include/proton/event_loop.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/event_loop.hpp b/proton-c/bindings/cpp/include/proton/event_loop.hpp index f49d211..6d1646e 100644 --- a/proton-c/bindings/cpp/include/proton/event_loop.hpp +++ b/proton-c/bindings/cpp/include/proton/event_loop.hpp @@ -50,6 +50,7 @@ class PN_CPP_CLASS_EXTERN event_loop { public: /// Create event_loop PN_CPP_EXTERN event_loop(); + PN_CPP_EXTERN event_loop(container&); PN_CPP_EXTERN ~event_loop(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/570d0a1a/proton-c/bindings/cpp/src/event_loop.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/event_loop.cpp b/proton-c/bindings/cpp/src/event_loop.cpp index ab39aa7..5320011 100644 --- a/proton-c/bindings/cpp/src/event_loop.cpp +++ b/proton-c/bindings/cpp/src/event_loop.cpp @@ -20,6 +20,7 @@ #include "proton/event_loop.hpp" #include "contexts.hpp" +#include "proactor_container_impl.hpp" #include "proactor_event_loop_impl.hpp" #include <proton/session.h> @@ -28,6 +29,8 @@ namespace proton { event_loop::event_loop() {} +event_loop::event_loop(container& c) { *this = container::impl::make_event_loop(c); } + event_loop::~event_loop() {} event_loop& event_loop::operator=(impl* i) { impl_.reset(i); return *this; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/570d0a1a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp index 859493d..4b84a6e 100644 --- a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp +++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp @@ -28,6 +28,7 @@ #include "proton/connection_options.hpp" #include "proton/duration.hpp" #include "proton/error_condition.hpp" +#include "proton/event_loop.hpp" #include "proton/messaging_handler.hpp" #include "proton/receiver.hpp" #include "proton/receiver_options.hpp" @@ -38,6 +39,7 @@ #include <list> #include <map> +#include <set> #include <string> #include <vector> @@ -77,8 +79,12 @@ class container::impl { #endif template <class T> static void set_handler(T s, messaging_handler* h); template <class T> static messaging_handler* get_handler(T s); + static event_loop::impl* make_event_loop(container&); private: + class common_event_loop; + class connection_event_loop; + class container_event_loop; pn_listener_t* listen_common_lh(const std::string&); connection connect_common(const std::string&, const connection_options&); @@ -89,6 +95,10 @@ class container::impl { container& container_; + typedef std::set<container_event_loop*> event_loops; + event_loops event_loops_; + container_event_loop* add_event_loop(); + void remove_event_loop(container_event_loop*); struct scheduled { timestamp time; // duration from epoch for task #if PN_CPP_HAS_STD_FUNCTION http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/570d0a1a/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp index 8fa7acf..82ec129 100644 --- a/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp +++ b/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp @@ -23,30 +23,19 @@ */ #include "proton/fwd.hpp" - -struct pn_connection_t; +#include "proton/internal/config.hpp" namespace proton { class event_loop::impl { public: - impl(pn_connection_t*); - - bool inject(void_function0& f); + virtual ~impl() {}; + virtual bool inject(void_function0& f) = 0; #if PN_CPP_HAS_STD_FUNCTION - bool inject(std::function<void()> f); - typedef std::vector<std::function<void()> > jobs; -#else - typedef std::vector<void_function0*> jobs; + virtual bool inject(std::function<void()> f) = 0; #endif - - - void run_all_jobs(); - void finished(); - - jobs jobs_; - pn_connection_t* connection_; - bool finished_; + virtual void run_all_jobs() = 0; + virtual void finished() = 0; }; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/570d0a1a/proton-c/bindings/cpp/src/proactor_container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp index 2486e2b..4d526f2 100644 --- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp +++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp @@ -43,30 +43,25 @@ namespace proton { -event_loop::impl::impl(pn_connection_t* c) - : connection_(c), finished_(false) -{} - -void event_loop::impl::finished() { - finished_ = true; -} +class container::impl::common_event_loop : public event_loop::impl { + public: + common_event_loop(): finished_(false) {} #if PN_CPP_HAS_STD_FUNCTION -bool event_loop::impl::inject(std::function<void()> f) { - // Note this is an unbounded work queue. - // A resource-safe implementation should be bounded. - if (finished_) - return false; - jobs_.push_back(f); - pn_connection_wake(connection_); - return true; -} + typedef std::vector<std::function<void()> > jobs; +#else + typedef std::vector<void_function0*> jobs; +#endif -bool event_loop::impl::inject(proton::void_function0& f) { - return inject([&f]() { f(); }); -} + void run_all_jobs(); + void finished() { finished_ = true; } -void event_loop::impl::run_all_jobs() { + jobs jobs_; + bool finished_; +}; + +#if PN_CPP_HAS_STD_FUNCTION +void container::impl::common_event_loop::run_all_jobs() { decltype(jobs_) j; { std::swap(j, jobs_); @@ -77,25 +72,93 @@ void event_loop::impl::run_all_jobs() { } catch (...) {}; } #else -bool event_loop::impl::inject(proton::void_function0& f) { +void container::impl::common_event_loop::run_all_jobs() { + // Run queued work, but ignore any exceptions + for (jobs::iterator f = jobs_.begin(); f != jobs_.end(); ++f) try { + (**f)(); + } catch (...) {}; + jobs_.clear(); + return; +} +#endif + +class container::impl::connection_event_loop : public common_event_loop { + public: + connection_event_loop(pn_connection_t* c): connection_(c) {} + + bool inject(void_function0& f); +#if PN_CPP_HAS_STD_FUNCTION + bool inject(std::function<void()> f); +#endif + + pn_connection_t* connection_; +}; + +#if PN_CPP_HAS_STD_FUNCTION +bool container::impl::connection_event_loop::inject(std::function<void()> f) { + // Note this is an unbounded work queue. + // A resource-safe implementation should be bounded. + if (finished_) return false; + jobs_.emplace_back(std::move(f)); + pn_connection_wake(connection_); + return true; +} + +bool container::impl::connection_event_loop::inject(proton::void_function0& f) { + return inject([&f]() { f(); }); +} +#else +bool container::impl::connection_event_loop::inject(proton::void_function0& f) { // Note this is an unbounded work queue. // A resource-safe implementation should be bounded. - if (finished_) - return false; + if (finished_) return false; jobs_.push_back(&f); pn_connection_wake(connection_); return true; } +#endif -void event_loop::impl::run_all_jobs() { - // Run queued work, but ignore any exceptions - for (event_loop::impl::jobs::iterator f = jobs_.begin(); f != jobs_.end(); ++f) try { - (**f)(); - } catch (...) {}; - jobs_.clear(); - return; +class container::impl::container_event_loop : public common_event_loop { + public: + container_event_loop(container::impl& c): container_(c) {} + ~container_event_loop() { container_.remove_event_loop(this); } + + bool inject(void_function0& f); +#if PN_CPP_HAS_STD_FUNCTION + bool inject(std::function<void()> f); +#endif + + container::impl& container_; +}; + +#if PN_CPP_HAS_STD_FUNCTION +bool container::impl::container_event_loop::inject(std::function<void()> f) { + // Note this is an unbounded work queue. + // A resource-safe implementation should be bounded. + if (finished_) return false; + jobs_.emplace_back(std::move(f)); + pn_proactor_set_timeout(container_.proactor_, 0); + return true; +} + +bool container::impl::container_event_loop::inject(proton::void_function0& f) { + return inject([&f]() { f(); }); +} +#else +bool container::impl::container_event_loop::inject(proton::void_function0& f) { + // Note this is an unbounded work queue. + // A resource-safe implementation should be bounded. + if (finished_) return false; + jobs_.push_back(&f); + pn_proactor_set_timeout(container_.proactor_, 0); + return true; } #endif + +class event_loop::impl* container::impl::make_event_loop(container& c) { + return c.impl_->add_event_loop(); +} + container::impl::impl(container& c, const std::string& id, messaging_handler* mh) : container_(c), proactor_(pn_proactor()), handler_(mh), id_(id), auto_stop_(true), stopping_(false) @@ -109,6 +172,16 @@ container::impl::~impl() { pn_proactor_free(proactor_); } +container::impl::container_event_loop* container::impl::add_event_loop() { + container_event_loop* c = new container_event_loop(*this); + event_loops_.insert(c); + return c; +} + +void container::impl::remove_event_loop(container::impl::container_event_loop* l) { + event_loops_.erase(l); +} + proton::connection container::impl::connect_common( const std::string& addr, const proton::connection_options& user_opts) @@ -125,7 +198,7 @@ proton::connection container::impl::connect_common( connection_context& cc(connection_context::get(pnc)); cc.container = &container_; cc.handler = mh; - cc.event_loop_ = new event_loop::impl(pnc); + cc.event_loop_ = new container::impl::connection_event_loop(pnc); pn_connection_set_container(pnc, id_.c_str()); pn_connection_set_hostname(pnc, url.host().c_str()); @@ -225,7 +298,7 @@ void container::impl::schedule(duration delay, void_function0& f) { pn_proactor_set_timeout(proactor_, delay.milliseconds()); // Record timeout; Add callback to timeout sorted list - scheduled s={timestamp::now()+delay, &f}; + scheduled s = {timestamp::now()+delay, &f}; deferred_.push_back(s); std::push_heap(deferred_.begin(), deferred_.end()); } @@ -285,13 +358,20 @@ bool container::impl::handle(pn_event_t* event) { case PN_PROACTOR_INTERRUPT: return false; - case PN_PROACTOR_TIMEOUT: - // Maybe we got a timeout and have nothing scheduled (not sure if this is possible) - if ( deferred_.size()==0 ) return false; + case PN_PROACTOR_TIMEOUT: { + // Can get an immediate timeout, if we have a container event loop inject + if ( deferred_.size()>0 ) { + run_timer_jobs(); + } - run_timer_jobs(); + // Run every container event loop job + // This is not at all efficient and single threads all these jobs, but it does correctly + // serialise them + for (event_loops::iterator loop = event_loops_.begin(); loop!=event_loops_.end(); ++loop) { + (*loop)->run_all_jobs(); + } return false; - + } case PN_LISTENER_OPEN: return false; @@ -312,7 +392,7 @@ bool container::impl::handle(pn_event_t* event) { cc.container = &container_; cc.listener_context_ = &lc; cc.handler = opts.handler(); - cc.event_loop_ = new event_loop::impl(c); + cc.event_loop_ = new container::impl::connection_event_loop(c); pn_listener_accept(l, c); return false; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
