PROTON-1482: [C++ binding] Implemented scheduling delaying work on a work_queue
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/88c2d7d4 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/88c2d7d4 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/88c2d7d4 Branch: refs/heads/master Commit: 88c2d7d481a6da96f85a6781ae27b488cfacd801 Parents: ca446ea Author: Andrew Stitcher <[email protected]> Authored: Fri May 19 00:59:55 2017 -0400 Committer: Andrew Stitcher <[email protected]> Committed: Fri Jul 21 12:50:06 2017 -0400 ---------------------------------------------------------------------- proton-c/bindings/cpp/include/proton/fwd.hpp | 1 + .../bindings/cpp/include/proton/work_queue.hpp | 8 ++++++++ .../src/include/proactor_work_queue_impl.hpp | 3 +-- .../cpp/src/proactor_container_impl.cpp | 21 +++++++++++++------- proton-c/bindings/cpp/src/work_queue.cpp | 8 ++++++++ 5 files changed, 32 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/include/proton/fwd.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/fwd.hpp b/proton-c/bindings/cpp/include/proton/fwd.hpp index b839f42..5ade5fd 100644 --- a/proton-c/bindings/cpp/include/proton/fwd.hpp +++ b/proton-c/bindings/cpp/include/proton/fwd.hpp @@ -29,6 +29,7 @@ class connection; class connection_options; class container; class delivery; +class duration; class error_condition; class event; class message; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/include/proton/work_queue.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/work_queue.hpp b/proton-c/bindings/cpp/include/proton/work_queue.hpp index fe739f5..bef041c 100644 --- a/proton-c/bindings/cpp/include/proton/work_queue.hpp +++ b/proton-c/bindings/cpp/include/proton/work_queue.hpp @@ -94,6 +94,14 @@ class PN_CPP_CLASS_EXTERN work_queue { /// or f() cannot be injected for any other reason. PN_CPP_EXTERN bool add(work f); + /// Add work to the work queue after duration: f() will be called after the duration + /// serialised with other work in the queue: possibly in another thread. + /// + /// The scheduled execution is "best effort" and it is possible that after the elapsed duration + /// the work will not be able to be injected into the serialised context - there will be no + /// indication of this. + PN_CPP_EXTERN void schedule(duration, work); + private: PN_CPP_EXTERN static work_queue& get(pn_connection_t*); PN_CPP_EXTERN static work_queue& get(pn_session_t*); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp index ac0f803..1c94254 100644 --- a/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp +++ b/proton-c/bindings/cpp/src/include/proactor_work_queue_impl.hpp @@ -23,8 +23,6 @@ */ #include "proton/fwd.hpp" -#include "proton/internal/config.hpp" -#include "proton/work_queue.hpp" namespace proton { @@ -32,6 +30,7 @@ class work_queue::impl { public: virtual ~impl() {}; virtual bool add(work f) = 0; + virtual void schedule(duration, work) = 0; virtual void run_all_jobs() = 0; virtual void finished() = 0; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/src/proactor_container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp index 0135c08..62115fd 100644 --- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp +++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp @@ -45,17 +45,26 @@ namespace proton { class container::impl::common_work_queue : public work_queue::impl { public: - common_work_queue(): finished_(false) {} + common_work_queue(container::impl& c): container_(c), finished_(false) {} typedef std::vector<work> jobs; void run_all_jobs(); void finished() { finished_ = true; } + void schedule(duration, work); + container::impl& container_; jobs jobs_; bool finished_; }; +void container::impl::common_work_queue::schedule(duration d, work f) { + // Note this is an unbounded work queue. + // A resource-safe implementation should be bounded. + if (finished_) return; + container_.schedule(d, make_work(&work_queue::impl::add, (work_queue::impl*)this, f)); +} + void container::impl::common_work_queue::run_all_jobs() { jobs j; // Lock this operation for mt @@ -69,7 +78,7 @@ void container::impl::common_work_queue::run_all_jobs() { class container::impl::connection_work_queue : public common_work_queue { public: - connection_work_queue(pn_connection_t* c): connection_(c) {} + connection_work_queue(container::impl& ct, pn_connection_t* c): common_work_queue(ct), connection_(c) {} bool add(work f); @@ -87,12 +96,10 @@ bool container::impl::connection_work_queue::add(work f) { class container::impl::container_work_queue : public common_work_queue { public: - container_work_queue(container::impl& c): container_(c) {} + container_work_queue(container::impl& c): common_work_queue(c) {} ~container_work_queue() { container_.remove_work_queue(this); } bool add(work f); - - container::impl& container_; }; bool container::impl::container_work_queue::add(work f) { @@ -147,7 +154,7 @@ proton::connection container::impl::connect_common( connection_context& cc(connection_context::get(pnc)); cc.container = &container_; cc.handler = mh; - cc.work_queue_ = new container::impl::connection_work_queue(pnc); + cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, pnc); pn_connection_set_container(pnc, id_.c_str()); pn_connection_set_hostname(pnc, url.host().c_str()); @@ -327,7 +334,7 @@ bool container::impl::handle(pn_event_t* event) { cc.container = &container_; cc.listener_context_ = &lc; cc.handler = opts.handler(); - cc.work_queue_ = new container::impl::connection_work_queue(c); + cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, c); pn_listener_accept(l, c); return false; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/88c2d7d4/proton-c/bindings/cpp/src/work_queue.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/work_queue.cpp b/proton-c/bindings/cpp/src/work_queue.cpp index 77fa3fb..f8fc45c 100644 --- a/proton-c/bindings/cpp/src/work_queue.cpp +++ b/proton-c/bindings/cpp/src/work_queue.cpp @@ -19,6 +19,8 @@ #include "proton/work_queue.hpp" +#include "proton/duration.hpp" + #include "contexts.hpp" #include "proactor_container_impl.hpp" #include "proactor_work_queue_impl.hpp" @@ -41,6 +43,12 @@ bool work_queue::add(work f) { return impl_->add(f); } +void work_queue::schedule(duration d, work f) { + // If we have no actual work queue, then can't defer + if (!impl_) return; + return impl_->schedule(d, f); +} + work_queue& work_queue::get(pn_connection_t* c) { return connection_context::get(c).work_queue_; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
