Author: gsim
Date: Wed Feb 25 11:02:20 2009
New Revision: 747744
URL: http://svn.apache.org/viewvc?rev=747744&view=rev
Log:
QPID-1685: Fixed ring queue policy
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=747744&r1=747743&r2=747744&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Feb 25 11:02:20 2009
@@ -676,9 +676,9 @@
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
- if (policy.get() && !policy->isEnqueued(msg)) return false;
{
Mutex::ScopedLock locker(messageLock);
+ if (policy.get() && !policy->isEnqueued(msg)) return false;
if (!ctxt) {
dequeued(msg);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=747744&r1=747743&r2=747744&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Wed Feb 25 11:02:20 2009
@@ -197,11 +197,12 @@
void RingQueuePolicy::dequeued(const QueuedMessage& m)
{
qpid::sys::Mutex::ScopedLock l(lock);
- QueuePolicy::dequeued(m);
//find and remove m from queue
- for (Messages::iterator i = queue.begin(); i != queue.end() && m.position
<= i->position; i++) {
- if (i->position == m.position) {
+ for (Messages::iterator i = queue.begin(); i != queue.end(); i++) {
+ if (i->payload == m.payload) {
queue.erase(i);
+ //now update count and size
+ QueuePolicy::dequeued(m);
break;
}
}
@@ -210,9 +211,11 @@
bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
{
qpid::sys::Mutex::ScopedLock l(lock);
- //for non-strict ring policy, a message can be dequeued before acked; need
to detect this
- for (Messages::iterator i = queue.begin(); i != queue.end() && m.position
<= i->position; i++) {
- if (i->position == m.position) {
+ //for non-strict ring policy, a message can be replaced (and
+ //therefore dequeued) before it is accepted or released by
+ //subscriber; need to detect this
+ for (Messages::const_iterator i = queue.begin(); i != queue.end(); i++) {
+ if (i->payload == m.payload) {
return true;
}
}
@@ -236,13 +239,10 @@
oldest = queue.front();
}
if (oldest.queue->acquire(oldest) || !strict) {
- qpid::sys::Mutex::ScopedLock l(lock);
- if (oldest.position == queue.front().position) {
- queue.pop_front();
- QPID_LOG(debug, "Ring policy triggered in queue "
- << (m.queue ? m.queue->getName() : std::string("unknown
queue"))
- << ": removed message " << oldest.position << " to make
way for " << m.position);
- }
+ oldest.queue->dequeue(0, oldest);
+ QPID_LOG(debug, "Ring policy triggered in queue "
+ << (m.queue ? m.queue->getName() : std::string("unknown
queue"))
+ << ": removed message " << oldest.position << " to make way
for " << m.position);
return true;
} else {
QPID_LOG(debug, "Ring policy could not be triggered in queue "
Modified: qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=747744&r1=747743&r2=747744&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp Wed Feb 25 11:02:20 2009
@@ -158,6 +158,15 @@
BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" %
(i+1)).str(), msg.getData());
}
BOOST_CHECK(!f.subs.get(msg, q));
+
+ for (int i = 10; i < 20; i++) {
+
f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%")
% "Message" % (i+1)).str(), q));
+ }
+ for (int i = 15; i < 20; i++) {
+ BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" %
(i+1)).str(), msg.getData());
+ }
+ BOOST_CHECK(!f.subs.get(msg, q));
}
QPID_AUTO_TEST_CASE(testStrictRingPolicy)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]