On 03/11/2011 12:37 AM, [email protected] wrote:
Author: cctrieloff
Date: Fri Mar 11 00:37:04 2011
New Revision: 1080411
URL: http://svn.apache.org/viewvc?rev=1080411&view=rev
Log:
QPID-3138 Topic exchange perf improvement
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
This commit causes a test failure:
0% 10 20 30 40 50 60 70 80 90 100%
|----|----|----|----|----|----|----|----|----|----|
Running 318 test cases...
*********************../../../src/tests/ExchangeTest.cpp(282): error in
"testIVEOption": check 1u == queue3->getMessageCount() failed [1 != 0]
******************************
*** 1 failure detected in test suite "Master Test Suite"
FAIL: unit_test
I believe it is also unsafe. In the route method...
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=1080411&r1=1080410&r2=1080411&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Fri Mar 11 00:37:04
2011
@@ -221,6 +221,7 @@ TopicExchange::TopicExchange(const std::
bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey,
const FieldTable* args)
{
+ ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function
exit.
string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
string fedTags(args ? args->getAsString(qpidFedTags) : "");
string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
@@ -288,6 +289,7 @@ bool TopicExchange::bind(Queue::shared_p
}
bool TopicExchange::unbind(Queue::shared_ptr queue, const string&
constRoutingKey, const FieldTable* /*args*/){
+ ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function
exit.
RWlock::ScopedWlock l(lock);
string routingKey = normalize(constRoutingKey);
BindingKey* bk = bindingTree.getBindingKey(routingKey);
@@ -333,13 +335,24 @@ bool TopicExchange::isBound(Queue::share
void TopicExchange::route(Deliverable& msg, const string& routingKey, const
FieldTable* /*args*/)
{
// Note: PERFORMANCE CRITICAL!!!
- BindingList b(new
std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+ BindingList b;
+ std::map<std::string, BindingList>::iterator it;
+ { // only lock the cache for read
+ RWlock::ScopedRlock cl(cacheLock);
+ it = bindingCache.find(routingKey);
+ }
PreRoute pr(msg, this);
- BindingsFinderIter bindingsFinder(b);
+ if (it == bindingCache.end()) // no cache hit
..the iterator may be invalidated by a subsequent update to the map once
the lock is released. Testing against bindingCache.end() outside the
lock is also not safe.
{
RWlock::ScopedRlock l(lock);
+ b = BindingList(new
std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
+ BindingsFinderIter bindingsFinder(b);
bindingTree.iterateMatch(routingKey, bindingsFinder);
- }
+ RWlock::ScopedWlock cwl(cacheLock);
+ bindingCache[routingKey] = b; // update cache
+ }else {
+ b = it->second;
+ }
doRoute(msg, b);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?rev=1080411&r1=1080410&r2=1080411&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Fri Mar 11 00:37:04 2011
@@ -135,7 +135,19 @@ class TopicExchange : public virtual Exc
BindingNode bindingTree;
unsigned long nBindings;
qpid::sys::RWlock lock; // protects bindingTree and nBindings
-
+ qpid::sys::RWlock cacheLock; // protects cache
+ std::map<std::string, BindingList> bindingCache; // cache of matched
routes.
+ class ClearCache {
+ private:
+ qpid::sys::RWlock* cacheLock;
+ std::map<std::string, BindingList>* bindingCache;
+ public:
+ ClearCache(qpid::sys::RWlock* l, std::map<std::string,
BindingList>* bc): cacheLock(l),bindingCache(bc) {};
+ ~ClearCache(){
+ qpid::sys::RWlock::ScopedWlock l(*cacheLock);
+ bindingCache->clear();
+ };
+ };
bool isBound(Queue::shared_ptr queue, const std::string& pattern);
class ReOriginIter;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]