Author: aconway
Date: Wed Feb 22 18:49:55 2012
New Revision: 1292444

URL: http://svn.apache.org/viewvc?rev=1292444&view=rev
Log:
QPID-3603: Test HA replication of LVQ, priority and ring queues.

Also fix one bug causing problems with LVQ replication.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1292444&r1=1292443&r2=1292444&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Wed Feb 22 18:49:55 2012
@@ -155,9 +155,6 @@ void QueueReplicator::route(Deliverable&
         QPID_LOG(trace, logPrefix << "Position moved from " << 
queue->getPosition()
                  << " to " << position);
         assert(queue->getPosition() <= position);
-         //TODO aconway 2011-12-14: Optimize this?
-        for (SequenceNumber i = queue->getPosition(); i < position; ++i)
-            dequeue(i,l);
         queue->setPosition(position);
     } else {
         msg.deliverTo(queue);

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1292444&r1=1292443&r2=1292444&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Wed Feb 22 18:49:55 2012
@@ -18,8 +18,9 @@
 # under the License.
 #
 
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, 
shutil
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, 
shutil, math
 from qpid.messaging import Message, NotFound, ConnectionError, Connection
+from qpid.datatypes import uuid4
 from brokertest import *
 from threading import Thread, Lock, Condition
 from logging import getLogger, WARN, ERROR, DEBUG
@@ -48,7 +49,6 @@ class HaBroker(Broker):
         assert os.system(
             "qpid-ha-tool --broker-addresses=%s %s"%(url, self.host_port())) 
== 0
 
-
 class ShortTests(BrokerTest):
     """Short HA functionality tests."""
 
@@ -109,7 +109,7 @@ class ShortTests(BrokerTest):
             s3 = p.sender(exchange(prefix+"e4", "messages", prefix+"q4"))
             s3.send(Message("7"))
             # Use old connection to unbind
-            us = primary.connect_old().session(str(qpid.datatypes.uuid4()))
+            us = primary.connect_old().session(str(uuid4()))
             us.exchange_unbind(exchange=prefix+"e4", binding_key="", 
queue=prefix+"q4")
             p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped
             # Need a marker so we can wait till sync is done.
@@ -298,6 +298,124 @@ class ShortTests(BrokerTest):
         self.assert_browse_backup(brokers[1], "q", ["a","b"])
         for b in brokers[1:]: b.kill()
 
+    def test_lvq(self):
+        """Verify that we replicate to an LVQ correctly"""
+        primary  = HaBroker(self, name="primary")
+        primary.promote()
+        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+        s = primary.connect().session().sender("lvq; {create:always, 
node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key, 
'qpid.replicate':messages}}}}")
+        def send(key,value): 
s.send(Message(content=value,properties={"lvq-key":key}))
+        for kv in 
[("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
+            send(*kv)
+        self.assert_browse_backup(backup, "lvq", ["b-1", "a-3", "c-2"])
+        send("b","b-2")
+        self.assert_browse_backup(backup, "lvq", ["a-3", "c-2", "b-2"])
+        send("c","c-3")
+        self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3"])
+        send("d","d-1")
+        self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3", "d-1"])
+
+    def test_ring(self):
+        primary  = HaBroker(self, name="primary")
+        primary.promote()
+        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+        s = primary.connect().session().sender("q; {create:always, 
node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 
'qpid.replicate':messages}}}}")
+        for i in range(10): s.send(Message(str(i)))
+        self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)])
+
+    def test_reject(self):
+        primary  = HaBroker(self, name="primary")
+        primary.promote()
+        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+        s = primary.connect().session().sender("q; {create:always, 
node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5, 
'qpid.replicate':messages}}}}")
+        try:
+            for i in range(10): s.send(Message(str(i)), sync=False)
+        except qpid.messaging.exceptions.TargetCapacityExceeded: pass
+        self.assert_browse_backup(backup, "q", [str(i) for i in range(0,5)])
+
+    def test_priority(self):
+        """Verify priority queues replicate correctly"""
+        primary  = HaBroker(self, name="primary")
+        primary.promote()
+        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+        session = primary.connect().session()
+        s = session.sender("priority-queue; {create:always, 
node:{x-declare:{arguments:{'qpid.priorities':10, 
'qpid.replicate':messages}}}}")
+        priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
+        for p in priorities: s.send(Message(priority=p))
+        self.wait_backup(backup, "priority-queue")
+        r = self.connect_admin(backup).session().receiver("priority-queue")
+        received = [r.fetch().priority for i in priorities]
+        self.assertEqual(sorted(priorities, reverse=True), received)
+
+    def test_priority_fairshare(self):
+        """Verify priority queues replicate correctly"""
+        primary  = HaBroker(self, name="primary")
+        primary.promote()
+        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+        session = primary.connect().session()
+        levels = 8
+        priorities = 
[4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3]
+        limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}
+        limit_policy = ",".join(["'qpid.fairshare':5"] + 
["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()])
+        s = session.sender("priority-queue; {create:always, 
node:{x-declare:{arguments:{'qpid.priorities':%s, %s, 
'qpid.replicate':messages}}}}"%(levels,limit_policy))
+        messages = [Message(content=str(uuid4()), priority = p) for p in 
priorities]
+        for m in messages: s.send(m)
+        self.wait_backup(backup, s.target)
+        r = self.connect_admin(backup).session().receiver("priority-queue")
+        received = [r.fetch().content for i in priorities]
+        sort = sorted(messages, key=lambda m: priority_level(m.priority, 
levels), reverse=True)
+        fair = [m.content for m in fairshare(sort, lambda l: limits.get(l,0), 
levels)]
+        self.assertEqual(received, fair)
+
+    def test_priority_ring(self):
+        primary  = HaBroker(self, name="primary")
+        primary.promote()
+        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+        s = primary.connect().session().sender("q; {create:always, 
node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 
'qpid.priorities':10, 'qpid.replicate':messages}}}}")
+        priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
+        for p in priorities: s.send(Message(priority=p))
+        # FIXME aconway 2012-02-22: there is a bug in priority ring queues 
that allows a low
+        # priority message to displace a high one. The following commented-out 
assert_browse
+        # is for the correct result, the uncommented one is for the actualy 
buggy result.
+        # See https://issues.apache.org/jira/browse/QPID-3866
+        #
+        # self.assert_browse_backup(backup, "q", 
sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
+        self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda 
m: m.priority)
+
+def fairshare(msgs, limit, levels):
+    """
+    Generator to return prioritised messages in expected order for a given 
fairshare limit
+    """
+    count = 0
+    last_priority = None
+    postponed = []
+    while msgs or postponed:
+        if not msgs:
+            msgs = postponed
+            count = 0
+            last_priority = None
+            postponed = []
+        msg = msgs.pop(0)
+        if last_priority and priority_level(msg.priority, levels) == 
last_priority:
+            count += 1
+        else:
+            last_priority = priority_level(msg.priority, levels)
+            count = 1
+        l = limit(last_priority)
+        if (l and count > l):
+            postponed.append(msg)
+        else:
+            yield msg
+    return
+
+def priority_level(value, levels):
+    """
+    Method to determine which of a distinct number of priority levels
+    a given value falls into.
+    """
+    offset = 5-math.ceil(levels/2.0)
+    return min(max(value - offset, 0), levels-1)
+
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
 
@@ -311,7 +429,7 @@ class LongTests(BrokerTest):
         """Test failover with continuous send-receive"""
         # FIXME aconway 2012-02-03: fails due to dropped messages,
         # known issue: sending messages to new primary before
-        # backups are ready.
+        # backups are ready. Enable when fixed.
 
         # Start a cluster, all members will be killed during the test.
         brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to