Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 0d978ef4b -> b765c1b84


DISPATCH-343 - Updated lifecycle management for deliveries


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a19ec1eb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a19ec1eb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a19ec1eb

Branch: refs/heads/master
Commit: a19ec1ebb29a7aa7108ba2a02b5d5b5afa4bcd07
Parents: 0d978ef
Author: Ted Ross <[email protected]>
Authored: Wed Jun 1 18:04:18 2016 -0400
Committer: Ted Ross <[email protected]>
Committed: Thu Jun 2 07:50:16 2016 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   |   6 +-
 src/router_core/connections.c         |   7 +-
 src/router_core/forwarder.c           |   6 +-
 src/router_core/router_core_private.h |   2 +-
 src/router_core/transfer.c            | 152 ++++++++++++++++++++++-------
 src/router_node.c                     |  31 +++---
 6 files changed, 152 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a19ec1eb/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h 
b/include/qpid/dispatch/router_core.h
index 4d9b7c6..459e1a3 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -549,11 +549,13 @@ void qdr_connection_handlers(qdr_core_t                
*core,
  * Delivery functions
  ******************************************************************************
  */
-void qdr_delivery_free(qdr_delivery_t *delivery);
-void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t 
*delivery, uint64_t disp, bool settled);
+void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t 
*delivery, uint64_t disp,
+                                     bool settled, bool ref_given);
 
 void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context);
 void *qdr_delivery_get_context(qdr_delivery_t *delivery);
+void qdr_delivery_incref(qdr_delivery_t *delivery);
+void qdr_delivery_decref(qdr_delivery_t *delivery);
 void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int 
*length);
 qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a19ec1eb/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 90dd778..8b2b114 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -462,6 +462,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, 
qdr_connection_t *conn, qdr_li
     qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries);
     while (ref) {
         qdr_del_delivery_ref(&updated_deliveries, ref);
+        qdr_delivery_decref(ref->dlv);
         ref = DEQ_HEAD(updated_deliveries);
     }
 
@@ -475,11 +476,12 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, 
qdr_connection_t *conn, qdr_li
     while (dlv) {
         DEQ_REMOVE_HEAD(undelivered);
         peer = dlv->peer;
-        qdr_delivery_free(dlv);
         if (peer) {
             peer->peer = 0;
             qdr_delivery_release_CT(core, peer);
+            qdr_delivery_decref(peer);
         }
+        qdr_delivery_decref(dlv);
         dlv = DEQ_HEAD(undelivered);
     }
 
@@ -498,12 +500,13 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, 
qdr_connection_t *conn, qdr_li
         }
 
         peer = dlv->peer;
-        qdr_delivery_free(dlv);
         if (peer) {
             peer->peer = 0;
             if (link->link_direction == QD_OUTGOING)
                 qdr_delivery_release_CT(core, peer);
+            qdr_delivery_decref(peer);
         }
+        qdr_delivery_decref(dlv);
         dlv = DEQ_HEAD(unsettled);
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a19ec1eb/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 4890e78..996788b 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -116,7 +116,10 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t 
*core, qdr_delivery_t *in
     if (!dlv->settled) {
         if (in_dlv && in_dlv->peer == 0) {
             dlv->peer = in_dlv;
-            in_dlv->peer = dlv;  // TODO - make this a back-list for multicast 
tracking
+            in_dlv->peer = dlv;
+
+            dlv->ref_count = 1;
+            qdr_delivery_incref(in_dlv);
         }
     }
 
@@ -129,6 +132,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t 
*link, qdr_delivery_t *
     sys_mutex_lock(link->conn->work_lock);
     DEQ_INSERT_TAIL(link->undelivered, dlv);
     dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
+    dlv->ref_count++; // We have the lock, don't use the incref function
 
     //
     // If the link isn't already on the links_with_deliveries list, put it 
there.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a19ec1eb/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 36e3ec0..58d73a2 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -199,6 +199,7 @@ typedef enum {
 struct qdr_delivery_t {
     DEQ_LINKS(qdr_delivery_t);
     void                *context;
+    int                  ref_count;
     qdr_link_t          *link;
     qdr_delivery_t      *peer;
     qd_message_t        *msg;
@@ -591,7 +592,6 @@ void qdr_action_enqueue(qdr_core_t *core, qdr_action_t 
*action);
 void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit);
 void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr);
 void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv);
-void qdr_delivery_free(qdr_delivery_t *delivery);
 void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a19ec1eb/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 8a03875..4ac5e1a 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -43,6 +43,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, 
qd_message_t *msg, qd_field_i
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
     ZERO(dlv);
+    dlv->ref_count      = 1;    // referenced by the action
     dlv->link           = link;
     dlv->msg            = msg;
     dlv->to_addr        = 0;
