PROTON-1400: [C++ binding] Make proactor container thread safe
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e4eca5c3 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e4eca5c3 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e4eca5c3 Branch: refs/heads/master Commit: e4eca5c3d92b917edca4629c6bc01155c4678baf Parents: 8aee73b Author: Andrew Stitcher <[email protected]> Authored: Fri May 26 17:00:31 2017 -0400 Committer: Andrew Stitcher <[email protected]> Committed: Fri Jul 21 12:50:06 2017 -0400 ---------------------------------------------------------------------- .../bindings/cpp/include/proton/container.hpp | 22 +++- .../cpp/include/proton/internal/config.hpp | 4 + proton-c/bindings/cpp/src/container.cpp | 6 +- .../cpp/src/include/proactor_container_impl.hpp | 28 +++- .../cpp/src/proactor_container_impl.cpp | 127 +++++++++++++++---- 5 files changed, 155 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/include/proton/container.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp index 0739517..8b517bf 100644 --- a/proton-c/bindings/cpp/include/proton/container.hpp +++ b/proton-c/bindings/cpp/include/proton/container.hpp @@ -29,11 +29,11 @@ #include "./internal/export.hpp" #include "./internal/pn_unique_ptr.hpp" -#ifdef PN_CPP_HAS_STD_FUNCTION -#include <functional> -#endif #include <string> +/// If the library can support multithreaded containers then PN_CPP_SUPPORTS_THREADS will be set. +#define PN_CPP_SUPPORTS_THREADS PN_CPP_HAS_STD_THREAD && PN_CPP_HAS_STD_MUTEX && PN_CPP_HAS_STD_ATOMIC + namespace proton { /// A top-level container of connections, sessions, senders, and @@ -55,6 +55,13 @@ class PN_CPP_CLASS_EXTERN container { /// Create a container. PN_CPP_EXTERN container(const std::string& id=""); + /// Destroy a container. + /// Note that you may not delete a container from within any of the threads running + /// any of the container's messaging_handlers. Specifically if you delete the container + /// from within a handler you cause a deadlock or a crash. + /// + /// The only safe place to delete a container is after all of the threads running a container + /// have finished and all of the run functions have returned. PN_CPP_EXTERN ~container(); /// Connect to `url` and send an open request to the remote peer. @@ -95,9 +102,16 @@ class PN_CPP_CLASS_EXTERN container { /// Returns when the container stops. /// @see auto_stop() and stop(). /// - /// With a multithreaded container, call run() in multiple threads to create a thread pool. + /// If you are using C++11 or later you may use a multithreaded container. In this case you may + /// call run() in multiple threads to create a thread pool. Or aternatively call run with an + /// integer parameter specifying the number of threads for the thread pool. PN_CPP_EXTERN void run(); +#if PN_CPP_SUPPORTS_THREADS + /// @copydoc run() + PN_CPP_EXTERN void run(int threads); +#endif + /// If true, stop the container when all active connections and listeners are closed. /// If false the container will keep running till stop() is called. /// http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/include/proton/internal/config.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/internal/config.hpp b/proton-c/bindings/cpp/include/proton/internal/config.hpp index 79d201c..54b014b 100644 --- a/proton-c/bindings/cpp/include/proton/internal/config.hpp +++ b/proton-c/bindings/cpp/include/proton/internal/config.hpp @@ -103,6 +103,10 @@ #define PN_CPP_HAS_STD_ATOMIC PN_CPP_HAS_CPP11 #endif +#ifndef PN_CPP_HAS_STD_THREAD +#define PN_CPP_HAS_STD_THREAD PN_CPP_HAS_CPP11 +#endif + #endif // PROTON_INTERNAL_CONFIG_HPP /// @endcond http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/src/container.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp index c2af659..35f645c 100644 --- a/proton-c/bindings/cpp/src/container.cpp +++ b/proton-c/bindings/cpp/src/container.cpp @@ -81,7 +81,11 @@ returned<connection> container::connect(const std::string& url, const connection listener container::listen(const std::string& url, listen_handler& l) { return impl_->listen(url, l); } -void container::run() { impl_->run(); } +void container::run() { impl_->run(1); } + +#if PN_CPP_SUPPORTS_THREADS +void container::run(int threads) { impl_->run(threads); } +#endif void container::auto_stop(bool set) { impl_->auto_stop(set); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp index fc963f7..ac54156 100644 --- a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp +++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp @@ -43,6 +43,21 @@ #include <string> #include <vector> +#if PN_CPP_SUPPORTS_THREADS +#include <atomic> +#include <mutex> +# define MUTEX(x) std::mutex x; +# define GUARD(x) std::lock_guard<std::mutex> g(x) +# define ONCE_FLAG(x) std::once_flag x; +# define CALL_ONCE(x, ...) std::call_once(x, __VA_ARGS__) +# define ATOMIC_INT(x) std::atomic<int> x; +#else +# define MUTEX(x) +# define GUARD(x) +# define ONCE_FLAG(x) +# define CALL_ONCE(x, f, o) ((o)->*(f))() +# define ATOMIC_INT(x) int x; +#endif struct pn_proactor_t; struct pn_listener_t; struct pn_event_t; @@ -70,7 +85,7 @@ class container::impl { class sender_options sender_options() const { return sender_options_; } void receiver_options(const proton::receiver_options&); class receiver_options receiver_options() const { return receiver_options_; } - void run(); + void run(int threads); void stop(const error_condition& err); void auto_stop(bool set); void schedule(duration, work); @@ -86,16 +101,23 @@ class container::impl { connection connect_common(const std::string&, const connection_options&); // Event loop to run in each container thread - static void thread(impl&); + void thread(); bool handle(pn_event_t*); void run_timer_jobs(); + ATOMIC_INT(threads_) + MUTEX(lock_) + ONCE_FLAG(start_once_) + ONCE_FLAG(stop_once_) container& container_; typedef std::set<container_work_queue*> work_queues; work_queues work_queues_; container_work_queue* add_work_queue(); void remove_work_queue(container_work_queue*); + void start_event(); + void stop_event(); + struct scheduled { timestamp time; // duration from epoch for task work task; @@ -112,8 +134,8 @@ class container::impl { connection_options server_connection_options_; proton::sender_options sender_options_; proton::receiver_options receiver_options_; + error_condition disconnect_error_; - proton::error_condition stop_err_; bool auto_stop_; bool stopping_; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e4eca5c3/proton-c/bindings/cpp/src/proactor_container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp index 62115fd..b900d6f 100644 --- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp +++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp @@ -41,21 +41,27 @@ #include <algorithm> #include <vector> +#if PN_CPP_SUPPORTS_THREADS +# include <thread> +#endif + namespace proton { class container::impl::common_work_queue : public work_queue::impl { public: - common_work_queue(container::impl& c): container_(c), finished_(false) {} + common_work_queue(container::impl& c): container_(c), finished_(false), running_(false) {} typedef std::vector<work> jobs; void run_all_jobs(); - void finished() { finished_ = true; } + void finished() { GUARD(lock_); finished_ = true; } void schedule(duration, work); + MUTEX(lock_) container::impl& container_; jobs jobs_; bool finished_; + bool running_; }; void container::impl::common_work_queue::schedule(duration d, work f) { @@ -68,11 +74,22 @@ void container::impl::common_work_queue::schedule(duration d, work f) { void container::impl::common_work_queue::run_all_jobs() { jobs j; // Lock this operation for mt - std::swap(j, jobs_); + { + GUARD(lock_); + // Ensure that we never run work from this queue concurrently + if (running_) return; + running_ = true; + // But allow adding to the queue concurrently to running + std::swap(j, jobs_); + } // Run queued work, but ignore any exceptions for (jobs::iterator f = j.begin(); f != j.end(); ++f) try { (*f)(); } catch (...) {}; + { + GUARD(lock_); + running_ = false; + } return; } @@ -88,6 +105,7 @@ class container::impl::connection_work_queue : public common_work_queue { bool container::impl::connection_work_queue::add(work f) { // Note this is an unbounded work queue. // A resource-safe implementation should be bounded. + GUARD(lock_); if (finished_) return false; jobs_.push_back(f); pn_connection_wake(connection_); @@ -105,6 +123,7 @@ class container::impl::container_work_queue : public common_work_queue { bool container::impl::container_work_queue::add(work f) { // Note this is an unbounded work queue. // A resource-safe implementation should be bounded. + GUARD(lock_); if (finished_) return false; jobs_.push_back(f); pn_proactor_set_timeout(container_.proactor_, 0); @@ -116,15 +135,11 @@ class work_queue::impl* container::impl::make_work_queue(container& c) { } container::impl::impl(container& c, const std::string& id, messaging_handler* mh) - : container_(c), proactor_(pn_proactor()), handler_(mh), id_(id), + : threads_(0), container_(c), proactor_(pn_proactor()), handler_(mh), id_(id), auto_stop_(true), stopping_(false) {} container::impl::~impl() { - try { - stop(error_condition("exception", "container shut-down")); - //wait(); - } catch (...) {} pn_proactor_free(proactor_); } @@ -178,6 +193,7 @@ proton::returned<proton::connection> container::impl::connect( const proton::connection_options& user_opts) { connection conn = connect_common(addr, user_opts); + GUARD(lock_); return make_thread_safe(conn); } @@ -186,6 +202,7 @@ returned<sender> container::impl::open_sender(const std::string &url, const prot lopts.update(o1); connection conn = connect_common(url, o2); + GUARD(lock_); return make_thread_safe(conn.default_session().open_sender(proton::url(url).path(), lopts)); } @@ -194,6 +211,7 @@ returned<receiver> container::impl::open_receiver(const std::string &url, const lopts.update(o1); connection conn = connect_common(url, o2); + GUARD(lock_); return make_thread_safe( conn.default_session().open_receiver(proton::url(url).path(), lopts)); } @@ -215,11 +233,13 @@ pn_listener_t* container::impl::listen_common_lh(const std::string& addr) { } proton::listener container::impl::listen(const std::string& addr) { + GUARD(lock_); pn_listener_t* listener = listen_common_lh(addr); return proton::listener(listener); } proton::listener container::impl::listen(const std::string& addr, const proton::connection_options& opts) { + GUARD(lock_); pn_listener_t* listener = listen_common_lh(addr); listener_context& lc=listener_context::get(listener); lc.connection_options_.reset(new connection_options(opts)); @@ -227,6 +247,7 @@ proton::listener container::impl::listen(const std::string& addr, const proton:: } proton::listener container::impl::listen(const std::string& addr, proton::listen_handler& lh) { + GUARD(lock_); pn_listener_t* listener = listen_common_lh(addr); listener_context& lc=listener_context::get(listener); lc.listen_handler_ = &lh; @@ -234,6 +255,7 @@ proton::listener container::impl::listen(const std::string& addr, proton::listen } void container::impl::schedule(duration delay, work f) { + GUARD(lock_); timestamp now = timestamp::now(); // Record timeout; Add callback to timeout sorted list @@ -247,18 +269,22 @@ void container::impl::schedule(duration delay, work f) { } void container::impl::client_connection_options(const connection_options &opts) { + GUARD(lock_); client_connection_options_ = opts; } void container::impl::server_connection_options(const connection_options &opts) { + GUARD(lock_); server_connection_options_ = opts; } void container::impl::sender_options(const proton::sender_options &opts) { + GUARD(lock_); sender_options_ = opts; } void container::impl::receiver_options(const proton::receiver_options &opts) { + GUARD(lock_); receiver_options_ = opts; } @@ -294,13 +320,18 @@ bool container::impl::handle(pn_event_t* event) { switch (pn_event_type(event)) { case PN_PROACTOR_INACTIVE: /* listener and all connections closed */ - return auto_stop_; + // If we're stopping interrupt all other threads still running + if (auto_stop_) pn_proactor_interrupt(proactor_); + return false; - // We never interrupt the proactor so ignore + // We only interrupt to stop threads case PN_PROACTOR_INTERRUPT: - return false; + // Interrupt any other threads still running + if (threads_>1) pn_proactor_interrupt(proactor_); + return true; case PN_PROACTOR_TIMEOUT: { + GUARD(lock_); // Can get an immediate timeout, if we have a container event loop inject if ( deferred_.size()>0 ) { run_timer_jobs(); @@ -402,30 +433,78 @@ bool container::impl::handle(pn_event_t* event) { return false; } -void container::impl::thread(container::impl& ci) { - bool finished = false; - do { - pn_event_batch_t *events = pn_proactor_wait(ci.proactor_); - pn_event_t *e; - while ((e = pn_event_batch_next(events))) { - finished = ci.handle(e) || finished; - } - pn_proactor_done(ci.proactor_, events); - } while(!finished); +void container::impl::thread() { + ++threads_; + bool finished = false; + do { + pn_event_batch_t *events = pn_proactor_wait(proactor_); + pn_event_t *e; + try { + while ((e = pn_event_batch_next(events))) { + finished = handle(e); + if (finished) break; + } + } catch (proton::error& e) { + // If we caught an exception then shutdown the (other threads of the) container + disconnect_error_ = error_condition("exception", e.what()); + if (!stopping_) stop(disconnect_error_); + finished = true; + } catch (...) { + // If we caught an exception then shutdown the (other threads of the) container + disconnect_error_ = error_condition("exception", "container shut-down by unknown exception"); + if (!stopping_) stop(disconnect_error_); + finished = true; + } + pn_proactor_done(proactor_, events); + } while(!finished); + --threads_; } -void container::impl::run() { - // Have to "manually" generate container events +void container::impl::start_event() { if (handler_) handler_->on_container_start(container_); - thread(*this); +} + +void container::impl::stop_event() { if (handler_) handler_->on_container_stop(container_); } +void container::impl::run(int threads) { + // Have to "manually" generate container events + CALL_ONCE(start_once_, &impl::start_event, this); + +#if PN_CPP_SUPPORTS_THREADS + // Run handler threads + std::vector<std::thread> ts(threads-1); + if (threads>1) { + for (auto& t : ts) t = std::thread(&impl::thread, this); + } + + thread(); // Use this thread too. + + // Wait for the other threads to stop + if (threads>1) { + for (auto& t : ts) t.join(); + } +#else + // Run a single handler thread (As we have no threading API) + thread(); +#endif + + if (threads_==0) CALL_ONCE(stop_once_, &impl::stop_event, this); + + // Throw an exception if we disconnected the proactor because of an exception + if (!disconnect_error_.empty()) { + throw proton::error(disconnect_error_.description()); + }; +} + void container::impl::auto_stop(bool set) { + GUARD(lock_); auto_stop_ = set; } void container::impl::stop(const proton::error_condition& err) { + GUARD(lock_); auto_stop_ = true; stopping_ = true; pn_condition_t* error_condition = pn_condition(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
