Repository: qpid-dispatch Updated Branches: refs/heads/master 152185eb9 -> 0e2a34abd
DISPATCH-523 - Make sure deliveries that _should_ be deliverable but are not get released. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0e2a34ab Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0e2a34ab Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0e2a34ab Branch: refs/heads/master Commit: 0e2a34abd3387f95c53325ddc92cf781070b4330 Parents: 152185e Author: Ted Ross <[email protected]> Authored: Fri Sep 23 17:11:36 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Fri Sep 23 17:11:36 2016 -0400 ---------------------------------------------------------------------- src/router_core/transfer.c | 65 ++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0e2a34ab/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 312b866..00ec719 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -483,8 +483,36 @@ static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, boo } -static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr) +/** + * Return the number of outbound paths to destinations that this address has. + * Note that even if there are more than zero paths, the destination still may + * be unreachable (e.g. an rnode next hop with no link). + */ +static long qdr_addr_path_count_CT(qdr_address_t *addr) +{ + return (long) DEQ_SIZE(addr->subscriptions) + (long) DEQ_SIZE(addr->rlinks) + + (long) qd_bitmask_cardinality(addr->rnodes); +} + + +static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr) { + if (addr && addr == link->owning_addr && qdr_addr_path_count_CT(addr) == 0) { + // + // We are trying to forward a delivery on an address that has no outbound paths + // AND the incoming link is targeted (not anonymous). In this case, we must put + // the delivery on the incoming link's undelivered list. Note that it is safe + // to do this because the undelivered list will be flushed once the number of + // paths transitions from zero to one. + // + // Use the action-reference as the reference for undelivered rather + // than decrementing and incrementing the delivery ref_count. + // + DEQ_INSERT_TAIL(link->undelivered, dlv); + dlv->where = QDR_DELIVERY_IN_UNDELIVERED; + return; + } + int fanout = 0; if (addr) { @@ -495,28 +523,15 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_ } if (fanout == 0) { - if (link->owning_addr) { - // - // Message was not delivered and the link is not anonymous. - // Queue the message for later delivery (when the address gets - // a valid destination). - // - // Use the action-reference as the reference for undelivered rather - // than decrementing and incrementing the delivery ref_count. - // - DEQ_INSERT_TAIL(link->undelivered, dlv); - dlv->where = QDR_DELIVERY_IN_UNDELIVERED; - } else { - // - // Message was not delivered and the link is anonymous, drop the delivery. - // - // If the delivery is not settled, release it. - // - if (!dlv->settled) - qdr_delivery_release_CT(core, dlv); - qdr_delivery_decref(dlv); - qdr_link_issue_credit_CT(core, link, 1, false); - } + // + // Message was not delivered, drop the delivery. + // + // If the delivery is not settled, release it. + // + if (!dlv->settled) + qdr_delivery_release_CT(core, dlv); + qdr_delivery_decref(dlv); + qdr_link_issue_credit_CT(core, link, 1, false); } else if (fanout > 0) { if (dlv->settled) { // @@ -547,8 +562,6 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_ qdr_link_issue_credit_CT(core, link, 1, false); } } - - return fanout; } @@ -769,7 +782,7 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr) if (DEQ_SIZE(addr->inlinks) == 0) return; - if (DEQ_SIZE(addr->subscriptions) + DEQ_SIZE(addr->rlinks) + qd_bitmask_cardinality(addr->rnodes) == 1) { + if (qdr_addr_path_count_CT(addr) == 1) { qdr_link_ref_t *ref = DEQ_HEAD(addr->inlinks); while (ref) { qdr_link_t *link = ref->link; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
