PROTON-1557: c++ fix race conditions in proactor_container_impl Fixed: - using of option member variables outside of guard. - reference couting of pn_ objects outside of handler after pn_proactor_connect - creating returned<> wrappers from raw pointers.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/38b27b50 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/38b27b50 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/38b27b50 Branch: refs/heads/go1 Commit: 38b27b5035658454be9ddc2bae80c589576960f2 Parents: bd10259 Author: Alan Conway <[email protected]> Authored: Tue Aug 29 18:08:01 2017 -0400 Committer: Alan Conway <[email protected]> Committed: Tue Aug 29 18:29:45 2017 -0400 ---------------------------------------------------------------------- examples/cpp/multithreaded_client.cpp | 30 ++++---- .../cpp/multithreaded_client_flow_control.cpp | 35 +++++---- .../bindings/cpp/include/proton/returned.hpp | 15 ++-- .../cpp/src/include/proactor_container_impl.hpp | 3 +- .../bindings/cpp/src/include/proton_bits.hpp | 15 ++-- .../cpp/src/proactor_container_impl.cpp | 75 ++++++++++++-------- proton-c/bindings/cpp/src/returned.cpp | 6 +- tools/py/proctest.py | 2 + 8 files changed, 110 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/examples/cpp/multithreaded_client.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/multithreaded_client.cpp b/examples/cpp/multithreaded_client.cpp index 955655c..4119bbf 100644 --- a/examples/cpp/multithreaded_client.cpp +++ b/examples/cpp/multithreaded_client.cpp @@ -47,6 +47,10 @@ #include <string> #include <thread> +// Lock output from threads to avoid scramblin +std::mutex out_lock; +#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false) + // Handler for a single thread-safe sending and receiving connection. class client : public proton::messaging_handler { // Invariant @@ -64,7 +68,7 @@ class client : public proton::messaging_handler { std::condition_variable messages_ready_; public: - client(const std::string& url, const std::string& address) : url_(url), address_(address) {} + client(const std::string& url, const std::string& address) : url_(url), address_(address), work_queue_(0) {} // Thread safe void send(const proton::message& msg) { @@ -112,25 +116,21 @@ class client : public proton::messaging_handler { } void on_sender_open(proton::sender& s) override { - { - // sender_ and work_queue_ must be set atomically - std::lock_guard<std::mutex> l(lock_); - sender_ = s; - work_queue_ = &s.work_queue(); - } + // sender_ and work_queue_ must be set atomically + std::lock_guard<std::mutex> l(lock_); + sender_ = s; + work_queue_ = &s.work_queue(); sender_ready_.notify_all(); } void on_message(proton::delivery& dlv, proton::message& msg) override { - { - std::lock_guard<std::mutex> l(lock_); - messages_.push(msg); - } + std::lock_guard<std::mutex> l(lock_); + messages_.push(msg); messages_ready_.notify_all(); } void on_error(const proton::error_condition& e) override { - std::cerr << "unexpected error: " << e << std::endl; + OUT(std::cerr << "unexpected error: " << e << std::endl); exit(1); } }; @@ -157,7 +157,7 @@ int main(int argc, const char** argv) { for (int i = 0; i < n_messages; ++i) { proton::message msg(std::to_string(i + 1)); cl.send(msg); - std::cout << "sent: " << msg.body() << std::endl; + OUT(std::cout << "sent: " << msg.body() << std::endl); } }); @@ -165,7 +165,7 @@ int main(int argc, const char** argv) { std::thread receiver([&]() { for (int i = 0; i < n_messages; ++i) { auto msg = cl.receive(); - std::cout << "received: " << msg.body() << std::endl; + OUT(std::cout << " received: " << msg.body() << std::endl); ++received; } }); @@ -174,7 +174,7 @@ int main(int argc, const char** argv) { receiver.join(); cl.close(); container_thread.join(); - std::cout << "received " << received << " messages" << std::endl; + std::cout << received << " messages sent and received" << std::endl; return 0; } catch (const std::exception& e) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/examples/cpp/multithreaded_client_flow_control.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/multithreaded_client_flow_control.cpp b/examples/cpp/multithreaded_client_flow_control.cpp index 9eec782..8764793 100644 --- a/examples/cpp/multithreaded_client_flow_control.cpp +++ b/examples/cpp/multithreaded_client_flow_control.cpp @@ -56,6 +56,10 @@ #include <string> #include <thread> +// Lock output from threads to avoid scramblin +std::mutex out_lock; +#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false) + // A thread-safe sending connection that blocks sending threads when there // is no AMQP credit to send messages. class sender : private proton::messaging_handler { @@ -127,7 +131,7 @@ class sender : private proton::messaging_handler { } void on_error(const proton::error_condition& e) override { - std::cerr << "unexpected error: " << e << std::endl; + OUT(std::cerr << "unexpected error: " << e << std::endl); exit(1); } }; @@ -202,7 +206,7 @@ class receiver : private proton::messaging_handler { } void on_error(const proton::error_condition& e) override { - std::cerr << "unexpected error: " << e << std::endl; + OUT(std::cerr << "unexpected error: " << e << std::endl); exit(1); } }; @@ -210,28 +214,31 @@ class receiver : private proton::messaging_handler { // ==== Example code using the sender and receiver // Send n messages -void send_thread(sender& s, int n, bool print) { +void send_thread(sender& s, int n) { auto id = std::this_thread::get_id(); for (int i = 0; i < n; ++i) { std::ostringstream ss; ss << std::this_thread::get_id() << ":" << i; s.send(proton::message(ss.str())); - if (print) std::cout << "received: " << ss.str() << std::endl; + OUT(std::cout << id << " received: " << ss.str() << std::endl); } - std::cout << id << " sent " << n << std::endl; + OUT(std::cout << id << " sent " << n << std::endl); } // Receive messages till atomic remaining count is 0. // remaining is shared among all receiving threads -void receive_thread(receiver& r, std::atomic_int& remaining, bool print) { +void receive_thread(receiver& r, std::atomic_int& remaining) { auto id = std::this_thread::get_id(); int n = 0; + // atomically check and decrement remaining *before* receiving. + // If it is 0 or less then return, as there are no more + // messages to receive so calling r.receive() would block forever. while (remaining-- > 0) { auto m = r.receive(); ++n; - if (print) std::cout << id << "received: " << m.body() << std::endl; + OUT(std::cout << id << " received: " << m.body() << std::endl); } - std::cout << id << " received " << n << " messages" << std::endl; + OUT(std::cout << id << " received " << n << " messages" << std::endl); } int main(int argc, const char **argv) { @@ -250,10 +257,10 @@ int main(int argc, const char **argv) { const char *address = argv[2]; int n_messages = atoi(argv[3]); int n_threads = atoi(argv[4]); + int count = n_messages * n_threads; // Total messages to be received, multiple receiver threads will decrement this. - std::atomic_int remaining(n_messages * n_threads); - bool print = remaining < 1000; // Don't print for long runs, dominates run time + std::atomic_int remaining(count); // Run the proton container proton::container container; @@ -267,17 +274,19 @@ int main(int argc, const char **argv) { // Starting receivers first gives all receivers a chance to compete for messages. std::vector<std::thread> threads; for (int i = 0; i < n_threads; ++i) - threads.push_back(std::thread([&]() { receive_thread(recv, remaining, print); })); + threads.push_back(std::thread([&]() { receive_thread(recv, remaining); })); for (int i = 0; i < n_threads; ++i) - threads.push_back(std::thread([&]() { send_thread(send, n_messages, print); })); + threads.push_back(std::thread([&]() { send_thread(send, n_messages); })); // Wait for threads to finish for (auto& t : threads) t.join(); send.close(); recv.close(); - container_thread.join(); + if (remaining > 0) + throw std::runtime_error("not all messages were received"); + std::cout << count << " messages sent and received" << std::endl; return 0; } catch (const std::exception& e) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/include/proton/returned.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/returned.hpp b/proton-c/bindings/cpp/include/proton/returned.hpp index 28c45a0..45de4c4 100644 --- a/proton-c/bindings/cpp/include/proton/returned.hpp +++ b/proton-c/bindings/cpp/include/proton/returned.hpp @@ -36,7 +36,7 @@ namespace proton { namespace internal { -template <class T> class factory; +class returned_factory; } /// Return type for container functions @@ -51,13 +51,20 @@ template <class T> class PN_CPP_CLASS_EXTERN returned { public: - PN_CPP_EXTERN returned(const T&); + /// Copy operator required to return a value + /// @note thread safe + PN_CPP_EXTERN returned(const returned<T>&); + + /// Convert to the proton::object + /// + /// @note **Thread unsafe** do not use in a multi-threaded application. PN_CPP_EXTERN operator T() const; private: typename T::pn_type* ptr_; - returned& operator=(const returned&); - template <class U> friend class internal::factory; + returned(typename T::pn_type*); + returned& operator=(const returned&); // Not defined + friend class internal::returned_factory; }; } // proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp index ac54156..0aa62a5 100644 --- a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp +++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp @@ -98,7 +98,8 @@ class container::impl { class connection_work_queue; class container_work_queue; pn_listener_t* listen_common_lh(const std::string&); - connection connect_common(const std::string&, const connection_options&); + pn_connection_t* make_connection_lh(const url& url, const connection_options&); + void start_connection(const url& url, pn_connection_t* c); // Event loop to run in each container thread void thread(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/src/include/proton_bits.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proton_bits.hpp b/proton-c/bindings/cpp/src/include/proton_bits.hpp index 035ffb7..675249f 100644 --- a/proton-c/bindings/cpp/src/include/proton_bits.hpp +++ b/proton-c/bindings/cpp/src/include/proton_bits.hpp @@ -124,7 +124,6 @@ class factory { public: static T wrap(typename wrapped<T>::type* t) { return t; } static typename wrapped<T>::type* unwrap(const T& t) { return t.pn_object(); } - static returned<T> make_returned(const T& t) { return returned<T>(t); } }; template <class T> struct context {}; @@ -140,7 +139,14 @@ inline void set_messaging_handler(T t, messaging_handler* mh) { context<T>::type template <class T> inline messaging_handler* get_messaging_handler(T* t) { return context<typename internal::wrapper<T>::type>::type::get(t).handler; } -} +class returned_factory { + public: + template <class T> static returned<T> make(typename internal::wrapped<T>::type* pn) { + return returned<T>(pn); + } +}; + +} // namespace internal template <class T> typename internal::wrapper<T>::type make_wrapper(T* t) { return internal::factory<typename internal::wrapper<T>::type>::wrap(t); } @@ -151,9 +157,8 @@ U make_wrapper(typename internal::wrapped<U>::type* t) { return internal::factor template <class T> typename internal::wrapped<T>::type* unwrap(const T& t) { return internal::factory<T>::unwrap(t); } -template <class T> -returned<T> make_returned(const T& t) { - return internal::factory<T>::make_returned(t); +template <class T> returned<T> make_returned(typename internal::wrapped<T>::type* pn) { + return internal::returned_factory::make<T>(pn); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/src/proactor_container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp index 1389306..9870210 100644 --- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp +++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp @@ -152,9 +152,9 @@ void container::impl::remove_work_queue(container::impl::container_work_queue* l work_queues_.erase(l); } -proton::connection container::impl::connect_common( - const std::string& addr, - const proton::connection_options& user_opts) +pn_connection_t* container::impl::make_connection_lh( + const url& url, + const connection_options& user_opts) { if (stopping_) throw proton::error("container is stopping"); @@ -163,7 +163,6 @@ proton::connection container::impl::connect_common( opts.update(user_opts); messaging_handler* mh = opts.handler(); - proton::url url(addr); pn_connection_t *pnc = pn_connection(); connection_context& cc(connection_context::get(pnc)); cc.container = &container_; @@ -177,42 +176,58 @@ proton::connection container::impl::connect_common( if (!url.password().empty()) pn_connection_set_password(pnc, url.password().c_str()); - connection conn = make_wrapper(pnc); - conn.open(opts); - // Figure out correct string len then create connection address - int len = pn_proactor_addr(0, 0, url.host().c_str(), url.port().c_str()); - std::vector<char> caddr(len+1); - pn_proactor_addr(&caddr[0], len+1, url.host().c_str(), url.port().c_str()); - pn_proactor_connect(proactor_, pnc, &caddr[0]); - return conn; + make_wrapper(pnc).open(opts); + return pnc; // 1 refcount from pn_connection() } -returned<proton::connection> container::impl::connect( +void container::impl::start_connection(const url& url, pn_connection_t *pnc) { + char caddr[PN_MAX_ADDR]; + pn_proactor_addr(caddr, sizeof(caddr), url.host().c_str(), url.port().c_str()); + pn_proactor_connect(proactor_, pnc, caddr); // Takes ownership of pnc +} + +returned<connection> container::impl::connect( const std::string& addr, const proton::connection_options& user_opts) { - connection conn = connect_common(addr, user_opts); GUARD(lock_); - return make_returned(conn); + proton::url url(addr); + pn_connection_t *pnc = make_connection_lh(url, user_opts); + start_connection(url, pnc); + return make_returned<proton::connection>(pnc); } -returned<sender> container::impl::open_sender(const std::string &url, const proton::sender_options &o1, const connection_options &o2) { - proton::sender_options lopts(sender_options_); - lopts.update(o1); - connection conn = connect_common(url, o2); - - GUARD(lock_); - return make_returned(conn.default_session().open_sender(proton::url(url).path(), lopts)); +returned<sender> container::impl::open_sender(const std::string &urlstr, const proton::sender_options &o1, const connection_options &o2) +{ + proton::url url(urlstr); + pn_link_t* pnl = 0; + pn_connection_t* pnc = 0; + { + GUARD(lock_); + proton::sender_options lopts(sender_options_); + lopts.update(o1); + pnc = make_connection_lh(url, o2); + connection conn(make_wrapper(pnc)); + pnl = unwrap(conn.default_session().open_sender(url.path(), lopts)); + } // There must be no refcounting after here + start_connection(url, pnc); // Takes ownership of pnc + return make_returned<sender>(pnl); // Unsafe returned pointer } -returned<receiver> container::impl::open_receiver(const std::string &url, const proton::receiver_options &o1, const connection_options &o2) { - proton::receiver_options lopts(receiver_options_); - lopts.update(o1); - connection conn = connect_common(url, o2); - - GUARD(lock_); - return make_returned( - conn.default_session().open_receiver(proton::url(url).path(), lopts)); +returned<receiver> container::impl::open_receiver(const std::string &urlstr, const proton::receiver_options &o1, const connection_options &o2) { + proton::url url(urlstr); + pn_link_t* pnl = 0; + pn_connection_t* pnc = 0; + { + GUARD(lock_); + proton::receiver_options lopts(receiver_options_); + lopts.update(o1); + pnc = make_connection_lh(url, o2); + connection conn(make_wrapper(pnc)); + pnl = unwrap(conn.default_session().open_receiver(url.path(), lopts)); + } // There must be no refcounting after here + start_connection(url, pnc); + return make_returned<receiver>(pnl); } pn_listener_t* container::impl::listen_common_lh(const std::string& addr) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/proton-c/bindings/cpp/src/returned.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/returned.cpp b/proton-c/bindings/cpp/src/returned.cpp index a6c6101..cd8c1cb 100644 --- a/proton-c/bindings/cpp/src/returned.cpp +++ b/proton-c/bindings/cpp/src/returned.cpp @@ -28,14 +28,14 @@ namespace proton { -template <class T> PN_CPP_EXTERN returned<T>::returned(const T& t) : ptr_(unwrap(t)) {} - -//template <class T> PN_CPP_EXTERN returned<T>::returned(const returned<T>& x) : ptr_(x.ptr_) {} +template <class T> PN_CPP_EXTERN returned<T>::returned(const returned<T>& x) : ptr_(x.ptr_) {} template <class T> PN_CPP_EXTERN returned<T>::operator T() const { return internal::factory<T>::wrap(ptr_); } +template <class T> returned<T>::returned(typename T::pn_type* p) : ptr_(p) {} + // Explicit instantiations for allowed types template class PN_CPP_CLASS_EXTERN returned<connection>; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38b27b50/tools/py/proctest.py ---------------------------------------------------------------------- diff --git a/tools/py/proctest.py b/tools/py/proctest.py index 047dc44..ebd7db2 100644 --- a/tools/py/proctest.py +++ b/tools/py/proctest.py @@ -95,6 +95,8 @@ class Proc(Popen): self.kwargs = kwargs self._out = tempfile.TemporaryFile() try: + if (os.getenv("PROCTEST_VERBOSE")): + sys.stderr.write("\nstart proc: %s\n" % self.args) Popen.__init__(self, self.args, stdout=self._out, stderr=STDOUT, **kwargs) except OSError, e: if e.errno == errno.ENOENT: --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
