Author: aconway
Date: Tue Apr 17 15:16:41 2012
New Revision: 1327137

URL: http://svn.apache.org/viewvc?rev=1327137&view=rev
Log:
NO-JIRA: Minor code clean-up in brokertest.py and ha_tests.py.

Modified:
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1327137&r1=1327136&r2=1327137&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Tue Apr 17 15:16:41 2012
@@ -436,6 +436,35 @@ class Cluster:
     def __getitem__(self,index): return self._brokers[index]
     def __iter__(self): return self._brokers.__iter__()
 
+
+def browse(session, queue, timeout=0, transform=lambda m: m.content):
+    """Return a list with the contents of each message on queue."""
+    r = session.receiver("%s;{mode:browse}"%(queue))
+    r.capacity = 100
+    try:
+        contents = []
+        try:
+            while True: contents.append(transform(r.fetch(timeout=timeout)))
+        except messaging.Empty: pass
+    finally: r.close()
+    return contents
+
+def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda 
m: m.content, msg="browse failed"):
+    """Assert that the contents of messages on queue (as retrieved
+    using session and timeout) exactly match the strings in
+    expect_contents"""
+    actual_contents = browse(session, queue, timeout, transform=transform)
+    if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+    assert expect_contents == actual_contents, "%s: %s != %s"%(msg, expect, 
actual)
+
+def assert_browse_retry(session, queue, expect_contents, timeout=1, delay=.01, 
transform=lambda m:m.content, msg="browse failed"):
+    """Wait up to timeout for contents of queue to match expect_contents"""
+    test = lambda: browse(session, queue, 0, transform=transform) == 
expect_contents
+    retry(test, timeout, delay)
+    actual_contents = browse(session, queue, 0, transform=transform)
+    if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+    assert expect_contents == actual_contents, "%s: %s != %s"%(msg, expect, 
actual)
+
 class BrokerTest(TestCase):
     """
     Tracks processes started by test and kills at end of test.
@@ -501,33 +530,9 @@ class BrokerTest(TestCase):
         cluster = Cluster(self, count, args, expect=expect, wait=wait, 
show_cmd=show_cmd)
         return cluster
 
-    def browse(self, session, queue, timeout=0, transform=lambda m: m.content):
-        """Return a list with the contents of each message on queue."""
-        r = session.receiver("%s;{mode:browse}"%(queue))
-        r.capacity = 100
-        try:
-            contents = []
-            try:
-                while True: 
contents.append(transform(r.fetch(timeout=timeout)))
-            except messaging.Empty: pass
-        finally: r.close()
-        return contents
-
-    def assert_browse(self, session, queue, expect_contents, timeout=0, 
transform=lambda m: m.content, msg=None):
-        """Assert that the contents of messages on queue (as retrieved
-        using session and timeout) exactly match the strings in
-        expect_contents"""
-        actual_contents = self.browse(session, queue, timeout, 
transform=transform)
-        if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
-        self.assertEqual(expect_contents, actual_contents, msg)
-
-    def assert_browse_retry(self, session, queue, expect_contents, timeout=1, 
delay=.01, transform=lambda m:m.content, msg=None):
-        """Wait up to timeout for contents of queue to match expect_contents"""
-        test = lambda: self.browse(session, queue, 0, transform=transform) == 
expect_contents
-        retry(test, timeout, delay)
-        actual_contents = self.browse(session, queue, 0, transform=transform)
-        if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
-        self.assertEqual(expect_contents, actual_contents, msg)
+    def browse(self, *args, **kwargs): browse(*args, **kwargs)
+    def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
+    def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, 
**kwargs)
 
 def join(thread, timeout=10):
     thread.join(timeout)

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1327137&r1=1327136&r2=1327137&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Apr 17 15:16:41 2012
@@ -72,6 +72,19 @@ class HaBroker(Broker):
     def connect_admin(self, **kwargs):
         return Broker.connect(self, client_properties={"qpid.ha-admin":1}, 
**kwargs)
 
+    def wait_backup(self, address):
+        """Wait for address to become valid on a backup broker."""
+        bs = self.connect_admin().session()
+        try: wait_address(bs, address)
+        finally: bs.connection.close()
+
+    def assert_browse_backup(self, queue, expected, **kwargs):
+        """Combines wait_backup and assert_browse_retry."""
+        bs = self.connect_admin().session()
+        try:
+            wait_address(bs, queue)
+            assert_browse_retry(bs, queue, expected, **kwargs)
+        finally: bs.connection.close()
 
 class HaCluster(object):
     _cluster_count = 0
@@ -106,40 +119,23 @@ class HaCluster(object):
     def __getitem__(self,index): return self._brokers[index]
     def __iter__(self): return self._brokers.__iter__()
 
-
-class HaTest(BrokerTest):
-    """Base class for HA test cases, defines convenience functions"""
-
-    # Wait for an address to become valid.
-    def wait(self, session, address):
-        def check():
-            try:
-                session.sender(address)
-                return True
-            except NotFound: return False
-        assert retry(check), "Timed out waiting for address %s"%(address)
-
-    # Wait for address to become valid on a backup broker.
-    def wait_backup(self, backup, address):
-        bs = backup.connect_admin().session()
-        self.wait(bs, address)
-        bs.connection.close()
-
-    # Combines wait_backup and assert_browse_retry
-    def assert_browse_backup(self, backup, queue, expected, **kwargs):
-        bs = backup.connect_admin().session()
-        self.wait(bs, queue)
-        self.assert_browse_retry(bs, queue, expected, **kwargs)
-        bs.connection.close()
-
-    def assert_missing(self, session, address):
+def wait_address(session, address):
+    """Wait for an address to become valid."""
+    def check():
         try:
-            session.receiver(address)
-            self.fail("Expected NotFound: %s"%(address))
-        except NotFound: pass
-
+            session.sender(address)
+            return True
+        except NotFound: return False
+    assert retry(check), "Timed out waiting for address %s"%(address)
+
+def assert_missing(session, address):
+    """Assert that the address is _not_ valid"""
+    try:
+        session.receiver(address)
+        self.fail("Expected NotFound: %s"%(address))
+    except NotFound: pass
 
-class ReplicationTests(HaTest):
+class ReplicationTests(BrokerTest):
     """Correctness tests for  HA replication."""
 
     def test_replication(self):
@@ -176,7 +172,7 @@ class ReplicationTests(HaTest):
         def verify(b, prefix, p):
             """Verify setup was replicated to backup b"""
             # Wait for configuration to replicate.
-            self.wait(b, prefix+"x");
+            wait_address(b, prefix+"x");
             self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
 
             self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, 
"b")
@@ -184,7 +180,7 @@ class ReplicationTests(HaTest):
             self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
 
             self.assert_browse_retry(b, prefix+"q2", []) # configuration only
-            self.assert_missing(b, prefix+"q3")
+            assert_missing(b, prefix+"q3")
             b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds 
with replicate=all
             self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
             b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds 
with replicate=configuration
@@ -209,7 +205,7 @@ class ReplicationTests(HaTest):
         verify(b, "2", p)
         # Test a series of messages, enqueue all then dequeue all.
         s = p.sender(queue("foo","all"))
-        self.wait(b, "foo")
+        wait_address(b, "foo")
         msgs = [str(i) for i in range(10)]
         for m in msgs: s.send(Message(m))
         self.assert_browse_retry(p, "foo", msgs)
@@ -246,10 +242,10 @@ class ReplicationTests(HaTest):
 
         msgs = [str(i) for i in range(30)]
         b1 = backup1.connect_admin().session()
-        self.wait(b1, "q");
+        wait_address(b1, "q");
         self.assert_browse_retry(b1, "q", msgs)
         b2 = backup2.connect_admin().session()
-        self.wait(b2, "q");
+        wait_address(b2, "q");
         self.assert_browse_retry(b2, "q", msgs)
 
     def test_send_receive(self):
@@ -273,8 +269,8 @@ class ReplicationTests(HaTest):
         self.assertEqual(receiver.wait(), 0)
         expect = [long(i) for i in range(991, 1001)]
         sn = lambda m: m.properties["sn"]
-        self.assert_browse_backup(brokers[1], "q", expect, transform=sn)
-        self.assert_browse_backup(brokers[2], "q", expect, transform=sn)
+        brokers[1].assert_browse_backup("q", expect, transform=sn)
+        brokers[2].assert_browse_backup("q", expect, transform=sn)
 
     def test_failover_python(self):
         """Verify that backups rejects connections and that fail-over works in 
