DISPATCH-341 - Drain now propagates across link routes and behaves correctly for router-terminated links.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/7a8aa51d Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/7a8aa51d Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/7a8aa51d Branch: refs/heads/0.6.x Commit: 7a8aa51de4bf6da2a1a49f1d37774975869f8d54 Parents: 789b73e Author: Ted Ross <[email protected]> Authored: Fri Jun 3 15:38:26 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Mon Jun 13 17:22:09 2016 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/router_core.h | 2 ++ src/router_core/connections.c | 24 ++++++++----- src/router_core/router_core_private.h | 4 ++- src/router_core/transfer.c | 56 +++++++++++++++++++++--------- src/router_node.c | 13 +++++++ tests/system_tests_drain.py | 8 ++--- tests/system_tests_drain_support.py | 8 ++--- tests/system_tests_link_routes.py | 16 +++++++-- 8 files changed, 94 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 459e1a3..8eaa7ef 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -527,6 +527,7 @@ typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_e typedef void (*qdr_link_flow_t) (void *context, qdr_link_t *link, int credit); typedef void (*qdr_link_offer_t) (void *context, qdr_link_t *link, int delivery_count); typedef void (*qdr_link_drained_t) (void *context, qdr_link_t *link); +typedef void (*qdr_link_drain_t) (void *context, qdr_link_t *link, bool mode); typedef void (*qdr_link_push_t) (void *context, qdr_link_t *link); typedef void (*qdr_link_deliver_t) (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled); typedef void (*qdr_delivery_update_t) (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled); @@ -540,6 +541,7 @@ void qdr_connection_handlers(qdr_core_t *core, qdr_link_flow_t flow, qdr_link_offer_t offer, qdr_link_drained_t drained, + qdr_link_drain_t drain, qdr_link_push_t push, qdr_link_deliver_t deliver, qdr_delivery_update_t delivery_update); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index a9612ea..e0543ec 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -188,8 +188,14 @@ int qdr_connection_process(qdr_connection_t *conn) sys_mutex_unlock(conn->work_lock); if (link) { - core->flow_handler(core->user_context, link, link->incremental_credit); - link->incremental_credit = 0; + if (link->incremental_credit > 0) { + core->flow_handler(core->user_context, link, link->incremental_credit); + link->incremental_credit = 0; + } + if (link->drain_mode_changed) { + core->drain_handler(core->user_context, link, link->drain_mode); + link->drain_mode_changed = false; + } event_count++; } } while (link); @@ -331,6 +337,7 @@ void qdr_connection_handlers(qdr_core_t *core, qdr_link_flow_t flow, qdr_link_offer_t offer, qdr_link_drained_t drained, + qdr_link_drain_t drain, qdr_link_push_t push, qdr_link_deliver_t deliver, qdr_delivery_update_t delivery_update) @@ -343,6 +350,7 @@ void qdr_connection_handlers(qdr_core_t *core, core->flow_handler = flow; core->offer_handler = offer; core->drained_handler = drained; + core->drain_handler = drain; core->push_handler = push; core->deliver_handler = deliver; core->delivery_update_handler = delivery_update; @@ -991,7 +999,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act if (qdr_terminus_is_anonymous(target)) { link->owning_addr = 0; qdr_link_outbound_second_attach_CT(core, link, source, target); - qdr_link_issue_credit_CT(core, link, link->capacity); + qdr_link_issue_credit_CT(core, link, link->capacity, false); } else { // @@ -1032,7 +1040,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act // Issue the initial credit only if there are destinations for the address. // if (DEQ_SIZE(addr->subscriptions) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes)) - qdr_link_issue_credit_CT(core, link, link->capacity); + qdr_link_issue_credit_CT(core, link, link->capacity, false); } } break; @@ -1040,12 +1048,12 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act case QD_LINK_CONTROL: qdr_link_outbound_second_attach_CT(core, link, source, target); - qdr_link_issue_credit_CT(core, link, link->capacity); + qdr_link_issue_credit_CT(core, link, link->capacity, false); break; case QD_LINK_ROUTER: qdr_link_outbound_second_attach_CT(core, link, source, target); - qdr_link_issue_credit_CT(core, link, link->capacity); + qdr_link_issue_credit_CT(core, link, link->capacity, false); break; } } else { @@ -1147,12 +1155,12 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac // qdr_address_t *addr = link->owning_addr; if (!addr || (DEQ_SIZE(addr->subscriptions) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes))) - qdr_link_issue_credit_CT(core, link, link->capacity); + qdr_link_issue_credit_CT(core, link, link->capacity, false); break; case QD_LINK_CONTROL: case QD_LINK_ROUTER: - qdr_link_issue_credit_CT(core, link, link->capacity); + qdr_link_issue_credit_CT(core, link, link->capacity, false); break; } } else { http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 58d73a2..0fd5546 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -267,6 +267,7 @@ struct qdr_link_t { int incremental_credit; bool flow_started; ///< for incoming, true iff initial credit has been granted bool drain_mode; + bool drain_mode_changed; int credit_to_core; ///< Number of the available credits incrementally given to the core uint64_t total_deliveries; }; @@ -545,6 +546,7 @@ struct qdr_core_t { qdr_link_flow_t flow_handler; qdr_link_offer_t offer_handler; qdr_link_drained_t drained_handler; + qdr_link_drain_t drain_handler; qdr_link_push_t push_handler; qdr_link_deliver_t deliver_handler; qdr_delivery_update_t delivery_update_handler; @@ -589,7 +591,7 @@ void qdr_agent_setup_CT(qdr_core_t *core); void qdr_forwarder_setup_CT(qdr_core_t *core); qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label); void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action); -void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit); +void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain); void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr); void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv); void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 4ac5e1a..b587baf 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -353,7 +353,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv) // if (moved && link->link_direction == QD_INCOMING && link->link_type != QD_LINK_ROUTER && !link->connected_link) - qdr_link_issue_credit_CT(core, link, 1); + qdr_link_issue_credit_CT(core, link, 1, false); return moved; } @@ -364,24 +364,39 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar if (discard) return; - qdr_link_t *link = action->args.connection.link; - int credit = action->args.connection.credit; - bool drain = action->args.connection.drain; - bool activate = false; + qdr_link_t *link = action->args.connection.link; + int credit = action->args.connection.credit; + bool drain = action->args.connection.drain; + bool activate = false; + bool drain_was_set = !link->drain_mode && drain; + + link->drain_mode = drain; // // If this is an attach-routed link, propagate the flow data downrange. // Note that the credit value is incremental. // - if (link->connected_link) - qdr_link_issue_credit_CT(core, link->connected_link, credit); + if (link->connected_link) { + qdr_link_t *clink = link->connected_link; + + if (clink->link_direction == QD_INCOMING) + qdr_link_issue_credit_CT(core, link->connected_link, credit, drain); + else { + sys_mutex_lock(clink->conn->work_lock); + qdr_add_link_ref(&clink->conn->links_with_deliveries, clink, QDR_LINK_LIST_CLASS_DELIVERY); + sys_mutex_unlock(clink->conn->work_lock); + qdr_connection_activate_CT(core, clink->conn); + } + + return; + } // // Handle the replenishing of credit outbound // - if (link->link_direction == QD_OUTGOING && credit > 0) { + if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) { sys_mutex_lock(link->conn->work_lock); - if (DEQ_SIZE(link->undelivered) > 0) { + if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) { qdr_add_link_ref(&link->conn->links_with_deliveries, link, QDR_LINK_LIST_CLASS_DELIVERY); activate = true; } @@ -389,10 +404,8 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar } // - // Record the drain mode for the link + // Activate the connection if we have deliveries to send or drain mode was set. // - link->drain_mode = drain; - if (activate) qdr_connection_activate_CT(core, link->conn); } @@ -428,7 +441,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_ qdr_delivery_release_CT(core, dlv); qdr_delivery_decref(dlv); if (link->link_type == QD_LINK_ROUTER) - qdr_link_issue_credit_CT(core, link, 1); + qdr_link_issue_credit_CT(core, link, 1, false); } } else if (fanout > 0) { if (dlv->settled) { @@ -436,7 +449,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_ // The delivery is settled. Keep it off the unsettled list and issue // replacement credit for it now. // - qdr_link_issue_credit_CT(core, link, 1); + qdr_link_issue_credit_CT(core, link, 1, false); // // If the delivery has no more references, free it now. @@ -457,7 +470,7 @@ static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_ // are many addresses sharing the link. // if (link->link_type == QD_LINK_ROUTER) - qdr_link_issue_credit_CT(core, link, 1); + qdr_link_issue_credit_CT(core, link, 1, false); } } @@ -629,8 +642,14 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool * Check the link's accumulated credit. If the credit given to the connection thread * has been issued to Proton, provide the next batch of credit to the connection thread. */ -void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit) +void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain) { + bool drain_changed = link->drain_mode |= drain; + bool activate = drain_changed; + + link->drain_mode = drain; + link->drain_mode_changed = drain_changed; + link->incremental_credit_CT += credit; link->flow_started = true; @@ -640,7 +659,10 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit) // link->incremental_credit = link->incremental_credit_CT; link->incremental_credit_CT = 0; + activate = true; + } + if (activate) { // // Put this link on the connection's has-credit list. // @@ -682,7 +704,7 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr) // Issue credit to stalled links // if (!link->flow_started) - qdr_link_issue_credit_CT(core, link, link->capacity); + qdr_link_issue_credit_CT(core, link, link->capacity, false); // // Drain undelivered deliveries via the forwarder http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index 10794ac..b8e8f0e 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -789,6 +789,18 @@ static void CORE_link_drained(void *context, qdr_link_t *link) } +static void CORE_link_drain(void *context, qdr_link_t *link, bool mode) +{ + qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); + pn_link_t *plink = qd_link_pn(qlink); + + if (plink) { + if (pn_link_is_receiver(plink)) + pn_link_set_drain(plink, mode); + } +} + + static void CORE_link_push(void *context, qdr_link_t *link) { qd_router_t *router = (qd_router_t*) context; @@ -875,6 +887,7 @@ void qd_router_setup_late(qd_dispatch_t *qd) CORE_link_flow, CORE_link_offer, CORE_link_drained, + CORE_link_drain, CORE_link_push, CORE_link_deliver, CORE_delivery_update); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/tests/system_tests_drain.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py index 9747995..b379d27 100644 --- a/tests/system_tests_drain.py +++ b/tests/system_tests_drain.py @@ -42,22 +42,22 @@ class DrainSupportTest(TestCase): cls.router.wait_ready() cls.address = cls.router.addresses[0] - def test_drain_support_all_messages(self): + def test_drain_support_1_all_messages(self): drain_support = DrainMessagesHandler(self.address) drain_support.run() self.assertEqual(drain_support.error, None) - def test_drain_support_one_message(self): + def test_drain_support_2_one_message(self): drain_support = DrainOneMessageHandler(self.address) drain_support.run() self.assertEqual(drain_support.error, None) - def test_drain_support_no_messages(self): + def test_drain_support_3_no_messages(self): drain_support = DrainNoMessagesHandler(self.address) drain_support.run() self.assertEqual(drain_support.error, None) - def test_drain_support_no_more_messages(self): + def test_drain_support_4_no_more_messages(self): drain_support = DrainNoMoreMessagesHandler(self.address) drain_support.run() self.assertEqual(drain_support.error, None) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/tests/system_tests_drain_support.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py index f11b8b8..aa3a23a 100644 --- a/tests/system_tests_drain_support.py +++ b/tests/system_tests_drain_support.py @@ -142,8 +142,8 @@ class DrainNoMessagesHandler(MessagingHandler): def on_sendable(self, event): self.receiver.drain(1) - def on_drained(self, event): - if sender.credit == 0: + def on_link_flow(self, event): + if self.receiver.credit == 0: self.error = None self.timer.cancel() self.conn.close() @@ -189,8 +189,8 @@ class DrainNoMoreMessagesHandler(MessagingHandler): def on_settled(self, event): self.receiver.drain(1) - def on_drained(self, event): - if sender.credit == 0: + def on_link_flow(self, event): + if self.receiver.credit == 0: self.error = None self.timer.cancel() self.conn.close() http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7a8aa51d/tests/system_tests_link_routes.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 37317f9..420cc96 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -28,7 +28,7 @@ from proton.handlers import MessagingHandler from proton.reactor import AtMostOnce, Container from proton.utils import BlockingConnection, LinkDetached -from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler +from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler, DrainNoMessagesHandler, DrainNoMoreMessagesHandler from qpid_dispatch.management.client import Node @@ -448,12 +448,22 @@ class LinkRoutePatternTest(TestCase): def test_www_drain_support_all_messages(self): drain_support = DrainMessagesHandler(self.routers[2].addresses[1]) drain_support.run() - self.assertTrue(drain_support.drain_successful) + self.assertEqual(None, drain_support.error) def test_www_drain_support_one_message(self): drain_support = DrainOneMessageHandler(self.routers[2].addresses[1]) drain_support.run() - self.assertTrue(drain_support.drain_successful) + self.assertEqual(None, drain_support.error) + + def test_www_drain_support_no_messages(self): + drain_support = DrainNoMessagesHandler(self.routers[2].addresses[1]) + drain_support.run() + self.assertEqual(None, drain_support.error) + + def test_www_drain_support_no_more_messages(self): + drain_support = DrainNoMoreMessagesHandler(self.routers[2].addresses[1]) + drain_support.run() + self.assertEqual(None, drain_support.error) class DeliveryTagsTest(MessagingHandler): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
