Repository: qpid-dispatch Updated Branches: refs/heads/master 0d978ef4b -> b765c1b84
DISPATCH-343 - Updated lifecycle management for deliveries Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a19ec1eb Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a19ec1eb Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a19ec1eb Branch: refs/heads/master Commit: a19ec1ebb29a7aa7108ba2a02b5d5b5afa4bcd07 Parents: 0d978ef Author: Ted Ross <[email protected]> Authored: Wed Jun 1 18:04:18 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Thu Jun 2 07:50:16 2016 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/router_core.h | 6 +- src/router_core/connections.c | 7 +- src/router_core/forwarder.c | 6 +- src/router_core/router_core_private.h | 2 +- src/router_core/transfer.c | 152 ++++++++++++++++++++++------- src/router_node.c | 31 +++--- 6 files changed, 152 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a19ec1eb/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 4d9b7c6..459e1a3 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -549,11 +549,13 @@ void qdr_connection_handlers(qdr_core_t *core, * Delivery functions ****************************************************************************** */ -void qdr_delivery_free(qdr_delivery_t *delivery); -void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disp, bool settled); +void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disp, + bool settled, bool ref_given); void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context); void *qdr_delivery_get_context(qdr_delivery_t *delivery); +void qdr_delivery_incref(qdr_delivery_t *delivery); +void qdr_delivery_decref(qdr_delivery_t *delivery); void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length); qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a19ec1eb/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 90dd778..8b2b114 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -462,6 +462,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries); while (ref) { qdr_del_delivery_ref(&updated_deliveries, ref); + qdr_delivery_decref(ref->dlv); ref = DEQ_HEAD(updated_deliveries); } @@ -475,11 +476,12 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li while (dlv) { DEQ_REMOVE_HEAD(undelivered); peer = dlv->peer; - qdr_delivery_free(dlv); if (peer) { peer->peer = 0; qdr_delivery_release_CT(core, peer); + qdr_delivery_decref(peer); } + qdr_delivery_decref(dlv); dlv = DEQ_HEAD(undelivered); } @@ -498,12 +500,13 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li } peer = dlv->peer; - qdr_delivery_free(dlv); if (peer) { peer->peer = 0; if (link->link_direction == QD_OUTGOING) qdr_delivery_release_CT(core, peer); + qdr_delivery_decref(peer); } + qdr_delivery_decref(dlv); dlv = DEQ_HEAD(unsettled); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a19ec1eb/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 4890e78..996788b 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -116,7 +116,10 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in if (!dlv->settled) { if (in_dlv && in_dlv->peer == 0) { dlv->peer = in_dlv; - in_dlv->peer = dlv; // TODO - make this a back-list for multicast tracking + in_dlv->peer = dlv; + + dlv->ref_count = 1; + qdr_delivery_incref(in_dlv); } } @@ -129,6 +132,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t * sys_mutex_lock(link->conn->work_lock); DEQ_INSERT_TAIL(link->undelivered, dlv); dlv->where = QDR_DELIVERY_IN_UNDELIVERED; + dlv->ref_count++; // We have the lock, don't use the incref function // // If the link isn't already on the links_with_deliveries list, put it there. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a19ec1eb/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 36e3ec0..58d73a2 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -199,6 +199,7 @@ typedef enum { struct qdr_delivery_t { DEQ_LINKS(qdr_delivery_t); void *context; + int ref_count; qdr_link_t *link; qdr_delivery_t *peer; qd_message_t *msg; @@ -591,7 +592,6 @@ void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action); void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit); void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr); void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv); -void qdr_delivery_free(qdr_delivery_t *delivery); void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery); bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery); void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a19ec1eb/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 8a03875..4ac5e1a 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -43,6 +43,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_field_i qdr_delivery_t *dlv = new_qdr_delivery_t(); ZERO(dlv); + dlv->ref_count = 1; // referenced by the action dlv->link = link; dlv->msg = msg; dlv->to_addr = 0; @@ -64,6 +65,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg, qdr_delivery_t *dlv = new_qdr_delivery_t(); ZERO(dlv); + dlv->ref_count = 1; // referenced by the action dlv->link = link; dlv->msg = msg; dlv->to_addr = addr; @@ -87,9 +89,10 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t * qdr_delivery_t *dlv = new_qdr_delivery_t(); ZERO(dlv); - dlv->link = link; - dlv->msg = msg; - dlv->settled = settled; + dlv->ref_count = 1; // referenced by the action + dlv->link = link; + dlv->msg = msg; + dlv->settled = settled; action->args.connection.delivery = dlv; action->args.connection.tag_length = tag_length; @@ -119,6 +122,7 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) dlv->where = QDR_DELIVERY_IN_UNSETTLED; } else dlv->where = QDR_DELIVERY_NOWHERE; + credit--; link->total_deliveries++; offer = DEQ_SIZE(link->undelivered); @@ -130,7 +134,7 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) link->credit_to_core--; core->deliver_handler(core->user_context, link, dlv, settled); if (settled) - qdr_delivery_free(dlv); + qdr_delivery_decref(dlv); } } @@ -151,8 +155,7 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries); while (ref) { core->delivery_update_handler(core->user_context, ref->dlv, ref->dlv->disposition, ref->dlv->settled); - if (ref->dlv->settled) - qdr_delivery_free(ref->dlv); + qdr_delivery_decref(ref->dlv); qdr_del_delivery_ref(&updated_deliveries, ref); ref = DEQ_HEAD(updated_deliveries); } @@ -205,24 +208,22 @@ void qdr_send_to2(qdr_core_t *core, qd_message_t *msg, const char *addr, bool ex } -void qdr_delivery_free(qdr_delivery_t *delivery) -{ - if (delivery->msg) - qd_message_free(delivery->msg); - if (delivery->to_addr) - qd_field_iterator_free(delivery->to_addr); - qd_bitmask_free(delivery->link_exclusion); - free_qdr_delivery_t(delivery); -} - - -void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition, bool settled) +void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition, + bool settled, bool ref_given) { qdr_action_t *action = qdr_action(qdr_update_delivery_CT, "update_delivery"); action->args.delivery.delivery = delivery; action->args.delivery.disposition = disposition; action->args.delivery.settled = settled; + // + // The delivery's ref_count must be incremented to protect its travels into the + // core thread. If the caller has given its reference to us, we can simply use + // the given ref rather than increment a new one. + // + if (!ref_given) + qdr_delivery_incref(delivery); + qdr_action_enqueue(core, action); } @@ -239,6 +240,41 @@ void *qdr_delivery_get_context(qdr_delivery_t *delivery) } +void qdr_delivery_incref(qdr_delivery_t *delivery) +{ + qdr_connection_t *conn = delivery->link ? delivery->link->conn : 0; + + if (!!conn) { + sys_mutex_lock(conn->work_lock); + delivery->ref_count++; + sys_mutex_unlock(conn->work_lock); + } +} + + +void qdr_delivery_decref(qdr_delivery_t *delivery) +{ + qdr_connection_t *conn = delivery->link ? delivery->link->conn : 0; + bool delete = false; + + if (!!conn) { + sys_mutex_lock(conn->work_lock); + assert(delivery->ref_count > 0); + delete = --delivery->ref_count == 0; + sys_mutex_unlock(conn->work_lock); + } + + if (delete) { + if (delivery->msg) + qd_message_free(delivery->msg); + if (delivery->to_addr) + qd_field_iterator_free(delivery->to_addr); + qd_bitmask_free(delivery->link_exclusion); + free_qdr_delivery_t(delivery); + } +} + + void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *length) { *tag = (const char*) delivery->tag; @@ -266,6 +302,12 @@ void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *dlv) if (push || moved) qdr_delivery_push_CT(core, dlv); + + // + // Remove the unsettled reference + // + if (moved) + qdr_delivery_decref(dlv); } @@ -358,8 +400,7 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr) { - int fanout = 0; - bool presettled = dlv->settled; + int fanout = 0; if (addr) { fanout = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL); @@ -375,6 +416,9 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_ // Queue the message for later delivery (when the address gets // a valid destination). // + // Use the action-reference as the reference for undelivered rather + // than decrementing and incrementing the delivery ref_count. + // DEQ_INSERT_TAIL(link->undelivered, dlv); dlv->where = QDR_DELIVERY_IN_UNDELIVERED; } else { @@ -382,6 +426,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_ // Release the delivery // qdr_delivery_release_CT(core, dlv); + qdr_delivery_decref(dlv); if (link->link_type == QD_LINK_ROUTER) qdr_link_issue_credit_CT(core, link, 1); } @@ -394,13 +439,14 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_ qdr_link_issue_credit_CT(core, link, 1); // - // If the delivery was pre-settled, free it now. + // If the delivery has no more references, free it now. // - if (presettled) { - assert(!dlv->peer); - qdr_delivery_free(dlv); - } + assert(!dlv->peer); + qdr_delivery_decref(dlv); } else { + // + // Again, don't bother decrementing then incrementing the ref_count + // DEQ_INSERT_TAIL(link->unsettled, dlv); dlv->where = QDR_DELIVERY_IN_UNSETTLED; @@ -446,6 +492,17 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis if (!dlv->settled) { DEQ_INSERT_TAIL(link->unsettled, dlv); dlv->where = QDR_DELIVERY_IN_UNSETTLED; + + // + // Note, in this case the ref_count is left unchanged as we are transferring + // the action's reference to the unsettled list's reference. + // + } else { + // + // If the delivery is settled, decrement the ref_count on the delivery. + // This count was the owned-by-action count. + // + qdr_delivery_decref(dlv); } return; } @@ -460,8 +517,15 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis qdr_address_t *addr = link->owning_addr; if (!addr && dlv->to_addr) qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr); + + // + // Give the action reference to the qdr_link_forward function. + // qdr_link_forward_CT(core, link, dlv, addr); } else { + // + // Take the action reference and use it for undelivered. Don't decref/incref. + // DEQ_INSERT_TAIL(link->undelivered, dlv); dlv->where = QDR_DELIVERY_IN_UNDELIVERED; } @@ -496,11 +560,13 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard) static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { - qdr_delivery_t *dlv = action->args.delivery.delivery; - qdr_delivery_t *peer = dlv->peer; - bool push = false; - uint64_t disp = action->args.delivery.disposition; - bool settled = action->args.delivery.settled; + qdr_delivery_t *dlv = action->args.delivery.delivery; + qdr_delivery_t *peer = dlv->peer; + bool push = false; + bool peer_moved = false; + bool dlv_moved = false; + uint64_t disp = action->args.delivery.disposition; + bool settled = action->args.delivery.settled; // // Logic: @@ -526,21 +592,36 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool peer->settled = true; peer->peer = 0; dlv->peer = 0; + + qdr_delivery_decref(dlv); + qdr_delivery_decref(peer); + if (peer->link) { - bool moved = qdr_delivery_settled_CT(core, peer); - if (moved) + peer_moved = qdr_delivery_settled_CT(core, peer); + if (peer_moved) push = true; } } if (dlv->link) - qdr_delivery_settled_CT(core, dlv); - - qdr_delivery_free(dlv); + dlv_moved = qdr_delivery_settled_CT(core, dlv); } if (push) qdr_delivery_push_CT(core, peer); + + // + // Release the action reference, possibly freeing the delivery + // + qdr_delivery_decref(dlv); + + // + // Release the unsettled references if the deliveries were moved + // + if (dlv_moved) + qdr_delivery_decref(dlv); + if (peer_moved) + qdr_delivery_decref(peer); } @@ -639,6 +720,7 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv) sys_mutex_lock(link->conn->work_lock); if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) { + dlv->ref_count++; // We have the lock, don't use the incref function qdr_add_delivery_ref(&link->updated_deliveries, dlv); qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY); activate = true; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a19ec1eb/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index c3ef5c2..ff5c58f 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -251,6 +251,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd) else { pn_delivery_set_context(pnd, delivery); qdr_delivery_set_context(delivery, pnd); + qdr_delivery_incref(delivery); } } return; @@ -329,6 +330,7 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd) else { pn_delivery_set_context(pnd, delivery); qdr_delivery_set_context(delivery, pnd); + qdr_delivery_incref(delivery); } } else { // @@ -369,23 +371,28 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery { qd_router_t *router = (qd_router_t*) context; qdr_delivery_t *delivery = (qdr_delivery_t*) pn_delivery_get_context(pnd); - - if (!delivery) - return; + bool give_reference = false; // // If the delivery is settled, remove the linkage between the PN and QDR deliveries. // - if (pn_delivery_settled(pnd)) { + if (pn_delivery_settled(pnd) && !!delivery) { pn_delivery_set_context(pnd, 0); qdr_delivery_set_context(delivery, 0); + + // + // Don't decref the delivery here. Rather, we will _give_ the reference to the core. + // + give_reference = true; } // // Update the disposition of the delivery // - qdr_delivery_update_disposition(router->router_core, delivery, - pn_delivery_remote_state(pnd), pn_delivery_settled(pnd)); + if (!!delivery) + qdr_delivery_update_disposition(router->router_core, delivery, + pn_delivery_remote_state(pnd), pn_delivery_settled(pnd), + give_reference); // // If settled, close out the delivery @@ -789,10 +796,10 @@ static void CORE_link_push(void *context, qdr_link_t *link) static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *dlv, bool settled) { qd_router_t *router = (qd_router_t*) context; - qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); - pn_link_t *plink = qd_link_pn(qlink); - const char *tag; - int tag_length; + qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); + pn_link_t *plink = qd_link_pn(qlink); + const char *tag; + int tag_length; qdr_delivery_tag(dlv, &tag, &tag_length); @@ -807,13 +814,14 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d if (!settled && !remote_snd_settled) { pn_delivery_set_context(pdlv, dlv); qdr_delivery_set_context(dlv, pdlv); + qdr_delivery_incref(dlv); } qd_message_send(qdr_delivery_message(dlv), qlink, qdr_link_strip_annotations_out(link)); if (!settled && remote_snd_settled) // Tell the core that the delivery has been accepted and settled, since we are settling on behalf of the receiver - qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true); + qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true, false); if (settled || remote_snd_settled) pn_delivery_settle(pdlv); @@ -842,6 +850,7 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di qdr_delivery_set_context(dlv, 0); pn_delivery_set_context(pnd, 0); pn_delivery_settle(pnd); + qdr_delivery_decref(dlv); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