python client"""
@@ -293,7 +289,7 @@ class ReplicationTests(HaTest):
         c = backup.connect(reconnect_urls=[primary.host_port(), 
backup.host_port()], reconnect=True)
         s = c.session()
         sender = s.sender("q;{create:always}")
-        self.wait_backup(backup, "q")
+        backup.wait_backup("q")
         sender.send("foo")
         primary.kill()
         assert retry(lambda: not is_running(primary.pid))
@@ -308,13 +304,13 @@ class ReplicationTests(HaTest):
         backup = HaBroker(self, name="backup", broker_url=primary.host_port())
         url="%s,%s"%(primary.host_port(), backup.host_port())
         primary.connect().session().sender("q;{create:always}")
-        self.wait_backup(backup, "q")
+        backup.wait_backup("q")
 
         sender = NumberedSender(primary, url=url, queue="q", failover_updates 
= False)
         receiver = NumberedReceiver(primary, url=url, queue="q", 
failover_updates = False)
         receiver.start()
         sender.start()
-        self.wait_backup(backup, "q")
+        backup.wait_backup("q")
         assert retry(lambda: receiver.received > 10) # Wait for some messages 
to get thru
 
         primary.kill()
@@ -329,22 +325,22 @@ class ReplicationTests(HaTest):
         """Verify that a backup broker fails over and recovers queue state"""
         brokers = HaCluster(self, 3)
         brokers[0].connect().session().sender("q;{create:always}").send("a")
-        for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
+        for b in brokers[1:]: b.assert_browse_backup("q", ["a"])
         brokers[0].expect = EXPECT_EXIT_FAIL
         brokers.kill(0)
         brokers[1].connect().session().sender("q").send("b")
-        self.assert_browse_backup(brokers[2], "q", ["a","b"])
+        brokers[2].assert_browse_backup("q", ["a","b"])
         s = brokers[1].connect().session()
         self.assertEqual("a", s.receiver("q").fetch().content)
         s.acknowledge()
-        self.assert_browse_backup(brokers[2], "q", ["b"])
+        brokers[2].assert_browse_backup("q", ["b"])
 
     def test_qpid_config_replication(self):
         """Set up replication via qpid-config"""
         brokers = HaCluster(self,2)
         brokers[0].config_declare("q","all")
         brokers[0].connect().session().sender("q").send("foo")
-        self.assert_browse_backup(brokers[1], "q", ["foo"])
+        brokers[1].assert_browse_backup("q", ["foo"])
 
     def test_standalone_queue_replica(self):
         """Test replication of individual queues outside of cluster mode"""
@@ -358,18 +354,18 @@ class ReplicationTests(HaTest):
         # Set up replication with qpid-ha
         backup.replicate(primary.host_port(), "q")
         ps.send("a")
-        self.assert_browse_backup(backup, "q", ["a"])
+        backup.assert_browse_backup("q", ["a"])
         ps.send("b")
-        self.assert_browse_backup(backup, "q", ["a", "b"])
+        backup.assert_browse_backup("q", ["a", "b"])
         self.assertEqual("a", pr.fetch().content)
         pr.session.acknowledge()
-        self.assert_browse_backup(backup, "q", ["b"])
+        backup.assert_browse_backup("q", ["b"])
 
         # Set up replication with qpid-config
         ps2 = pc.session().sender("q2;{create:always}")
         backup.config_replicate(primary.host_port(), "q2");
         ps2.send("x")
-        self.assert_browse_backup(backup, "q2", ["x"])
+        backup.assert_browse_backup("q2", ["x"])
 
 
     def test_queue_replica_failover(self):
@@ -383,15 +379,15 @@ class ReplicationTests(HaTest):
         br = backup.connect().session().receiver("q;{create:always}")
         backup.replicate(cluster.url, "q")
         ps.send("a")
-        self.assert_browse_backup(backup, "q", ["a"])
+        backup.assert_browse_backup("q", ["a"])
         cluster.bounce(0)
-        self.assert_browse_backup(backup, "q", ["a"])
+        backup.assert_browse_backup("q", ["a"])
         ps.send("b")
-        self.assert_browse_backup(backup, "q", ["a", "b"])
+        backup.assert_browse_backup("q", ["a", "b"])
         cluster.bounce(1)
         self.assertEqual("a", pr.fetch().content)
         pr.session.acknowledge()
-        self.assert_browse_backup(backup, "q", ["b"])
+        backup.assert_browse_backup("q", ["b"])
 
     def test_lvq(self):
         """Verify that we replicate to an LVQ correctly"""
@@ -402,13 +398,13 @@ class ReplicationTests(HaTest):
         def send(key,value): 
s.send(Message(content=value,properties={"lvq-key":key}))
         for kv in 
[("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
             send(*kv)
-        self.assert_browse_backup(backup, "lvq", ["b-1", "a-3", "c-2"])
+        backup.assert_browse_backup("lvq", ["b-1", "a-3", "c-2"])
         send("b","b-2")
-        self.assert_browse_backup(backup, "lvq", ["a-3", "c-2", "b-2"])
+        backup.assert_browse_backup("lvq", ["a-3", "c-2", "b-2"])
         send("c","c-3")
-        self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3"])
+        backup.assert_browse_backup("lvq", ["a-3", "b-2", "c-3"])
         send("d","d-1")
-        self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3", "d-1"])
+        backup.assert_browse_backup("lvq", ["a-3", "b-2", "c-3", "d-1"])
 
     def test_ring(self):
         """Test replication with the ring queue policy"""
@@ -417,7 +413,7 @@ class ReplicationTests(HaTest):
         backup = HaBroker(self, name="backup", broker_url=primary.host_port())
         s = primary.connect().session().sender("q; {create:always, 
node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}")
         for i in range(10): s.send(Message(str(i)))
-        self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)])
+        backup.assert_browse_backup("q", [str(i) for i in range(5,10)])
 
     def test_reject(self):
         """Test replication with the reject queue policy"""
@@ -428,7 +424,7 @@ class ReplicationTests(HaTest):
         try:
             for i in range(10): s.send(Message(str(i)), sync=False)
         except qpid.messaging.exceptions.TargetCapacityExceeded: pass
-        self.assert_browse_backup(backup, "q", [str(i) for i in range(0,5)])
+        backup.assert_browse_backup("q", [str(i) for i in range(0,5)])
 
     def test_priority(self):
         """Verify priority queues replicate correctly"""
@@ -440,7 +436,7 @@ class ReplicationTests(HaTest):
         priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
         for p in priorities: s.send(Message(priority=p))
         # Can't use browse_backup as browser sees messages in delivery order 
not priority.
-        self.wait_backup(backup, "priority-queue")
+        backup.wait_backup("priority-queue")
         r = backup.connect_admin().session().receiver("priority-queue")
         received = [r.fetch().priority for i in priorities]
         self.assertEqual(sorted(priorities, reverse=True), received)
@@ -458,7 +454,7 @@ class ReplicationTests(HaTest):
         s = session.sender("priority-queue; {create:always, 
node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy))
         messages = [Message(content=str(uuid4()), priority = p) for p in 
priorities]
         for m in messages: s.send(m)
-        self.wait_backup(backup, s.target)
+        backup.wait_backup(s.target)
         r = backup.connect_admin().session().receiver("priority-queue")
         received = [r.fetch().content for i in priorities]
         sort = sorted(messages, key=lambda m: priority_level(m.priority, 
levels), reverse=True)
@@ -478,8 +474,8 @@ class ReplicationTests(HaTest):
         # correct result, the uncommented one is for the actualy buggy
         # result.  See https://issues.apache.org/jira/browse/QPID-3866
         #
-        # self.assert_browse_backup(backup, "q", 
sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
-        self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda 
m: m.priority)
+        # backup.assert_browse_backup("q", 
sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
+        backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: 
m.priority)
 
     def test_backup_acquired(self):
         """Verify that acquired messages are backed up, for all queue types."""
@@ -497,11 +493,10 @@ class ReplicationTests(HaTest):
                 s.receiver(self.address).fetch()
 
             def wait(self, brokertest, backup):
-                brokertest.wait_backup(backup, self.queue)
+                backup.wait_backup(self.queue)
 
             def verify(self, brokertest, backup):
-                brokertest.assert_browse_backup(
-                    backup, self.queue, self.expect, msg=self.queue)
+                backup.assert_browse_backup(self.queue, self.expect, 
msg=self.queue)
 
         tests = [
             Test("plain",[],range(10)),
@@ -546,7 +541,7 @@ class ReplicationTests(HaTest):
         """Verify that a queue with an invalid qpid.replicate gets default 
treatment"""
         cluster = HaCluster(self, 2, ha_replicate="all")
         c = cluster[0].connect().session().sender("q;{create:always, 
node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
-        self.wait_backup(cluster[1], "q")
+        cluster[1].wait_backup("q")
 
     def test_exclusive_queue(self):
         """Ensure that we can back-up exclusive queues, i.e. the replicating
@@ -559,7 +554,7 @@ class ReplicationTests(HaTest):
             try: c.session().receiver(addr); self.fail("Expected exclusive 
exception")
             except ReceiverError: pass
             s = c.session().sender(q).send(q)
-            self.assert_browse_backup(cluster[1], q, [q])
+            cluster[1].assert_browse_backup(q, [q])
         test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
         test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to