@@ -64,6 +65,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, 
qd_message_t *msg,
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
     ZERO(dlv);
+    dlv->ref_count      = 1;    // referenced by the action
     dlv->link           = link;
     dlv->msg            = msg;
     dlv->to_addr        = addr;
@@ -87,9 +89,10 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t 
*link, qd_message_t *
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
     ZERO(dlv);
-    dlv->link    = link;
-    dlv->msg     = msg;
-    dlv->settled = settled;
+    dlv->ref_count = 1;    // referenced by the action
+    dlv->link      = link;
+    dlv->msg       = msg;
+    dlv->settled   = settled;
 
     action->args.connection.delivery = dlv;
     action->args.connection.tag_length = tag_length;
@@ -119,6 +122,7 @@ void qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
                     dlv->where = QDR_DELIVERY_IN_UNSETTLED;
                 } else
                     dlv->where = QDR_DELIVERY_NOWHERE;
+
                 credit--;
                 link->total_deliveries++;
                 offer = DEQ_SIZE(link->undelivered);
@@ -130,7 +134,7 @@ void qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
                 link->credit_to_core--;
                 core->deliver_handler(core->user_context, link, dlv, settled);
                 if (settled)
-                    qdr_delivery_free(dlv);
+                    qdr_delivery_decref(dlv);
             }
         }
 
@@ -151,8 +155,7 @@ void qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
     qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries);
     while (ref) {
         core->delivery_update_handler(core->user_context, ref->dlv, 
ref->dlv->disposition, ref->dlv->settled);
-        if (ref->dlv->settled)
-            qdr_delivery_free(ref->dlv);
+        qdr_delivery_decref(ref->dlv);
         qdr_del_delivery_ref(&updated_deliveries, ref);
         ref = DEQ_HEAD(updated_deliveries);
     }
@@ -205,24 +208,22 @@ void qdr_send_to2(qdr_core_t *core, qd_message_t *msg, 
const char *addr, bool ex
 }
 
 
-void qdr_delivery_free(qdr_delivery_t *delivery)
-{
-    if (delivery->msg)
-        qd_message_free(delivery->msg);
-    if (delivery->to_addr)
-        qd_field_iterator_free(delivery->to_addr);
-    qd_bitmask_free(delivery->link_exclusion);
-    free_qdr_delivery_t(delivery);
-}
-
-
-void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t 
*delivery, uint64_t disposition, bool settled)
+void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t 
*delivery, uint64_t disposition,
+                                     bool settled, bool ref_given)
 {
     qdr_action_t *action = qdr_action(qdr_update_delivery_CT, 
"update_delivery");
     action->args.delivery.delivery    = delivery;
     action->args.delivery.disposition = disposition;
     action->args.delivery.settled     = settled;
 
+    //
+    // The delivery's ref_count must be incremented to protect its travels 
into the
+    // core thread.  If the caller has given its reference to us, we can 
simply use
+    // the given ref rather than increment a new one.
+    //
+    if (!ref_given)
+        qdr_delivery_incref(delivery);
+
     qdr_action_enqueue(core, action);
 }
 
@@ -239,6 +240,41 @@ void *qdr_delivery_get_context(qdr_delivery_t *delivery)
 }
 
 
+void qdr_delivery_incref(qdr_delivery_t *delivery)
+{
+    qdr_connection_t *conn = delivery->link ? delivery->link->conn : 0;
+
+    if (!!conn) {
+        sys_mutex_lock(conn->work_lock);
+        delivery->ref_count++;
+        sys_mutex_unlock(conn->work_lock);
+    }
+}
+
+
+void qdr_delivery_decref(qdr_delivery_t *delivery)
+{
+    qdr_connection_t *conn   = delivery->link ? delivery->link->conn : 0;
+    bool              delete = false;
+    
+    if (!!conn) {
+        sys_mutex_lock(conn->work_lock);
+        assert(delivery->ref_count > 0);
+        delete = --delivery->ref_count == 0;
+        sys_mutex_unlock(conn->work_lock);
+    }
+
+    if (delete) {
+        if (delivery->msg)
+            qd_message_free(delivery->msg);
+        if (delivery->to_addr)
+            qd_field_iterator_free(delivery->to_addr);
+        qd_bitmask_free(delivery->link_exclusion);
+        free_qdr_delivery_t(delivery);
+    }
+}
+
+
 void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int 
*length)
 {
     *tag    = (const char*) delivery->tag;
@@ -266,6 +302,12 @@ void qdr_delivery_release_CT(qdr_core_t *core, 
qdr_delivery_t *dlv)
 
     if (push || moved)
         qdr_delivery_push_CT(core, dlv);
+
+    //
+    // Remove the unsettled reference
+    //
+    if (moved)
+        qdr_delivery_decref(dlv);
 }
 
 
