Author: tross
Date: Tue Aug  6 16:27:50 2013
New Revision: 1511021

URL: http://svn.apache.org/r1511021
Log:
QPID-5045 - Refactored the router data structures to support message and link 
routing.
QPID-4997 - Fixed the thread safety problem.

 - Wrapped "pn_delivery" in the container to allow for explicit linkage of 
delivery pairs.
 - Removed the linkage between "message" and "delivery".  Messages can now be 
discarded as
   soon as the last copy is delivered because their status will be tracked in 
the dx_delivery
   object.
 - Removed tx_handler from the set of callbacks in the container.  There is no 
need for this
   notification (sendable delivery) because Dispatch does not create outbound 
deliveries
   until it is ready to send them (i.e. deliveries are created and advanced in 
one synchronous
   operation).
 - Replaced the out_fifo of messages per outbound-link with a pair of fifos 
(one for messages
   and one for state changes) per link.  Note that even inbound-links need to 
send state
   changes outbound.  This change addresses QPID-4997.


Modified:
    qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h
    qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h
    qpid/trunk/qpid/extras/dispatch/src/container.c
    qpid/trunk/qpid/extras/dispatch/src/message.c
    qpid/trunk/qpid/extras/dispatch/src/message_private.h
    qpid/trunk/qpid/extras/dispatch/src/router_node.c

Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h?rev=1511021&r1=1511020&r2=1511021&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h Tue Aug  
6 16:27:50 2013
@@ -51,10 +51,11 @@ typedef enum {
 } dx_direction_t;
 
 
-typedef struct dx_node_t dx_node_t;
-typedef struct dx_link_t dx_link_t;
+typedef struct dx_node_t     dx_node_t;
+typedef struct dx_link_t     dx_link_t;
+typedef struct dx_delivery_t dx_delivery_t;
 
-typedef void (*dx_container_delivery_handler_t)    (void *node_context, 
dx_link_t *link, pn_delivery_t *delivery);
+typedef void (*dx_container_delivery_handler_t)    (void *node_context, 
dx_link_t *link, dx_delivery_t *delivery);
 typedef int  (*dx_container_link_handler_t)        (void *node_context, 
dx_link_t *link);
 typedef int  (*dx_container_link_detach_handler_t) (void *node_context, 
dx_link_t *link, int closed);
 typedef void (*dx_container_node_handler_t)        (void *type_context, 
dx_node_t *node);
@@ -65,23 +66,68 @@ typedef struct {
     void *type_context;
     int   allow_dynamic_creation;
 
-    //
+    //=======================
     // Node-Instance Handlers
+    //=======================
+
+    //
+    // rx_handler - Invoked when a new received delivery is avaliable for 
processing.
+    //
+    dx_container_delivery_handler_t rx_handler;
+
+    //
+    // disp_handler - Invoked when an existing delivery changes disposition
+    //                or settlement state.
+    //
+    dx_container_delivery_handler_t disp_handler;
+
+    //
+    // incoming_handler - Invoked when an attach for a new incoming link is 
received.
+    //
+    dx_container_link_handler_t incoming_handler;
+
+    //
+    // outgoing_handler - Invoked when an attach for a new outgoing link is 
received.
+    //
+    dx_container_link_handler_t outgoing_handler;
+
+    //
+    // writable_handler - Invoked when an outgoing link is available for 
sending either
+    //                    deliveries or disposition changes.  The handler must 
check the
+    //                    link's credit to determine whether (and how many) 
message
+    //                    deliveries may be sent.
     //
-    dx_container_delivery_handler_t     rx_handler;
-    dx_container_delivery_handler_t     tx_handler;
-    dx_container_delivery_handler_t     disp_handler;
-    dx_container_link_handler_t         incoming_handler;
-    dx_container_link_handler_t         outgoing_handler;
-    dx_container_link_handler_t         writable_handler;
-    dx_container_link_detach_handler_t  link_detach_handler;
+    dx_container_link_handler_t writable_handler;
 
     //
+    // link_detach_handler - Invoked when a link is detached.
+    //
+    dx_container_link_detach_handler_t link_detach_handler;
+
+    //===================
     // Node-Type Handlers
+    //===================
+
+    //
+    // node_created_handler - Invoked when a new instance of the node-type is 
created.
     //
     dx_container_node_handler_t  node_created_handler;
+
+    //
+    // node_destroyed_handler - Invoked when an instance of the node type is 
destroyed.
+    //
     dx_container_node_handler_t  node_destroyed_handler;
+
+    //
+    // inbound_conn_open_handler - Invoked when an incoming connection (via 
listener)
+    //                             is established.
+    //
     dx_container_conn_handler_t  inbound_conn_open_handler;
+
+    //
+    // outbound_conn_open_handler - Invoked when an outgoing connection (via 
connector)
+    //                              is established.
+    //
     dx_container_conn_handler_t  outbound_conn_open_handler;
 } dx_node_type_t;
 
