Author: tross
Date: Thu Feb 2 17:14:51 2012
New Revision: 1239728
URL: http://svn.apache.org/viewvc?rev=1239728&view=rev
Log:
QPID-3481 - After queue deletion, route re-queued messages to the alternate
exchange.
Added:
qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1239728&r1=1239727&r2=1239728&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Feb 2 17:14:51 2012
@@ -213,15 +213,26 @@ void Queue::requeue(const QueuedMessage&
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
- messages->reinsert(msg);
- listeners.populate(copy);
+ if (deleted) {
+ //
+ // If the queue has been deleted, requeued messages must be sent
to the alternate exchange
+ // if one is configured.
+ //
+ if (alternateExchange.get()) {
+ DeliverableMessage dmsg(msg.payload);
+ alternateExchange->routeWithAlternate(dmsg);
+ }
+ } else {
+ messages->reinsert(msg);
+ listeners.populate(copy);
- // for persistLastNode - don't force a message twice to disk, but
force it if no force before
- if(inLastNodeFailure && persistLastNode &&
!msg.payload->isStoredOnQueue(shared_from_this())) {
- msg.payload->forcePersistent();
- if (msg.payload->isForcedPersistent() ){
- boost::intrusive_ptr<Message> payload = msg.payload;
- enqueue(0, payload);
+ // for persistLastNode - don't force a message twice to disk, but
force it if no force before
+ if(inLastNodeFailure && persistLastNode &&
!msg.payload->isStoredOnQueue(shared_from_this())) {
+ msg.payload->forcePersistent();
+ if (msg.payload->isForcedPersistent() ){
+ boost::intrusive_ptr<Message> payload = msg.payload;
+ enqueue(0, payload);
+ }
}
}
observeRequeue(msg, locker);
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py?rev=1239728&r1=1239727&r2=1239728&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py Thu Feb 2
17:14:51 2012
@@ -34,3 +34,4 @@ from priority import *
from threshold import *
from extensions import *
from msg_groups import *
+from new_api import *
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py?rev=1239728&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py Thu Feb 2
17:14:51 2012
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+import qmf.console
+from time import sleep
+
+#
+# Tests the Broker's support for message groups
+#
+
+class GeneralTests(Base):
+ """
+ Tests of the API and broker via the new API.
+ """
+
+ def assertEqual(self, left, right, text=None):
+ if not left == right:
+ print "assertEqual failure: %r != %r" % (left, right)
+ if text:
+ print " %r" % text
+ assert None
+
+ def fail(self, text=None):
+ if text:
+ print "Fail: %r" % text
+ assert None
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def test_qpid_3481_acquired_to_alt_exchange(self):
+ """
+ Verify that acquired messages are routed to the alternate when the
queue is deleted.
+ """
+ sess1 = self.setup_session()
+ sess2 = self.setup_session()
+
+ tx = sess1.sender("amq.direct/key")
+ rx_main =
sess1.receiver("amq.direct/key;{link:{x-declare:{alternate-exchange:'amq.fanout'}}}")
+ rx_alt = sess2.receiver("amq.fanout")
+ rx_alt.capacity = 10
+
+ tx.send("DATA")
+ tx.send("DATA")
+ tx.send("DATA")
+ tx.send("DATA")
+ tx.send("DATA")
+
+ msg = rx_main.fetch()
+ msg = rx_main.fetch()
+ msg = rx_main.fetch()
+
+ self.assertEqual(rx_alt.available(), 0, "No messages should have been
routed to the alt_exchange")
+
+ sess1.close()
+
+ self.assertEqual(rx_alt.available(), 5, "All 5 messages should have
been routed to the alt_exchange")
+
+ sess2.close()
+
+ def test_qpid_3481_acquired_to_alt_exchange_2_consumers(self):
+ """
+ Verify that acquired messages are routed to the alternate when the
queue is deleted.
+ """
+ sess1 = self.setup_session()
+ sess2 = self.setup_session()
+ sess3 = self.setup_session()
+ sess4 = self.setup_session()
+
+ tx =
sess1.sender("test_acquired;{create:always,delete:always,node:{x-declare:{alternate-exchange:'amq.fanout'}}}")
+ rx_main1 = sess2.receiver("test_acquired")
+ rx_main2 = sess3.receiver("test_acquired")
+ rx_alt = sess4.receiver("amq.fanout")
+ rx_alt.capacity = 10
+
+ tx.send("DATA")
+ tx.send("DATA")
+ tx.send("DATA")
+ tx.send("DATA")
+ tx.send("DATA")
+
+ msg = rx_main1.fetch()
+ msg = rx_main1.fetch()
+ msg = rx_main1.fetch()
+
+ self.assertEqual(rx_alt.available(), 0, "No messages should have been
routed to the alt_exchange")
+
+ # Close sess1; This will cause the queue to be deleted
+ sess1.close()
+ self.assertEqual(rx_alt.available(), 2, "2 of the messages should have
been routed to the alt_exchange")
+
+ # Close sess2; This will cause the acquired messages to be requeued
and routed to the alternate
+ sess2.close()
+ for i in range(5):
+ try:
+ m = rx_alt.fetch(0)
+ except:
+ self.fail("failed to receive all 5 messages via alternate
exchange")
+
+ sess3.close()
+ self.assertEqual(rx_alt.available(), 0, "No further messages should be
received via the alternate exchange")
+
+ sess4.close()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]