@@ -358,8 +400,7 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t 
*action, bool discar
 
 static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, 
qdr_delivery_t *dlv, qdr_address_t *addr)
 {
-    int  fanout     = 0;
-    bool presettled = dlv->settled;
+    int fanout = 0;
 
     if (addr) {
         fanout = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false, 
link->link_type == QD_LINK_CONTROL);
@@ -375,6 +416,9 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t 
*link, qdr_delivery_
             // Queue the message for later delivery (when the address gets
             // a valid destination).
             //
+            // Use the action-reference as the reference for undelivered rather
+            // than decrementing and incrementing the delivery ref_count.
+            //
             DEQ_INSERT_TAIL(link->undelivered, dlv);
             dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
         } else {
@@ -382,6 +426,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t 
*link, qdr_delivery_
             // Release the delivery
             //
             qdr_delivery_release_CT(core, dlv);
+            qdr_delivery_decref(dlv);
             if (link->link_type == QD_LINK_ROUTER)
                 qdr_link_issue_credit_CT(core, link, 1);
         }
@@ -394,13 +439,14 @@ static int qdr_link_forward_CT(qdr_core_t *core, 
qdr_link_t *link, qdr_delivery_
             qdr_link_issue_credit_CT(core, link, 1);
 
             //
-            // If the delivery was pre-settled, free it now.
+            // If the delivery has no more references, free it now.
             //
-            if (presettled) {
-                assert(!dlv->peer);
-                qdr_delivery_free(dlv);
-            }
+            assert(!dlv->peer);
+            qdr_delivery_decref(dlv);
         } else {
+            //
+            // Again, don't bother decrementing then incrementing the ref_count
+            //
             DEQ_INSERT_TAIL(link->unsettled, dlv);
             dlv->where = QDR_DELIVERY_IN_UNSETTLED;
 
@@ -446,6 +492,17 @@ static void qdr_link_deliver_CT(qdr_core_t *core, 
qdr_action_t *action, bool dis
         if (!dlv->settled) {
             DEQ_INSERT_TAIL(link->unsettled, dlv);
             dlv->where = QDR_DELIVERY_IN_UNSETTLED;
+
+            //
+            // Note, in this case the ref_count is left unchanged as we are 
transferring
+            // the action's reference to the unsettled list's reference.
+            //
+        } else {
+            //
+            // If the delivery is settled, decrement the ref_count on the 
delivery.
+            // This count was the owned-by-action count.
+            //
+            qdr_delivery_decref(dlv);
         }
         return;
     }
@@ -460,8 +517,15 @@ static void qdr_link_deliver_CT(qdr_core_t *core, 
qdr_action_t *action, bool dis
         qdr_address_t *addr = link->owning_addr;
         if (!addr && dlv->to_addr)
             qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr);
+
+        //
+        // Give the action reference to the qdr_link_forward function.
+        //
         qdr_link_forward_CT(core, link, dlv, addr);
     } else {
+        //
+        // Take the action reference and use it for undelivered.  Don't 
decref/incref.
+        //
         DEQ_INSERT_TAIL(link->undelivered, dlv);
         dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
     }
@@ -496,11 +560,13 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard)
 
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard)
 {
-    qdr_delivery_t *dlv     = action->args.delivery.delivery;
-    qdr_delivery_t *peer    = dlv->peer;
-    bool            push    = false;
-    uint64_t        disp    = action->args.delivery.disposition;
-    bool            settled = action->args.delivery.settled;
+    qdr_delivery_t *dlv        = action->args.delivery.delivery;
+    qdr_delivery_t *peer       = dlv->peer;
+    bool            push       = false;
+    bool            peer_moved = false;
+    bool            dlv_moved  = false;
+    uint64_t        disp       = action->args.delivery.disposition;
+    bool            settled    = action->args.delivery.settled;
 
     //
     // Logic:
@@ -526,21 +592,36 @@ static void qdr_update_delivery_CT(qdr_core_t *core, 
qdr_action_t *action, bool
             peer->settled = true;
             peer->peer = 0;
             dlv->peer  = 0;
+
+            qdr_delivery_decref(dlv);
+            qdr_delivery_decref(peer);
+
             if (peer->link) {
-                bool moved = qdr_delivery_settled_CT(core, peer);
-                if (moved)
+                peer_moved = qdr_delivery_settled_CT(core, peer);
+                if (peer_moved)
                     push = true;
             }
         }
 
         if (dlv->link)
-            qdr_delivery_settled_CT(core, dlv);
-
-        qdr_delivery_free(dlv);
+            dlv_moved = qdr_delivery_settled_CT(core, dlv);
     }
 
     if (push)
         qdr_delivery_push_CT(core, peer);
