Repository: qpid-dispatch Updated Branches: refs/heads/tross-DISPATCH-179-1 54c923d63 -> ccf4c7086
DISPATCH-179 - Delivery disposition and settlement now propagate across the core. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ccf4c708 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ccf4c708 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ccf4c708 Branch: refs/heads/tross-DISPATCH-179-1 Commit: ccf4c708668dc0aa8e3fc97f5f86115715a1ecb6 Parents: 54c923d Author: Ted Ross <[email protected]> Authored: Wed Jan 27 18:06:59 2016 -0500 Committer: Ted Ross <[email protected]> Committed: Wed Jan 27 18:06:59 2016 -0500 ---------------------------------------------------------------------- include/qpid/dispatch/router_core.h | 1 + src/router_core/forwarder.c | 13 +++-- src/router_core/transfer.c | 81 +++++++++++++++++++++++++++----- src/router_node.c | 16 +++++-- 4 files changed, 91 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf4c708/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 5f63b0d..c081b6e 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -527,6 +527,7 @@ void qdr_connection_handlers(qdr_core_t *core, * Delivery functions ****************************************************************************** */ +void qdr_delivery_free(qdr_delivery_t *delivery); void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disp); void qdr_delivery_settle(qdr_core_t *core, qdr_delivery_t *delivery); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf4c708/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 30a6d9c..90e210f 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -54,13 +54,18 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *pe ZERO(dlv); dlv->link = link; - dlv->peer = peer; dlv->msg = qd_message_copy(msg); dlv->settled = !peer || peer->settled; dlv->tag = core->next_tag++; - if (peer && peer->peer == 0) - peer->peer = dlv; // TODO - make this a back-list for multicast tracking + // + // Create peer linkage only if the delivery is not settled + // + if (!dlv->settled) { + dlv->peer = peer; + if (peer && peer->peer == 0) + peer->peer = dlv; // TODO - make this a back-list for multicast tracking + } return dlv; } @@ -244,7 +249,7 @@ int qdr_forward_closest_CT(qdr_core_t *core, // // If the incoming delivery is not settled, it should be accepted and settled here. // - if (in_delivery) { + if (in_delivery && !in_delivery->settled) { in_delivery->disposition = PN_ACCEPTED; in_delivery->settled = true; qdr_delivery_push_CT(core, in_delivery); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf4c708/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 5c44a7f..af88530 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -116,15 +116,19 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) dlv = DEQ_HEAD(link->undelivered); if (dlv) { DEQ_REMOVE_HEAD(link->undelivered); - DEQ_INSERT_TAIL(link->unsettled, dlv); + if (!dlv->settled) + DEQ_INSERT_TAIL(link->unsettled, dlv); credit--; offer = DEQ_SIZE(link->undelivered); } else drained = true; sys_mutex_unlock(conn->work_lock); - if (dlv) + if (dlv) { core->deliver_handler(core->user_context, link, dlv, dlv->settled); + if (dlv->settled) + qdr_delivery_free(dlv); + } } if (drained) @@ -143,6 +147,8 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries); while (ref) { core->delivery_update_handler(core->user_context, ref->dlv, ref->dlv->disposition, ref->dlv->settled); + if (ref->dlv->settled) + qdr_delivery_free(ref->dlv); qdr_del_delivery_ref(&updated_deliveries, ref); ref = DEQ_HEAD(updated_deliveries); } @@ -173,6 +179,14 @@ void qdr_send_to2(qdr_core_t *core, qd_message_t *msg, const char *addr, bool ex } +void qdr_delivery_free(qdr_delivery_t *delivery) +{ + if (delivery->msg) + qd_message_free(delivery->msg); + free_qdr_delivery_t(delivery); +} + + void qdr_delivery_update_disposition(qdr_core_t *core, qdr_delivery_t *delivery, uint64_t disposition) { qdr_action_t *action = qdr_action(qdr_update_delivery_CT, "update_delivery"); @@ -259,7 +273,8 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis qdr_delivery_t *dlv = action->args.connection.delivery; qd_bitmask_t *link_exclude = action->args.connection.link_exclusion; qdr_link_t *link = dlv->link; - int count = 0; + int fanout = 0; + bool presettled = dlv->settled; // // NOTE: The link->undelivered list does not need to be protected by the @@ -272,11 +287,11 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis if (!addr && dlv->to_addr) qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr); if (addr) - count = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false, - link->link_type == QD_LINK_CONTROL, link_exclude); + fanout = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false, + link->link_type == QD_LINK_CONTROL, link_exclude); } - if (count == 0) { + if (fanout == 0) { if (link->owning_addr) { // // Message was not delivered and the link is not anonymous. @@ -288,20 +303,24 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis // // TODO - Release the delivery // + printf("TODO fanout == 0\n"); } - } else if (count == 1) { - if (dlv->settled) + } else if (fanout == 1) { + if (presettled) { // // The delivery was pre-settled. Issue replacement credit now that it's // been forwarded. // qdr_link_issue_credit_CT(core, link, 1); - else + assert(!dlv->peer); + qdr_delivery_free(dlv); + } else DEQ_INSERT_TAIL(link->unsettled, dlv); } else { // - // The count is greater than one. Do something! TODO + // The fanout is greater than one. Do something! TODO // + printf("TODO fanout > 1\n"); } } @@ -334,15 +353,55 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard) static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { qdr_delivery_t *dlv = action->args.delivery.delivery; + qdr_delivery_t *peer = dlv->peer; + bool push = false; uint64_t disp = action->args.delivery.disposition; - // bool settled = action->args.delivery.settled; + bool settled = action->args.delivery.settled; + + // + // Logic: + // + // If disposition has changed and there is a peer link, set the disposition of the peer + // If settled, the delivery must be unlinked and freed. + // If settled and there is a peer, the peer shall be settled and unlinked. It shall not + // be freed until the connection-side thread settles the PN delivery. + // if (disp != dlv->disposition) { // // Disposition has changed, propagate the change to the peer delivery. // dlv->disposition = disp; + if (peer) { + peer->disposition = disp; + push = true; + } + } + + if (settled) { + if (peer) { + peer->settled = true; + push = true; + peer->peer = 0; + dlv->peer = 0; + if (peer->link) { + DEQ_REMOVE(peer->link->unsettled, peer); + if (peer->link->link_direction == QD_INCOMING) + qdr_link_issue_credit_CT(core, peer->link, 1); + } + } + + if (dlv->link) { + DEQ_REMOVE(dlv->link->unsettled, dlv); + if (dlv->link->link_direction == QD_INCOMING) + qdr_link_issue_credit_CT(core, dlv->link, 1); + } + + qdr_delivery_free(dlv); } + + if (push) + qdr_delivery_push_CT(core, peer); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ccf4c708/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index de09c5b..b16b862 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -264,7 +264,7 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd // qd_message_depth_t validation_depth = anonymous_link ? QD_DEPTH_PROPERTIES : QD_DEPTH_MESSAGE_ANNOTATIONS; bool valid_message = qd_message_check(msg, validation_depth); - qdr_delivery_t *delivery; + qdr_delivery_t *delivery = 0; if (valid_message) { qd_parsed_field_t *in_ma = qd_message_message_annotations(msg); @@ -299,8 +299,12 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions); if (delivery) { - pn_delivery_set_context(pnd, delivery); - qdr_delivery_set_context(delivery, pnd); + if (pn_delivery_settled(pnd)) + pn_delivery_settle(pnd); + else { + pn_delivery_set_context(pnd, delivery); + qdr_delivery_set_context(delivery, pnd); + } } else { // // The message is now and will always be unroutable because there is no address. @@ -680,8 +684,10 @@ static void qd_router_link_deliver(void *context, qdr_link_t *link, qdr_delivery pn_delivery(plink, pn_dtag(tag, tag_length)); pn_delivery_t *pdlv = pn_link_current(plink); - pn_delivery_set_context(pdlv, dlv); - qdr_delivery_set_context(dlv, pdlv); + if (!settled) { + pn_delivery_set_context(pdlv, dlv); + qdr_delivery_set_context(dlv, pdlv); + } qd_message_send(qdr_delivery_message(dlv), qlink, qdr_link_strip_annotations_out(link)); if (settled) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
