Author: aconway
Date: Thu Oct 11 20:38:34 2012
New Revision: 1397295
URL: http://svn.apache.org/viewvc?rev=1397295&view=rev
Log:
QPID-4349: Duplicate messages with alternate exchange.
This check-in is a test only, the issue was fixed as a side effect
of the fix for QPID-4350.
Modified:
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1397295&r1=1397294&r2=1397295&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Oct 11 20:38:34 2012
@@ -61,6 +61,11 @@ class ReplicationTests(BrokerTest):
us = primary.connect_old().session(str(uuid4()))
us.exchange_unbind(exchange=prefix+"e4", binding_key="key4",
queue=prefix+"q4")
p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped
+ # Test replication of deletes
+ p.sender(queue(prefix+"dq", "all"))
+ p.sender(exchange(prefix+"de", "all", prefix+"dq", ""))
+ p.sender(prefix+"dq;{delete:always}").close()
+ p.sender(prefix+"de;{delete:always}").close()
# Need a marker so we can wait till sync is done.
p.sender(queue(prefix+"x", "configuration"))
@@ -88,6 +93,10 @@ class ReplicationTests(BrokerTest):
b.sender(prefix+"e4/key4").send(Message("drop2")) # Verify unbind.
self.assert_browse_retry(b, prefix+"q4", ["6","7"])
+ # Verify deletes
+ assert not valid_address(b, prefix+"dq")
+ assert not valid_address(b, prefix+"de")
+
l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
try:
primary = HaBroker(self, name="primary")
@@ -736,6 +745,24 @@ acl deny all all
cluster[1].wait_queue("q") # Not timed out yet
cluster[1].wait_no_queue("q") # Wait for timeout
+ def test_alt_exchange_dup(self):
+ """QPID-4349: if a queue has an alterante exchange and is deleted the
+ messages appear twice on the alternate, they are rerouted once by the
+ primary and again by the backup."""
+ cluster = HaCluster(self,2)
+ cluster[0].wait_status("active")
+
+ # Set up q with alternate exchange altex bound to altq.
+ s = cluster[0].connect().session()
+
s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
+
s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
+ snd =
s.sender("q;{create:always,node:{x-declare:{alternate-exchange:'altex'}}}")
+ messages = [ str(n) for n in xrange(10) ]
+ for m in messages: snd.send(m)
+ cluster[1].assert_browse_backup("q", messages)
+ s.sender("q;{delete:always}").close()
+ cluster[1].assert_browse_backup("altq", messages)
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given
fairshare limit
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]