+
+    //
+    // Release the action reference, possibly freeing the delivery
+    //
+    qdr_delivery_decref(dlv);
+
+    //
+    // Release the unsettled references if the deliveries were moved
+    //
+    if (dlv_moved)
+        qdr_delivery_decref(dlv);
+    if (peer_moved)
+        qdr_delivery_decref(peer);
 }
 
 
@@ -639,6 +720,7 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t 
*dlv)
 
     sys_mutex_lock(link->conn->work_lock);
     if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
+        dlv->ref_count++; // We have the lock, don't use the incref function
         qdr_add_delivery_ref(&link->updated_deliveries, dlv);
         qdr_add_link_ref(&link->conn->links_with_deliveries, link, 
QDR_LINK_LIST_CLASS_DELIVERY);
         activate = true;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a19ec1eb/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index c3ef5c2..ff5c58f 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -251,6 +251,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, 
pn_delivery_t *pnd)
             else {
                 pn_delivery_set_context(pnd, delivery);
                 qdr_delivery_set_context(delivery, pnd);
+                qdr_delivery_incref(delivery);
             }
         }
         return;
@@ -329,6 +330,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, 
pn_delivery_t *pnd)
             else {
                 pn_delivery_set_context(pnd, delivery);
                 qdr_delivery_set_context(delivery, pnd);
+                qdr_delivery_incref(delivery);
             }
         } else {
             //
@@ -369,23 +371,28 @@ static void AMQP_disposition_handler(void* context, 
qd_link_t *link, pn_delivery
 {
     qd_router_t    *router   = (qd_router_t*) context;
     qdr_delivery_t *delivery = (qdr_delivery_t*) pn_delivery_get_context(pnd);
-
-    if (!delivery)
-        return;
+    bool            give_reference = false;
 
     //
     // If the delivery is settled, remove the linkage between the PN and QDR 
deliveries.
     //
-    if (pn_delivery_settled(pnd)) {
+    if (pn_delivery_settled(pnd) && !!delivery) {
         pn_delivery_set_context(pnd, 0);
         qdr_delivery_set_context(delivery, 0);
+
+        //
+        // Don't decref the delivery here.  Rather, we will _give_ the 
reference to the core.
+        //
+        give_reference = true;
     }
 
     //
     // Update the disposition of the delivery
     //
-    qdr_delivery_update_disposition(router->router_core, delivery,
-                                    pn_delivery_remote_state(pnd), 
pn_delivery_settled(pnd));
+    if (!!delivery)
+        qdr_delivery_update_disposition(router->router_core, delivery,
+                                        pn_delivery_remote_state(pnd), 
pn_delivery_settled(pnd),
+                                        give_reference);
 
     //
     // If settled, close out the delivery
@@ -789,10 +796,10 @@ static void CORE_link_push(void *context, qdr_link_t 
*link)
 static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t 
*dlv, bool settled)
 {
     qd_router_t *router = (qd_router_t*) context;
-    qd_link_t  *qlink   = (qd_link_t*) qdr_link_get_context(link);
-    pn_link_t  *plink   = qd_link_pn(qlink);
-    const char *tag;
-    int         tag_length;
+    qd_link_t   *qlink  = (qd_link_t*) qdr_link_get_context(link);
+    pn_link_t   *plink  = qd_link_pn(qlink);
+    const char  *tag;
+    int          tag_length;
 
     qdr_delivery_tag(dlv, &tag, &tag_length);
 
@@ -807,13 +814,14 @@ static void CORE_link_deliver(void *context, qdr_link_t 
*link, qdr_delivery_t *d
     if (!settled && !remote_snd_settled) {
         pn_delivery_set_context(pdlv, dlv);
         qdr_delivery_set_context(dlv, pdlv);
+        qdr_delivery_incref(dlv);
     }
 
     qd_message_send(qdr_delivery_message(dlv), qlink, 
qdr_link_strip_annotations_out(link));
 
     if (!settled && remote_snd_settled)
         // Tell the core that the delivery has been accepted and settled, 
since we are settling on behalf of the receiver
-        qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, 
true);
+        qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, 
true, false);
 
     if (settled || remote_snd_settled)
         pn_delivery_settle(pdlv);
@@ -842,6 +850,7 @@ static void CORE_delivery_update(void *context, 
qdr_delivery_t *dlv, uint64_t di
         qdr_delivery_set_context(dlv, 0);
         pn_delivery_set_context(pnd, 0);
         pn_delivery_settle(pnd);
+        qdr_delivery_decref(dlv);
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to