Author: gsim
Date: Tue May 13 13:03:08 2014
New Revision: 1594220

URL: http://svn.apache.org/r1594220
Log:
QPID-5758: Move purging of expired messages from timer thread to worker thread

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1594220&r1=1594219&r2=1594220&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue May 13 13:03:08 2014
@@ -234,7 +234,7 @@ Broker::Broker(const Broker::Options& co
             conf.replayFlushLimit*1024, // convert kb to bytes.
             conf.replayHardLimit*1024),
         *this),
-    queueCleaner(queues, timer.get()),
+    queueCleaner(queues, poller, timer.get()),
     recoveryInProgress(false),
     expiryPolicy(new ExpiryPolicy),
     getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp?rev=1594220&r1=1594219&r2=1594220&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp Tue May 13 13:03:08 
2014
@@ -48,10 +48,15 @@ namespace {
         fireFunction();
     }
 }
-QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), 
timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, boost::shared_ptr<sys::Poller> p, 
sys::Timer* t)
+    : queues(q), timer(t), purging(boost::bind(&QueueCleaner::purge, this, 
_1), p)
+{
+    purging.start();
+}
 
 QueueCleaner::~QueueCleaner()
 {
+    purging.stop();
     if (task) task->cancel();
 }
 
@@ -66,28 +71,19 @@ void QueueCleaner::setTimer(qpid::sys::T
     this->timer = timer;
 }
 
-namespace {
-struct CollectQueues
+void QueueCleaner::fired()
 {
-    std::vector<Queue::shared_ptr>* queues;
-    CollectQueues(std::vector<Queue::shared_ptr>* q) : queues(q) {}
-    void operator()(Queue::shared_ptr q)
-    {
-        queues->push_back(q);
-    }
-};
+    queues.eachQueue(boost::bind(&PurgeSet::push, &purging, _1));
 }
 
-void QueueCleaner::fired()
+QueueCleaner::QueuePtrs::const_iterator QueueCleaner::purge(const 
QueueCleaner::QueuePtrs& batch)
 {
-    //collect copy of list of queues to avoid holding registry lock while we 
perform purge
-    std::vector<Queue::shared_ptr> copy;
-    CollectQueues collect(&copy);
-    queues.eachQueue(collect);
-    std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, 
_1, period));
-    task->setupNextFire();
+    for (QueuePtrs::const_iterator i = batch.begin(); i != batch.end(); ++i) {
+        (*i)->purgeExpired(period);
+    }
+    task->restart();
     timer->add(task);
+    return batch.end();
 }
 
-
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h?rev=1594220&r1=1594219&r2=1594220&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h Tue May 13 13:03:08 2014
@@ -23,9 +23,11 @@
  */
 
 #include "qpid/broker/BrokerImportExport.h"
+#include "qpid/sys/PollableQueue.h"
 #include "qpid/sys/Time.h"
 
 #include <boost/intrusive_ptr.hpp>
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
 
@@ -36,6 +38,7 @@ namespace sys {
 
 namespace broker {
 
+class Queue;
 class QueueRegistry;
 /**
  * TimerTask to purge expired messages from queues
@@ -43,18 +46,24 @@ class QueueRegistry;
 class QueueCleaner
 {
   public:
-    QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer);
+    QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, 
boost::shared_ptr<sys::Poller>, sys::Timer* timer);
     QPID_BROKER_EXTERN ~QueueCleaner();
     QPID_BROKER_EXTERN void start(sys::Duration period);
     QPID_BROKER_EXTERN void setTimer(sys::Timer* timer);
 
   private:
+    typedef boost::shared_ptr<Queue> QueuePtr;
+    typedef std::deque< QueuePtr > QueuePtrs;
+    typedef qpid::sys::PollableQueue< QueuePtr > PurgeSet;
     boost::intrusive_ptr<sys::TimerTask> task;
     QueueRegistry& queues;
     sys::Timer* timer;
     sys::Duration period;
+    PurgeSet purging;
 
     void fired();
+    QueuePtrs::const_iterator purge(const QueuePtrs&);
+
 };
 }} // namespace qpid::broker
 

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1594220&r1=1594219&r2=1594220&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue May 13 13:03:08 2014
@@ -40,6 +40,7 @@
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/broker/QueueSettings.h"
+#include "qpid/sys/Thread.h"
 #include "qpid/sys/Timer.h"
 
 #include <iostream>
@@ -202,18 +203,22 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
 }
 
 QPID_AUTO_TEST_CASE(testQueueCleaner) {
+    boost::shared_ptr<Poller> poller(new Poller);
+    Thread runner(poller.get());
     Timer timer;
     QueueRegistry queues;
     Queue::shared_ptr queue = queues.declare("my-queue", 
QueueSettings()).first;
     addMessagesToQueue(10, *queue, 200, 400);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
 
-    QueueCleaner cleaner(queues, &timer);
+    QueueCleaner cleaner(queues, poller, &timer);
     cleaner.start(100 * qpid::sys::TIME_MSEC);
     ::usleep(300*1000);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
     ::usleep(300*1000);
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
+    poller->shutdown();
+    runner.join();
 }
 namespace {
 int getIntProperty(const Message& message, const std::string& key)



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to