PROTON-1400: [C++ binding] Removed proton_event and proton_handler - Removed old low level proton event handling completely - Now directly dispatch to the messaging_handler - Moved private message::decode directly into message handling code
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1a513d64 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1a513d64 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1a513d64 Branch: refs/heads/master Commit: 1a513d64f6f884c228e8a0a5b691c96949b3d1f4 Parents: 9c1797c Author: Andrew Stitcher <[email protected]> Authored: Mon Jan 23 12:32:50 2017 -0500 Committer: Andrew Stitcher <[email protected]> Committed: Fri Jul 21 12:50:06 2017 -0400 ---------------------------------------------------------------------- proton-c/bindings/cpp/CMakeLists.txt | 2 - .../bindings/cpp/include/proton/message.hpp | 4 - proton-c/bindings/cpp/src/handler.cpp | 1 - proton-c/bindings/cpp/src/include/contexts.hpp | 2 - .../cpp/src/include/messaging_adapter.hpp | 30 +- .../cpp/src/include/proactor_container_impl.hpp | 1 - .../bindings/cpp/src/include/proton_event.hpp | 286 ------------------- .../bindings/cpp/src/include/proton_handler.hpp | 92 ------ .../bindings/cpp/src/io/connection_driver.cpp | 5 +- proton-c/bindings/cpp/src/message.cpp | 14 - proton-c/bindings/cpp/src/messaging_adapter.cpp | 208 ++++++++------ .../cpp/src/proactor_container_impl.cpp | 6 +- proton-c/bindings/cpp/src/proton_event.cpp | 88 ------ proton-c/bindings/cpp/src/proton_handler.cpp | 74 ----- 14 files changed, 131 insertions(+), 682 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1a513d64/proton-c/bindings/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt index 295a99e..625206f 100644 --- a/proton-c/bindings/cpp/CMakeLists.txt +++ b/proton-c/bindings/cpp/CMakeLists.txt @@ -55,8 +55,6 @@ set(qpid-proton-cpp-source src/node_options.cpp src/object.cpp src/proton_bits.cpp - src/proton_event.cpp - src/proton_handler.cpp src/receiver.cpp src/receiver_options.cpp src/reconnect_timer.cpp http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1a513d64/proton-c/bindings/cpp/include/proton/message.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/message.hpp b/proton-c/bindings/cpp/include/proton/message.hpp index a25f7db..a428c46 100644 --- a/proton-c/bindings/cpp/include/proton/message.hpp +++ b/proton-c/bindings/cpp/include/proton/message.hpp @@ -319,11 +319,7 @@ class message { mutable pn_message_t *pn_msg_; - /// Decode the message corresponding to a delivery from a link. - void decode(proton::delivery); - PN_CPP_EXTERN friend void swap(message&, message&); - friend class messaging_adapter; /// @endcond }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1a513d64/proton-c/bindings/cpp/src/handler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/handler.cpp b/proton-c/bindings/cpp/src/handler.cpp index 5cf6208..3137718 100644 --- a/proton-c/bindings/cpp/src/handler.cpp +++ b/proton-c/bindings/cpp/src/handler.cpp @@ -27,7 +27,6 @@ #include "proton/session.hpp" #include "proton/transport.hpp" -#include "proton_event.hpp" #include "messaging_adapter.hpp" #include <proton/handlers.h> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1a513d64/proton-c/bindings/cpp/src/include/contexts.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/contexts.hpp b/proton-c/bindings/cpp/src/include/contexts.hpp index c096a6e..637cbec 100644 --- a/proton-c/bindings/cpp/src/include/contexts.hpp +++ b/proton-c/bindings/cpp/src/include/contexts.hpp @@ -32,8 +32,6 @@ #include "proton/io/link_namer.hpp" -#include "proton_handler.hpp" - struct pn_record_t; struct pn_link_t; struct pn_session_t; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1a513d64/proton-c/bindings/cpp/src/include/messaging_adapter.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/messaging_adapter.hpp b/proton-c/bindings/cpp/src/include/messaging_adapter.hpp index d7eb6a0..10c7682 100644 --- a/proton-c/bindings/cpp/src/include/messaging_adapter.hpp +++ b/proton-c/bindings/cpp/src/include/messaging_adapter.hpp @@ -22,37 +22,19 @@ * */ -#include "proton/messaging_handler.hpp" - -#include "proton_handler.hpp" - -#include <proton/event.h> -#include <proton/reactor.h> - ///@cond INTERNAL +struct pn_event_t; + namespace proton { +class messaging_handler; + /// Convert the low level proton-c events to the higher level proton::messaging_handler calls -class messaging_adapter : public proton_handler +class messaging_adapter { public: - messaging_adapter(messaging_handler &delegate) : delegate_(delegate) {} - - void on_link_flow(proton_event &e); - void on_delivery(proton_event &e); - void on_connection_remote_open(proton_event &e); - void on_connection_remote_close(proton_event &e); - void on_session_remote_open(proton_event &e); - void on_session_remote_close(proton_event &e); - void on_link_local_open(proton_event &e); - void on_link_remote_open(proton_event &e); - void on_link_remote_detach(proton_event & e); - void on_link_remote_close(proton_event &e); - void on_transport_closed(proton_event &e); - - private: - messaging_handler &delegate_; // The handler for generated messaging_event's + static void dispatch(messaging_handler& delegate, pn_event_t* e); }; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1a513d64/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp index 8c12c02..859493d 100644 --- a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp +++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp @@ -35,7 +35,6 @@ #include "proton/sender_options.hpp" #include "proton_bits.hpp" -#include "proton_handler.hpp" #include <list> #include <map> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1a513d64/proton-c/bindings/cpp/src/include/proton_event.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proton_event.hpp b/proton-c/bindings/cpp/src/include/proton_event.hpp deleted file mode 100644 index be324e7..0000000 --- a/proton-c/bindings/cpp/src/include/proton_event.hpp +++ /dev/null @@ -1,286 +0,0 @@ -#ifndef PROTON_CPP_PROTONEVENT_H -#define PROTON_CPP_PROTONEVENT_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton/fwd.hpp" -#include "proton/error.hpp" - -#include <proton/event.h> - -namespace proton { - -class proton_handler; - -/** Event information for a proton::proton_handler */ -class proton_event -{ - public: - /// The type of an event - enum event_type { - ///@name Event types - ///@{ - - /** - * Defined as a programming convenience. No event of this type will - * ever be generated. - */ - EVENT_NONE=PN_EVENT_NONE, - - /** - * A reactor has been started. Events of this type point to the reactor. - */ - REACTOR_INIT=PN_REACTOR_INIT, - - /** - * A reactor has no more events to process. Events of this type - * point to the reactor. - */ - REACTOR_QUIESCED=PN_REACTOR_QUIESCED, - - /** - * A reactor has been stopped. Events of this type point to the reactor. - */ - REACTOR_FINAL=PN_REACTOR_FINAL, - - /** - * A timer event has occurred. - */ - TIMER_TASK=PN_TIMER_TASK, - - /** - * The connection has been created. This is the first event that - * will ever be issued for a connection. Events of this type point - * to the relevant connection. - */ - CONNECTION_INIT=PN_CONNECTION_INIT, - - /** - * The connection has been bound to a transport. This event is - * issued when the transport::bind() is called. - */ - CONNECTION_BOUND=PN_CONNECTION_BOUND, - - /** - * The connection has been unbound from its transport. This event is - * issued when transport::unbind() is called. - */ - CONNECTION_UNBOUND=PN_CONNECTION_UNBOUND, - - /** - * The local connection endpoint has been closed. Events of this - * type point to the relevant connection. - */ - CONNECTION_LOCAL_OPEN=PN_CONNECTION_LOCAL_OPEN, - - /** - * The remote endpoint has opened the connection. Events of this - * type point to the relevant connection. - */ - CONNECTION_REMOTE_OPEN=PN_CONNECTION_REMOTE_OPEN, - - /** - * The local connection endpoint has been closed. Events of this - * type point to the relevant connection. - */ - CONNECTION_LOCAL_CLOSE=PN_CONNECTION_LOCAL_CLOSE, - - /** - * The remote endpoint has closed the connection. Events of this - * type point to the relevant connection. - */ - CONNECTION_REMOTE_CLOSE=PN_CONNECTION_REMOTE_CLOSE, - - /** - * The connection has been freed and any outstanding processing has - * been completed. This is the final event that will ever be issued - * for a connection. - */ - CONNECTION_FINAL=PN_CONNECTION_FINAL, - - /** - * The session has been created. This is the first event that will - * ever be issued for a session. - */ - SESSION_INIT=PN_SESSION_INIT, - - /** - * The local session endpoint has been opened. Events of this type - * point to the relevant session. - */ - SESSION_LOCAL_OPEN=PN_SESSION_LOCAL_OPEN, - - /** - * The remote endpoint has opened the session. Events of this type - * point to the relevant session. - */ - SESSION_REMOTE_OPEN=PN_SESSION_REMOTE_OPEN, - - /** - * The local session endpoint has been closed. Events of this type - * point ot the relevant session. - */ - SESSION_LOCAL_CLOSE=PN_SESSION_LOCAL_CLOSE, - - /** - * The remote endpoint has closed the session. Events of this type - * point to the relevant session. - */ - SESSION_REMOTE_CLOSE=PN_SESSION_REMOTE_CLOSE, - - /** - * The session has been freed and any outstanding processing has - * been completed. This is the final event that will ever be issued - * for a session. - */ - SESSION_FINAL=PN_SESSION_FINAL, - - /** - * The link has been created. This is the first event that will ever - * be issued for a link. - */ - LINK_INIT=PN_LINK_INIT, - - /** - * The local link endpoint has been opened. Events of this type - * point ot the relevant link. - */ - LINK_LOCAL_OPEN=PN_LINK_LOCAL_OPEN, - - /** - * The remote endpoint has opened the link. Events of this type - * point to the relevant link. - */ - LINK_REMOTE_OPEN=PN_LINK_REMOTE_OPEN, - - /** - * The local link endpoint has been closed. Events of this type - * point ot the relevant link. - */ - LINK_LOCAL_CLOSE=PN_LINK_LOCAL_CLOSE, - - /** - * The remote endpoint has closed the link. Events of this type - * point to the relevant link. - */ - LINK_REMOTE_CLOSE=PN_LINK_REMOTE_CLOSE, - - /** - * The local link endpoint has been detached. Events of this type - * point to the relevant link. - */ - LINK_LOCAL_DETACH=PN_LINK_LOCAL_DETACH, - - /** - * The remote endpoint has detached the link. Events of this type - * point to the relevant link. - */ - LINK_REMOTE_DETACH=PN_LINK_REMOTE_DETACH, - - /** - * The flow control state for a link has changed. Events of this - * type point to the relevant link. - */ - LINK_FLOW=PN_LINK_FLOW, - - /** - * The link has been freed and any outstanding processing has been - * completed. This is the final event that will ever be issued for a - * link. Events of this type point to the relevant link. - */ - LINK_FINAL=PN_LINK_FINAL, - - /** - * A delivery has been created or updated. Events of this type point - * to the relevant delivery. - */ - DELIVERY=PN_DELIVERY, - - /** - * The transport has new data to read and/or write. Events of this - * type point to the relevant transport. - */ - TRANSPORT=PN_TRANSPORT, - - /** - * The transport has authenticated, if this is received by a server - * the associated transport has authenticated an incoming connection - * and transport::user() can be used to obtain the authenticated - * user. - */ - TRANSPORT_AUTHENTICATED=PN_TRANSPORT_AUTHENTICATED, - - /** - * Indicates that a transport error has occurred. Use - * transport::condition() to access the details of the error - * from the associated transport. - */ - TRANSPORT_ERROR=PN_TRANSPORT_ERROR, - - /** - * Indicates that the head of the transport has been closed. This - * means the transport will never produce more bytes for output to - * the network. Events of this type point to the relevant transport. - */ - TRANSPORT_HEAD_CLOSED=PN_TRANSPORT_HEAD_CLOSED, - - /** - * Indicates that the tail of the transport has been closed. This - * means the transport will never be able to process more bytes from - * the network. Events of this type point to the relevant transport. - */ - TRANSPORT_TAIL_CLOSED=PN_TRANSPORT_TAIL_CLOSED, - - /** - * Indicates that the both the head and tail of the transport are - * closed. Events of this type point to the relevant transport. - */ - TRANSPORT_CLOSED=PN_TRANSPORT_CLOSED, - - SELECTABLE_INIT=PN_SELECTABLE_INIT, - SELECTABLE_UPDATED=PN_SELECTABLE_UPDATED, - SELECTABLE_READABLE=PN_SELECTABLE_READABLE, - SELECTABLE_WRITABLE=PN_SELECTABLE_WRITABLE, - SELECTABLE_ERROR=PN_SELECTABLE_ERROR, - SELECTABLE_EXPIRED=PN_SELECTABLE_EXPIRED, - SELECTABLE_FINAL=PN_SELECTABLE_FINAL - }; - ///@} - - proton_event(pn_event_t *ce) : - pn_event_(ce) - {} - - pn_event_t* pn_event() const { return pn_event_; } - - /// Get type of event - event_type type() const { return event_type(pn_event_type(pn_event_)); } - - void dispatch(proton_handler& h); - - private: - pn_event_t *pn_event_; -}; - -} - -#endif /*!PROTON_CPP_PROTONEVENT_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1a513d64/proton-c/bindings/cpp/src/include/proton_handler.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proton_handler.hpp b/proton-c/bindings/cpp/src/include/proton_handler.hpp deleted file mode 100644 index 9941396..0000000 --- a/proton-c/bindings/cpp/src/include/proton_handler.hpp +++ /dev/null @@ -1,92 +0,0 @@ -#ifndef PROTON_CPP_PROTONHANDLER_H -#define PROTON_CPP_PROTONHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton/internal/object.hpp" - -#include <vector> - -struct pn_handler_t; - -namespace proton { - -class event; -class proton_event; - -/// Handler base class, subclass and over-ride event handling member functions. -/// @see proton::proton_event for meaning of events. -class proton_handler -{ - public: - proton_handler(); - virtual ~proton_handler(); - - ///@name Over-ride these member functions to handle events - ///@{ - virtual void on_reactor_init(proton_event &e); - virtual void on_reactor_quiesced(proton_event &e); - virtual void on_reactor_final(proton_event &e); - virtual void on_timer_task(proton_event &e); - virtual void on_connection_init(proton_event &e); - virtual void on_connection_bound(proton_event &e); - virtual void on_connection_unbound(proton_event &e); - virtual void on_connection_local_open(proton_event &e); - virtual void on_connection_local_close(proton_event &e); - virtual void on_connection_remote_open(proton_event &e); - virtual void on_connection_remote_close(proton_event &e); - virtual void on_connection_final(proton_event &e); - virtual void on_session_init(proton_event &e); - virtual void on_session_local_open(proton_event &e); - virtual void on_session_local_close(proton_event &e); - virtual void on_session_remote_open(proton_event &e); - virtual void on_session_remote_close(proton_event &e); - virtual void on_session_final(proton_event &e); - virtual void on_link_init(proton_event &e); - virtual void on_link_local_open(proton_event &e); - virtual void on_link_local_close(proton_event &e); - virtual void on_link_local_detach(proton_event &e); - virtual void on_link_remote_open(proton_event &e); - virtual void on_link_remote_close(proton_event &e); - virtual void on_link_remote_detach(proton_event &e); - virtual void on_link_flow(proton_event &e); - virtual void on_link_final(proton_event &e); - virtual void on_delivery(proton_event &e); - virtual void on_transport(proton_event &e); - virtual void on_transport_error(proton_event &e); - virtual void on_transport_head_closed(proton_event &e); - virtual void on_transport_tail_closed(proton_event &e); - virtual void on_transport_closed(proton_event &e); - virtual void on_selectable_init(proton_event &e); - virtual void on_selectable_updated(proton_event &e); - virtual void on_selectable_readable(proton_event &e); - virtual void on_selectable_writable(proton_event &e); - virtual void on_selectable_expired(proton_event &e); - virtual void on_selectable_error(proton_event &e); - virtual void on_selectable_final(proton_event &e); - virtual void on_unhandled(proton_event &e); - ///@} -}; - -} - -#endif /*!PROTON_CPP_PROTONHANDLER_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1a513d64/proton-c/bindings/cpp/src/io/connection_driver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/io/connection_driver.cpp b/proton-c/bindings/cpp/src/io/connection_driver.cpp index d7c5e5c..0f5bc33 100644 --- a/proton-c/bindings/cpp/src/io/connection_driver.cpp +++ b/proton-c/bindings/cpp/src/io/connection_driver.cpp @@ -28,7 +28,6 @@ #include "messaging_adapter.hpp" #include "msg.hpp" #include "proton_bits.hpp" -#include "proton_event.hpp" #include <proton/connection.h> #include <proton/transport.h> @@ -88,11 +87,9 @@ bool connection_driver::has_events() const { bool connection_driver::dispatch() { pn_event_t* c_event; while ((c_event = pn_connection_driver_next_event(&driver_)) != NULL) { - proton_event cpp_event(c_event); try { if (handler_ != 0) { - messaging_adapter adapter(*handler_); - cpp_event.dispatch(adapter); + messaging_adapter::dispatch(*handler_, c_event); } } catch (const std::exception& e) { pn_condition_t *cond = pn_transport_condition(driver_.transport); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1a513d64/proton-c/bindings/cpp/src/message.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/message.cpp b/proton-c/bindings/cpp/src/message.cpp index d121cc8..1fbdf70 100644 --- a/proton-c/bindings/cpp/src/message.cpp +++ b/proton-c/bindings/cpp/src/message.cpp @@ -300,20 +300,6 @@ void message::decode(const std::vector<char> &s) { check(pn_message_decode(pn_msg(), &s[0], s.size())); } -void message::decode(proton::delivery delivery) { - std::vector<char> buf; - buf.resize(pn_delivery_pending(unwrap(delivery))); - if (buf.empty()) - throw error("message decode: no delivery pending on link"); - proton::receiver link = delivery.receiver(); - assert(!buf.empty()); - ssize_t n = pn_link_recv(unwrap(link), const_cast<char *>(&buf[0]), buf.size()); - if (n != ssize_t(buf.size())) throw error(MSG("receiver read failure")); - clear(); - decode(buf); - pn_link_advance(unwrap(link)); -} - bool message::durable() const { return pn_message_is_durable(pn_msg()); } void message::durable(bool b) { pn_message_set_durable(pn_msg(), b); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1a513d64/proton-c/bindings/cpp/src/messaging_adapter.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp index 613808b..3a16930 100644 --- a/proton-c/bindings/cpp/src/messaging_adapter.cpp +++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp @@ -23,6 +23,7 @@ #include "proton/delivery.hpp" #include "proton/error.hpp" +#include "proton/messaging_handler.hpp" #include "proton/receiver_options.hpp" #include "proton/sender.hpp" #include "proton/sender_options.hpp" @@ -32,7 +33,6 @@ #include "contexts.hpp" #include "msg.hpp" #include "proton_bits.hpp" -#include "proton_event.hpp" #include <proton/connection.h> #include <proton/delivery.h> @@ -42,62 +42,71 @@ #include <proton/session.h> #include <proton/transport.h> +#include <assert.h> #include <string.h> namespace proton { namespace { +// This must only be called for receiver links void credit_topup(pn_link_t *link) { - if (link && pn_link_is_receiver(link)) { - int window = link_context::get(link).credit_window; - if (window) { - int delta = window - pn_link_credit(link); - pn_link_flow(link, delta); - } + assert(pn_link_is_receiver(link)); + int window = link_context::get(link).credit_window; + if (window) { + int delta = window - pn_link_credit(link); + pn_link_flow(link, delta); } } -} -void messaging_adapter::on_link_flow(proton_event &pe) { - pn_event_t *pne = pe.pn_event(); - pn_link_t *lnk = pn_event_link(pne); + +void on_link_flow(messaging_handler& handler, pn_event_t* event) { + pn_link_t *lnk = pn_event_link(event); // TODO: process session flow data, if no link-specific data, just return. if (!lnk) return; - link_context& lctx = link_context::get(lnk); int state = pn_link_state(lnk); if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) { + link_context& lctx = link_context::get(lnk); if (pn_link_is_sender(lnk)) { if (pn_link_credit(lnk) > 0) { sender s(make_wrapper<sender>(lnk)); - if (pn_link_get_drain(lnk)) { - if (!lctx.draining) { - lctx.draining = true; - delegate_.on_sender_drain_start(s); - } - } - else { - lctx.draining = false; + bool draining = pn_link_get_drain(lnk); + if ( draining && !lctx.draining) { + handler.on_sender_drain_start(s); } + lctx.draining = draining; // create on_message extended event - delegate_.on_sendable(s); + handler.on_sendable(s); } - } - else { + } else { // receiver if (!pn_link_credit(lnk) && lctx.draining) { lctx.draining = false; receiver r(make_wrapper<receiver>(lnk)); - delegate_.on_receiver_drain_finish(r); + handler.on_receiver_drain_finish(r); } + credit_topup(lnk); } } - credit_topup(lnk); } -void messaging_adapter::on_delivery(proton_event &pe) { - pn_event_t *cevent = pe.pn_event(); - pn_link_t *lnk = pn_event_link(cevent); - pn_delivery_t *dlv = pn_event_delivery(cevent); +// Decode the message corresponding to a delivery from a link. +void message_decode(message& msg, proton::delivery delivery) { + std::vector<char> buf; + buf.resize(pn_delivery_pending(unwrap(delivery))); + if (buf.empty()) + throw error("message decode: no delivery pending on link"); + proton::receiver link = delivery.receiver(); + assert(!buf.empty()); + ssize_t n = pn_link_recv(unwrap(link), const_cast<char *>(&buf[0]), buf.size()); + if (n != ssize_t(buf.size())) throw error(MSG("receiver read failure")); + msg.clear(); + msg.decode(buf); + pn_link_advance(unwrap(link)); +} + +void on_delivery(messaging_handler& handler, pn_event_t* event) { + pn_link_t *lnk = pn_event_link(event); + pn_delivery_t *dlv = pn_event_delivery(event); link_context& lctx = link_context::get(lnk); if (pn_link_is_receiver(lnk)) { @@ -110,29 +119,29 @@ void messaging_adapter::on_delivery(proton_event &pe) { // Avoid expensive heap malloc/free overhead. // See PROTON-998 class message &msg(ctx.event_message); - msg.decode(d); + message_decode(msg, d); if (pn_link_state(lnk) & PN_LOCAL_CLOSED) { if (lctx.auto_accept) d.release(); } else { - delegate_.on_message(d, msg); + handler.on_message(d, msg); if (lctx.auto_accept && !d.settled()) d.accept(); if (lctx.draining && !pn_link_credit(lnk)) { lctx.draining = false; receiver r(make_wrapper<receiver>(lnk)); - delegate_.on_receiver_drain_finish(r); + handler.on_receiver_drain_finish(r); } } } else if (pn_delivery_updated(dlv) && d.settled()) { - delegate_.on_delivery_settle(d); + handler.on_delivery_settle(d); } if (lctx.draining && pn_link_credit(lnk) == 0) { lctx.draining = false; pn_link_set_drain(lnk, false); receiver r(make_wrapper<receiver>(lnk)); - delegate_.on_receiver_drain_finish(r); + handler.on_receiver_drain_finish(r); if (lctx.pending_credit) { pn_link_flow(lnk, lctx.pending_credit); lctx.pending_credit = 0; @@ -145,17 +154,17 @@ void messaging_adapter::on_delivery(proton_event &pe) { if (pn_delivery_updated(dlv)) { uint64_t rstate = pn_delivery_remote_state(dlv); if (rstate == PN_ACCEPTED) { - delegate_.on_tracker_accept(t); + handler.on_tracker_accept(t); } else if (rstate == PN_REJECTED) { - delegate_.on_tracker_reject(t); + handler.on_tracker_reject(t); } else if (rstate == PN_RELEASED || rstate == PN_MODIFIED) { - delegate_.on_tracker_release(t); + handler.on_tracker_release(t); } if (t.settled()) { - delegate_.on_tracker_settle(t); + handler.on_tracker_settle(t); } if (lctx.auto_settle) t.settle(); @@ -163,8 +172,6 @@ void messaging_adapter::on_delivery(proton_event &pe) { } } -namespace { - bool is_local_open(pn_state_t state) { return state & PN_LOCAL_ACTIVE; } @@ -177,54 +184,48 @@ bool is_remote_unititialised(pn_state_t state) { return state & PN_REMOTE_UNINIT; } -} // namespace - -void messaging_adapter::on_link_remote_detach(proton_event & pe) { - pn_event_t *cevent = pe.pn_event(); - pn_link_t *lnk = pn_event_link(cevent); +void on_link_remote_detach(messaging_handler& handler, pn_event_t* event) { + pn_link_t *lnk = pn_event_link(event); if (pn_link_is_receiver(lnk)) { receiver r(make_wrapper<receiver>(lnk)); - delegate_.on_receiver_detach(r); + handler.on_receiver_detach(r); } else { sender s(make_wrapper<sender>(lnk)); - delegate_.on_sender_detach(s); + handler.on_sender_detach(s); } pn_link_detach(lnk); } -void messaging_adapter::on_link_remote_close(proton_event &pe) { - pn_event_t *cevent = pe.pn_event(); - pn_link_t *lnk = pn_event_link(cevent); +void on_link_remote_close(messaging_handler& handler, pn_event_t* event) { + pn_link_t *lnk = pn_event_link(event); if (pn_link_is_receiver(lnk)) { receiver r(make_wrapper<receiver>(lnk)); if (pn_condition_is_set(pn_link_remote_condition(lnk))) { - delegate_.on_receiver_error(r); + handler.on_receiver_error(r); } - delegate_.on_receiver_close(r); + handler.on_receiver_close(r); } else { sender s(make_wrapper<sender>(lnk)); if (pn_condition_is_set(pn_link_remote_condition(lnk))) { - delegate_.on_sender_error(s); + handler.on_sender_error(s); } - delegate_.on_sender_close(s); + handler.on_sender_close(s); } pn_link_close(lnk); } -void messaging_adapter::on_session_remote_close(proton_event &pe) { - pn_event_t *cevent = pe.pn_event(); - pn_session_t *session = pn_event_session(cevent); +void on_session_remote_close(messaging_handler& handler, pn_event_t* event) { + pn_session_t *session = pn_event_session(event); class session s(make_wrapper(session)); if (pn_condition_is_set(pn_session_remote_condition(session))) { - delegate_.on_session_error(s); + handler.on_session_error(s); } - delegate_.on_session_close(s); + handler.on_session_close(s); pn_session_close(session); } -void messaging_adapter::on_connection_remote_close(proton_event &pe) { - pn_event_t *cevent = pe.pn_event(); - pn_connection_t *conn = pn_event_connection(cevent); +void on_connection_remote_close(messaging_handler& handler, pn_event_t* event) { + pn_connection_t *conn = pn_event_connection(event); pn_condition_t *cond = pn_connection_remote_condition(conn); // If we got a close with a condition of amqp:connection:forced then treat this @@ -237,71 +238,108 @@ void messaging_adapter::on_connection_remote_close(proton_event &pe) { connection c(make_wrapper(conn)); if (pn_condition_is_set(cond)) { - delegate_.on_connection_error(c); + handler.on_connection_error(c); } - delegate_.on_connection_close(c); + handler.on_connection_close(c); pn_connection_close(conn); } -void messaging_adapter::on_connection_remote_open(proton_event &pe) { +void on_connection_remote_open(messaging_handler& handler, pn_event_t* event) { // Generate on_transport_open event here until we find a better place - transport t(make_wrapper(pn_event_transport(pe.pn_event()))); - delegate_.on_transport_open(t); + transport t(make_wrapper(pn_event_transport(event))); + handler.on_transport_open(t); - pn_connection_t *conn = pn_event_connection(pe.pn_event()); + pn_connection_t *conn = pn_event_connection(event); connection c(make_wrapper(conn)); - delegate_.on_connection_open(c); + handler.on_connection_open(c); if (!is_local_open(pn_connection_state(conn)) && is_local_unititialised(pn_connection_state(conn))) { pn_connection_open(conn); } } -void messaging_adapter::on_session_remote_open(proton_event &pe) { - pn_session_t *session = pn_event_session(pe.pn_event()); +void on_session_remote_open(messaging_handler& handler, pn_event_t* event) { + pn_session_t *session = pn_event_session(event); class session s(make_wrapper(session)); - delegate_.on_session_open(s); + handler.on_session_open(s); if (!is_local_open(pn_session_state(session)) && is_local_unititialised(pn_session_state(session))) { pn_session_open(session); } } -void messaging_adapter::on_link_local_open(proton_event &pe) { - credit_topup(pn_event_link(pe.pn_event())); +void on_link_local_open(messaging_handler& handler, pn_event_t* event) { + pn_link_t* lnk = pn_event_link(event); + if ( pn_link_is_receiver(lnk) ) { + credit_topup(lnk); + // We know local is active so don't check for it + } else if ( pn_link_state(lnk)&PN_REMOTE_ACTIVE && pn_link_credit(lnk) > 0) { + sender s(make_wrapper<sender>(lnk)); + handler.on_sendable(s); + } } -void messaging_adapter::on_link_remote_open(proton_event &pe) { - pn_link_t *lnk = pn_event_link(pe.pn_event()); +void on_link_remote_open(messaging_handler& handler, pn_event_t* event) { + pn_link_t *lnk = pn_event_link(event); if (pn_link_is_receiver(lnk)) { receiver r(make_wrapper<receiver>(lnk)); - delegate_.on_receiver_open(r); + handler.on_receiver_open(r); if (is_local_unititialised(pn_link_state(lnk))) { r.open(r.connection().receiver_options()); } + credit_topup(lnk); } else { sender s(make_wrapper<sender>(lnk)); - delegate_.on_sender_open(s); + handler.on_sender_open(s); if (is_local_unititialised(pn_link_state(lnk))) { s.open(s.connection().sender_options()); } } - credit_topup(lnk); } -void messaging_adapter::on_transport_closed(proton_event &pe) { - pn_transport_t *tspt = pn_event_transport(pe.pn_event()); +void on_transport_closed(messaging_handler& handler, pn_event_t* event) { + pn_transport_t *tspt = pn_event_transport(event); transport t(make_wrapper(tspt)); // If the connection isn't open generate on_transport_open event // because we didn't generate it yet and the events won't match. - pn_connection_t *conn = pn_event_connection(pe.pn_event()); + pn_connection_t *conn = pn_event_connection(event); if (!conn || is_remote_unititialised(pn_connection_state(conn))) { - delegate_.on_transport_open(t); + handler.on_transport_open(t); } if (pn_condition_is_set(pn_transport_condition(tspt))) { - delegate_.on_transport_error(t); + handler.on_transport_error(t); + } + handler.on_transport_close(t); +} + +} + +void messaging_adapter::dispatch(messaging_handler& handler, pn_event_t* event) +{ + pn_event_type_t type = pn_event_type(event); + + // Only handle events we are interested in + switch(type) { + + case PN_CONNECTION_REMOTE_OPEN: on_connection_remote_open(handler, event); break; + case PN_CONNECTION_REMOTE_CLOSE: on_connection_remote_close(handler, event); break; + + case PN_SESSION_REMOTE_OPEN: on_session_remote_open(handler, event); break; + case PN_SESSION_REMOTE_CLOSE: on_session_remote_close(handler, event); break; + + case PN_LINK_LOCAL_OPEN: on_link_local_open(handler, event); break; + case PN_LINK_REMOTE_OPEN: on_link_remote_open(handler, event); break; + case PN_LINK_REMOTE_CLOSE: on_link_remote_close(handler, event); break; + case PN_LINK_REMOTE_DETACH: on_link_remote_detach(handler, event); break; + case PN_LINK_FLOW: on_link_flow(handler, event); break; + + case PN_DELIVERY: on_delivery(handler, event); break; + + case PN_TRANSPORT_CLOSED: on_transport_closed(handler, event); break; + + // Ignore everything else + default: break; } - delegate_.on_transport_close(t); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1a513d64/proton-c/bindings/cpp/src/proactor_container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp index 2b6b1de..2486e2b 100644 --- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp +++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp @@ -35,7 +35,6 @@ #include "contexts.hpp" #include "messaging_adapter.hpp" #include "proton_bits.hpp" -#include "proton_event.hpp" #include <assert.h> @@ -377,10 +376,7 @@ bool container::impl::handle(pn_event_t* event) { // This is pretty unusual, but possible if we use the default constructor for container if (!mh) return false; - // TODO: Currently create a throwaway messaging_adapter and proton_event so we can call dispatch, a bit inefficient - messaging_adapter ma(*mh); - proton_event pe(event); - pe.dispatch(ma); + messaging_adapter::dispatch(*mh, event); return false; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1a513d64/proton-c/bindings/cpp/src/proton_event.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_event.cpp b/proton-c/bindings/cpp/src/proton_event.cpp deleted file mode 100644 index 9a1ffea..0000000 --- a/proton-c/bindings/cpp/src/proton_event.cpp +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton_event.hpp" - -#include "proton/error.hpp" - -#include "msg.hpp" -#include "proton_handler.hpp" -#include "types_internal.hpp" - -namespace proton { - -void proton_event::dispatch(proton_handler &handler) { - pn_event_type_t type = pn_event_type(pn_event_); - switch(type) { - - case PN_REACTOR_INIT: handler.on_reactor_init(*this); break; - case PN_REACTOR_QUIESCED: handler.on_reactor_quiesced(*this); break; - case PN_REACTOR_FINAL: handler.on_reactor_final(*this); break; - - case PN_TIMER_TASK: handler.on_timer_task(*this); break; - - case PN_CONNECTION_INIT: handler.on_connection_init(*this); break; - case PN_CONNECTION_BOUND: handler.on_connection_bound(*this); break; - case PN_CONNECTION_UNBOUND: handler.on_connection_unbound(*this); break; - case PN_CONNECTION_LOCAL_OPEN: handler.on_connection_local_open(*this); break; - case PN_CONNECTION_LOCAL_CLOSE: handler.on_connection_local_close(*this); break; - case PN_CONNECTION_REMOTE_OPEN: handler.on_connection_remote_open(*this); break; - case PN_CONNECTION_REMOTE_CLOSE: handler.on_connection_remote_close(*this); break; - case PN_CONNECTION_FINAL: handler.on_connection_final(*this); break; - - case PN_SESSION_INIT: handler.on_session_init(*this); break; - case PN_SESSION_LOCAL_OPEN: handler.on_session_local_open(*this); break; - case PN_SESSION_LOCAL_CLOSE: handler.on_session_local_close(*this); break; - case PN_SESSION_REMOTE_OPEN: handler.on_session_remote_open(*this); break; - case PN_SESSION_REMOTE_CLOSE: handler.on_session_remote_close(*this); break; - case PN_SESSION_FINAL: handler.on_session_final(*this); break; - - case PN_LINK_INIT: handler.on_link_init(*this); break; - case PN_LINK_LOCAL_OPEN: handler.on_link_local_open(*this); break; - case PN_LINK_LOCAL_CLOSE: handler.on_link_local_close(*this); break; - case PN_LINK_LOCAL_DETACH: handler.on_link_local_detach(*this); break; - case PN_LINK_REMOTE_OPEN: handler.on_link_remote_open(*this); break; - case PN_LINK_REMOTE_CLOSE: handler.on_link_remote_close(*this); break; - case PN_LINK_REMOTE_DETACH: handler.on_link_remote_detach(*this); break; - case PN_LINK_FLOW: handler.on_link_flow(*this); break; - case PN_LINK_FINAL: handler.on_link_final(*this); break; - - case PN_DELIVERY: handler.on_delivery(*this); break; - - case PN_TRANSPORT: handler.on_transport(*this); break; - case PN_TRANSPORT_ERROR: handler.on_transport_error(*this); break; - case PN_TRANSPORT_HEAD_CLOSED: handler.on_transport_head_closed(*this); break; - case PN_TRANSPORT_TAIL_CLOSED: handler.on_transport_tail_closed(*this); break; - case PN_TRANSPORT_CLOSED: handler.on_transport_closed(*this); break; - - case PN_SELECTABLE_INIT: handler.on_selectable_init(*this); break; - case PN_SELECTABLE_UPDATED: handler.on_selectable_updated(*this); break; - case PN_SELECTABLE_READABLE: handler.on_selectable_readable(*this); break; - case PN_SELECTABLE_WRITABLE: handler.on_selectable_writable(*this); break; - case PN_SELECTABLE_EXPIRED: handler.on_selectable_expired(*this); break; - case PN_SELECTABLE_ERROR: handler.on_selectable_error(*this); break; - case PN_SELECTABLE_FINAL: handler.on_selectable_final(*this); break; - default: - throw error(MSG("Invalid Proton event type " << type)); - } -} - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1a513d64/proton-c/bindings/cpp/src/proton_handler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_handler.cpp b/proton-c/bindings/cpp/src/proton_handler.cpp deleted file mode 100644 index 87d00a3..0000000 --- a/proton-c/bindings/cpp/src/proton_handler.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "proton_handler.hpp" -#include "proton_event.hpp" - -namespace proton { - -proton_handler::proton_handler() {} -proton_handler::~proton_handler() {} - -// Everything goes to on_unhandled() unless overriden by subclass - -void proton_handler::on_reactor_init(proton_event &e) { on_unhandled(e); } -void proton_handler::on_reactor_quiesced(proton_event &e) { on_unhandled(e); } -void proton_handler::on_reactor_final(proton_event &e) { on_unhandled(e); } -void proton_handler::on_timer_task(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_init(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_bound(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_unbound(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_local_open(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_local_close(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_remote_open(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_remote_close(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_final(proton_event &e) { on_unhandled(e); } -void proton_handler::on_session_init(proton_event &e) { on_unhandled(e); } -void proton_handler::on_session_local_open(proton_event &e) { on_unhandled(e); } -void proton_handler::on_session_local_close(proton_event &e) { on_unhandled(e); } -void proton_handler::on_session_remote_open(proton_event &e) { on_unhandled(e); } -void proton_handler::on_session_remote_close(proton_event &e) { on_unhandled(e); } -void proton_handler::on_session_final(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_init(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_local_open(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_local_close(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_local_detach(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_remote_open(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_remote_close(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_remote_detach(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_flow(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_final(proton_event &e) { on_unhandled(e); } -void proton_handler::on_delivery(proton_event &e) { on_unhandled(e); } -void proton_handler::on_transport(proton_event &e) { on_unhandled(e); } -void proton_handler::on_transport_error(proton_event &e) { on_unhandled(e); } -void proton_handler::on_transport_head_closed(proton_event &e) { on_unhandled(e); } -void proton_handler::on_transport_tail_closed(proton_event &e) { on_unhandled(e); } -void proton_handler::on_transport_closed(proton_event &e) { on_unhandled(e); } -void proton_handler::on_selectable_init(proton_event &e) { on_unhandled(e); } -void proton_handler::on_selectable_updated(proton_event &e) { on_unhandled(e); } -void proton_handler::on_selectable_readable(proton_event &e) { on_unhandled(e); } -void proton_handler::on_selectable_writable(proton_event &e) { on_unhandled(e); } -void proton_handler::on_selectable_expired(proton_event &e) { on_unhandled(e); } -void proton_handler::on_selectable_error(proton_event &e) { on_unhandled(e); } -void proton_handler::on_selectable_final(proton_event &e) { on_unhandled(e); } - -void proton_handler::on_unhandled(proton_event &) {} - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
