Repository: qpid-dispatch Updated Branches: refs/heads/master 33cbeede4 -> 522760691
DISPATCH-334 - Ensure that undelivered messages are only pushed out on outgoing links. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/52276069 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/52276069 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/52276069 Branch: refs/heads/master Commit: 522760691e20976e298352253d25c805881dcd5f Parents: 33cbeed Author: Ted Ross <[email protected]> Authored: Mon May 16 13:23:30 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Mon May 16 13:26:15 2016 -0400 ---------------------------------------------------------------------- src/router_core/connections.c | 13 +++++++++ src/router_core/transfer.c | 56 ++++++++++++++++++++------------------ 2 files changed, 42 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/52276069/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 547b49e..229a49b 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -440,7 +440,20 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li sys_mutex_lock(conn->work_lock); DEQ_MOVE(link->updated_deliveries, updated_deliveries); DEQ_MOVE(link->undelivered, undelivered); + qdr_delivery_t *d = DEQ_HEAD(undelivered); + while (d) { + assert(d->where == QDR_DELIVERY_IN_UNDELIVERED); + d->where = QDR_DELIVERY_NOWHERE; + d = DEQ_NEXT(d); + } + DEQ_MOVE(link->unsettled, unsettled); + d = DEQ_HEAD(unsettled); + while (d) { + assert(d->where == QDR_DELIVERY_IN_UNSETTLED); + d->where = QDR_DELIVERY_NOWHERE; + d = DEQ_NEXT(d); + } sys_mutex_unlock(conn->work_lock); // http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/52276069/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 54a4448..8a03875 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -107,36 +107,38 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) int offer = -1; bool settled = false; - while (credit > 0 && !drained) { - sys_mutex_lock(conn->work_lock); - dlv = DEQ_HEAD(link->undelivered); - if (dlv) { - DEQ_REMOVE_HEAD(link->undelivered); - settled = dlv->settled; - if (!settled) { - DEQ_INSERT_TAIL(link->unsettled, dlv); - dlv->where = QDR_DELIVERY_IN_UNSETTLED; + if (link->link_direction == QD_OUTGOING) { + while (credit > 0 && !drained) { + sys_mutex_lock(conn->work_lock); + dlv = DEQ_HEAD(link->undelivered); + if (dlv) { + DEQ_REMOVE_HEAD(link->undelivered); + settled = dlv->settled; + if (!settled) { + DEQ_INSERT_TAIL(link->unsettled, dlv); + dlv->where = QDR_DELIVERY_IN_UNSETTLED; + } else + dlv->where = QDR_DELIVERY_NOWHERE; + credit--; + link->total_deliveries++; + offer = DEQ_SIZE(link->undelivered); } else - dlv->where = QDR_DELIVERY_NOWHERE; - credit--; - link->total_deliveries++; - offer = DEQ_SIZE(link->undelivered); - } else - drained = true; - sys_mutex_unlock(conn->work_lock); - - if (dlv) { - link->credit_to_core--; - core->deliver_handler(core->user_context, link, dlv, settled); - if (settled) - qdr_delivery_free(dlv); + drained = true; + sys_mutex_unlock(conn->work_lock); + + if (dlv) { + link->credit_to_core--; + core->deliver_handler(core->user_context, link, dlv, settled); + if (settled) + qdr_delivery_free(dlv); + } } - } - if (drained) - core->drained_handler(core->user_context, link); - else if (offer != -1) - core->offer_handler(core->user_context, link, offer); + if (drained) + core->drained_handler(core->user_context, link); + else if (offer != -1) + core->offer_handler(core->user_context, link, offer); + } // // Handle disposition/settlement updates --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
