On 03/11/2011 12:37 AM, [email protected] wrote:
Author: cctrieloff
Date: Fri Mar 11 00:37:04 2011
New Revision: 1080411

URL: http://svn.apache.org/viewvc?rev=1080411&view=rev
Log:
QPID-3138 Topic exchange perf improvement

Modified:
     qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
     qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h

This commit causes a test failure:

0%   10   20   30   40   50   60   70   80   90   100%
|----|----|----|----|----|----|----|----|----|----|
Running 318 test cases...
*********************../../../src/tests/ExchangeTest.cpp(282): error in 
"testIVEOption": check 1u == queue3->getMessageCount() failed [1 != 0]
******************************

*** 1 failure detected in test suite "Master Test Suite"
FAIL: unit_test

I believe it is also unsafe. In the route method...

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=1080411&r1=1080410&r2=1080411&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Fri Mar 11 00:37:04 
2011
@@ -221,6 +221,7 @@ TopicExchange::TopicExchange(const std::

  bool TopicExchange::bind(Queue::shared_ptr queue, const string&  routingKey, 
const FieldTable* args)
  {
+       ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function 
exit.
      string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
      string fedTags(args ? args->getAsString(qpidFedTags) : "");
      string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
@@ -288,6 +289,7 @@ bool TopicExchange::bind(Queue::shared_p
  }

  bool TopicExchange::unbind(Queue::shared_ptr queue, const string&  
constRoutingKey, const FieldTable* /*args*/){
+       ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function 
exit.
      RWlock::ScopedWlock l(lock);
      string routingKey = normalize(constRoutingKey);
      BindingKey* bk = bindingTree.getBindingKey(routingKey);
@@ -333,13 +335,24 @@ bool TopicExchange::isBound(Queue::share
  void TopicExchange::route(Deliverable&  msg, const string&  routingKey, const 
FieldTable* /*args*/)
  {
      // Note: PERFORMANCE CRITICAL!!!
-    BindingList b(new 
std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding>  >);
+    BindingList b;
+       std::map<std::string, BindingList>::iterator it;
+       {  // only lock the cache for read
+       RWlock::ScopedRlock cl(cacheLock);
+          it = bindingCache.find(routingKey);
+       }
      PreRoute pr(msg, this);
-    BindingsFinderIter bindingsFinder(b);
+    if (it == bindingCache.end())  // no cache hit

..the iterator may be invalidated by a subsequent update to the map once the lock is released. Testing against bindingCache.end() outside the lock is also not safe.

      {
          RWlock::ScopedRlock l(lock);
+       b = BindingList(new 
std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding>  >);
+        BindingsFinderIter bindingsFinder(b);
          bindingTree.iterateMatch(routingKey, bindingsFinder);
-    }
+           RWlock::ScopedWlock cwl(cacheLock);
+               bindingCache[routingKey] = b; // update cache
+    }else {
+        b = it->second;
+     }
      doRoute(msg, b);
  }


Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?rev=1080411&r1=1080410&r2=1080411&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Fri Mar 11 00:37:04 2011
@@ -135,7 +135,19 @@ class TopicExchange : public virtual Exc
      BindingNode bindingTree;
      unsigned long nBindings;
      qpid::sys::RWlock lock;     // protects bindingTree and nBindings
-
+    qpid::sys::RWlock cacheLock;     // protects cache
+       std::map<std::string, BindingList>  bindingCache; // cache of matched 
routes.
+       class ClearCache {
+       private:
+               qpid::sys::RWlock* cacheLock;
+               std::map<std::string, BindingList>* bindingCache;
+       public:
+               ClearCache(qpid::sys::RWlock* l, std::map<std::string, 
BindingList>* bc): cacheLock(l),bindingCache(bc) {};
+               ~ClearCache(){
+                       qpid::sys::RWlock::ScopedWlock l(*cacheLock);
+                       bindingCache->clear();
+               };
+       };
      bool isBound(Queue::shared_ptr queue, const std::string&  pattern);

      class ReOriginIter;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]




---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to