Repository: qpid-dispatch
Updated Branches:
  refs/heads/master cab2f5da6 -> a3be94a6d


DISPATCH-1221 - Added a default parameter in the router engine to prevent 
argument mismatches in the processing of absolute MAU messages (ones that use 
the "exists" field).  Added a system test to exercise this path.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a3be94a6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a3be94a6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a3be94a6

Branch: refs/heads/master
Commit: a3be94a6d6db6bfdf6942fb837350bf5829e6506
Parents: cab2f5d
Author: Ted Ross <[email protected]>
Authored: Wed Dec 12 17:37:11 2018 -0500
Committer: Ted Ross <[email protected]>
Committed: Wed Dec 12 17:37:11 2018 -0500

----------------------------------------------------------------------
 python/qpid_dispatch_internal/router/node.py |   2 +-
 tests/CMakeLists.txt                         |   1 +
 tests/system_test.py                         |   7 +-
 tests/system_tests_interior_sync_up.py       | 259 ++++++++++++++++++++++
 4 files changed, 267 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a3be94a6/python/qpid_dispatch_internal/router/node.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/node.py 
b/python/qpid_dispatch_internal/router/node.py
index 16930fb..6ec5227 100644
--- a/python/qpid_dispatch_internal/router/node.py
+++ b/python/qpid_dispatch_internal/router/node.py
@@ -541,7 +541,7 @@ class RouterNode(object):
         return False
 
 
-    def map_address(self, addr, treatment):
+    def map_address(self, addr, treatment = -1):
         self.mobile_addresses.append(addr)
         self.adapter.map_destination(addr, treatment, self.maskbit)
         self.log(LOG_DEBUG, "Remote destination %s mapped to router %s" % 
(self._logify(addr), self.id))

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a3be94a6/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 4c14aa1..9c68325 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -101,6 +101,7 @@ foreach(py_test_module
     system_tests_user_id
     system_tests_user_id_proxy
     system_tests_two_routers
+    system_tests_interior_sync_up
     system_tests_distribution
     system_tests_multi_tenancy
     system_tests_dynamic_terminus

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a3be94a6/tests/system_test.py
----------------------------------------------------------------------
diff --git a/tests/system_test.py b/tests/system_test.py
index 8820cc2..354eac6 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -921,6 +921,11 @@ class MgmtMsgProxy(object):
               'type': 'org.apache.qpid.dispatch.router.config.linkRoute'}
         return Message(properties=ap, reply_to=self.reply_addr)
 
+    def query_addresses(self):
+        ap = {'operation': 'QUERY',
+              'type': 'org.apache.qpid.dispatch.router.address'}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
     def create_link_route(self, name, kwargs):
         ap = {'operation': 'CREATE',
               'type': 'org.apache.qpid.dispatch.router.config.linkRoute',
@@ -934,7 +939,7 @@ class MgmtMsgProxy(object):
               'name': name}
         return Message(properties=ap, reply_to=self.reply_addr)
 
-    def create_connector(self, name, kwargs):
+    def create_connector(self, name, **kwargs):
         ap = {'operation': 'CREATE',
               'type': 'org.apache.qpid.dispatch.connector',
               'name': name}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a3be94a6/tests/system_tests_interior_sync_up.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_interior_sync_up.py 
