Repository: qpid-dispatch Updated Branches: refs/heads/master 81e58b462 -> ac57daddf
DISPATCH-1194 - Fixed a problem with credit propagation in the new async-link-route-setup. There is still an accounting problem with flow credit which will be fixed shortly. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ac57dadd Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ac57dadd Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ac57dadd Branch: refs/heads/master Commit: ac57daddf54e1a79b22f6fcc7800e46762a82e90 Parents: 81e58b4 Author: Ted Ross <[email protected]> Authored: Mon Dec 10 13:44:58 2018 -0500 Committer: Ted Ross <[email protected]> Committed: Mon Dec 10 13:47:16 2018 -0500 ---------------------------------------------------------------------- src/router_core/forwarder.c | 5 +- tests/CMakeLists.txt | 1 + tests/system_tests_link_route_credit.py | 307 +++++++++++++++++++++++++++ 3 files changed, 312 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ac57dadd/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 0cddb19..b4478c9 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -911,7 +911,6 @@ void qdr_forward_link_direct_CT(qdr_core_t *core, out_link->link_type = QD_LINK_ENDPOINT; out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ? QD_INCOMING : QD_OUTGOING; out_link->admin_enabled = true; - out_link->terminus_addr = 0; if (strip) { out_link->strip_prefix = strip; @@ -938,6 +937,10 @@ void qdr_forward_link_direct_CT(qdr_core_t *core, work->target = target; qdr_connection_enqueue_work_CT(core, conn, work); + + if (qdr_link_direction(in_link) == QD_OUTGOING && in_link->credit_to_core > 0) { + qdr_link_issue_credit_CT(core, out_link, in_link->credit_to_core, false); + } } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ac57dadd/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 45206e8..4c14aa1 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -84,6 +84,7 @@ foreach(py_test_module # system_tests_broker system_tests_link_routes system_tests_link_routes_add_external_prefix + system_tests_link_route_credit system_tests_autolinks system_tests_drain system_tests_management http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ac57dadd/tests/system_tests_link_route_credit.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_link_route_credit.py b/tests/system_tests_link_route_credit.py new file mode 100644 index 0000000..e8c824e --- /dev/null +++ b/tests/system_tests_link_route_credit.py @@ -0,0 +1,307 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + +from time import sleep +from threading import Timer + +import unittest2 as unittest +from proton import Message, Timeout +from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy +from system_test import AsyncTestReceiver +from system_test import AsyncTestSender +from system_tests_link_routes import ConnLinkRouteService +from test_broker import FakeService +from proton.handlers import MessagingHandler +from proton.reactor import Container, DynamicNodeProperties +from qpid_dispatch.management.client import Node +from subprocess import PIPE, STDOUT +import re + + +class AddrTimer(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.check_address() + + +class RouterTest(TestCase): + + inter_router_port = None + + @classmethod + def setUpClass(cls): + """Start a router""" + super(RouterTest, cls).setUpClass() + + def router(name, mode, connection, extra=None): + config = [ + ('router', {'mode': mode, 'id': name}), + ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}), + ('listener', {'port': cls.tester.get_port(), 'role': 'route-container', 'stripAnnotations': 'no'}), + ('linkRoute', {'prefix': 'queue', 'containerId': 'LRC_S', 'direction': 'out'}), + ('linkRoute', {'prefix': 'queue', 'containerId': 'LRC_R', 'direction': 'in'}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'spread', 'distribution': 'balanced'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + connection + ] + + if extra: + config.append(extra) + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + + cls.routers = [] + + inter_router_port = cls.tester.get_port() + edge_port_A = cls.tester.get_port() + edge_port_B = cls.tester.get_port() + + router('INT.A', 'interior', + ('listener', {'role': 'inter-router', 'port': inter_router_port}), + ('listener', {'role': 'edge', 'port': edge_port_A})) + router('INT.B', 'interior', + ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port}), + ('listener', {'role': 'edge', 'port': edge_port_B})) + router('EA1', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_A})) + router('EA2', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_A})) + router('EB1', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_B})) + router('EB2', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_B})) + + cls.routers[0].wait_router_connected('INT.B') + cls.routers[1].wait_router_connected('INT.A') + + + def test_01_dest_sender_same_edge(self): + test = LRDestSenderFlowTest(self.routers[2].addresses[0], + self.routers[2].addresses[1], + self.routers[2].addresses[0], + 'queue.01', 0) + test.run() + self.assertEqual(None, test.error) + + def test_02_dest_sender_same_interior(self): + test = LRDestSenderFlowTest(self.routers[0].addresses[0], + self.routers[0].addresses[1], + self.routers[0].addresses[0], + 'queue.02', 0) + test.run() + self.assertEqual(None, test.error) + + def test_03_dest_sender_edge_edge(self): + test = LRDestSenderFlowTest(self.routers[2].addresses[0], + self.routers[3].addresses[1], + self.routers[0].addresses[0], + 'queue.03', 0) + test.run() + self.assertEqual(None, test.error) + + def test_04_dest_sender_interior_interior(self): + test = LRDestSenderFlowTest(self.routers[0].addresses[0], + self.routers[1].addresses[1], + self.routers[0].addresses[0], + 'queue.04', 0) + test.run() + self.assertEqual(None, test.error) + + def test_05_dest_sender_edge_interior(self): + test = LRDestSenderFlowTest(self.routers[2].addresses[0], + self.routers[0].addresses[1], + self.routers[0].addresses[0], + 'queue.05', 0) + test.run() + self.assertEqual(None, test.error) + + def test_06_dest_sender_interior_edge(self): + test = LRDestSenderFlowTest(self.routers[0].addresses[0], + self.routers[2].addresses[1], + self.routers[0].addresses[0], + 'queue.06', 0) + test.run() + self.assertEqual(None, test.error) + + def test_07_dest_sender_edge_interior_interior_edge(self): + test = LRDestSenderFlowTest(self.routers[2].addresses[0], + self.routers[4].addresses[1], + self.routers[0].addresses[0], + 'queue.07', 0) + test.run() + self.assertEqual(None, test.error) + + +class Entity(object): + def __init__(self, status_code, status_description, attrs): + self.status_code = status_code + self.status_description = status_description + self.attrs = attrs + + def __getattr__(self, key): + return self.attrs[key] + + +class RouterProxy(object): + def __init__(self, reply_addr): + self.reply_addr = reply_addr + + def response(self, msg): + ap = msg.properties + return Entity(ap['statusCode'], ap['statusDescription'], msg.body) + + def read_address(self, name): + ap = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.router.address', 'name': name} + return Message(properties=ap, reply_to=self.reply_addr) + + def query_addresses(self): + ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router.address'} + return Message(properties=ap, reply_to=self.reply_addr) + + +class Timeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.timeout() + + +class PollTimeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.poll_timeout() + + +class LRDestSenderFlowTest(MessagingHandler): + def __init__(self, receiver_host, sender_host, probe_host, address, initial_credit): + super(LRDestSenderFlowTest, self).__init__(prefetch=0) + self.receiver_host = receiver_host + self.sender_host = sender_host + self.probe_host = probe_host + self.address = address + self.initial_credit = initial_credit + self.delta_credit = 7 + self.final_credit = initial_credit + 2 * self.delta_credit + self.expected_credit = initial_credit + + self.receiver_conn = None + self.sender_conn = None + self.probe_conn = None + self.probe_sender = None + self.probe_receiver = None + self.probe_reply = None + self.receiver = None + self.sender = None + self.error = None + self.last_action = "Test initialization" + + def fail(self, text): + self.error = text + self.receiver_conn.close() + self.sender_conn.close() + self.probe_conn.close() + self.timer.cancel() + + def timeout(self): + self.error = "Timeout Expired - last_action: %s" % (self.last_action) + self.receiver_conn.close() + self.sender_conn.close() + self.probe_conn.close() + + def poll_timeout(self): + self.probe() + + def on_start(self, event): + self.reactor = event.reactor + self.timer = event.reactor.schedule(7.0, Timeout(self)) + self.receiver_conn = event.container.connect(self.receiver_host) + self.sender_conn = event.container.connect(self.sender_host) + self.probe_conn = event.container.connect(self.probe_host) + self.probe_receiver = event.container.create_receiver(self.probe_conn, dynamic=True) + self.probe_receiver.flow(1000) + self.last_action = "on_start" + + def probe(self): + self.probe_sender.send(self.proxy.read_address('Dqueue')) + + def on_link_opened(self, event): + if event.receiver == self.probe_receiver: + self.probe_reply = self.probe_receiver.remote_source.address + self.proxy = RouterProxy(self.probe_reply) + self.probe_sender = event.container.create_sender(self.probe_conn, '$management') + elif event.sender == self.probe_sender: + self.probe() + self.last_action = "probing" + elif event.receiver == self.receiver: + if self.initial_credit == 0: + self.expected_credit += self.delta_credit + self.receiver.flow(self.delta_credit) + + def on_link_opening(self, event): + if event.sender: + self.sender = event.sender + if event.sender.remote_source.address == self.address: + event.sender.source.address = self.address + event.sender.open() + else: + self.fail("Incorrect address on incoming sender: got %s, expected %s" % + (event.sender.remote_source.address, self.address)) + + def on_sendable(self, event): + if event.sender == self.sender: + if event.sender.credit == self.expected_credit: + if self.expected_credit == self.final_credit: + self.fail(None) + else: + self.expected_credit += self.delta_credit + self.receiver.flow(self.delta_credit) + else: + self.fail("Unexpected sender credit: got %d, expected %d" % + (event.sender.credit, self.expected_credit)) + + def on_message(self, event): + if event.receiver == self.probe_receiver: + response = self.proxy.response(event.message); + self.last_action = "Handling probe response: remote: %d container: %d" \ + % (response.remoteCount, response.containerCount) + if response.status_code == 200 and response.remoteCount + response.containerCount == 1: + self.receiver = event.container.create_receiver(self.receiver_conn, self.address) + if self.initial_credit > 0: + self.receiver.flow(self.initial_credit) + self.expected_credit = self.initial_credit + self.last_action = "opening test receiver" + else: + self.poll_timer = self.reactor.schedule(0.5, PollTimeout(self)) + + + def run(self): + container = Container(self) + container.container_id = 'LRC_S' + container.run() + + +if __name__== '__main__': + unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