@@ -116,6 +162,26 @@ pn_terminus_t *dx_link_remote_target(dx_
 void dx_link_activate(dx_link_t *link);
 void dx_link_close(dx_link_t *link);
 
+/**
+ * Important: dx_delivery must never be called twice in a row without an 
intervening pn_link_advance.
+ *            The Disatch architecture provides a hook for discovering when an 
outgoing link is writable
+ *            and has credit.  When a link is writable, a delivery is 
allocated, written, and advanced
+ *            in one operation.  If a backlog of pending deliveries is 
created, an assertion will be
+ *            thrown.
+ */
+dx_delivery_t *dx_delivery(dx_link_t *link, pn_delivery_tag_t tag);
+void dx_delivery_free(dx_delivery_t *delivery, uint64_t final_disposition);
+void dx_delivery_set_peer(dx_delivery_t *delivery, dx_delivery_t *peer);
+dx_delivery_t *dx_delivery_peer(dx_delivery_t *delivery);
+void dx_delivery_set_context(dx_delivery_t *delivery, void *context);
+void *dx_delivery_context(dx_delivery_t *delivery);
+pn_delivery_t *dx_delivery_pn(dx_delivery_t *delivery);
+void dx_delivery_settle(dx_delivery_t *delivery);
+bool dx_delivery_settled(dx_delivery_t *delivery);
+bool dx_delivery_disp_changed(dx_delivery_t *delivery);
+uint64_t dx_delivery_disp(dx_delivery_t *delivery);
+dx_link_t *dx_delivery_link(dx_delivery_t *delivery);
+
 
 typedef struct dx_link_item_t dx_link_item_t;
 

Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h?rev=1511021&r1=1511020&r2=1511021&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h Tue Aug  6 
16:27:50 2013
@@ -19,12 +19,12 @@
  * under the License.
  */
 
-#include <proton/engine.h>
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/alloc.h>
 #include <qpid/dispatch/iterator.h>
 #include <qpid/dispatch/buffer.h>
 #include <qpid/dispatch/compose.h>
+#include <qpid/dispatch/container.h>
 
 // Callback for status change (confirmed persistent, loaded-in-memory, etc.)
 
@@ -97,16 +97,11 @@ dx_message_t *dx_message_copy(dx_message
 int dx_message_persistent(dx_message_t *qm);
 int dx_message_in_memory(dx_message_t *qm);
 
-void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery);
-pn_delivery_t *dx_message_out_delivery(dx_message_t *msg);
-void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery);
-pn_delivery_t *dx_message_in_delivery(dx_message_t *msg);
-
 //
 // Functions for received messages
 //
-dx_message_t *dx_message_receive(pn_delivery_t *delivery);
-void dx_message_send(dx_message_t *msg, pn_link_t *link);
+dx_message_t *dx_message_receive(dx_delivery_t *delivery);
+void dx_message_send(dx_message_t *msg, dx_link_t *link);
 
 int dx_message_check(dx_message_t *msg, dx_message_depth_t depth);
 dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, 
dx_message_field_t field);
@@ -114,8 +109,6 @@ dx_field_iterator_t *dx_message_field_it
 ssize_t dx_message_field_length(dx_message_t *msg, dx_message_field_t field);
 ssize_t dx_message_field_copy(dx_message_t *msg, dx_message_field_t field, 
void *buffer);
 
-pn_delivery_t *dx_message_inbound_delivery(dx_message_t *qm);
-
 //
 // Functions for composed messages
 //

Modified: qpid/trunk/qpid/extras/dispatch/src/container.c
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/container.c?rev=1511021&r1=1511020&r2=1511021&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/container.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/container.c Tue Aug  6 16:27:50 2013
@@ -47,6 +47,7 @@ ALLOC_DECLARE(dx_node_t);
 ALLOC_DEFINE(dx_node_t);
 ALLOC_DEFINE(dx_link_item_t);
 
