This patch is a small Exchange refactor around the route() method. It
was part of r813825 which was subsequently rolled back out. The current
solution to QPID-2102 does not require this part of that original patch,
but as this is hopefully a useful cleanup of some code duplication, I
submit it here for comment before checking it in.

Comments welcome.
Index: src/qpid/broker/Exchange.cpp
===================================================================
--- src/qpid/broker/Exchange.cpp	(revision 817722)
+++ src/qpid/broker/Exchange.cpp	(working copy)
@@ -76,6 +76,36 @@
     }
 }
 
+void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
+{
+    int count = 0;
+
+    if (b.get()) {
+        for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
+            msg.deliverTo((*i)->queue);
+            if ((*i)->mgmtBinding != 0)
+                (*i)->mgmtBinding->inc_msgMatched();
+        }
+    }
+
+    if (mgmtExchange != 0)
+    {
+        mgmtExchange->inc_msgReceives  ();
+        mgmtExchange->inc_byteReceives (msg.contentSize ());
+        if (count == 0)
+        {
+            //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found");
+            mgmtExchange->inc_msgDrops  ();
+            mgmtExchange->inc_byteDrops (msg.contentSize ());
+        }
+        else
+        {
+            mgmtExchange->inc_msgRoutes  (count);
+            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+        }
+    }
+}
+
 void Exchange::routeIVE(){
     if (ive && lastMsg.get()){
         DeliverableMessage dmsg(lastMsg);
Index: src/qpid/broker/Exchange.h
===================================================================
--- src/qpid/broker/Exchange.h	(revision 817722)
+++ src/qpid/broker/Exchange.h	(working copy)
@@ -79,6 +79,9 @@
         Exchange* parent;
     };
            
+    typedef boost::shared_ptr<const std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > ConstBindingList;
+    typedef boost::shared_ptr<      std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList;
+    void doRoute(Deliverable& msg, ConstBindingList b);
     void routeIVE();
            
 
Index: src/qpid/broker/DirectExchange.cpp
===================================================================
--- src/qpid/broker/DirectExchange.cpp	(revision 817722)
+++ src/qpid/broker/DirectExchange.cpp	(working copy)
@@ -145,39 +145,12 @@
 void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
 {
     PreRoute pr(msg, this);
-    Queues::ConstPtr p;
+    ConstBindingList b;
     {
         Mutex::ScopedLock l(lock);
-        p = bindings[routingKey].queues.snapshot();
+        b = bindings[routingKey].queues.snapshot();
     }
-    int count(0);
-
-    if (p) {
-        for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
-            msg.deliverTo((*i)->queue);
-            if ((*i)->mgmtBinding != 0)
-                (*i)->mgmtBinding->inc_msgMatched();
-        }
-    }
-
-    if(!count){
-        QPID_LOG(info, "DirectExchange " << getName() << " could not route message with key " << routingKey
-                 << "; no matching binding found");
-        if (mgmtExchange != 0) {
-            mgmtExchange->inc_msgDrops();
-            mgmtExchange->inc_byteDrops(msg.contentSize());
-        }
-    } else {
-        if (mgmtExchange != 0) {
-            mgmtExchange->inc_msgRoutes(count);
-            mgmtExchange->inc_byteRoutes(count * msg.contentSize());
-        }
-    }
-
-    if (mgmtExchange != 0) {
-        mgmtExchange->inc_msgReceives();
-        mgmtExchange->inc_byteReceives(msg.contentSize());
-    }
+    doRoute(msg, b);
 }
 
 
Index: src/qpid/broker/TopicExchange.cpp
===================================================================
--- src/qpid/broker/TopicExchange.cpp	(revision 817722)
+++ src/qpid/broker/TopicExchange.cpp	(working copy)
@@ -293,44 +293,23 @@
     return q != qv.end();
 }
 
-void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+{
     Binding::vector mb;
+    BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
     PreRoute pr(msg, this);
-    uint32_t count(0);
-
     {
-    RWlock::ScopedRlock l(lock);
-    for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
-        if (match(i->first, routingKey)) {
-            Binding::vector& qv(i->second.bindingVector);
-            for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
-                mb.push_back(*j);
+        RWlock::ScopedRlock l(lock);
+        for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+            if (match(i->first, routingKey)) {
+                Binding::vector& qv(i->second.bindingVector);
+                for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){
+                    b->push_back(*j);
+                }
             }
         }
     }
-    }
-    
-    for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) {
-        msg.deliverTo((*j)->queue);
-        if ((*j)->mgmtBinding != 0)
-            (*j)->mgmtBinding->inc_msgMatched ();
-    }
-
-    if (mgmtExchange != 0)
-    {
-        mgmtExchange->inc_msgReceives  ();
-        mgmtExchange->inc_byteReceives (msg.contentSize ());
-        if (count == 0)
-        {
-            mgmtExchange->inc_msgDrops  ();
-            mgmtExchange->inc_byteDrops (msg.contentSize ());
-        }
-        else
-        {
-            mgmtExchange->inc_msgRoutes  (count);
-            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
-        }
-    }
+    doRoute(msg, b);
 }
 
 bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