b/tests/system_tests_interior_sync_up.py
new file mode 100644
index 0000000..380a802
--- /dev/null
+++ b/tests/system_tests_interior_sync_up.py
@@ -0,0 +1,259 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+from time import sleep
+from threading import Timer
+
+import unittest2 as unittest
+from proton import Message, Timeout
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy
+from system_test import AsyncTestReceiver
+from system_test import AsyncTestSender
+from system_tests_link_routes import ConnLinkRouteService
+from test_broker import FakeService
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties
+from qpid_dispatch.management.client import Node
+from subprocess import PIPE, STDOUT
+import re
+
+
+class AddrTimer(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+            self.parent.check_address()
+
+
+class RouterTest(TestCase):
+
+    inter_router_port = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(RouterTest, cls).setUpClass()
+
+        def router(name, mode, connection=None, extra=None):
+            config = [
+                ('router', {'mode': mode, 'id': name}),
+                ('listener', {'port': cls.tester.get_port(), 
'stripAnnotations': 'no'}),
+                ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+                ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
+                ('address', {'prefix': 'multicast', 'distribution': 
'multicast'})
+            ]
+
+            if connection:
+                config.append(connection)
+
+            if extra:
+                config.append(extra)
+
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+
+        inter_router_port = cls.tester.get_port()
+        edge_port_A       = cls.tester.get_port()
+        edge_port_B       = cls.tester.get_port()
+
+        router('INT.A', 'interior')
+        router('INT.B', 'interior',
+               ('listener', {'role': 'inter-router', 'port': 
inter_router_port}))
+
+
+    def test_interior_sync_up(self):
+        test = InteriorSyncUpTest(self.routers[0].addresses[0], 
self.routers[1].addresses[0], self.routers[1].ports[1])
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+class DelayTimeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.delay_timeout()
+
+
+class PollTimeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.poll_timeout()
+
+
+class InteriorSyncUpTest(MessagingHandler):
+    def __init__(self, host_a, host_b, inter_router_port):
+        """
+        This test verifies that a router can join an existing network and be 
synced up using
+        an absolute MAU (as opposed to an incremental MAU).  This requires 
that the exiting
+        network have sequenced through mobile address changes at least 10 
times to avoid the
+        cached incremental update optimization that the routers use.
+        """
+        super(InteriorSyncUpTest, self).__init__()
+        self.host_a            = host_a
+        self.host_b            = host_b
+        self.timer             = None
+        self.poll_timer        = None
+        self.delay_timer       = None
+        self.count             = 200
+        self.delay_count       = 12   # This should be larger than 
MAX_KEPT_DELTAS in mobile.py
+        self.inter_router_port = inter_router_port
+
+        self.receivers      = []
+        self.n_receivers    = 0
+        self.n_setup_delays = 0
+        self.error          = None
+        self.last_action    = "test initialization"
+        self.expect         = ""
+
+    def fail(self, text):
+        self.error = text
+        self.conn_a.close()
+        self.conn_b.close()
+        self.timer.cancel()
+        if self.poll_timer:
+            self.poll_timer.cancel()
+        if self.delay_timer:
+            self.delay_timer.cancel()
+
+    def timeout(self):
+        self.error = "Timeout Expired - last_action: %s, n_receivers: %d, 
n_setup_delays: %d" % \
+                     (self.last_action, self.n_receivers, self.n_setup_delays)
+        self.conn_a.close()
+        self.conn_b.close()
+        if self.poll_timer:
+            self.poll_timer.cancel()
+        if self.delay_timer:
+            self.delay_timer.cancel()
+
+    def poll_timeout(self):
+        self.probe()
+
+    def delay_timeout(self):
+        self.n_setup_delays += 1
+        self.add_receivers()
+
+    def add_receivers(self):
+        if len(self.receivers) < self.count:
+            self.receivers.append(self.container.create_receiver(self.conn_b, 
"address.%d" % len(self.receivers)))
+        if self.n_setup_delays < self.delay_count:
+            self.delay_timer = self.reactor.schedule(2.0, DelayTimeout(self))
+        else:
+            while len(self.receivers) < self.count:
+                
self.receivers.append(self.container.create_receiver(self.conn_b, "address.%d" 
% len(self.receivers)))
+
+    def on_start(self, event):
+        self.container      = event.container
+        self.reactor        = event.reactor
+        self.timer          = self.reactor.schedule(40.0, Timeout(self))
+        self.conn_a         = self.container.connect(self.host_a)
+        self.conn_b         = self.container.connect(self.host_b)
+        self.probe_receiver = self.container.create_receiver(self.conn_a, 
dynamic=True)
+        self.last_action = "on_start - opened connections"
+
+    def probe(self):
+        self.probe_sender.send(self.proxy.query_addresses())
+
+    def on_link_opened(self, event):
+        if event.receiver == self.probe_receiver:
+            self.probe_reply  = self.probe_receiver.remote_source.address
+            self.proxy        = MgmtMsgProxy(self.probe_reply)
+            self.probe_sender = self.container.create_sender(self.conn_a, 
'$management')
+        elif event.sender == self.probe_sender:
+            ##
+            ## Create listeners for an address per count
+            ##
+            self.add_receivers()
+            self.last_action = "started slow creation of receivers"
+        elif event.receiver in self.receivers:
+            self.n_receivers += 1
+            if self.n_receivers == self.count:
+                self.expect = "not-found"
+                self.probe()
+                self.last_action = "started probe expecting addresses not 
found"
+
+    def on_message(self, event):
+        if event.receiver == self.probe_receiver:
+            response = self.proxy.response(event.message);
+
+            if response.status_code < 200 or response.status_code > 299:
+                self.fail("Unexpected operation failure: (%d) %s" % 
(response.status_code, response.status_description))
+
+            if self.expect == "not-found":
+                response = self.proxy.response(event.message)
+                for addr in response.results:
+                    if "address." in addr.name:
+                        self.fail("Found address on host-a when we didn't 
expect it - %s" % addr.name)
+
+                ##
+                ## Hook up the two routers to start the sync-up
+                ##
+                self.probe_sender.send(self.proxy.create_connector("IR", 
port=self.inter_router_port, role="inter-router"))
+                self.expect      = "create-success"
+                self.last_action = "created inter-router connector"
+
+            elif self.expect == "create-success":
+                ##
+                ## Start polling for the addresses on host_a
+                ##
+                response  = self.proxy.response(event.message)
+                self.probe_sender.send(self.proxy.query_addresses())
+                self.expect      = "query-success"
+                self.last_action = "started probing host_a for addresses"
+
+            elif self.expect == "query-success":
+                response  = self.proxy.response(event.message)
+                got_count = 0
+                for addr in response.results:
+                    if "address." in addr.name:
+                        got_count += 1
+
+                self.last_action = "Got a query response with %d of the 
expected addresses" % (got_count)
+
+                if got_count == self.count:
+                    self.fail(None)
+                else:
+                    self.poll_timer = self.reactor.schedule(0.5, 
PollTimeout(self))
+                
+
+    def run(self):
+        container = Container(self)
+        container.run()
+
+
+if __name__== '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to