This is an automated email from the ASF dual-hosted git repository. asf-gitbox-commits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 0a4c7b2bb9fc093505e5d451fe2d9660f2213622 Author: Andrew Stitcher <[email protected]> AuthorDate: Fri May 15 18:53:49 2026 -0400 PROTON-2936: [C++] Add container quiescent callback This callback will be invoked when the event loop has no immediate events to process - it indicates that there is nothing currently to do, so is a good place for an application to do short lived background processing, or to wait for the immediate effects of some protocol action to complete, for example to initiate an interactive prompt. This code was written with the assistance of Cursor. --- cpp/include/proton/container.hpp | 7 ++++++ cpp/include/proton/messaging_handler.hpp | 3 +++ cpp/src/container.cpp | 2 ++ cpp/src/handler.cpp | 4 +++- cpp/src/proactor_container_impl.cpp | 37 +++++++++++++++++++++++++++++--- cpp/src/proactor_container_impl.hpp | 13 ++++++----- 6 files changed, 57 insertions(+), 9 deletions(-) diff --git a/cpp/include/proton/container.hpp b/cpp/include/proton/container.hpp index 839cf8137..57cebfcd0 100644 --- a/cpp/include/proton/container.hpp +++ b/cpp/include/proton/container.hpp @@ -177,6 +177,13 @@ class PN_CPP_CLASS_EXTERN container { /// until `stop()` is called. PN_CPP_EXTERN void auto_stop(bool enabled); + /// Enable or disable the quiescent callback. It is disabled by + /// default. + /// + /// If true, the container will call `messaging_handler::on_container_quiescent` + /// when the container is quiescent. + PN_CPP_EXTERN void enable_quiescent_callback(bool enabled); + /// Stop the container with error condition `err`. /// /// @copydetails stop() diff --git a/cpp/include/proton/messaging_handler.hpp b/cpp/include/proton/messaging_handler.hpp index 9baca0b72..1d368d894 100644 --- a/cpp/include/proton/messaging_handler.hpp +++ b/cpp/include/proton/messaging_handler.hpp @@ -270,6 +270,9 @@ PN_CPP_CLASS_EXTERN messaging_handler { /// This means that if the transaction successfully commits the delivery /// will be released (including modifying the delivery count). PN_CPP_EXTERN virtual void on_transactional_release(tracker&); + + /// **Unsettled API** - Called when the container is quiescent. + PN_CPP_EXTERN virtual void on_container_quiescent(container&); }; } // namespace proton diff --git a/cpp/src/container.cpp b/cpp/src/container.cpp index 8189539de..c021fc012 100644 --- a/cpp/src/container.cpp +++ b/cpp/src/container.cpp @@ -95,6 +95,8 @@ void container::run(int threads) { impl_->run(threads); } void container::auto_stop(bool set) { impl_->auto_stop(set); } +void container::enable_quiescent_callback(bool set) { impl_->enable_quiescent_callback(set); } + void container::stop(const error_condition& err) { impl_->stop(err); } returned<sender> container::open_sender( diff --git a/cpp/src/handler.cpp b/cpp/src/handler.cpp index 152c519b2..e1b0c8cab 100644 --- a/cpp/src/handler.cpp +++ b/cpp/src/handler.cpp @@ -43,8 +43,11 @@ messaging_handler::~messaging_handler() = default; void messaging_handler::on_container_start(container &) {} void messaging_handler::on_container_stop(container &) {} +void messaging_handler::on_container_quiescent(container &) {} + void messaging_handler::on_message(delivery &, message &) {} void messaging_handler::on_sendable(sender &) {} + void messaging_handler::on_transport_close(transport &) {} void messaging_handler::on_transport_error(transport &t) { on_error(t.error()); } void messaging_handler::on_transport_open(transport &) {} @@ -100,5 +103,4 @@ void messaging_handler::on_sender_drain_start(sender &) {} void messaging_handler::on_receiver_drain_finish(receiver &) {} void messaging_handler::on_error(const error_condition& c) { throw proton::error(c.what()); } - } diff --git a/cpp/src/proactor_container_impl.cpp b/cpp/src/proactor_container_impl.cpp index 6489beaef..3679649bb 100644 --- a/cpp/src/proactor_container_impl.cpp +++ b/cpp/src/proactor_container_impl.cpp @@ -140,8 +140,7 @@ class work_queue::impl* container::impl::make_work_queue(container& c) { } container::impl::impl(container& c, const std::string& id, messaging_handler* mh) - : threads_(0), container_(c), proactor_(pn_proactor()), handler_(mh), id_(id), - reconnecting_(0), auto_stop_(true), stopping_(false) + : container_(c), proactor_(pn_proactor()), handler_(mh), id_(id) {} container::impl::~impl() { @@ -771,7 +770,30 @@ void container::impl::thread() { finished = stopping_; } while (!finished) { - pn_event_batch_t *events = pn_proactor_wait(proactor_); + pn_event_batch_t* events = nullptr; + auto dispatching_threads = [this]() { + GUARD(lock_); + return dispatching_threads_; + } (); + if (dispatching_threads == 0) { + events = pn_proactor_get(proactor_); + if (!events) { + { + GUARD(lock_); + ++dispatching_threads_; + } + if (handler_ && quiescent_callback_) handler_->on_container_quiescent(container_); + { + GUARD(lock_); + --dispatching_threads_; + } + } + } + events = events ? events : pn_proactor_wait(proactor_); + { + GUARD(lock_); + ++dispatching_threads_; + } pn_event_t *e; error_condition error; try { @@ -787,6 +809,10 @@ void container::impl::thread() { error = error_condition("exception", "container shut-down by unknown exception"); } pn_proactor_done(proactor_, events); + { + GUARD(lock_); + --dispatching_threads_; + } if (!error.empty()) { finished = true; { @@ -844,6 +870,11 @@ void container::impl::run(int threads) { }; } +void container::impl::enable_quiescent_callback(bool set) { + GUARD(lock_); + quiescent_callback_ = set; +} + void container::impl::auto_stop(bool set) { GUARD(lock_); auto_stop_ = set; diff --git a/cpp/src/proactor_container_impl.hpp b/cpp/src/proactor_container_impl.hpp index 1f87e7636..4a229920e 100644 --- a/cpp/src/proactor_container_impl.hpp +++ b/cpp/src/proactor_container_impl.hpp @@ -62,7 +62,7 @@ class connector; class container::impl { public: - impl(container& c, const std::string& id, messaging_handler* = 0); + impl(container& c, const std::string& id, messaging_handler* = nullptr); ~impl(); std::string id() const { return id_; } returned<connection> connect(); @@ -85,6 +85,7 @@ class container::impl { void run(int threads); void stop(const error_condition& err); void auto_stop(bool set); + void enable_quiescent_callback(bool set); work_handle schedule(duration, work); void cancel(work_handle); template <class T> static void set_handler(T s, messaging_handler* h); @@ -109,7 +110,8 @@ class container::impl { dispatch_result dispatch(pn_event_t*); void run_timer_jobs(); - int threads_; + int threads_ = 0; + int dispatching_threads_ = 0; container& container_; MUTEX(lock_) @@ -146,9 +148,10 @@ class container::impl { proton::receiver_options receiver_options_; error_condition disconnect_error_; - unsigned reconnecting_; - bool auto_stop_; - bool stopping_; + unsigned reconnecting_ = 0; + bool auto_stop_ = true; + bool quiescent_callback_ = false; + bool stopping_ = false; friend class connector; }; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