Index: src/qpid/broker/HeadersExchange.cpp
===================================================================
--- src/qpid/broker/HeadersExchange.cpp	(revision 817722)
+++ src/qpid/broker/HeadersExchange.cpp	(working copy)
@@ -104,7 +104,8 @@
 }
 
 
-void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
+void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args)
+{
     if (!args) {
         //can't match if there were no headers passed in
         if (mgmtExchange != 0) {
@@ -118,31 +119,17 @@
 
     PreRoute pr(msg, this);
 
-    uint32_t count(0);
-
-    Bindings::ConstPtr p = bindings.snapshot();
-    if (p.get()){
+    ConstBindingList p = bindings.snapshot();
+    BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+    if (p.get())
+    {
         for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
             if (match((*i)->args, *args)) {
-                msg.deliverTo((*i)->queue);
-                count++;
-                if ((*i)->mgmtBinding != 0)
-                    (*i)->mgmtBinding->inc_msgMatched();
+                b->push_back(*i);
             }
         }
     }
-
-    if (mgmtExchange != 0) {
-        mgmtExchange->inc_msgReceives();
-        mgmtExchange->inc_byteReceives(msg.contentSize());
-        if (count == 0) {
-            mgmtExchange->inc_msgDrops();
-            mgmtExchange->inc_byteDrops(msg.contentSize());
-        } else {
-            mgmtExchange->inc_msgRoutes(count);
-            mgmtExchange->inc_byteRoutes(count * msg.contentSize());
-        }
-    }
+    doRoute(msg, b);
 }
 
 
Index: src/qpid/broker/FanOutExchange.cpp
===================================================================
--- src/qpid/broker/FanOutExchange.cpp	(revision 817722)
+++ src/qpid/broker/FanOutExchange.cpp	(working copy)
@@ -106,36 +106,12 @@
     return true;
 }
 
-void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
+void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/)
+{
     PreRoute pr(msg, this);
-    uint32_t count(0);
-
-    BindingsArray::ConstPtr p = bindings.snapshot();
-    if (p.get()){
-        for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){
-            msg.deliverTo((*i)->queue);
-            if ((*i)->mgmtBinding != 0)
-                (*i)->mgmtBinding->inc_msgMatched ();
-        }
-    }
-    
-    if (mgmtExchange != 0)
-    {
-        mgmtExchange->inc_msgReceives  ();
-        mgmtExchange->inc_byteReceives (msg.contentSize ());
-        if (count == 0)
-        {
-            mgmtExchange->inc_msgDrops  ();
-            mgmtExchange->inc_byteDrops (msg.contentSize ());
-        }
-        else
-        {
-            mgmtExchange->inc_msgRoutes  (count);
-            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
-        }
-    }
+    doRoute(msg, bindings.snapshot());
 }
-
+    
 bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
 {
     BindingsArray::ConstPtr ptr = bindings.snapshot();
Index: src/qpid/xml/XmlExchange.cpp
===================================================================
--- src/qpid/xml/XmlExchange.cpp	(revision 817722)
+++ src/qpid/xml/XmlExchange.cpp	(working copy)
@@ -206,45 +206,22 @@
     PreRoute pr(msg, this);
     try {
         XmlBinding::vector::ConstPtr p;
-       {
+        BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+        {
             RWlock::ScopedRlock l(lock);
-           p = bindingsMap[routingKey].snapshot();
-           if (!p) return;
-       }
-        int count(0);
+            p = bindingsMap[routingKey].snapshot();
+            if (!p.get()) return;
+        }
 
         for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) {
             if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) { 
-                msg.deliverTo((*i)->queue);
-                count++;
-                QPID_LOG(trace, "Delivered to queue" );
-
-                if ((*i)->mgmtBinding != 0)
-                    (*i)->mgmtBinding->inc_msgMatched ();
+                b->push_back(*i);
             }
-       }
-       if (!count) {
-           QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey);
-           if (mgmtExchange != 0) {
-               mgmtExchange->inc_msgDrops  ();
-               mgmtExchange->inc_byteDrops (msg.contentSize ());
-           }
-       } else {
-           if (mgmtExchange != 0) {
-               mgmtExchange->inc_msgRoutes  (count);
-               mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
-           }
-       }
-
-       if (mgmtExchange != 0) {
-           mgmtExchange->inc_msgReceives  ();
-           mgmtExchange->inc_byteReceives (msg.contentSize ());
-       }
+        }
+        doRoute(msg, b);
     } catch (...) {
         QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey);
     }
-      
-
 }
 
 

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to