PROTON-1481: [C++ binding] simplify work_queue code by introducing work type - The work type can be created from std::function<void()> or void_function0 - and so pushes those c++11/C++03 differences into a single place
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5dd3f464 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5dd3f464 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5dd3f464 Branch: refs/heads/master Commit: 5dd3f464fbe472de49c0341a853098eba8059a14 Parents: d1c91a4 Author: Andrew Stitcher <[email protected]> Authored: Mon May 15 01:36:52 2017 -0400 Committer: Andrew Stitcher <[email protected]> Committed: Fri Jul 21 12:50:06 2017 -0400 ---------------------------------------------------------------------- examples/cpp/broker.cpp | 8 +- .../bindings/cpp/include/proton/container.hpp | 14 +-- proton-c/bindings/cpp/include/proton/fwd.hpp | 4 +- .../bindings/cpp/include/proton/work_queue.hpp | 52 +++++++--- proton-c/bindings/cpp/src/container.cpp | 6 +- .../cpp/src/include/proactor_container_impl.hpp | 12 +-- .../src/include/proactor_work_queue_impl.hpp | 6 +- .../cpp/src/proactor_container_impl.cpp | 103 ++++--------------- proton-c/bindings/cpp/src/work_queue.cpp | 10 +- 9 files changed, 77 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/examples/cpp/broker.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp index d39314c..9af60ba 100644 --- a/examples/cpp/broker.cpp +++ b/examples/cpp/broker.cpp @@ -185,7 +185,7 @@ public: sender_(s), senders_(ss), work_queue_(s.work_queue()), queue_(0), pending_credit_(0) {} - void add(proton::void_function0& f) { + void add(proton::work f) { work_queue_.add(f); } @@ -241,7 +241,7 @@ public: work_queue_(c), name_(n), current_(subscriptions_.end()) {} - void add(proton::void_function0& f) { + void add(proton::work f) { work_queue_.add(f); } @@ -333,7 +333,7 @@ public: receiver_(r), work_queue_(r.work_queue()), queue_(0) {} - void add(proton::void_function0& f) { + void add(proton::work f) { work_queue_.add(f); } @@ -361,7 +361,7 @@ public: container_(c), work_queue_(c), next_id_(0) {} - void add(proton::void_function0& f) { + void add(proton::work f) { work_queue_.add(f); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/include/proton/container.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp index 383aa3c..0739517 100644 --- a/proton-c/bindings/cpp/include/proton/container.hpp +++ b/proton-c/bindings/cpp/include/proton/container.hpp @@ -206,15 +206,11 @@ class PN_CPP_CLASS_EXTERN container { /// @copydoc receiver_options PN_CPP_EXTERN class receiver_options receiver_options() const; - /// Schedule a function to be called after the duration. C++03 - /// compatible, for C++11 use schedule(duration, - /// std::function<void()>) - PN_CPP_EXTERN void schedule(duration, void_function0&); - -#if PN_CPP_HAS_STD_FUNCTION - /// Schedule a function to be called after the duration - PN_CPP_EXTERN void schedule(duration, std::function<void()>); -#endif + /// Schedule a piece of work to happen after the duration: + /// The piece of work can be created from a function object. + /// for C++11 and on use a std::function<void()> type; for + /// C++03 compatibility you can use void_function0& + PN_CPP_EXTERN void schedule(duration, work); private: class impl; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/include/proton/fwd.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/fwd.hpp b/proton-c/bindings/cpp/include/proton/fwd.hpp index 6ad216e..b839f42 100644 --- a/proton-c/bindings/cpp/include/proton/fwd.hpp +++ b/proton-c/bindings/cpp/include/proton/fwd.hpp @@ -31,7 +31,6 @@ class container; class delivery; class error_condition; class event; -class work_queue; class message; class message_id; class messaging_handler; @@ -54,7 +53,8 @@ class tracker; class transport; class url; class void_function0; - +class work; +class work_queue; namespace io { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/include/proton/work_queue.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/work_queue.hpp b/proton-c/bindings/cpp/include/proton/work_queue.hpp index 7acd507..7c21710 100644 --- a/proton-c/bindings/cpp/include/proton/work_queue.hpp +++ b/proton-c/bindings/cpp/include/proton/work_queue.hpp @@ -23,6 +23,7 @@ */ #include "./fwd.hpp" +#include "./function.hpp" #include "./internal/config.hpp" #include "./internal/export.hpp" #include "./internal/pn_unique_ptr.hpp" @@ -35,12 +36,44 @@ struct pn_link_t; namespace proton { -/// **Experimental** - A serial execution context. +/// **Experimental** - A work queue for serial execution. /// /// Event handler functions associated with a single proton::connection are called in sequence. -/// The connection's @ref event_loop allows you to "inject" extra work from any thread, +/// The connection's @ref work_queue allows you to "inject" extra @ref work from any thread, /// and have it executed in the same sequence. /// +/// You may also create arbitrary @ref work_queue objects backed by a @ref container that allow +/// other objects to have their own serialised work queues that can have work injected safely +/// from other threads. The @ref container ensures that the work is correctly serialised. +/// +/// The @ref work class represents the work to be queued and can be created from a function +/// that takes no parameters and returns no value. +/// + +class work { + public: +#if PN_CPP_HAS_STD_FUNCTION + work(void_function0& f): item_( [&f]() { f(); }) {} + template <class T> + work(T f): item_(f) {} + + void operator()() { item_(); } +#else + work(void_function0& f): item_(&f) {} + + void operator()() { (*item_)(); } +#endif + ~work() {} + + + private: +#if PN_CPP_HAS_STD_FUNCTION + std::function<void()> item_; +#else + void_function0* item_; +#endif +}; + class PN_CPP_CLASS_EXTERN work_queue { /// @cond internal class impl; @@ -48,14 +81,14 @@ class PN_CPP_CLASS_EXTERN work_queue { /// @endcond public: - /// Create event_loop + /// Create work_queue PN_CPP_EXTERN work_queue(); PN_CPP_EXTERN work_queue(container&); PN_CPP_EXTERN ~work_queue(); #if PN_CPP_HAS_EXPLICIT_CONVERSIONS - /// When using C++11 (or later) you can use event_loop in a bool context + /// When using C++11 (or later) you can use work_queue in a bool context /// to indicate if there is an event loop set. PN_CPP_EXTERN explicit operator bool() const { return bool(impl_); } #endif @@ -63,17 +96,12 @@ class PN_CPP_CLASS_EXTERN work_queue { /// No event loop set. PN_CPP_EXTERN bool operator !() const { return !impl_; } - /// Arrange to have f() called in the event_loop's sequence: possibly - /// deferred, possibly in another thread. + /// Add work to the work queue: f() will be called serialised with other work in the queue: + /// deferred and possibly in another thread. /// /// @return true if f() has or will be called, false if the event_loop is ended /// and f() cannot be injected. - PN_CPP_EXTERN bool add(void_function0& f); - -#if PN_CPP_HAS_STD_FUNCTION - /// @copydoc inject(void_function0&) - PN_CPP_EXTERN bool add(std::function<void()> f); -#endif + PN_CPP_EXTERN bool add(work f); private: PN_CPP_EXTERN static work_queue& get(pn_connection_t*); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/src/container.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp index b98da78..c2af659 100644 --- a/proton-c/bindings/cpp/src/container.cpp +++ b/proton-c/bindings/cpp/src/container.cpp @@ -103,11 +103,7 @@ returned<receiver> container::open_receiver( std::string container::id() const { return impl_->id(); } -void container::schedule(duration d, void_function0& f) { return impl_->schedule(d, f); } - -#if PN_CPP_HAS_STD_FUNCTION -void container::schedule(duration d, std::function<void()> f) { return impl_->schedule(d, f); } -#endif +void container::schedule(duration d, work f) { return impl_->schedule(d, f); } void container::client_connection_options(const connection_options& c) { impl_->client_connection_options(c); } connection_options container::client_connection_options() const { return impl_->client_connection_options(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp index 9b4be11..fc963f7 100644 --- a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp +++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp @@ -73,10 +73,7 @@ class container::impl { void run(); void stop(const error_condition& err); void auto_stop(bool set); - void schedule(duration, void_function0&); -#if PN_CPP_HAS_STD_FUNCTION - void schedule(duration, std::function<void()>); -#endif + void schedule(duration, work); template <class T> static void set_handler(T s, messaging_handler* h); template <class T> static messaging_handler* get_handler(T s); static work_queue::impl* make_work_queue(container&); @@ -101,12 +98,7 @@ class container::impl { void remove_work_queue(container_work_queue*); struct scheduled { timestamp time; // duration from epoch for task -#if PN_CPP_HAS_STD_FUNCTION - std::function<void()> task; -#else - void_function0* task_; - void task(); -#endif + work task; // We want to get to get the *earliest* first so test is "reversed" bool operator < (const scheduled& r) const { return r.time < time; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp index 57fc4c0..ac0f803 100644 --- a/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp +++ b/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp @@ -24,16 +24,14 @@ #include "proton/fwd.hpp" #include "proton/internal/config.hpp" +#include "proton/work_queue.hpp" namespace proton { class work_queue::impl { public: virtual ~impl() {}; - virtual bool inject(void_function0& f) = 0; -#if PN_CPP_HAS_STD_FUNCTION - virtual bool inject(std::function<void()> f) = 0; -#endif + virtual bool add(work f) = 0; virtual void run_all_jobs() = 0; virtual void finished() = 0; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/src/proactor_container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp index 78ccabf..0135c08 100644 --- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp +++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp @@ -47,11 +47,7 @@ class container::impl::common_work_queue : public work_queue::impl { public: common_work_queue(): finished_(false) {} -#if PN_CPP_HAS_STD_FUNCTION - typedef std::vector<std::function<void()> > jobs; -#else - typedef std::vector<void_function0*> jobs; -#endif + typedef std::vector<work> jobs; void run_all_jobs(); void finished() { finished_ = true; } @@ -60,100 +56,53 @@ class container::impl::common_work_queue : public work_queue::impl { bool finished_; }; -#if PN_CPP_HAS_STD_FUNCTION -void container::impl::common_work_queue::run_all_jobs() { - decltype(jobs_) j; - { - std::swap(j, jobs_); - } - // Run queued work, but ignore any exceptions - for (auto& f : j) try { - f(); - } catch (...) {}; -} -#else void container::impl::common_work_queue::run_all_jobs() { + jobs j; + // Lock this operation for mt + std::swap(j, jobs_); // Run queued work, but ignore any exceptions - for (jobs::iterator f = jobs_.begin(); f != jobs_.end(); ++f) try { - (**f)(); + for (jobs::iterator f = j.begin(); f != j.end(); ++f) try { + (*f)(); } catch (...) {}; - jobs_.clear(); return; } -#endif class container::impl::connection_work_queue : public common_work_queue { public: connection_work_queue(pn_connection_t* c): connection_(c) {} - bool inject(void_function0& f); -#if PN_CPP_HAS_STD_FUNCTION - bool inject(std::function<void()> f); -#endif + bool add(work f); pn_connection_t* connection_; }; -#if PN_CPP_HAS_STD_FUNCTION -bool container::impl::connection_work_queue::inject(std::function<void()> f) { - // Note this is an unbounded work queue. - // A resource-safe implementation should be bounded. - if (finished_) return false; - jobs_.emplace_back(std::move(f)); - pn_connection_wake(connection_); - return true; -} - -bool container::impl::connection_work_queue::inject(proton::void_function0& f) { - return inject([&f]() { f(); }); -} -#else -bool container::impl::connection_work_queue::inject(proton::void_function0& f) { +bool container::impl::connection_work_queue::add(work f) { // Note this is an unbounded work queue. // A resource-safe implementation should be bounded. if (finished_) return false; - jobs_.push_back(&f); + jobs_.push_back(f); pn_connection_wake(connection_); return true; } -#endif class container::impl::container_work_queue : public common_work_queue { public: container_work_queue(container::impl& c): container_(c) {} ~container_work_queue() { container_.remove_work_queue(this); } - bool inject(void_function0& f); -#if PN_CPP_HAS_STD_FUNCTION - bool inject(std::function<void()> f); -#endif + bool add(work f); container::impl& container_; }; -#if PN_CPP_HAS_STD_FUNCTION -bool container::impl::container_work_queue::inject(std::function<void()> f) { - // Note this is an unbounded work queue. - // A resource-safe implementation should be bounded. - if (finished_) return false; - jobs_.emplace_back(std::move(f)); - pn_proactor_set_timeout(container_.proactor_, 0); - return true; -} - -bool container::impl::container_work_queue::inject(proton::void_function0& f) { - return inject([&f]() { f(); }); -} -#else -bool container::impl::container_work_queue::inject(proton::void_function0& f) { +bool container::impl::container_work_queue::add(work f) { // Note this is an unbounded work queue. // A resource-safe implementation should be bounded. if (finished_) return false; - jobs_.push_back(&f); + jobs_.push_back(f); pn_proactor_set_timeout(container_.proactor_, 0); return true; } -#endif class work_queue::impl* container::impl::make_work_queue(container& c) { return c.impl_->add_work_queue(); @@ -277,32 +226,18 @@ proton::listener container::impl::listen(const std::string& addr, proton::listen return proton::listener(listener); } -#if PN_CPP_HAS_STD_FUNCTION -void container::impl::schedule(duration delay, void_function0& f) { - schedule(delay, [&f](){ f(); } ); -} - -void container::impl::schedule(duration delay, std::function<void()> f) { - // Set timeout - pn_proactor_set_timeout(proactor_, delay.milliseconds()); - - // Record timeout; Add callback to timeout sorted list - deferred_.emplace_back(scheduled{timestamp::now()+delay, f}); - std::push_heap(deferred_.begin(), deferred_.end()); -} -#else -void container::impl::scheduled::task() {(*task_)();} - -void container::impl::schedule(duration delay, void_function0& f) { - // Set timeout - pn_proactor_set_timeout(proactor_, delay.milliseconds()); +void container::impl::schedule(duration delay, work f) { + timestamp now = timestamp::now(); // Record timeout; Add callback to timeout sorted list - scheduled s = {timestamp::now()+delay, &f}; + scheduled s = {now+delay, f}; deferred_.push_back(s); std::push_heap(deferred_.begin(), deferred_.end()); + + // Set timeout for current head of timeout queue + scheduled* next = &deferred_.front(); + pn_proactor_set_timeout(proactor_, (next->time-now).milliseconds()); } -#endif void container::impl::client_connection_options(const connection_options &opts) { client_connection_options_ = opts; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/src/work_queue.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/work_queue.cpp b/proton-c/bindings/cpp/src/work_queue.cpp index 961e5f0..db95042 100644 --- a/proton-c/bindings/cpp/src/work_queue.cpp +++ b/proton-c/bindings/cpp/src/work_queue.cpp @@ -35,16 +35,10 @@ work_queue::~work_queue() {} work_queue& work_queue::operator=(impl* i) { impl_.reset(i); return *this; } -bool work_queue::add(void_function0& f) { - return impl_->inject(f); +bool work_queue::add(work f) { + return impl_->add(f); } -#if PN_CPP_HAS_STD_FUNCTION -bool work_queue::add(std::function<void()> f) { - return impl_->inject(f); -} -#endif - work_queue& work_queue::get(pn_connection_t* c) { return connection_context::get(c).work_queue_; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
