PROTON-1481: [C++ binding] simplify work_queue code by introducing work type
- The work type can be created from std::function<void()> or void_function0
- and so pushes those c++11/C++03 differences into a single place


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5dd3f464
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5dd3f464
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5dd3f464

Branch: refs/heads/master
Commit: 5dd3f464fbe472de49c0341a853098eba8059a14
Parents: d1c91a4
Author: Andrew Stitcher <[email protected]>
Authored: Mon May 15 01:36:52 2017 -0400
Committer: Andrew Stitcher <[email protected]>
Committed: Fri Jul 21 12:50:06 2017 -0400

----------------------------------------------------------------------
 examples/cpp/broker.cpp                         |   8 +-
 .../bindings/cpp/include/proton/container.hpp   |  14 +--
 proton-c/bindings/cpp/include/proton/fwd.hpp    |   4 +-
 .../bindings/cpp/include/proton/work_queue.hpp  |  52 +++++++---
 proton-c/bindings/cpp/src/container.cpp         |   6 +-
 .../cpp/src/include/proactor_container_impl.hpp |  12 +--
 .../src/include/proactor_work_queue_impl.hpp    |   6 +-
 .../cpp/src/proactor_container_impl.cpp         | 103 ++++---------------
 proton-c/bindings/cpp/src/work_queue.cpp        |  10 +-
 9 files changed, 77 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/examples/cpp/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp
index d39314c..9af60ba 100644
--- a/examples/cpp/broker.cpp
+++ b/examples/cpp/broker.cpp
@@ -185,7 +185,7 @@ public:
         sender_(s), senders_(ss), work_queue_(s.work_queue()), queue_(0), 
pending_credit_(0)
     {}
 