+
 struct dx_link_t {
     pn_link_t *pn_link;
     void      *context;
@@ -56,6 +57,19 @@ struct dx_link_t {
 ALLOC_DECLARE(dx_link_t);
 ALLOC_DEFINE(dx_link_t);
 
+
+struct dx_delivery_t {
+    pn_delivery_t *pn_delivery;
+    dx_delivery_t *peer;
+    void          *context;
+    uint64_t       disposition;
+    dx_link_t     *link;
+};
+
+ALLOC_DECLARE(dx_delivery_t);
+ALLOC_DEFINE(dx_delivery_t);
+
+
 typedef struct dxc_node_type_t {
     DEQ_LINKS(struct dxc_node_type_t);
     const dx_node_type_t *ntype;
@@ -180,14 +194,25 @@ static int do_writable(pn_link_t *pn_lin
 }
 
 
-static void process_receive(pn_delivery_t *delivery)
+static void do_receive(pn_delivery_t *pnd)
 {
-    pn_link_t *pn_link = pn_delivery_link(delivery);
-    dx_link_t *link    = (dx_link_t*) pn_link_get_context(pn_link);
+    pn_link_t     *pn_link  = pn_delivery_link(pnd);
+    dx_link_t     *link     = (dx_link_t*) pn_link_get_context(pn_link);
+    dx_delivery_t *delivery = (dx_delivery_t*) pn_delivery_get_context(pnd);
 
     if (link) {
         dx_node_t *node = link->node;
         if (node) {
+            if (!delivery) {
+                delivery = new_dx_delivery_t();
+                delivery->pn_delivery = pnd;
+                delivery->peer        = 0;
+                delivery->context     = 0;
+                delivery->disposition = 0;
+                delivery->link        = link;
+                pn_delivery_set_context(pnd, delivery);
+            }
+
             node->ntype->rx_handler(node->context, link, delivery);
             return;
         }
@@ -198,34 +223,18 @@ static void process_receive(pn_delivery_
     //
     pn_link_advance(pn_link);
     pn_link_flow(pn_link, 1);
-    pn_delivery_update(delivery, PN_REJECTED);
-    pn_delivery_settle(delivery);
-}
-
-
-static void do_send(pn_delivery_t *delivery)
-{
-    pn_link_t *pn_link = pn_delivery_link(delivery);
-    dx_link_t *link    = (dx_link_t*) pn_link_get_context(pn_link);
-
-    if (link) {
-        dx_node_t *node = link->node;
-        if (node) {
-            node->ntype->tx_handler(node->context, link, delivery);
-            return;
-        }
-    }
-
-    // TODO - Cancel the delivery
+    pn_delivery_update(pnd, PN_REJECTED);
+    pn_delivery_settle(pnd);
 }
 
 
-static void do_updated(pn_delivery_t *delivery)
+static void do_updated(pn_delivery_t *pnd)
 {
-    pn_link_t *pn_link = pn_delivery_link(delivery);
-    dx_link_t *link    = (dx_link_t*) pn_link_get_context(pn_link);
+    pn_link_t     *pn_link  = pn_delivery_link(pnd);
+    dx_link_t     *link     = (dx_link_t*) pn_link_get_context(pn_link);
+    dx_delivery_t *delivery = (dx_delivery_t*) pn_delivery_get_context(pnd);
 
-    if (link) {
+    if (link && delivery) {
         dx_node_t *node = link->node;
         if (node)
             node->ntype->disp_handler(node->context, link, delivery);
@@ -239,15 +248,15 @@ static int close_handler(void* unused, p
     // Close all links, passing False as the 'closed' argument.  These links 
are not
     // being properly 'detached'.  They are being orphaned.
     //
-    pn_link_t *pn_link = pn_link_head(conn, 0);
+    pn_link_t *pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE);
     while (pn_link) {
         dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link);
         dx_node_t *node = link->node;
-        if (node)
+        if (node && link)
             node->ntype->link_detach_handler(node->context, link, 0);
         pn_link_close(pn_link);
         free_dx_link_t(link);
-        pn_link = pn_link_next(pn_link, 0);
+        pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE);
     }
 
     // teardown all sessions
@@ -305,9 +314,7 @@ static int process_handler(dx_container_
     delivery = pn_work_head(conn);
     while (delivery) {
         if (pn_delivery_readable(delivery))
-            process_receive(delivery);
-        else if (pn_delivery_writable(delivery))
-            do_send(delivery);
+            do_receive(delivery);
 
         if (pn_delivery_updated(delivery)) {
             do_updated(delivery);
@@ -325,7 +332,7 @@ static int process_handler(dx_container_
     pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
     while (pn_link) {
         assert(pn_session_connection(pn_link_session(pn_link)) == conn);
-        if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0)
+        if (pn_link_is_sender(pn_link))
             event_count += do_writable(pn_link);
         pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
     }
@@ -701,3 +708,110 @@ void dx_link_close(dx_link_t *link)
 }
 
 
+dx_delivery_t *dx_delivery(dx_link_t *link, pn_delivery_tag_t tag)
+{
+    pn_link_t *pnl = dx_link_pn(link);
+
+    //
+    // If there is a current delivery on this outgoing link, something
+    // is wrong with the delivey algorithm.  We assume that the current
+    // delivery ('pnd' below) is the one created by pn_delivery.  If it is
+    // not, then my understanding of how proton works is incorrect.
+    //
+    assert(!pn_link_current(pnl));
+
+    pn_delivery(pnl, tag);
+    pn_delivery_t *pnd = pn_link_current(pnl);
+
+    if (!pnd)
+        return 0;
+
+    dx_delivery_t *delivery = new_dx_delivery_t();
+    delivery->pn_delivery = pnd;
+    delivery->peer        = 0;
+    delivery->context     = 0;
+    delivery->disposition = 0;
+    delivery->link        = link;
+    pn_delivery_set_context(pnd, delivery);
+
+    return delivery;
+}
+
+
+void dx_delivery_free(dx_delivery_t *delivery, uint64_t final_disposition)
+{
+    if (delivery->pn_delivery) {
+        if (final_disposition > 0)
+            pn_delivery_update(delivery->pn_delivery, final_disposition);
+        pn_delivery_set_context(delivery->pn_delivery, 0);
+        pn_delivery_settle(delivery->pn_delivery);
+    }
+    if (delivery->peer)
+        delivery->peer->peer = 0;
+    free_dx_delivery_t(delivery);
+}
+
+
+void dx_delivery_set_peer(dx_delivery_t *delivery, dx_delivery_t *peer)
+{
+    delivery->peer = peer;
+}
+
+
+void dx_delivery_set_context(dx_delivery_t *delivery, void *context)
+{
+    delivery->context = context;
+}
+
+
+void *dx_delivery_context(dx_delivery_t *delivery)
+{
+    return delivery->context;
+}
+
+
+dx_delivery_t *dx_delivery_peer(dx_delivery_t *delivery)
+{
+    return delivery->peer;
+}
+
+
+pn_delivery_t *dx_delivery_pn(dx_delivery_t *delivery)
+{
+    return delivery->pn_delivery;
+}
+
+
+void dx_delivery_settle(dx_delivery_t *delivery)
+{
+    if (delivery->pn_delivery) {
+        pn_delivery_settle(delivery->pn_delivery);
+        delivery->pn_delivery = 0;
+    }
+}
+
+
+bool dx_delivery_settled(dx_delivery_t *delivery)
+{
+    return pn_delivery_settled(delivery->pn_delivery);
+}
+
+
+bool dx_delivery_disp_changed(dx_delivery_t *delivery)
+{
+    return delivery->disposition != 
pn_delivery_remote_state(delivery->pn_delivery);
+}
+
+
+uint64_t dx_delivery_disp(dx_delivery_t *delivery)
+{
+    delivery->disposition = pn_delivery_remote_state(delivery->pn_delivery);
+    return delivery->disposition;
+}
+
+
+dx_link_t *dx_delivery_link(dx_delivery_t *delivery)
+{
+    return delivery->link;
+}
+

Modified: qpid/trunk/qpid/extras/dispatch/src/message.c
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/message.c?rev=1511021&r1=1511020&r2=1511021&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/message.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/message.c Tue Aug  6 16:27:50 2013
@@ -343,8 +343,7 @@ dx_message_t *dx_allocate_message()
         return 0;
 
     DEQ_ITEM_INIT(msg);
-    msg->content      = new_dx_message_content_t();
-    msg->out_delivery = 0;
+    msg->content = new_dx_message_content_t();
 
     if (msg->content == 0) {
         free_dx_message_t((dx_message_t*) msg);
@@ -397,8 +396,7 @@ dx_message_t *dx_message_copy(dx_message
         return 0;
 
     DEQ_ITEM_INIT(copy);
-    copy->content      = content;
-    copy->out_delivery = 0;
+    copy->content = content;
 
     sys_mutex_lock(content->lock);
     content->ref_count++;
@@ -408,36 +406,11 @@ dx_message_t *dx_message_copy(dx_message
 }
 
 
-void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery)
+dx_message_t *dx_message_receive(dx_delivery_t *delivery)
 {
-    ((dx_message_pvt_t*) msg)->out_delivery = delivery;
-}
-
-
-pn_delivery_t *dx_message_out_delivery(dx_message_t *msg)
-{
-    return ((dx_message_pvt_t*) msg)->out_delivery;
-}
-
-
-void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery)
-{
-    dx_message_content_t *content = MSG_CONTENT(msg);
-    content->in_delivery = delivery;
-}
-
-
-pn_delivery_t *dx_message_in_delivery(dx_message_t *msg)
-{
-    dx_message_content_t *content = MSG_CONTENT(msg);
-    return content->in_delivery;
-}
-
-
-dx_message_t *dx_message_receive(pn_delivery_t *delivery)
-{
-    pn_link_t        *link = pn_delivery_link(delivery);
-    dx_message_pvt_t *msg  = (dx_message_pvt_t*) 
pn_delivery_get_context(delivery);
+    pn_delivery_t    *pnd  = dx_delivery_pn(delivery);
+    dx_message_pvt_t *msg  = (dx_message_pvt_t*) dx_delivery_context(delivery);
+    pn_link_t        *link = pn_delivery_link(pnd);
     ssize_t           rc;
     dx_buffer_t      *buf;
 
@@ -448,15 +421,7 @@ dx_message_t *dx_message_receive(pn_deli
     //
     if (!msg) {
         msg = (dx_message_pvt_t*) dx_allocate_message();
-        pn_delivery_set_context(delivery, (void*) msg);
-
-        //
-        // Record the incoming delivery only if it is not settled.  If it is 
-        // settled, it should not be recorded as no future operations on it are
-        // permitted.
-        //
-        if (!pn_delivery_settled(delivery))
-            msg->content->in_delivery = delivery;
+        dx_delivery_set_context(delivery, (void*) msg);
     }
 
     //
@@ -489,6 +454,7 @@ dx_message_t *dx_message_receive(pn_deli
                 DEQ_REMOVE_TAIL(msg->content->buffers);
                 dx_free_buffer(buf);
             }
+            dx_delivery_set_context(delivery, 0);
             return (dx_message_t*) msg;
         }
 
@@ -520,14 +486,14 @@ dx_message_t *dx_message_receive(pn_deli
 }
 
 
-void dx_message_send(dx_message_t *in_msg, pn_link_t *link)
+void dx_message_send(dx_message_t *in_msg, dx_link_t *link)
 {
     dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
     dx_buffer_t      *buf = DEQ_HEAD(msg->content->buffers);
 
     // TODO - Handle cases where annotations have been added or modified
     while (buf) {
-        pn_link_send(link, (char*) dx_buffer_base(buf), dx_buffer_size(buf));
+        pn_link_send(dx_link_pn(link), (char*) dx_buffer_base(buf), 
dx_buffer_size(buf));
         buf = DEQ_NEXT(buf);
     }
 }

Modified: qpid/trunk/qpid/extras/dispatch/src/message_private.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/message_private.h?rev=1511021&r1=1511020&r2=1511021&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/message_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/message_private.h Tue Aug  6 16:27:50 
2013
@@ -63,7 +63,6 @@ typedef struct {
     sys_mutex_t         *lock;
     uint32_t             ref_count;                       // The number of 
qmessages referencing this
     dx_buffer_list_t     buffers;                         // The buffer chain 
containing the message
-    pn_delivery_t       *in_delivery;                     // The delivery on 
which the message arrived
     dx_field_location_t  section_message_header;          // The message 
header list
     dx_field_location_t  section_delivery_annotation;     // The delivery 
annotation map
     dx_field_location_t  section_message_annotation;      // The message 
annotation map
@@ -83,7 +82,6 @@ typedef struct {
 typedef struct {
     DEQ_LINKS(dx_message_t);                              // Deq linkage that 
overlays the dx_message_t
     dx_message_content_t *content;
-    pn_delivery_t        *out_delivery;
 } dx_message_pvt_t;
 
 ALLOC_DECLARE(dx_message_t);

Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1511021&r1=1511020&r2=1511021&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Tue Aug  6 16:27:50 2013
@@ -62,21 +62,27 @@ typedef enum {
 
 typedef struct dx_routed_event_t {
     DEQ_LINKS(struct dx_routed_event_t);
-    dx_message_t *message;
-    bool          settled;
-    uint64_t      disposition;
+    dx_delivery_t *delivery;
+    dx_message_t  *message;
+    bool           settle;
+    uint64_t       disposition;
 } dx_routed_event_t;
 
+ALLOC_DECLARE(dx_routed_event_t);
+ALLOC_DEFINE(dx_routed_event_t);
+DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t);
+
 
 struct dx_router_link_t {
     DEQ_LINKS(dx_router_link_t);
-    dx_direction_t     link_direction;
-    dx_link_type_t     link_type;
-    dx_address_t      *owning_addr;     // [ref] Address record that owns this 
link
-    dx_link_t         *link;            // [own] Link pointer
-    dx_router_link_t  *connected_link;  // [ref] If this is a link-route, 
reference the connected link
-    dx_router_link_t  *peer_link;       // [ref] If this is a bidirectional 
link-route, reference the peer link
-    dx_message_list_t  out_fifo;        // Message FIFO for outgoing messages. 
 Unused for incoming links
+    dx_direction_t          link_direction;
+    dx_link_type_t          link_type;
+    dx_address_t           *owning_addr;     // [ref] Address record that owns 
this link
+    dx_link_t              *link;            // [own] Link pointer
+    dx_router_link_t       *connected_link;  // [ref] If this is a link-route, 
reference the connected link
+    dx_router_link_t       *peer_link;       // [ref] If this is a 
bidirectional link-route, reference the peer link
+    dx_routed_event_list_t  event_fifo;      // FIFO of outgoing delivery/link 
events (no messages)
+    dx_routed_event_list_t  msg_fifo;        // FIFO of outgoing message 
deliveries
 };
 
 ALLOC_DECLARE(dx_router_link_t);
@@ -125,54 +131,128 @@ struct dx_router_t {
 
 
 /**
- * Outbound Delivery Handler
+ * Outgoing Link Writable Handler
  */
-static void router_tx_handler(void* context, dx_link_t *link, pn_delivery_t 
*delivery)
+static int router_writable_link_handler(void* context, dx_link_t *link)
 {
-    dx_router_t      *router  = (dx_router_t*) context;
-    pn_link_t        *pn_link = pn_delivery_link(delivery);
-    dx_router_link_t *rlink   = (dx_router_link_t*) dx_link_get_context(link);
-    dx_message_t     *msg;
-    size_t            size;
+    dx_router_t            *router = (dx_router_t*) context;
+    dx_delivery_t          *delivery;
+    dx_router_link_t       *rlink = (dx_router_link_t*) 
dx_link_get_context(link);
+    pn_link_t              *pn_link = dx_link_pn(link);
+    uint64_t                tag;
+    int                     link_credit = pn_link_credit(pn_link);
+    dx_routed_event_list_t  to_send;
+    dx_routed_event_list_t  events;
+    dx_routed_event_t      *re;
+    size_t                  offer;
+    int                     event_count = 0;
+
+    DEQ_INIT(to_send);
+    DEQ_INIT(events);
 
     sys_mutex_lock(router->lock);
-    msg = DEQ_HEAD(rlink->out_fifo);
-    if (!msg) {
-        // TODO - Recind the delivery
-        sys_mutex_unlock(router->lock);
-        return;
+
+    //
+    // Pull the non-delivery events into a local list so they can be processed 
without
+    // the lock being held.
+    //
+    re = DEQ_HEAD(rlink->event_fifo);
+    while (re) {
+        DEQ_REMOVE_HEAD(rlink->event_fifo);
+        DEQ_INSERT_TAIL(events, re);
+        re = DEQ_HEAD(rlink->event_fifo);
     }
 
-    DEQ_REMOVE_HEAD(rlink->out_fifo);
-    size = (DEQ_SIZE(rlink->out_fifo));
+    //
+    // Under lock, move available deliveries from the msg_fifo to the local 
to_send
+    // list.  Don't move more than we have credit to send.
+    //
+    if (link_credit > 0) {
+        tag = router->dtag;
+        re = DEQ_HEAD(rlink->msg_fifo);
+        while (re) {
+            DEQ_REMOVE_HEAD(rlink->msg_fifo);
+            DEQ_INSERT_TAIL(to_send, re);
+            if (DEQ_SIZE(to_send) == link_credit)
+                break;
+            re = DEQ_HEAD(rlink->msg_fifo);
+        }
+        router->dtag += DEQ_SIZE(to_send);
+    }
+
+    offer = DEQ_SIZE(rlink->msg_fifo);
     sys_mutex_unlock(router->lock);
 
-    dx_message_send(msg, pn_link);
+    //
+    // Deliver all the to_send messages downrange
+    //
+    re = DEQ_HEAD(to_send);
+    while (re) {
+        DEQ_REMOVE_HEAD(to_send);
+
+        //
+        // Get a delivery for the send.  This will be the current deliver on 
the link.
+        //
+        tag++;
+        delivery = dx_delivery(link, pn_dtag((char*) &tag, 8));
+
+        //
+        // Send the message
+        //
+        dx_message_send(re->message, link);
+
+        //
+        // If there is an incoming delivery associated with this message, link 
it
+        // with the outgoing delivery.  Otherwise, the message arrived 
pre-settled
+        // and should be sent presettled.
+        //
+        if (re->delivery) {
+            dx_delivery_set_peer(re->delivery, delivery);
+            dx_delivery_set_peer(delivery, re->delivery);
+        } else
+            dx_delivery_free(delivery, 0);  // settle and free
+
+        pn_link_advance(pn_link);
+        event_count++;
+
+        dx_free_message(re->message);
+        free_dx_routed_event_t(re);
+        re = DEQ_HEAD(to_send);
+    }
 
     //
-    // If there is no incoming delivery, it was pre-settled.  In this case,
-    // we must pre-settle the outgoing delivery as well.
+    // Process the non-delivery events.
     //
-    if (dx_message_in_delivery(msg)) {
-        pn_delivery_set_context(delivery, (void*) msg);
-        dx_message_set_out_delivery(msg, delivery);
-    } else {
-        pn_delivery_settle(delivery);
-        dx_free_message(msg);
+    re = DEQ_HEAD(events);
+    while (re) {
+        DEQ_REMOVE_HEAD(events);
+
+        if (re->delivery) {
+            if (re->disposition)
+                pn_delivery_update(dx_delivery_pn(re->delivery), 
re->disposition);
+            if (re->settle)
+                dx_delivery_free(re->delivery, 0);
+        }
+
+        free_dx_routed_event_t(re);
+        re = DEQ_HEAD(events);
     }
 
-    pn_link_advance(pn_link);
-    pn_link_offered(pn_link, size);
+    //
+    // Set the offer to the number of messages remaining to be sent.
+    //
+    pn_link_offered(pn_link, offer);
+    return event_count;
 }
 
 
 /**
  * Inbound Delivery Handler
  */
-static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t 
*delivery)
+static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t 
*delivery)
 {
     dx_router_t      *router  = (dx_router_t*) context;
-    pn_link_t        *pn_link = pn_delivery_link(delivery);
+    pn_link_t        *pn_link = dx_link_pn(link);
     dx_router_link_t *rlink   = (dx_router_link_t*) dx_link_get_context(link);
     dx_message_t     *msg;
     int               valid_message = 0;
@@ -202,15 +282,27 @@ static void router_rx_handler(void* cont
     // the message in this case.
     //
     if (rlink->connected_link) {
-        dx_router_link_t *clink      = rlink->connected_link;
-        pn_link_t        *pn_outlink = dx_link_pn(clink->link);
-        DEQ_INSERT_TAIL(clink->out_fifo, msg);
-        sys_mutex_unlock(router->lock);
+        dx_router_link_t  *clink = rlink->connected_link;
+        dx_routed_event_t *re    = new_dx_routed_event_t();
 
-        pn_link_offered(pn_outlink, DEQ_SIZE(clink->out_fifo));
-        dx_link_activate(clink->link);
-        sys_mutex_unlock(router->lock);
+        DEQ_ITEM_INIT(re);
+        re->delivery    = 0; 
+        re->message     = msg;
+        re->settle      = false;
+        re->disposition = 0;
+        DEQ_INSERT_TAIL(clink->msg_fifo, re);
 
+        //
+        // If the incoming delivery is settled (pre-settled), don't link it 
into the routed
+        // event.  If it's not settled, link it into the event for later 
handling.
+        //
+        if (dx_delivery_settled(delivery))
+            dx_delivery_free(delivery, 0);
+        else
+            re->delivery = delivery;
+
+        sys_mutex_unlock(router->lock);
+        dx_link_activate(clink->link);
         return;
     }
 
@@ -227,6 +319,8 @@ static void router_rx_handler(void* cont
     if (valid_message) {
         dx_field_iterator_t *iter = dx_message_field_iterator(msg, 
DX_FIELD_TO);
         dx_address_t        *addr;
+        int                  fanout = 0;
+
         if (iter) {
             dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
             hash_retrieve(router->out_hash, iter, (void*) &addr);
@@ -261,10 +355,18 @@ static void router_rx_handler(void* cont
                     //
                     dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
                     while (dest_link) {
-                        pn_link_t    *pn_outlink = dx_link_pn(dest_link->link);
-                        dx_message_t *copy       = dx_message_copy(msg);
-                        DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
-                        pn_link_offered(pn_outlink, 
DEQ_SIZE(dest_link->out_fifo));
+                        dx_routed_event_t *re = new_dx_routed_event_t();
+                        DEQ_ITEM_INIT(re);
+                        re->delivery    = 0;
+                        re->message     = dx_message_copy(msg);
+                        re->settle      = 0;
+                        re->disposition = 0;
+                        DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+                        fanout++;
+                        if (fanout == 1 && !dx_delivery_settled(delivery))
+                            re->delivery = delivery;
+
                         dx_link_activate(dest_link->link);
                         dest_link = DEQ_NEXT(dest_link);
                     }
@@ -279,39 +381,41 @@ static void router_rx_handler(void* cont
                         else
                             dest_link = dest_node->peer_link;
                         if (dest_link) {
-                            pn_link_t    *pn_outlink = 
dx_link_pn(dest_link->link);
-                            dx_message_t *copy       = dx_message_copy(msg);
-                            DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
-                            pn_link_offered(pn_outlink, 
DEQ_SIZE(dest_link->out_fifo));
+                            dx_routed_event_t *re = new_dx_routed_event_t();
+                            DEQ_ITEM_INIT(re);
+                            re->delivery    = 0;
+                            re->message     = dx_message_copy(msg);
+                            re->settle      = 0;
+                            re->disposition = 0;
+                            DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+                            fanout++;
+                            if (fanout == 1)
+                                re->delivery = delivery;
+
                             dx_link_activate(dest_link->link);
                         }
                         dest_node = DEQ_NEXT(dest_node);
                     }
                 }
-
-            } else {
-                //
-                // To field contains an unknown address.  Release the message.
-                //
-                // TODO - Undeliverable processing
-                pn_delivery_update(delivery, PN_RELEASED);
-                pn_delivery_settle(delivery);
             }
 
             //
-            // Since we are message-routing, there is no end-to-end 
disposition or
-            // settlement.  Accept and settle the delivery now.
+            // In message-routing mode, the handling of the incoming delivery 
depends on the
+            // number of copies of the received message that were forwarded.
             //
-            pn_delivery_update(delivery, PN_ACCEPTED);
-            pn_delivery_settle(delivery);
+            if (handler) {
+                dx_delivery_free(delivery, PN_ACCEPTED);
+            } else if (fanout == 0) {
+                dx_delivery_free(delivery, PN_RELEASED);
+            } else if (fanout > 1)
+                dx_delivery_free(delivery, PN_ACCEPTED);
         }
     } else {
         //
         // Message is invalid.  Reject the message.
         //
-        pn_delivery_update(delivery, PN_REJECTED);
-        pn_delivery_settle(delivery);
-        pn_delivery_set_context(delivery, 0);
+        dx_delivery_free(delivery, PN_REJECTED);
     }
 
     sys_mutex_unlock(router->lock);
@@ -328,54 +432,41 @@ static void router_rx_handler(void* cont
 /**
  * Delivery Disposition Handler
  */
-static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t 
*delivery)
+static void router_disp_handler(void* context, dx_link_t *link, dx_delivery_t 
*delivery)
 {
-    pn_link_t        *pn_link = pn_delivery_link(delivery);
-    //dx_router_link_t *rlink   = (dx_router_link_t*) 
dx_link_get_context(link);
-
-    //
-    // TODO - Propagate disposition and settlement between deliveries on a 
link-routed
-    //        link pair.
-    //
-    return;
-
-    if (pn_link_is_sender(pn_link)) {
-        uint64_t       disp     = pn_delivery_remote_state(delivery);
-        dx_message_t  *msg      = pn_delivery_get_context(delivery);
-        pn_delivery_t *activate = 0;
-
-        if (msg) {
-            assert(delivery == dx_message_out_delivery(msg));
-            if (disp != 0) {
-                activate = dx_message_in_delivery(msg);
-                pn_delivery_update(activate, disp);
-                // TODO - handling of the data accompanying RECEIVED/MODIFIED
-            }
+    dx_router_t   *router  = (dx_router_t*) context;
+    bool           changed = dx_delivery_disp_changed(delivery);
+    uint64_t       disp    = dx_delivery_disp(delivery);
+    bool           settled = dx_delivery_settled(delivery);
+    dx_delivery_t *peer    = dx_delivery_peer(delivery);
 
-            if (pn_delivery_settled(delivery)) {
-                //
-                // Downstream delivery has been settled.  Propagate the 
settlement
-                // upstream.
-                //
-                activate = dx_message_in_delivery(msg);
-                pn_delivery_settle(activate);
-                pn_delivery_settle(delivery);
-                dx_free_message(msg);
-            }
-
-            if (activate) {
-                //
-                // Activate the upstream/incoming link so that the settlement 
will
-                // get pushed out.
-                //
-                dx_link_t *act_link = (dx_link_t*) 
pn_link_get_context(pn_delivery_link(activate));
-                dx_link_activate(act_link);
-            }
+    if (peer) {
+        //
+        // The case where this delivery has a peer.
+        //
+        if (changed || settled) {
+            dx_link_t         *peer_link = dx_delivery_link(peer);
+            dx_router_link_t  *prl       = (dx_router_link_t*) 
dx_link_get_context(peer_link);
+            dx_routed_event_t *re        = new_dx_routed_event_t();
+            DEQ_ITEM_INIT(re);
+            re->delivery    = peer;
+            re->message     = 0;
+            re->settle      = settled;
+            re->disposition = changed ? disp : 0;
+
+            sys_mutex_lock(router->lock);
+            DEQ_INSERT_TAIL(prl->event_fifo, re);
+            sys_mutex_unlock(router->lock);
 
-            return;
+            dx_link_activate(peer_link);
         }
+
     } else {
-        // TODO - Handle disposition updates from upstream
+        //
+        // The no-peer case.  Ignore status changes and echo settlement.
+        //
+        if (settled)
+            dx_delivery_free(delivery, 0);
     }
 }
 
@@ -396,7 +487,8 @@ static int router_incoming_link_handler(
     rlink->link           = link;
     rlink->connected_link = 0;
     rlink->peer_link      = 0;
-    DEQ_INIT(rlink->out_fifo);  // Won't be used
+    DEQ_INIT(rlink->event_fifo);
+    DEQ_INIT(rlink->msg_fifo);
 
     dx_link_set_context(link, rlink);
 
@@ -443,7 +535,8 @@ static int router_outgoing_link_handler(
     rlink->link           = link;
     rlink->connected_link = 0;
     rlink->peer_link      = 0;
-    DEQ_INIT(rlink->out_fifo);
+    DEQ_INIT(rlink->event_fifo);
+    DEQ_INIT(rlink->msg_fifo);
 
     dx_link_set_context(link, rlink);
 
@@ -475,38 +568,6 @@ static int router_outgoing_link_handler(
 
 
 /**
- * Outgoing Link Writable Handler
- */
-static int router_writable_link_handler(void* context, dx_link_t *link)
-{
-    dx_router_t      *router = (dx_router_t*) context;
-    int               grant_delivery = 0;
-    pn_delivery_t    *delivery;
-    dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
-    pn_link_t        *pn_link = dx_link_pn(link);
-    uint64_t          tag;
-
-    sys_mutex_lock(router->lock);
-    if (DEQ_SIZE(rlink->out_fifo) > 0) {
-        grant_delivery = 1;
-        tag = router->dtag++;
-    }
-    sys_mutex_unlock(router->lock);
-
-    if (grant_delivery) {
-        pn_delivery(pn_link, pn_dtag((char*) &tag, 8));
-        delivery = pn_link_current(pn_link);
-        if (delivery) {
-            router_tx_handler(context, link, delivery);
-            return 1;
-        }
-    }
-
-    return 0;
-}
-
-
-/**
  * Link Detached Handler
  */
 static int router_link_detach_handler(void* context, dx_link_t *link, int 
closed)
@@ -539,8 +600,10 @@ static int router_link_detach_handler(vo
                 dx_field_iterator_free(iter);
             }
         }
-    } else
+    } else {
         DEQ_REMOVE(router->in_links, rlink);
+        free_dx_router_link_t(rlink);
+    }
 
     sys_mutex_unlock(router->lock);
     return 0;
@@ -580,7 +643,8 @@ static void router_outbound_open_handler
     rlink->link           = receiver;
     rlink->connected_link = 0;
     rlink->peer_link      = 0;
-    DEQ_INIT(rlink->out_fifo);  // Won't be used
+    DEQ_INIT(rlink->event_fifo);
+    DEQ_INIT(rlink->msg_fifo);
 
     dx_link_set_context(receiver, rlink);
 
@@ -604,7 +668,8 @@ static void router_outbound_open_handler
     rlink->link           = sender;
     rlink->connected_link = 0;
     rlink->peer_link      = 0;
-    DEQ_INIT(rlink->out_fifo);
+    DEQ_INIT(rlink->event_fifo);
+    DEQ_INIT(rlink->msg_fifo);
 
     dx_link_set_context(sender, rlink);
 
@@ -647,7 +712,6 @@ static void dx_router_timer_handler(void
 
 static dx_node_type_t router_node = {"router", 0, 0,
                                      router_rx_handler,
-                                     router_tx_handler,
                                      router_disp_handler,
                                      router_incoming_link_handler,
                                      router_outgoing_link_handler,
@@ -779,10 +843,14 @@ void dx_router_send(dx_dispatch_t       
         //
         dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
         while (dest_link) {
-            pn_link_t    *pn_outlink = dx_link_pn(dest_link->link);
-            dx_message_t *copy       = dx_message_copy(msg);
-            DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
-            pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo));
+            dx_routed_event_t *re = new_dx_routed_event_t();
+            DEQ_ITEM_INIT(re);
+            re->delivery    = 0;
+            re->message     = dx_message_copy(msg);
+            re->settle      = 0;
+            re->disposition = 0;
+            DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
             dx_link_activate(dest_link->link);
             dest_link = DEQ_NEXT(dest_link);
         }
@@ -797,10 +865,13 @@ void dx_router_send(dx_dispatch_t       
             else
                 dest_link = dest_node->peer_link;
             if (dest_link) {
-                pn_link_t    *pn_outlink = dx_link_pn(dest_link->link);
-                dx_message_t *copy       = dx_message_copy(msg);
-                DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
-                pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo));
+                dx_routed_event_t *re = new_dx_routed_event_t();
+                DEQ_ITEM_INIT(re);
+                re->delivery    = 0;
+                re->message     = dx_message_copy(msg);
+                re->settle      = 0;
+                re->disposition = 0;
+                DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
                 dx_link_activate(dest_link->link);
             }
             dest_node = DEQ_NEXT(dest_node);



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to