Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 81e58b462 -> ac57daddf


DISPATCH-1194 - Fixed a problem with credit propagation in the new 
async-link-route-setup.  There is still an accounting problem with flow credit 
which will be fixed shortly.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ac57dadd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ac57dadd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ac57dadd

Branch: refs/heads/master
Commit: ac57daddf54e1a79b22f6fcc7800e46762a82e90
Parents: 81e58b4
Author: Ted Ross <[email protected]>
Authored: Mon Dec 10 13:44:58 2018 -0500
Committer: Ted Ross <[email protected]>
Committed: Mon Dec 10 13:47:16 2018 -0500

----------------------------------------------------------------------
 src/router_core/forwarder.c             |   5 +-
 tests/CMakeLists.txt                    |   1 +
 tests/system_tests_link_route_credit.py | 307 +++++++++++++++++++++++++++
 3 files changed, 312 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ac57dadd/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 0cddb19..b4478c9 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -911,7 +911,6 @@ void qdr_forward_link_direct_CT(qdr_core_t       *core,
     out_link->link_type      = QD_LINK_ENDPOINT;
     out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ? 
QD_INCOMING : QD_OUTGOING;
     out_link->admin_enabled  = true;
-    out_link->terminus_addr  = 0;
 
     if (strip) {
         out_link->strip_prefix = strip;
@@ -938,6 +937,10 @@ void qdr_forward_link_direct_CT(qdr_core_t       *core,
     work->target    = target;
 
     qdr_connection_enqueue_work_CT(core, conn, work);
+
+    if (qdr_link_direction(in_link) == QD_OUTGOING && in_link->credit_to_core 
> 0) {
+        qdr_link_issue_credit_CT(core, out_link, in_link->credit_to_core, 
false);
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ac57dadd/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 45206e8..4c14aa1 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -84,6 +84,7 @@ foreach(py_test_module
 #   system_tests_broker
     system_tests_link_routes
     system_tests_link_routes_add_external_prefix
+    system_tests_link_route_credit
     system_tests_autolinks
     system_tests_drain
     system_tests_management

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ac57dadd/tests/system_tests_link_route_credit.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_route_credit.py 
b/tests/system_tests_link_route_credit.py
new file mode 100644
index 0000000..e8c824e
--- /dev/null
+++ b/tests/system_tests_link_route_credit.py
@@ -0,0 +1,307 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+from time import sleep
+from threading import Timer
+
+import unittest2 as unittest
+from proton import Message, Timeout
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy
+from system_test import AsyncTestReceiver
+from system_test import AsyncTestSender
+from system_tests_link_routes import ConnLinkRouteService
+from test_broker import FakeService
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties
+from qpid_dispatch.management.client import Node
+from subprocess import PIPE, STDOUT
+import re
+
+
+class AddrTimer(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+            self.parent.check_address()
+
+
+class RouterTest(TestCase):
+
+    inter_router_port = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(RouterTest, cls).setUpClass()
+
+        def router(name, mode, connection, extra=None):
+            config = [
+                ('router', {'mode': mode, 'id': name}),
+                ('listener', {'port': cls.tester.get_port(), 
'stripAnnotations': 'no'}),
+                ('listener', {'port': cls.tester.get_port(), 'role': 
'route-container', 'stripAnnotations': 'no'}),
+                ('linkRoute', {'prefix': 'queue', 'containerId': 'LRC_S', 
'direction': 'out'}),
+                ('linkRoute', {'prefix': 'queue', 'containerId': 'LRC_R', 
'direction': 'in'}),
+                ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+                ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+                ('address', {'prefix': 'multicast', 'distribution': 
'multicast'}),
+                connection
+            ]
+
+            if extra:
+                config.append(extra)
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+
+        inter_router_port = cls.tester.get_port()
+        edge_port_A       = cls.tester.get_port()
+        edge_port_B       = cls.tester.get_port()
+
+        router('INT.A', 'interior',
+               ('listener', {'role': 'inter-router', 'port': 
inter_router_port}),
+               ('listener', {'role': 'edge', 'port': edge_port_A}))
+        router('INT.B', 'interior',
+               ('connector', {'name': 'connectorToA', 'role': 'inter-router', 
'port': inter_router_port}),
+               ('listener',  {'role': 'edge', 'port': edge_port_B}))
+        router('EA1',   'edge',     ('connector', {'name': 'edge', 'role': 
'edge', 'port': edge_port_A}))
+        router('EA2',   'edge',     ('connector', {'name': 'edge', 'role': 
'edge', 'port': edge_port_A}))
+        router('EB1',   'edge',     ('connector', {'name': 'edge', 'role': 
'edge', 'port': edge_port_B}))
+        router('EB2',   'edge',     ('connector', {'name': 'edge', 'role': 
'edge', 'port': edge_port_B}))
+
+        cls.routers[0].wait_router_connected('INT.B')
+        cls.routers[1].wait_router_connected('INT.A')
+
+
+    def test_01_dest_sender_same_edge(self):
+        test = LRDestSenderFlowTest(self.routers[2].addresses[0],
+                                    self.routers[2].addresses[1],
+                                    self.routers[2].addresses[0],
+                                    'queue.01', 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_02_dest_sender_same_interior(self):
+        test = LRDestSenderFlowTest(self.routers[0].addresses[0],
+                                    self.routers[0].addresses[1],
+                                    self.routers[0].addresses[0],
+                                    'queue.02', 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_03_dest_sender_edge_edge(self):
+        test = LRDestSenderFlowTest(self.routers[2].addresses[0],
+                                    self.routers[3].addresses[1],
+                                    self.routers[0].addresses[0],
+                                    'queue.03', 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_04_dest_sender_interior_interior(self):
+        test = LRDestSenderFlowTest(self.routers[0].addresses[0],
+                                    self.routers[1].addresses[1],
+                                    self.routers[0].addresses[0],
+                                    'queue.04', 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_05_dest_sender_edge_interior(self):
+        test = LRDestSenderFlowTest(self.routers[2].addresses[0],
+                                    self.routers[0].addresses[1],
+                                    self.routers[0].addresses[0],
+                                    'queue.05', 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_06_dest_sender_interior_edge(self):
+        test = LRDestSenderFlowTest(self.routers[0].addresses[0],
+                                    self.routers[2].addresses[1],
+                                    self.routers[0].addresses[0],
+                                    'queue.06', 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_07_dest_sender_edge_interior_interior_edge(self):
+        test = LRDestSenderFlowTest(self.routers[2].addresses[0],
+                                    self.routers[4].addresses[1],
+                                    self.routers[0].addresses[0],
+                                    'queue.07', 0)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+class Entity(object):
+    def __init__(self, status_code, status_description, attrs):
+        self.status_code        = status_code
+        self.status_description = status_description
+        self.attrs              = attrs
+
+    def __getattr__(self, key):
+        return self.attrs[key]
+
+
+class RouterProxy(object):
+    def __init__(self, reply_addr):
+        self.reply_addr = reply_addr
+
+    def response(self, msg):
+        ap = msg.properties
+        return Entity(ap['statusCode'], ap['statusDescription'], msg.body)
+
+    def read_address(self, name):
+        ap = {'operation': 'READ', 'type': 
'org.apache.qpid.dispatch.router.address', 'name': name}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+    def query_addresses(self):
+        ap = {'operation': 'QUERY', 'type': 
'org.apache.qpid.dispatch.router.address'}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+class PollTimeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.poll_timeout()
+
+
+class LRDestSenderFlowTest(MessagingHandler):
+    def __init__(self, receiver_host, sender_host, probe_host, address, 
initial_credit):
+        super(LRDestSenderFlowTest, self).__init__(prefetch=0)
+        self.receiver_host   = receiver_host
+        self.sender_host     = sender_host
+        self.probe_host      = probe_host
+        self.address         = address
+        self.initial_credit  = initial_credit
+        self.delta_credit    = 7
+        self.final_credit    = initial_credit + 2 * self.delta_credit
+        self.expected_credit = initial_credit
+
+        self.receiver_conn  = None
+        self.sender_conn    = None
+        self.probe_conn     = None
+        self.probe_sender   = None
+        self.probe_receiver = None
+        self.probe_reply    = None
+        self.receiver       = None
+        self.sender         = None
+        self.error          = None
+        self.last_action    = "Test initialization"
+
+    def fail(self, text):
+        self.error = text
+        self.receiver_conn.close()
+        self.sender_conn.close()
+        self.probe_conn.close()
+        self.timer.cancel()
+
+    def timeout(self):
+        self.error = "Timeout Expired - last_action: %s" % (self.last_action)
+        self.receiver_conn.close()
+        self.sender_conn.close()
+        self.probe_conn.close()
+
+    def poll_timeout(self):
+        self.probe()
+
+    def on_start(self, event):
+        self.reactor        = event.reactor
+        self.timer          = event.reactor.schedule(7.0, Timeout(self))
+        self.receiver_conn  = event.container.connect(self.receiver_host)
+        self.sender_conn    = event.container.connect(self.sender_host)
+        self.probe_conn     = event.container.connect(self.probe_host)
+        self.probe_receiver = event.container.create_receiver(self.probe_conn, 
dynamic=True)
+        self.probe_receiver.flow(1000)
+        self.last_action = "on_start"
+
+    def probe(self):
+        self.probe_sender.send(self.proxy.read_address('Dqueue'))
+
+    def on_link_opened(self, event):
+        if event.receiver == self.probe_receiver:
+            self.probe_reply  = self.probe_receiver.remote_source.address
+            self.proxy        = RouterProxy(self.probe_reply)
+            self.probe_sender = event.container.create_sender(self.probe_conn, 
'$management')
+        elif event.sender == self.probe_sender:
+            self.probe()
+            self.last_action = "probing"
+        elif event.receiver == self.receiver:
+            if self.initial_credit == 0:
+                self.expected_credit += self.delta_credit
+                self.receiver.flow(self.delta_credit)
+
+    def on_link_opening(self, event):
+        if event.sender:
+            self.sender = event.sender
+            if event.sender.remote_source.address == self.address:
+                event.sender.source.address = self.address
+                event.sender.open()
+            else:
+                self.fail("Incorrect address on incoming sender: got %s, 
expected %s" %
+                          (event.sender.remote_source.address, self.address))
+
+    def on_sendable(self, event):
+        if event.sender == self.sender:
+            if event.sender.credit == self.expected_credit:
+                if self.expected_credit == self.final_credit:
+                    self.fail(None)
+                else:
+                    self.expected_credit += self.delta_credit
+                    self.receiver.flow(self.delta_credit)
+            else:
+                self.fail("Unexpected sender credit: got %d, expected %d" %
+                          (event.sender.credit, self.expected_credit))
+
+    def on_message(self, event):
+        if event.receiver == self.probe_receiver:
+            response = self.proxy.response(event.message);
+            self.last_action = "Handling probe response: remote: %d container: 
%d" \
+                               % (response.remoteCount, 
response.containerCount)
+            if response.status_code == 200 and response.remoteCount + 
response.containerCount == 1:
+                self.receiver = 
event.container.create_receiver(self.receiver_conn, self.address)
+                if self.initial_credit > 0:
+                    self.receiver.flow(self.initial_credit)
+                    self.expected_credit = self.initial_credit
+                self.last_action = "opening test receiver"
+            else:
+                self.poll_timer = self.reactor.schedule(0.5, PollTimeout(self))
+                
+
+    def run(self):
+        container = Container(self)
+        container.container_id = 'LRC_S'
+        container.run()
+
+
+if __name__== '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to