-    void add(proton::void_function0& f) {
+    void add(proton::work f) {
         work_queue_.add(f);
     }
 
@@ -241,7 +241,7 @@ public:
         work_queue_(c), name_(n), current_(subscriptions_.end())
     {}
 
-    void add(proton::void_function0& f) {
+    void add(proton::work f) {
         work_queue_.add(f);
     }
 
@@ -333,7 +333,7 @@ public:
         receiver_(r), work_queue_(r.work_queue()), queue_(0)
     {}
 
-    void add(proton::void_function0& f) {
+    void add(proton::work f) {
         work_queue_.add(f);
     }
 
@@ -361,7 +361,7 @@ public:
         container_(c), work_queue_(c), next_id_(0)
     {}
 
-    void add(proton::void_function0& f) {
+    void add(proton::work f) {
         work_queue_.add(f);
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp 
b/proton-c/bindings/cpp/include/proton/container.hpp
index 383aa3c..0739517 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -206,15 +206,11 @@ class PN_CPP_CLASS_EXTERN container {
     /// @copydoc receiver_options
     PN_CPP_EXTERN class receiver_options receiver_options() const;
 
-    /// Schedule a function to be called after the duration.  C++03
-    /// compatible, for C++11 use schedule(duration,
-    /// std::function<void()>)
-    PN_CPP_EXTERN void schedule(duration, void_function0&);
-
-#if PN_CPP_HAS_STD_FUNCTION
-    /// Schedule a function to be called after the duration
-    PN_CPP_EXTERN void schedule(duration, std::function<void()>);
-#endif
+    /// Schedule a piece of work to happen after the duration:
+    /// The piece of work can be created from a function object.
+    /// for C++11 and on use a std::function<void()> type; for
+    /// C++03 compatibility you can use void_function0&
+    PN_CPP_EXTERN void schedule(duration, work);
 
   private:
     class impl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/include/proton/fwd.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/fwd.hpp 
b/proton-c/bindings/cpp/include/proton/fwd.hpp
index 6ad216e..b839f42 100644
--- a/proton-c/bindings/cpp/include/proton/fwd.hpp
+++ b/proton-c/bindings/cpp/include/proton/fwd.hpp
@@ -31,7 +31,6 @@ class container;
 class delivery;
 class error_condition;
 class event;
-class work_queue;
 class message;
 class message_id;
 class messaging_handler;
@@ -54,7 +53,8 @@ class tracker;
 class transport;
 class url;
 class void_function0;
-
+class work;
+class work_queue;
 
 namespace io {
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/include/proton/work_queue.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/work_queue.hpp 
b/proton-c/bindings/cpp/include/proton/work_queue.hpp
index 7acd507..7c21710 100644
--- a/proton-c/bindings/cpp/include/proton/work_queue.hpp
+++ b/proton-c/bindings/cpp/include/proton/work_queue.hpp
@@ -23,6 +23,7 @@
  */
 
 #include "./fwd.hpp"
+#include "./function.hpp"
 #include "./internal/config.hpp"
 #include "./internal/export.hpp"
 #include "./internal/pn_unique_ptr.hpp"
@@ -35,12 +36,44 @@ struct pn_link_t;
 
 namespace proton {
 
-/// **Experimental** - A serial execution context.
+/// **Experimental** - A work queue for serial execution.
 ///
 /// Event handler functions associated with a single proton::connection are 
called in sequence.
-/// The connection's @ref event_loop allows you to "inject" extra work from 
any thread,
+/// The connection's @ref work_queue allows you to "inject" extra @ref work 
from any thread,
 /// and have it executed in the same sequence.
 ///
+/// You may also create arbitrary @ref work_queue objects backed by a @ref 
container that allow
+/// other objects to have their own serialised work queues that can have work 
injected safely
+/// from other threads. The @ref container ensures that the work is correctly 
serialised.
+///
+/// The @ref work class represents the work to be queued and can be created 
from a function
+/// that takes no parameters and returns no value.
+///
+
+class work {
+  public:
+#if PN_CPP_HAS_STD_FUNCTION
+    work(void_function0& f): item_( [&f]() { f(); }) {}
+    template <class T>
+    work(T f): item_(f) {}
+
+    void operator()() { item_(); }
+#else
+    work(void_function0& f): item_(&f) {}
+
+    void operator()() { (*item_)(); }
+#endif
+    ~work() {}
+
+
+  private:
+#if PN_CPP_HAS_STD_FUNCTION
+    std::function<void()> item_;
+#else
+    void_function0* item_;
+#endif
+};
+
 class PN_CPP_CLASS_EXTERN work_queue {
     /// @cond internal
     class impl;
@@ -48,14 +81,14 @@ class PN_CPP_CLASS_EXTERN work_queue {
     /// @endcond
 
   public:
-    /// Create event_loop
+    /// Create work_queue
     PN_CPP_EXTERN work_queue();
     PN_CPP_EXTERN work_queue(container&);
 
     PN_CPP_EXTERN ~work_queue();
 
 #if PN_CPP_HAS_EXPLICIT_CONVERSIONS
-    /// When using C++11 (or later) you can use event_loop in a bool context
+    /// When using C++11 (or later) you can use work_queue in a bool context
     /// to indicate if there is an event loop set.
     PN_CPP_EXTERN explicit operator bool() const { return bool(impl_); }
 #endif
@@ -63,17 +96,12 @@ class PN_CPP_CLASS_EXTERN work_queue {
     /// No event loop set.
     PN_CPP_EXTERN bool operator !() const { return !impl_; }
 
-    /// Arrange to have f() called in the event_loop's sequence: possibly
-    /// deferred, possibly in another thread.
+    /// Add work to the work queue: f() will be called serialised with other 
work in the queue:
+    /// deferred and possibly in another thread.
     ///
     /// @return true if f() has or will be called, false if the event_loop is 
ended
     /// and f() cannot be injected.
-    PN_CPP_EXTERN bool add(void_function0& f);
-
-#if PN_CPP_HAS_STD_FUNCTION
-    /// @copydoc inject(void_function0&)
-    PN_CPP_EXTERN bool add(std::function<void()> f);
-#endif
+    PN_CPP_EXTERN bool add(work f);
 
   private:
     PN_CPP_EXTERN static work_queue& get(pn_connection_t*);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/src/container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container.cpp 
b/proton-c/bindings/cpp/src/container.cpp
index b98da78..c2af659 100644
--- a/proton-c/bindings/cpp/src/container.cpp
+++ b/proton-c/bindings/cpp/src/container.cpp
@@ -103,11 +103,7 @@ returned<receiver> container::open_receiver(
 
 std::string container::id() const { return impl_->id(); }
 
-void container::schedule(duration d, void_function0& f) { return 
impl_->schedule(d, f); }
-
-#if PN_CPP_HAS_STD_FUNCTION
-void container::schedule(duration d, std::function<void()> f) { return 
impl_->schedule(d, f); }
-#endif
+void container::schedule(duration d, work f) { return impl_->schedule(d, f); }
 
 void container::client_connection_options(const connection_options& c) { 
impl_->client_connection_options(c); }
 connection_options container::client_connection_options() const { return 
impl_->client_connection_options(); }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp 
b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
index 9b4be11..fc963f7 100644
--- a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
+++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
@@ -73,10 +73,7 @@ class container::impl {
     void run();
     void stop(const error_condition& err);
     void auto_stop(bool set);
-    void schedule(duration, void_function0&);
-#if PN_CPP_HAS_STD_FUNCTION
-    void schedule(duration, std::function<void()>);
-#endif
+    void schedule(duration, work);
     template <class T> static void set_handler(T s, messaging_handler* h);
     template <class T> static messaging_handler* get_handler(T s);
     static work_queue::impl* make_work_queue(container&);
@@ -101,12 +98,7 @@ class container::impl {
     void remove_work_queue(container_work_queue*);
     struct scheduled {
         timestamp time; // duration from epoch for task
-#if PN_CPP_HAS_STD_FUNCTION
-        std::function<void()>  task;
-#else
-        void_function0* task_;
-        void task();
-#endif
+        work task;
 
         // We want to get to get the *earliest* first so test is "reversed"
         bool operator < (const scheduled& r) const { return  r.time < time; }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp 
b/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp
index 57fc4c0..ac0f803 100644
--- a/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp
+++ b/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp
@@ -24,16 +24,14 @@
 
 #include "proton/fwd.hpp"
 #include "proton/internal/config.hpp"
+#include "proton/work_queue.hpp"
 
 namespace proton {
 
 class work_queue::impl {
   public:
     virtual ~impl() {};
-    virtual bool inject(void_function0& f) = 0;
-#if PN_CPP_HAS_STD_FUNCTION
-    virtual bool inject(std::function<void()> f) = 0;
-#endif
+    virtual bool add(work f) = 0;
     virtual void run_all_jobs() = 0;
     virtual void finished() = 0;
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp 
b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 78ccabf..0135c08 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -47,11 +47,7 @@ class container::impl::common_work_queue : public 
work_queue::impl {
   public:
     common_work_queue(): finished_(false) {}
 
-#if PN_CPP_HAS_STD_FUNCTION
-    typedef std::vector<std::function<void()> > jobs;
-#else
-    typedef std::vector<void_function0*> jobs;
-#endif
+    typedef std::vector<work> jobs;
 
     void run_all_jobs();
     void finished() { finished_ = true; }
@@ -60,100 +56,53 @@ class container::impl::common_work_queue : public 
work_queue::impl {
     bool finished_;
 };
 
-#if PN_CPP_HAS_STD_FUNCTION
-void container::impl::common_work_queue::run_all_jobs() {
-    decltype(jobs_) j;
-    {
-        std::swap(j, jobs_);
-    }
-    // Run queued work, but ignore any exceptions
-    for (auto& f : j) try {
-        f();
-    } catch (...) {};
-}
-#else
 void container::impl::common_work_queue::run_all_jobs() {
+    jobs j;
+    // Lock this operation for mt
+    std::swap(j, jobs_);
     // Run queued work, but ignore any exceptions
-    for (jobs::iterator f = jobs_.begin(); f != jobs_.end(); ++f) try {
-        (**f)();
+    for (jobs::iterator f = j.begin(); f != j.end(); ++f) try {
+        (*f)();
     } catch (...) {};
-    jobs_.clear();
     return;
 }
-#endif
 
 class container::impl::connection_work_queue : public common_work_queue {
   public:
     connection_work_queue(pn_connection_t* c): connection_(c) {}
 
-    bool inject(void_function0& f);
-#if PN_CPP_HAS_STD_FUNCTION
-    bool inject(std::function<void()> f);
-#endif
+    bool add(work f);
 
     pn_connection_t* connection_;
 };
 
-#if PN_CPP_HAS_STD_FUNCTION
-bool container::impl::connection_work_queue::inject(std::function<void()> f) {
-    // Note this is an unbounded work queue.
-    // A resource-safe implementation should be bounded.
-    if (finished_) return false;
-    jobs_.emplace_back(std::move(f));
-    pn_connection_wake(connection_);
-    return true;
-}
-
-bool container::impl::connection_work_queue::inject(proton::void_function0& f) 
{
-    return inject([&f]() { f(); });
-}
-#else
-bool container::impl::connection_work_queue::inject(proton::void_function0& f) 
{
+bool container::impl::connection_work_queue::add(work f) {
     // Note this is an unbounded work queue.
     // A resource-safe implementation should be bounded.
     if (finished_) return false;
-    jobs_.push_back(&f);
+    jobs_.push_back(f);
     pn_connection_wake(connection_);
     return true;
 }
-#endif
 
 class container::impl::container_work_queue : public common_work_queue {
   public:
     container_work_queue(container::impl& c): container_(c) {}
     ~container_work_queue() { container_.remove_work_queue(this); }
 
-    bool inject(void_function0& f);
-#if PN_CPP_HAS_STD_FUNCTION
-    bool inject(std::function<void()> f);
-#endif
+    bool add(work f);
 
     container::impl& container_;
 };
 
-#if PN_CPP_HAS_STD_FUNCTION
-bool container::impl::container_work_queue::inject(std::function<void()> f) {
-    // Note this is an unbounded work queue.
-    // A resource-safe implementation should be bounded.
-    if (finished_) return false;
-    jobs_.emplace_back(std::move(f));
-    pn_proactor_set_timeout(container_.proactor_, 0);
-    return true;
-}
-
-bool container::impl::container_work_queue::inject(proton::void_function0& f) {
-    return inject([&f]() { f(); });
-}
-#else
-bool container::impl::container_work_queue::inject(proton::void_function0& f) {
+bool container::impl::container_work_queue::add(work f) {
     // Note this is an unbounded work queue.
     // A resource-safe implementation should be bounded.
     if (finished_) return false;
-    jobs_.push_back(&f);
+    jobs_.push_back(f);
     pn_proactor_set_timeout(container_.proactor_, 0);
     return true;
 }
-#endif
 
 class work_queue::impl* container::impl::make_work_queue(container& c) {
     return c.impl_->add_work_queue();
@@ -277,32 +226,18 @@ proton::listener container::impl::listen(const 
std::string& addr, proton::listen
     return proton::listener(listener);
 }
 
-#if PN_CPP_HAS_STD_FUNCTION
-void container::impl::schedule(duration delay, void_function0& f) {
-    schedule(delay, [&f](){ f(); } );
-}
-
-void container::impl::schedule(duration delay, std::function<void()> f) {
-    // Set timeout
-    pn_proactor_set_timeout(proactor_, delay.milliseconds());
-
-    // Record timeout; Add callback to timeout sorted list
-    deferred_.emplace_back(scheduled{timestamp::now()+delay, f});
-    std::push_heap(deferred_.begin(), deferred_.end());
-}
-#else
-void container::impl::scheduled::task() {(*task_)();}
-
-void container::impl::schedule(duration delay, void_function0& f) {
-    // Set timeout
-    pn_proactor_set_timeout(proactor_, delay.milliseconds());
+void container::impl::schedule(duration delay, work f) {
+    timestamp now = timestamp::now();
 
     // Record timeout; Add callback to timeout sorted list
-    scheduled s = {timestamp::now()+delay, &f};
+    scheduled s = {now+delay, f};
     deferred_.push_back(s);
     std::push_heap(deferred_.begin(), deferred_.end());
+
+    // Set timeout for current head of timeout queue
+    scheduled* next = &deferred_.front();
+    pn_proactor_set_timeout(proactor_, (next->time-now).milliseconds());
 }
-#endif
 
 void container::impl::client_connection_options(const connection_options 
&opts) {
     client_connection_options_ = opts;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5dd3f464/proton-c/bindings/cpp/src/work_queue.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/work_queue.cpp 
b/proton-c/bindings/cpp/src/work_queue.cpp
index 961e5f0..db95042 100644
--- a/proton-c/bindings/cpp/src/work_queue.cpp
+++ b/proton-c/bindings/cpp/src/work_queue.cpp
@@ -35,16 +35,10 @@ work_queue::~work_queue() {}
 
 work_queue& work_queue::operator=(impl* i) { impl_.reset(i); return *this; }
 
-bool work_queue::add(void_function0& f) {
-    return impl_->inject(f);
+bool work_queue::add(work f) {
+    return impl_->add(f);
 }
 
-#if PN_CPP_HAS_STD_FUNCTION
-bool work_queue::add(std::function<void()> f) {
-    return impl_->inject(f);
-}
-#endif
-
 work_queue& work_queue::get(pn_connection_t* c) {
     return connection_context::get(c).work_queue_;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to