Author: kgiusti
Date: Wed Dec 17 22:37:37 2014
New Revision: 1646354

URL: http://svn.apache.org/r1646354
Log:
QPID-6255: Use Proton event model in qpidd when available.

Modified:
    qpid/trunk/qpid/cpp/src/amqp.cmake
    qpid/trunk/qpid/cpp/src/config.h.cmake
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp

Modified: qpid/trunk/qpid/cpp/src/amqp.cmake
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/amqp.cmake?rev=1646354&r1=1646353&r2=1646354&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/amqp.cmake (original)
+++ qpid/trunk/qpid/cpp/src/amqp.cmake Wed Dec 17 22:37:37 2014
@@ -22,7 +22,7 @@
 find_package(Proton 0.5)
 
 set (amqp_default ${amqp_force})
-set (maximum_version 0.7)
+set (maximum_version 0.8)
 if (Proton_FOUND)
     if (Proton_VERSION GREATER ${maximum_version})
         message(WARNING "Qpid proton ${Proton_VERSION} is not a tested version 
and might not be compatible, ${maximum_version} is highest tested; build may 
not work")
@@ -35,6 +35,7 @@ if (Proton_FOUND)
     endif (NOT Proton_VERSION EQUAL 0.5)
     if (Proton_VERSION GREATER 0.7)
         set (USE_PROTON_TRANSPORT_CONDITION 1)
+        set (HAVE_PROTON_EVENTS 1)
     endif (Proton_VERSION GREATER 0.7)
 else ()
     message(STATUS "Qpid proton not found, amqp 1.0 support not enabled")

Modified: qpid/trunk/qpid/cpp/src/config.h.cmake
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/config.h.cmake?rev=1646354&r1=1646353&r2=1646354&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/config.h.cmake (original)
+++ qpid/trunk/qpid/cpp/src/config.h.cmake Wed Dec 17 22:37:37 2014
@@ -58,5 +58,6 @@
 #cmakedefine HAVE_LOG_FTP
 #cmakedefine HAVE_PROTON_TRACER
 #cmakedefine USE_PROTON_TRANSPORT_CONDITION
+#cmakedefine HAVE_PROTON_EVENTS
 
 #endif /* QPID_CONFIG_H */

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1646354&r1=1646353&r2=1646354&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Wed Dec 17 22:37:37 
2014
@@ -37,6 +37,9 @@
 extern "C" {
 #include <proton/engine.h>
 #include <proton/error.h>
+#ifdef HAVE_PROTON_EVENTS
+#include <proton/event.h>
+#endif
 }
 
 namespace qpid {
@@ -117,8 +120,14 @@ Connection::Connection(qpid::sys::Output
     : BrokerContext(b), ManagedConnection(getBroker(), i, brokerInitiated),
       connection(pn_connection()),
       transport(pn_transport()),
+      collector(0),
       out(o), id(i), haveOutput(true), closeInitiated(false), 
closeRequested(false)
 {
+#ifdef HAVE_PROTON_EVENTS
+    collector = pn_collector();
+    pn_connection_collect(connection, collector);
+#endif
+
     if (pn_transport_bind(transport, connection)) {
         //error
         QPID_LOG(error, "Failed to bind transport to connection: " << 
getError());
@@ -157,6 +166,9 @@ Connection::~Connection()
     getBroker().getConnectionObservers().closed(*this);
     pn_transport_free(transport);
     pn_connection_free(connection);
+#ifdef HAVE_PROTON_EVENTS
+    pn_collector_free(collector);
+#endif
 }
 
 pn_transport_t* Connection::getTransport()
@@ -222,10 +234,15 @@ size_t Connection::encode(char* buffer,
 
 void Connection::doOutput(size_t capacity)
 {
-    for (ssize_t n = pn_transport_pending(transport); n > 0 && n < (ssize_t) 
capacity; n = pn_transport_pending(transport)) {
-        if (dispatch()) processDeliveries();
-        else break;
-    }
+    ssize_t n = 0;
+    do {
+        if (dispatch()) {
+            processDeliveries();
+            ssize_t next = pn_transport_pending(transport);
+            if (n == next) break;
+            n = next;
+        } else break;
+    } while (n > 0 && n < (ssize_t) capacity);
 }
 
 bool Connection::dispatch()
@@ -327,85 +344,70 @@ framing::ProtocolVersion Connection::get
 {
     return qpid::framing::ProtocolVersion(1,0);
 }
-namespace {
-pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE;
-pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
-}
 
 void Connection::process()
 {
     QPID_LOG(trace, id << " process()");
+#ifdef HAVE_PROTON_EVENTS
+    pn_event_t *event = pn_collector_peek(collector);
+    while (event) {
+        switch (pn_event_type(event)) {
+        case PN_CONNECTION_REMOTE_OPEN:
+            doConnectionRemoteOpen();
+            break;
+        case PN_CONNECTION_REMOTE_CLOSE:
+            doConnectionRemoteClose();
+            break;
+        case PN_SESSION_REMOTE_OPEN:
+            doSessionRemoteOpen(pn_event_session(event));
+            break;
+        case PN_SESSION_REMOTE_CLOSE:
+            doSessionRemoteClose(pn_event_session(event));
+            break;
+        case PN_LINK_REMOTE_OPEN:
+            doLinkRemoteOpen(pn_event_link(event));
+            break;
+        case PN_LINK_REMOTE_CLOSE:
+            doLinkRemoteClose(pn_event_link(event));
+            break;
+        case PN_DELIVERY:
+            doDeliveryUpdated(pn_event_delivery(event));
+            break;
+        default:
+            break;
+        }
+        pn_collector_pop(collector);
+        event = pn_collector_peek(collector);
+    }
+
+#else   // !HAVE_PROTON_EVENTS
+
+    const pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE;
+    const pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
+
     if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) {
-        QPID_LOG_CAT(debug, model, id << " connection opened");
-        open();
-        setContainerId(pn_connection_remote_container(connection));
+        doConnectionRemoteOpen();
     }
 
     for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = 
pn_session_next(s, REQUIRES_OPEN)) {
-        QPID_LOG_CAT(debug, model, id << " session begun");
-        pn_session_open(s);
-        boost::shared_ptr<Session> ssn(new Session(s, *this, out));
-        sessions[s] = ssn;
+        doSessionRemoteOpen(s);
     }
     for (pn_link_t* l = pn_link_head(connection, REQUIRES_OPEN); l; l = 
pn_link_next(l, REQUIRES_OPEN)) {
-        pn_link_open(l);
-
-        Sessions::iterator session = sessions.find(pn_link_session(l));
-        if (session == sessions.end()) {
-            QPID_LOG(error, id << " Link attached on unknown session!");
-        } else {
-            try {
-                session->second->attach(l);
-                QPID_LOG_CAT(debug, protocol, id << " link " << l << " 
attached on " << pn_link_session(l));
-            } catch (const Exception& e) {
-                QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
-                pn_condition_t* error = pn_link_condition(l);
-                pn_condition_set_name(error, e.symbol());
-                pn_condition_set_description(error, e.what());
-                pn_link_close(l);
-            } catch (const qpid::framing::UnauthorizedAccessException& e) {
-                QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
-                pn_condition_t* error = pn_link_condition(l);
-                pn_condition_set_name(error, 
qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS.c_str());
-                pn_condition_set_description(error, e.what());
-                pn_link_close(l);
-            } catch (const std::exception& e) {
-                QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
-                pn_condition_t* error = pn_link_condition(l);
-                pn_condition_set_name(error, 
qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
-                pn_condition_set_description(error, e.what());
-                pn_link_close(l);
-            }
-        }
+        doLinkRemoteOpen(l);
     }
 
     processDeliveries();
 
     for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE); l; l = 
pn_link_next(l, REQUIRES_CLOSE)) {
-        pn_link_close(l);
-        Sessions::iterator session = sessions.find(pn_link_session(l));
-        if (session == sessions.end()) {
-            QPID_LOG(error, id << " peer attempted to detach link on unknown 
session!");
-        } else {
-            session->second->detach(l);
-            QPID_LOG_CAT(debug, model, id << " link detached");
-        }
+        doLinkRemoteClose(l);
     }
     for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE); s; s = 
pn_session_next(s, REQUIRES_CLOSE)) {
-        pn_session_close(s);
-        Sessions::iterator i = sessions.find(s);
-        if (i != sessions.end()) {
-            i->second->close();
-            sessions.erase(i);
-            QPID_LOG_CAT(debug, model, id << " session ended");
-        } else {
-            QPID_LOG(error, id << " peer attempted to close unrecognised 
session");
-        }
+        doSessionRemoteClose(s);
     }
     if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
-        QPID_LOG_CAT(debug, model, id << " connection closed");
-        pn_connection_close(connection);
+        doConnectionRemoteClose();
     }
+#endif  // !HAVE_PROTON_EVENTS
 }
 namespace {
 std::string convert(pn_delivery_tag_t in)
@@ -415,34 +417,15 @@ std::string convert(pn_delivery_tag_t in
 }
 void Connection::processDeliveries()
 {
-    //handle deliveries
+#ifdef HAVE_PROTON_EVENTS
+    // with the event API, there's no way to selectively process only
+    // the delivery-related events.  We have to process all events:
+    process();
+#else
     for (pn_delivery_t* delivery = pn_work_head(connection); delivery; 
delivery = pn_work_next(delivery)) {
-        pn_link_t* link = pn_delivery_link(delivery);
-        try {
-            if (pn_link_is_receiver(link)) {
-                Sessions::iterator i = sessions.find(pn_link_session(link));
-                if (i != sessions.end()) {
-                    i->second->readable(link, delivery);
-                } else {
-                    pn_delivery_update(delivery, PN_REJECTED);
-                }
-            } else { //i.e. SENDER
-                Sessions::iterator i = sessions.find(pn_link_session(link));
-                if (i != sessions.end()) {
-                    QPID_LOG(trace, id << " handling outgoing delivery for " 
<< link << " on session " << pn_link_session(link));
-                    i->second->writable(link, delivery);
-                } else {
-                    QPID_LOG(error, id << " Got delivery for non-existent 
session: " << pn_link_session(link) << ", link: " << link);
-                }
-            }
-        } catch (const Exception& e) {
-            QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << 
e.what());
-            pn_condition_t* error = pn_link_condition(link);
-            pn_condition_set_name(error, e.symbol());
-            pn_condition_set_description(error, e.what());
-            pn_link_close(link);
-        }
+        doDeliveryUpdated(delivery);
     }
+#endif
 }
 
 std::string Connection::getError()
@@ -470,4 +453,132 @@ void Connection::closedByManagement()
     closeRequested = true;
     out.activateOutput();
 }
+
+// the peer has issued an Open performative
+void Connection::doConnectionRemoteOpen()
+{
+    // respond in kind if we haven't yet
+    if ((pn_connection_state(connection) & PN_LOCAL_UNINIT) == 
PN_LOCAL_UNINIT) {
+        QPID_LOG_CAT(debug, model, id << " connection opened");
+        open();
+        setContainerId(pn_connection_remote_container(connection));
+    }
+}
+
+// the peer has issued a Close performative
+void Connection::doConnectionRemoteClose()
+{
+    if ((pn_connection_state(connection) & PN_LOCAL_CLOSED) == 0) {
+        QPID_LOG_CAT(debug, model, id << " connection closed");
+        pn_connection_close(connection);
+    }
+}
+
+// the peer has issued a Begin performative
+void Connection::doSessionRemoteOpen(pn_session_t *session)
+{
+    if ((pn_session_state(session) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) {
+        QPID_LOG_CAT(debug, model, id << " session begun");
+        pn_session_open(session);
+        boost::shared_ptr<Session> ssn(new Session(session, *this, out));
+        sessions[session] = ssn;
+    }
+}
+
+// the peer has issued an End performative
+void Connection::doSessionRemoteClose(pn_session_t *session)
+{
+    if ((pn_session_state(session) & PN_LOCAL_CLOSED) == 0) {
+        pn_session_close(session);
+        Sessions::iterator i = sessions.find(session);
+        if (i != sessions.end()) {
+            i->second->close();
+            sessions.erase(i);
+            QPID_LOG_CAT(debug, model, id << " session ended");
+        } else {
+            QPID_LOG(error, id << " peer attempted to close unrecognised 
session");
+        }
+    }
+}
+
+// the peer has issued an Attach performative
+void Connection::doLinkRemoteOpen(pn_link_t *link)
+{
+    if ((pn_link_state(link) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) {
+        pn_link_open(link);
+        Sessions::iterator session = sessions.find(pn_link_session(link));
+        if (session == sessions.end()) {
+            QPID_LOG(error, id << " Link attached on unknown session!");
+        } else {
+            try {
+                session->second->attach(link);
+                QPID_LOG_CAT(debug, protocol, id << " link " << link << " 
attached on " << pn_link_session(link));
+            } catch (const Exception& e) {
+                QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
+                pn_condition_t* error = pn_link_condition(link);
+                pn_condition_set_name(error, e.symbol());
+                pn_condition_set_description(error, e.what());
+                pn_link_close(link);
+            } catch (const qpid::framing::UnauthorizedAccessException& e) {
+                QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
+                pn_condition_t* error = pn_link_condition(link);
+                pn_condition_set_name(error, 
qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS.c_str());
+                pn_condition_set_description(error, e.what());
+                pn_link_close(link);
+            } catch (const std::exception& e) {
+                QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
+                pn_condition_t* error = pn_link_condition(link);
+                pn_condition_set_name(error, 
qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
+                pn_condition_set_description(error, e.what());
+                pn_link_close(link);
+            }
+        }
+    }
+}
+
+// the peer has issued a Detach performative
+void Connection::doLinkRemoteClose(pn_link_t *link)
+{
+    if ((pn_link_state(link) & PN_LOCAL_CLOSED) == 0) {
+        pn_link_close(link);
+        Sessions::iterator session = sessions.find(pn_link_session(link));
+        if (session == sessions.end()) {
+            QPID_LOG(error, id << " peer attempted to detach link on unknown 
session!");
+        } else {
+            session->second->detach(link);
+            QPID_LOG_CAT(debug, model, id << " link detached");
+        }
+    }
+}
+
+// the status of the delivery has changed
+void Connection::doDeliveryUpdated(pn_delivery_t *delivery)
+{
+    pn_link_t* link = pn_delivery_link(delivery);
+    try {
+        if (pn_link_is_receiver(link)) {
+            Sessions::iterator i = sessions.find(pn_link_session(link));
+            if (i != sessions.end()) {
+                i->second->readable(link, delivery);
+            } else {
+                pn_delivery_update(delivery, PN_REJECTED);
+            }
+        } else { //i.e. SENDER
+            Sessions::iterator i = sessions.find(pn_link_session(link));
+            if (i != sessions.end()) {
+                QPID_LOG(trace, id << " handling outgoing delivery for " << 
link << " on session " << pn_link_session(link));
+                i->second->writable(link, delivery);
+            } else {
+                QPID_LOG(error, id << " Got delivery for non-existent session: 
" << pn_link_session(link) << ", link: " << link);
+            }
+        }
+    } catch (const Exception& e) {
+        QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << 
e.what());
+        pn_condition_t* error = pn_link_condition(link);
+        pn_condition_set_name(error, e.symbol());
+        pn_condition_set_description(error, e.what());
+        pn_link_close(link);
+    }
+}
+
 }}} // namespace qpid::broker::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h?rev=1646354&r1=1646353&r2=1646354&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h Wed Dec 17 22:37:37 
2014
@@ -31,6 +31,9 @@
 struct pn_connection_t;
 struct pn_session_t;
 struct pn_transport_t;
+struct pn_collector_t;
+struct pn_link_t;
+struct pn_delivery_t;
 
 namespace qpid {
 namespace sys {
@@ -69,6 +72,7 @@ class Connection : public BrokerContext,
     typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions;
     pn_connection_t* connection;
     pn_transport_t* transport;
+    pn_collector_t* collector;
     qpid::sys::OutputControl& out;
     const std::string id;
     bool haveOutput;
@@ -86,6 +90,17 @@ class Connection : public BrokerContext,
     void open();
     void readPeerProperties();
     void closedByManagement();
+
+ private:
+    // handle Proton engine events
+    void doConnectionRemoteOpen();
+    void doConnectionRemoteClose();
+    void doSessionRemoteOpen(pn_session_t *session);
+    void doSessionRemoteClose(pn_session_t *session);
+    void doLinkRemoteOpen(pn_link_t *link);
+    void doLinkRemoteClose(pn_link_t *link);
+    void doDeliveryUpdated(pn_delivery_t *delivery);
+
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp?rev=1646354&r1=1646353&r2=1646354&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Relay.cpp Wed Dec 17 22:37:37 2014
@@ -126,7 +126,13 @@ bool OutgoingFromRelay::doWork()
 {
     relay->check();
     relay->setCredit(pn_link_credit(link));
-    return relay->send(link);
+    bool worked = relay->send(link);
+    pn_delivery_t *d = pn_link_current(link);
+    if (d && pn_delivery_writable(d)) {
+        handle(d);
+        return true;
+    }
+    return worked;
 }
 /**
  * Called when a delivery is writable



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to