This patch is a small Exchange refactor around the route() method. It
was part of r813825 which was subsequently rolled back out. The current
solution to QPID-2102 does not require this part of that original patch,
but as this is hopefully a useful cleanup of some code duplication, I
submit it here for comment before checking it in.
Comments welcome.
Index: src/qpid/broker/Exchange.cpp
===================================================================
--- src/qpid/broker/Exchange.cpp (revision 817722)
+++ src/qpid/broker/Exchange.cpp (working copy)
@@ -76,6 +76,36 @@
}
}
+void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
+{
+ int count = 0;
+
+ if (b.get()) {
+ for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) {
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched();
+ }
+ }
+
+ if (mgmtExchange != 0)
+ {
+ mgmtExchange->inc_msgReceives ();
+ mgmtExchange->inc_byteReceives (msg.contentSize ());
+ if (count == 0)
+ {
+ //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found");
+ mgmtExchange->inc_msgDrops ();
+ mgmtExchange->inc_byteDrops (msg.contentSize ());
+ }
+ else
+ {
+ mgmtExchange->inc_msgRoutes (count);
+ mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+ }
+ }
+}
+
void Exchange::routeIVE(){
if (ive && lastMsg.get()){
DeliverableMessage dmsg(lastMsg);
Index: src/qpid/broker/Exchange.h
===================================================================
--- src/qpid/broker/Exchange.h (revision 817722)
+++ src/qpid/broker/Exchange.h (working copy)
@@ -79,6 +79,9 @@
Exchange* parent;
};
+ typedef boost::shared_ptr<const std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > ConstBindingList;
+ typedef boost::shared_ptr< std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList;
+ void doRoute(Deliverable& msg, ConstBindingList b);
void routeIVE();
Index: src/qpid/broker/DirectExchange.cpp
===================================================================
--- src/qpid/broker/DirectExchange.cpp (revision 817722)
+++ src/qpid/broker/DirectExchange.cpp (working copy)
@@ -145,39 +145,12 @@
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
{
PreRoute pr(msg, this);
- Queues::ConstPtr p;
+ ConstBindingList b;
{
Mutex::ScopedLock l(lock);
- p = bindings[routingKey].queues.snapshot();
+ b = bindings[routingKey].queues.snapshot();
}
- int count(0);
-
- if (p) {
- for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++, count++) {
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
- }
- }
-
- if(!count){
- QPID_LOG(info, "DirectExchange " << getName() << " could not route message with key " << routingKey
- << "; no matching binding found");
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- }
- } else {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes(count);
- mgmtExchange->inc_byteRoutes(count * msg.contentSize());
- }
- }
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives();
- mgmtExchange->inc_byteReceives(msg.contentSize());
- }
+ doRoute(msg, b);
}
Index: src/qpid/broker/TopicExchange.cpp
===================================================================
--- src/qpid/broker/TopicExchange.cpp (revision 817722)
+++ src/qpid/broker/TopicExchange.cpp (working copy)
@@ -293,44 +293,23 @@
return q != qv.end();
}
-void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/)
+{
Binding::vector mb;
+ BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
PreRoute pr(msg, this);
- uint32_t count(0);
-
{
- RWlock::ScopedRlock l(lock);
- for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, routingKey)) {
- Binding::vector& qv(i->second.bindingVector);
- for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++, count++){
- mb.push_back(*j);
+ RWlock::ScopedRlock l(lock);
+ for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (match(i->first, routingKey)) {
+ Binding::vector& qv(i->second.bindingVector);
+ for(Binding::vector::iterator j = qv.begin(); j != qv.end(); j++){
+ b->push_back(*j);
+ }
}
}
}
- }
-
- for (Binding::vector::iterator j = mb.begin(); j != mb.end(); ++j) {
- msg.deliverTo((*j)->queue);
- if ((*j)->mgmtBinding != 0)
- (*j)->mgmtBinding->inc_msgMatched ();
- }
-
- if (mgmtExchange != 0)
- {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- if (count == 0)
- {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- else
- {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
- }
+ doRoute(msg, b);
}
bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const)
Index: src/qpid/broker/HeadersExchange.cpp
===================================================================
--- src/qpid/broker/HeadersExchange.cpp (revision 817722)
+++ src/qpid/broker/HeadersExchange.cpp (working copy)
@@ -104,7 +104,8 @@
}
-void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
+void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args)
+{
if (!args) {
//can't match if there were no headers passed in
if (mgmtExchange != 0) {
@@ -118,31 +119,17 @@
PreRoute pr(msg, this);
- uint32_t count(0);
-
- Bindings::ConstPtr p = bindings.snapshot();
- if (p.get()){
+ ConstBindingList p = bindings.snapshot();
+ BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+ if (p.get())
+ {
for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
if (match((*i)->args, *args)) {
- msg.deliverTo((*i)->queue);
- count++;
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched();
+ b->push_back(*i);
}
}
}
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives();
- mgmtExchange->inc_byteReceives(msg.contentSize());
- if (count == 0) {
- mgmtExchange->inc_msgDrops();
- mgmtExchange->inc_byteDrops(msg.contentSize());
- } else {
- mgmtExchange->inc_msgRoutes(count);
- mgmtExchange->inc_byteRoutes(count * msg.contentSize());
- }
- }
+ doRoute(msg, b);
}
Index: src/qpid/broker/FanOutExchange.cpp
===================================================================
--- src/qpid/broker/FanOutExchange.cpp (revision 817722)
+++ src/qpid/broker/FanOutExchange.cpp (working copy)
@@ -106,36 +106,12 @@
return true;
}
-void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
+void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/)
+{
PreRoute pr(msg, this);
- uint32_t count(0);
-
- BindingsArray::ConstPtr p = bindings.snapshot();
- if (p.get()){
- for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
- }
- }
-
- if (mgmtExchange != 0)
- {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- if (count == 0)
- {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- else
- {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
- }
+ doRoute(msg, bindings.snapshot());
}
-
+
bool FanOutExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const)
{
BindingsArray::ConstPtr ptr = bindings.snapshot();
Index: src/qpid/xml/XmlExchange.cpp
===================================================================
--- src/qpid/xml/XmlExchange.cpp (revision 817722)
+++ src/qpid/xml/XmlExchange.cpp (working copy)
@@ -206,45 +206,22 @@
PreRoute pr(msg, this);
try {
XmlBinding::vector::ConstPtr p;
- {
+ BindingList b(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+ {
RWlock::ScopedRlock l(lock);
- p = bindingsMap[routingKey].snapshot();
- if (!p) return;
- }
- int count(0);
+ p = bindingsMap[routingKey].snapshot();
+ if (!p.get()) return;
+ }
for (std::vector<XmlBinding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); i++) {
if (matches((*i)->xquery, msg, args, (*i)->parse_message_content)) {
- msg.deliverTo((*i)->queue);
- count++;
- QPID_LOG(trace, "Delivered to queue" );
-
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
+ b->push_back(*i);
}
- }
- if (!count) {
- QPID_LOG(warning, "XMLExchange " << getName() << ": could not route message with query " << routingKey);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgDrops ();
- mgmtExchange->inc_byteDrops (msg.contentSize ());
- }
- } else {
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes (count);
- mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
- }
- }
-
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgReceives ();
- mgmtExchange->inc_byteReceives (msg.contentSize ());
- }
+ }
+ doRoute(msg, b);
} catch (...) {
QPID_LOG(warning, "XMLExchange " << getName() << ": exception routing message with query " << routingKey);
}
-
-
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]