PROTON-1164: [C++ binding] Update handlers to split link events into receiver and sender events
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9bdea3b6 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9bdea3b6 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9bdea3b6 Branch: refs/heads/master Commit: 9bdea3b652b05e6cc01e60c0681411d2ab10a9b4 Parents: 3d52220 Author: Andrew Stitcher <[email protected]> Authored: Tue Mar 22 14:42:59 2016 -0400 Committer: Andrew Stitcher <[email protected]> Committed: Thu Mar 24 10:12:21 2016 -0400 ---------------------------------------------------------------------- examples/cpp/broker.hpp | 38 +++++++++----------- examples/cpp/client.cpp | 6 ++-- examples/cpp/engine/client.cpp | 5 ++- examples/cpp/server_direct.cpp | 8 ++--- .../bindings/cpp/include/proton/handler.hpp | 13 +++++-- proton-c/bindings/cpp/src/engine_test.cpp | 6 +++- proton-c/bindings/cpp/src/handler.cpp | 9 +++-- proton-c/bindings/cpp/src/messaging_adapter.cpp | 36 +++++++++++++------ proton-c/bindings/cpp/src/messaging_adapter.hpp | 3 +- 9 files changed, 72 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/examples/cpp/broker.hpp ---------------------------------------------------------------------- diff --git a/examples/cpp/broker.hpp b/examples/cpp/broker.hpp index 3caffbe..ba9040c 100644 --- a/examples/cpp/broker.hpp +++ b/examples/cpp/broker.hpp @@ -154,23 +154,21 @@ class broker_handler : public proton::handler { public: broker_handler(queues& qs) : queues_(qs) {} - void on_link_open(proton::event &e, proton::link &lnk) override { - - if (!!lnk.sender()) { - proton::terminus remote_source(lnk.remote_source()); - queue &q = remote_source.dynamic() ? - queues_.dynamic() : queues_.get(remote_source.address()); - lnk.local_source().address(q.name()); - - q.subscribe(lnk.sender()); - std::cout << "broker outgoing link from " << q.name() << std::endl; - } else { - // Receiver - std::string address = lnk.remote_target().address(); - if (!address.empty()) { - lnk.local_target().address(address); - std::cout << "broker incoming link to " << address << std::endl; - } + void on_sender_open(proton::event &e, proton::sender &sender) override { + proton::terminus remote_source(sender.remote_source()); + queue &q = remote_source.dynamic() ? + queues_.dynamic() : queues_.get(remote_source.address()); + sender.local_source().address(q.name()); + + q.subscribe(sender); + std::cout << "broker outgoing link from " << q.name() << std::endl; + } + + void on_receiver_open(proton::event &e, proton::receiver &receiver) override { + std::string address = receiver.remote_target().address(); + if (!address.empty()) { + receiver.local_target().address(address); + std::cout << "broker incoming link to " << address << std::endl; } } @@ -182,10 +180,8 @@ class broker_handler : public proton::handler { } } - void on_link_close(proton::event &e, proton::link &lnk) override { - if (!!lnk.sender()) { - unsubscribe(lnk.sender()); - } + void on_sender_close(proton::event &e, proton::sender &sender) override { + unsubscribe(sender); } void on_connection_close(proton::event &e, proton::connection &c) override { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/examples/cpp/client.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/client.cpp b/examples/cpp/client.cpp index 3a55efe..d026c51 100644 --- a/examples/cpp/client.cpp +++ b/examples/cpp/client.cpp @@ -54,10 +54,8 @@ class client : public proton::handler { sender.send(req); } - void on_link_open(proton::event &e, proton::link &l) override { - if (l == receiver) { - send_request(); - } + void on_receiver_open(proton::event &e, proton::receiver &) override { + send_request(); } void on_message(proton::event &e, proton::message &response) override { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/examples/cpp/engine/client.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/engine/client.cpp b/examples/cpp/engine/client.cpp index f1066fa..2b22288 100644 --- a/examples/cpp/engine/client.cpp +++ b/examples/cpp/engine/client.cpp @@ -53,9 +53,8 @@ class client : public proton::handler { sender.send(req); } - void on_link_open(proton::event &e, proton::link &l) override { - if (l == receiver) - send_request(); + void on_receiver_open(proton::event &e, proton::receiver &) override { + send_request(); } void on_message(proton::event &e, proton::message &response) override { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/examples/cpp/server_direct.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/server_direct.cpp b/examples/cpp/server_direct.cpp index 5ce2d21..a99adc0 100644 --- a/examples/cpp/server_direct.cpp +++ b/examples/cpp/server_direct.cpp @@ -66,10 +66,10 @@ class server : public proton::handler { return addr.str(); } - void on_link_open(proton::event& e, proton::link &link) override { - if (!!link.sender() && link.remote_source().dynamic()) { - link.local_source().address(generate_address()); - senders[link.local_source().address()] = link.sender(); + void on_sender_open(proton::event& e, proton::sender &sender) override { + if (sender.remote_source().dynamic()) { + sender.local_source().address(generate_address()); + senders[sender.local_source().address()] = sender; } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/proton-c/bindings/cpp/include/proton/handler.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp index 4d00f93..aaed559 100644 --- a/proton-c/bindings/cpp/include/proton/handler.hpp +++ b/proton-c/bindings/cpp/include/proton/handler.hpp @@ -104,11 +104,18 @@ PN_CPP_CLASS_EXTERN handler PN_CPP_EXTERN virtual void on_session_error(event &e, session &s); /// The remote peer opened the link. - PN_CPP_EXTERN virtual void on_link_open(event &e, link& l); + PN_CPP_EXTERN virtual void on_receiver_open(event &e, receiver& l); /// The remote peer closed the link. - PN_CPP_EXTERN virtual void on_link_close(event &e, link& l); + PN_CPP_EXTERN virtual void on_receiver_close(event &e, receiver& l); /// The remote peer closed the link with an error condition. - PN_CPP_EXTERN virtual void on_link_error(event &e, link& l); + PN_CPP_EXTERN virtual void on_receiver_error(event &e, receiver& l); + + /// The remote peer opened the link. + PN_CPP_EXTERN virtual void on_sender_open(event &e, sender& l); + /// The remote peer closed the link. + PN_CPP_EXTERN virtual void on_sender_close(event &e, sender& l); + /// The remote peer closed the link with an error condition. + PN_CPP_EXTERN virtual void on_sender_error(event &e, sender& l); /// The remote peer accepted an outgoing message. PN_CPP_EXTERN virtual void on_delivery_accept(event &e, delivery &d); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/proton-c/bindings/cpp/src/engine_test.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/engine_test.cpp b/proton-c/bindings/cpp/src/engine_test.cpp index 1b32fb0..ea1be2f 100644 --- a/proton-c/bindings/cpp/src/engine_test.cpp +++ b/proton-c/bindings/cpp/src/engine_test.cpp @@ -100,7 +100,11 @@ struct record_handler : public handler { std::deque<proton::session> sessions; std::deque<std::string> errors; - void on_link_open(event& e, link &l) override { + void on_receiver_open(event& e, receiver &l) override { + links.push_back(l); + } + + void on_sender_open(event& e, sender &l) override { links.push_back(l); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/proton-c/bindings/cpp/src/handler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/handler.cpp b/proton-c/bindings/cpp/src/handler.cpp index a6ee246..87a9075 100644 --- a/proton-c/bindings/cpp/src/handler.cpp +++ b/proton-c/bindings/cpp/src/handler.cpp @@ -48,9 +48,12 @@ void handler::on_connection_open(event &e, connection &) { on_unhandled(e); } void handler::on_session_close(event &e, session &) { on_unhandled(e); } void handler::on_session_error(event &e, session &s) { on_unhandled_error(e, s.remote_condition()); } void handler::on_session_open(event &e, session &) { on_unhandled(e); } -void handler::on_link_close(event &e, link &) { on_unhandled(e); } -void handler::on_link_error(event &e, link &l) { on_unhandled_error(e, l.remote_condition()); } -void handler::on_link_open(event &e, link &) { on_unhandled(e); } +void handler::on_receiver_close(event &e, receiver &) { on_unhandled(e); } +void handler::on_receiver_error(event &e, receiver &l) { on_unhandled_error(e, l.remote_condition()); } +void handler::on_receiver_open(event &e, receiver &) { on_unhandled(e); } +void handler::on_sender_close(event &e, sender &) { on_unhandled(e); } +void handler::on_sender_error(event &e, sender &l) { on_unhandled_error(e, l.remote_condition()); } +void handler::on_sender_open(event &e, sender &) { on_unhandled(e); } void handler::on_delivery_accept(event &e, delivery &) { on_unhandled(e); } void handler::on_delivery_reject(event &e, delivery &) { on_unhandled(e); } void handler::on_delivery_release(event &e, delivery &) { on_unhandled(e); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/proton-c/bindings/cpp/src/messaging_adapter.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp index a0cf9ab..f280330 100644 --- a/proton-c/bindings/cpp/src/messaging_adapter.cpp +++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp @@ -147,13 +147,21 @@ bool is_local_unititialised(pn_state_t state) { void messaging_adapter::on_link_remote_close(proton_event &pe) { pn_event_t *cevent = pe.pn_event(); pn_link_t *lnk = pn_event_link(cevent); - link l(lnk); - if (pn_condition_is_set(pn_link_remote_condition(lnk))) { - messaging_event mevent(messaging_event::LINK_ERROR, pe); - delegate_.on_link_error(mevent, l); + messaging_event clevent(messaging_event::LINK_CLOSE, pe); + messaging_event eevent(messaging_event::LINK_ERROR, pe); + if (pn_link_is_receiver(lnk)) { + receiver r = link(lnk).receiver(); + if (pn_condition_is_set(pn_link_remote_condition(lnk))) { + delegate_.on_receiver_error(eevent, r); + } + delegate_.on_receiver_close(clevent, r); + } else { + sender s = link(lnk).sender(); + if (pn_condition_is_set(pn_link_remote_condition(lnk))) { + delegate_.on_sender_error(eevent, s); + } + delegate_.on_sender_close(clevent, s); } - messaging_event mevent(messaging_event::LINK_CLOSE, pe); - delegate_.on_link_close(mevent, l); pn_link_close(lnk); } @@ -209,16 +217,22 @@ void messaging_adapter::on_link_local_open(proton_event &pe) { void messaging_adapter::on_link_remote_open(proton_event &pe) { messaging_event mevent(messaging_event::LINK_OPEN, pe); - pn_link_t *link = pn_event_link(pe.pn_event()); - class link l(link); - delegate_.on_link_open(mevent, l); - if (!is_local_open(pn_link_state(link)) && is_local_unititialised(pn_link_state(link))) { + pn_link_t *lnk = pn_event_link(pe.pn_event()); + if (pn_link_is_receiver(lnk)) { + receiver r = link(lnk).receiver(); + delegate_.on_receiver_open(mevent, r); + } else { + sender s = link(lnk).sender(); + delegate_.on_sender_open(mevent, s); + } + if (!is_local_open(pn_link_state(lnk)) && is_local_unititialised(pn_link_state(lnk))) { + link l(lnk); if (pe.container_) l.open(pe.container_->impl_->link_options_); else l.open(); // No default for engine } - credit_topup(link); + credit_topup(lnk); } void messaging_adapter::on_transport_tail_closed(proton_event &pe) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/proton-c/bindings/cpp/src/messaging_adapter.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_adapter.hpp b/proton-c/bindings/cpp/src/messaging_adapter.hpp index 1fa4172..ce51c4a 100644 --- a/proton-c/bindings/cpp/src/messaging_adapter.hpp +++ b/proton-c/bindings/cpp/src/messaging_adapter.hpp @@ -33,8 +33,7 @@ namespace proton { -// Combine's Python's: endpoint_state_handler, incoming_message_handler, outgoing_message_handler - +/// Convert the low level proton-c events to the higher level proton::handler calls class messaging_adapter : public proton_handler { public: --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
