Repository: qpid-dispatch Updated Branches: refs/heads/master a94a7a9b7 -> 25d8513e3
DISPATCH-179 - Fixed race condition in the sender-settles-first case. All tests pass. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/25d8513e Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/25d8513e Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/25d8513e Branch: refs/heads/master Commit: 25d8513e373209e3f8ab4ca83a45c32b1e3d0382 Parents: a94a7a9 Author: Ted Ross <[email protected]> Authored: Wed Mar 30 17:32:47 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Wed Mar 30 17:33:50 2016 -0400 ---------------------------------------------------------------------- src/router_core/router_core_private.h | 2 +- src/router_core/transfer.c | 37 +++++++++++++++++------------- tests/system_tests_two_routers.py | 4 ++-- 3 files changed, 24 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/25d8513e/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 1a61620..fed9f88 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -569,7 +569,7 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr); void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv); void qdr_delivery_free(qdr_delivery_t *delivery); void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery); -void qdr_delivery_remove_unsettled_CT(qdr_core_t *core, qdr_delivery_t *delivery); +bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery); void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query); void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/25d8513e/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 8b2d574..883801b 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -98,14 +98,16 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) qdr_connection_t *conn = link->conn; qdr_delivery_t *dlv; bool drained = false; - int offer = -1; + int offer = -1; + bool settled = false; while (credit > 0 && !drained) { sys_mutex_lock(conn->work_lock); dlv = DEQ_HEAD(link->undelivered); if (dlv) { DEQ_REMOVE_HEAD(link->undelivered); - if (!dlv->settled) { + settled = dlv->settled; + if (!settled) { DEQ_INSERT_TAIL(link->unsettled, dlv); dlv->where = QDR_DELIVERY_IN_UNSETTLED; } else @@ -119,8 +121,8 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) if (dlv) { link->credit_to_core--; - core->deliver_handler(core->user_context, link, dlv, dlv->settled); - if (dlv->settled) + core->deliver_handler(core->user_context, link, dlv, settled); + if (settled) qdr_delivery_free(dlv); } } @@ -251,18 +253,18 @@ void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery) } -void qdr_delivery_remove_unsettled_CT(qdr_core_t *core, qdr_delivery_t *dlv) +bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv) { // // Remove a delivery from its unsettled list. Side effects include issuing // replacement credit and visiting the link-quiescence algorithm // - qdr_link_t *link = dlv->link; - qdr_connection_t *conn = link ? link->conn : 0; - bool issue_credit = false; + qdr_link_t *link = dlv->link; + qdr_connection_t *conn = link ? link->conn : 0; + bool moved = false; if (!link || !conn) - return; + return false; // // The lock needs to be acquired only for outgoing links @@ -273,7 +275,7 @@ void qdr_delivery_remove_unsettled_CT(qdr_core_t *core, qdr_delivery_t *dlv) if (dlv->where == QDR_DELIVERY_IN_UNSETTLED) { DEQ_REMOVE(link->unsettled, dlv); dlv->where = QDR_DELIVERY_NOWHERE; - issue_credit = true; + moved = true; } if (link->link_direction == QD_OUTGOING) @@ -283,8 +285,10 @@ void qdr_delivery_remove_unsettled_CT(qdr_core_t *core, qdr_delivery_t *dlv) // If this is an incoming link and it is not link-routed, issue // one replacement credit on the link. // - if (issue_credit && link->link_direction == QD_INCOMING && !link->connected_link) + if (moved && link->link_direction == QD_INCOMING && !link->connected_link) qdr_link_issue_credit_CT(core, link, 1); + + return moved; } @@ -460,7 +464,6 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool // If settled and there is a peer, the peer shall be settled and unlinked. It shall not // be freed until the connection-side thread settles the PN delivery. // - if (disp != dlv->disposition) { // // Disposition has changed, propagate the change to the peer delivery. @@ -475,15 +478,17 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool if (settled) { if (peer) { peer->settled = true; - push = true; peer->peer = 0; dlv->peer = 0; - if (peer->link) - qdr_delivery_remove_unsettled_CT(core, peer); + if (peer->link) { + bool moved = qdr_delivery_settled_CT(core, peer); + if (moved) + push = true; + } } if (dlv->link) - qdr_delivery_remove_unsettled_CT(core, dlv); + qdr_delivery_settled_CT(core, dlv); qdr_delivery_free(dlv); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/25d8513e/tests/system_tests_two_routers.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py index f7938e3..8962a7c 100644 --- a/tests/system_tests_two_routers.py +++ b/tests/system_tests_two_routers.py @@ -170,7 +170,7 @@ class RouterTest(TestCase): def test_02c_sender_settles_first(self): - addr = "amqp:/settled/senderfirst/1" + addr = "amqp:/closest.senderfirst.1" M1 = self.messenger() M2 = self.messenger() @@ -183,7 +183,7 @@ class RouterTest(TestCase): M1.start() M2.start() M2.subscribe(addr) - self.routers[0].wait_address("settled/senderfirst/1", 0, 1) + self.routers[0].wait_address("closest.senderfirst.1", 0, 1) tm = Message() rm = Message() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
