Author: gsim
Date: Tue May 13 13:03:08 2014
New Revision: 1594220
URL: http://svn.apache.org/r1594220
Log:
QPID-5758: Move purging of expired messages from timer thread to worker thread
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h
qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1594220&r1=1594219&r2=1594220&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue May 13 13:03:08 2014
@@ -234,7 +234,7 @@ Broker::Broker(const Broker::Options& co
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
- queueCleaner(queues, timer.get()),
+ queueCleaner(queues, poller, timer.get()),
recoveryInProgress(false),
expiryPolicy(new ExpiryPolicy),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp?rev=1594220&r1=1594219&r2=1594220&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.cpp Tue May 13 13:03:08
2014
@@ -48,10 +48,15 @@ namespace {
fireFunction();
}
}
-QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q),
timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, boost::shared_ptr<sys::Poller> p,
sys::Timer* t)
+ : queues(q), timer(t), purging(boost::bind(&QueueCleaner::purge, this,
_1), p)
+{
+ purging.start();
+}
QueueCleaner::~QueueCleaner()
{
+ purging.stop();
if (task) task->cancel();
}
@@ -66,28 +71,19 @@ void QueueCleaner::setTimer(qpid::sys::T
this->timer = timer;
}
-namespace {
-struct CollectQueues
+void QueueCleaner::fired()
{
- std::vector<Queue::shared_ptr>* queues;
- CollectQueues(std::vector<Queue::shared_ptr>* q) : queues(q) {}
- void operator()(Queue::shared_ptr q)
- {
- queues->push_back(q);
- }
-};
+ queues.eachQueue(boost::bind(&PurgeSet::push, &purging, _1));
}
-void QueueCleaner::fired()
+QueueCleaner::QueuePtrs::const_iterator QueueCleaner::purge(const
QueueCleaner::QueuePtrs& batch)
{
- //collect copy of list of queues to avoid holding registry lock while we
perform purge
- std::vector<Queue::shared_ptr> copy;
- CollectQueues collect(©);
- queues.eachQueue(collect);
- std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired,
_1, period));
- task->setupNextFire();
+ for (QueuePtrs::const_iterator i = batch.begin(); i != batch.end(); ++i) {
+ (*i)->purgeExpired(period);
+ }
+ task->restart();
timer->add(task);
+ return batch.end();
}
-
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h?rev=1594220&r1=1594219&r2=1594220&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueCleaner.h Tue May 13 13:03:08 2014
@@ -23,9 +23,11 @@
*/
#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Time.h"
#include <boost/intrusive_ptr.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
@@ -36,6 +38,7 @@ namespace sys {
namespace broker {
+class Queue;
class QueueRegistry;
/**
* TimerTask to purge expired messages from queues
@@ -43,18 +46,24 @@ class QueueRegistry;
class QueueCleaner
{
public:
- QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer);
+ QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues,
boost::shared_ptr<sys::Poller>, sys::Timer* timer);
QPID_BROKER_EXTERN ~QueueCleaner();
QPID_BROKER_EXTERN void start(sys::Duration period);
QPID_BROKER_EXTERN void setTimer(sys::Timer* timer);
private:
+ typedef boost::shared_ptr<Queue> QueuePtr;
+ typedef std::deque< QueuePtr > QueuePtrs;
+ typedef qpid::sys::PollableQueue< QueuePtr > PurgeSet;
boost::intrusive_ptr<sys::TimerTask> task;
QueueRegistry& queues;
sys::Timer* timer;
sys::Duration period;
+ PurgeSet purging;
void fired();
+ QueuePtrs::const_iterator purge(const QueuePtrs&);
+
};
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1594220&r1=1594219&r2=1594220&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue May 13 13:03:08 2014
@@ -40,6 +40,7 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/QueueSettings.h"
+#include "qpid/sys/Thread.h"
#include "qpid/sys/Timer.h"
#include <iostream>
@@ -202,18 +203,22 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
}
QPID_AUTO_TEST_CASE(testQueueCleaner) {
+ boost::shared_ptr<Poller> poller(new Poller);
+ Thread runner(poller.get());
Timer timer;
QueueRegistry queues;
Queue::shared_ptr queue = queues.declare("my-queue",
QueueSettings()).first;
addMessagesToQueue(10, *queue, 200, 400);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
- QueueCleaner cleaner(queues, &timer);
+ QueueCleaner cleaner(queues, poller, &timer);
cleaner.start(100 * qpid::sys::TIME_MSEC);
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
+ poller->shutdown();
+ runner.join();
}
namespace {
int getIntProperty(const Message& message, const std::string& key)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]