Repository: qpid-dispatch Updated Branches: refs/heads/master 7968e159e -> 4c12a62b9
DISPATCH-458 - Added flush for credits that can, in some cases, be stuck on an incoming link. Added a test that causes the credit-hang. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/4c12a62b Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/4c12a62b Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/4c12a62b Branch: refs/heads/master Commit: 4c12a62b964ab27b14ef41dc917d53a43e8df0fb Parents: 7968e15 Author: Ted Ross <[email protected]> Authored: Mon Jul 25 14:49:00 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Mon Jul 25 14:49:00 2016 -0400 ---------------------------------------------------------------------- src/router_core/connections.c | 10 +++++ src/router_core/router_core_private.h | 7 ++++ src/router_core/transfer.c | 19 ++++++++++ tests/system_tests_one_router.py | 60 ++++++++++++++++++++++++++++++ 4 files changed, 96 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4c12a62b/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 336b90f..2b95b41 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -191,6 +191,16 @@ int qdr_connection_process(qdr_connection_t *conn) if (link->incremental_credit > 0) { core->flow_handler(core->user_context, link, link->incremental_credit); link->incremental_credit = 0; + + // + // Note: This unprotected read of a CT-only value is safe in this case. + // If there is pending credit on the link that needs to be pushed down to + // Proton, we need to give the core a kick to make sure it is sent. It is + // possible that no more credit will be issued to cause the movement of CT + // credit to Proton credit (see DISPATCH-458). + // + if (link->incremental_credit_CT > 0) + qdr_link_check_credit(core, link); } if (link->drain_mode_changed) { core->drain_handler(core->user_context, link, link->drain_mode); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4c12a62b/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 3113165..a578810 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -627,4 +627,11 @@ qdr_query_t *qdr_query(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_composed_field_t *body); + +// +// Cause the core to check credit on an incoming link that might have CT credit but +// no IO/Proton credit. +// +void qdr_link_check_credit(qdr_core_t *core, qdr_link_t *link); + #endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4c12a62b/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 40ba82e..b6b4f21 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -24,6 +24,7 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard); @@ -184,6 +185,14 @@ void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mo } +void qdr_link_check_credit(qdr_core_t *core, qdr_link_t *link) +{ + qdr_action_t *action = qdr_action(qdr_link_check_credit_CT, "link_check_credit"); + action->args.connection.link = link; + qdr_action_enqueue(core, action); +} + + void qdr_send_to1(qdr_core_t *core, qd_message_t *msg, qd_field_iterator_t *addr, bool exclude_inprocess, bool control) { qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to"); @@ -430,6 +439,16 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar } +static void qdr_link_check_credit_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + if (discard) + return; + + qdr_link_t *link = action->args.connection.link; + qdr_link_issue_credit_CT(core, link, 0, false); +} + + static int qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr) { int fanout = 0; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4c12a62b/tests/system_tests_one_router.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index fbd5672..026e076 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -1076,6 +1076,11 @@ class RouterTest(TestCase): test.run() self.assertEqual(None, test.error) + def test_20_batched_settlement(self): + test = BatchedSettlementTest(self.address) + test.run() + self.assertEqual(None, test.error) + def test_connection_properties(self): connection = BlockingConnection(self.router.addresses[0], timeout=60, @@ -1388,5 +1393,60 @@ class AppearanceOfBalanceTest(MessagingHandler): Container(self).run() +class BatchedSettlementTest(MessagingHandler): + def __init__(self, address): + super(BatchedSettlementTest, self).__init__(auto_accept=False) + self.address = address + self.dest = "balanced.BatchedSettlement" + self.error = None + self.count = 20000 + self.batch_count = 200 + self.n_sent = 0 + self.n_received = 0 + self.n_settled = 0 + self.batch = [] + + def check_if_done(self): + if self.n_settled == self.count: + self.timer.cancel() + self.conn.close() + + def timeout(self): + self.error = "Timeout Expired: sent=%d rcvd=%d settled=%d" % \ + (self.n_sent, self.n_received, self.n_settled) + self.conn.close() + + def on_start(self, event): + self.timer = event.reactor.schedule(20, Timeout(self)) + self.conn = event.container.connect(self.address) + self.sender = event.container.create_sender(self.conn, self.dest) + self.receiver = event.container.create_receiver(self.conn, self.dest) + + def send(self): + if self.n_sent < self.count: + while self.sender.credit > 0: + msg = Message(body="Batch-Test") + self.sender.send(msg) + self.n_sent += 1 + + def on_sendable(self, event): + if self.n_sent < self.count: + self.send() + + def on_message(self, event): + self.n_received += 1 + self.batch.insert(0, event.delivery) + if len(self.batch) == self.batch_count: + while len(self.batch) > 0: + self.accept(self.batch.pop()) + + def on_accepted(self, event): + self.n_settled += 1 + self.check_if_done() + + def run(self): + Container(self).run() + + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
