Author: aconway Date: Fri Apr 15 17:03:17 2011 New Revision: 1092765 URL: http://svn.apache.org/viewvc?rev=1092765&view=rev Log: QPID-3208: Exchanges make best effort to route messages if there is an error.
Previously if multiple queues were bound to the same routing key, then a failure to deliver to one of the queues (e.g. policy limit error) could prevent delivery on some of the other queues. With this commit the exchange delivers to every queue that did not have an error before raising an error. Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp qpid/trunk/qpid/cpp/src/tests/brokertest.py qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1092765&r1=1092764&r2=1092765&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Fri Apr 15 17:03:17 2011 @@ -29,6 +29,7 @@ #include "qpid/framing/MessageProperties.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/DeliverableMessage.h" +#include "stdexcept" using namespace qpid::broker; using namespace qpid::framing; @@ -70,6 +71,44 @@ Exchange::PreRoute::~PreRoute(){ } } +namespace { +/** Store information about an exception to be thrown later. + * If multiple exceptions are stored, save the first of the "most severe" + * exceptions, SESSION is les sever than CONNECTION etc. + */ +class ExInfo { + public: + enum Type { NONE, SESSION, CONNECTION, OTHER }; + + ExInfo(string ex) : type(NONE), code(0), exchange(ex) {} + + void store(Type type_, const char* what_, int code_, const boost::shared_ptr<Queue>& queue) { + QPID_LOG(error, "Exchange " << exchange << " cannot deliver to queue " + << queue->getName() << ": " << what_); + if (type < type_) { // Replace less severe error + type = type_; + what = what_; + code = code_; + } + } + + void raise() { + switch (type) { + case NONE: break; + case SESSION: throw qpid::SessionException(qpid::framing::execution::ErrorCode(code), what); + case CONNECTION: throw qpid::ConnectionException(qpid::framing::connection::CloseCode(code), what); + case OTHER: throw std::runtime_error(what); + } + } + + private: + Type type; + string what; + int code; + string exchange; +}; +} + void Exchange::doRoute(Deliverable& msg, ConstBindingList b) { int count = 0; @@ -80,11 +119,25 @@ void Exchange::doRoute(Deliverable& msg, msg.getMessage().blockContentRelease(); } + + ExInfo error(getName()); // Save errors to throw at the end. for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) { - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); + try { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); + } + catch (const SessionException& e) { + error.store(ExInfo::SESSION, e.what(), int(e.code), (*i)->queue); + } + catch (const ConnectionException& e) { + error.store(ExInfo::CONNECTION, e.what(), int(e.code), (*i)->queue); + } + catch (const std::exception& e) { + error.store(ExInfo::OTHER, e.what(), 0, (*i)->queue); + } } + error.raise(); } if (mgmtExchange != 0) Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1092765&r1=1092764&r2=1092765&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original) +++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Fri Apr 15 17:03:17 2011 @@ -484,18 +484,24 @@ class BrokerTest(TestCase): cluster = Cluster(self, count, args, expect=expect, wait=wait) return cluster - def assert_browse(self, session, queue, expect_contents, timeout=0): + def browse(self, session, queue, timeout=0): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" - r = session.receiver("%s;{mode:browse}"%(queue)) - actual_contents = [] try: - for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content) - while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages. - except messaging.Empty: pass - r.close() + contents = [] + try: + while True: contents.append(r.fetch(timeout=timeout).content) + except messaging.Empty: pass + finally: pass #FIXME aconway 2011-04-14: r.close() + return contents + + def assert_browse(self, session, queue, expect_contents, timeout=0): + """Assert that the contents of messages on queue (as retrieved + using session and timeout) exactly match the strings in + expect_contents""" + actual_contents = self.browse(session, queue, timeout) self.assertEqual(expect_contents, actual_contents) def join(thread, timeout=10): Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1092765&r1=1092764&r2=1092765&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original) +++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Fri Apr 15 17:03:17 2011 @@ -449,6 +449,36 @@ acl allow all all cluster.start() verify(cluster[1]) + def test_binding_order(self): + """Regression test for binding order inconsistency in cluster""" + cluster = self.cluster(1) + c0 = cluster[0].connect() + s0 = c0.session() + # Declare multiple queues bound to same key on amq.topic + def declare(q,max=0): + if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d}}'%max + else: declare = 'x-declare:{}' + bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q) + s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind)) + declare('d',max=4) # Only one with a limit + for q in ['c', 'b','a']: declare(q) + # Add a cluster member, send enough messages to exceed the max count + cluster.start() + try: + s = s0.sender('amq.topic/key') + for m in xrange(1,6): s.send(Message(str(m))) + self.fail("Expected capacity exceeded exception") + except messaging.exceptions.TargetCapacityExceeded: pass + c1 = cluster[1].connect() + s1 = c1.session() + s0 = c0.session() # Old session s0 is broken by exception. + # Verify queue contents are consistent. + for q in ['a','b','c','d']: + self.assertEqual(self.browse(s0, q), self.browse(s1, q)) + # Verify queue contents are "best effort" + for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)]) + self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)]) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org