Author: gsim
Date: Wed Feb 25 11:02:20 2009
New Revision: 747744

URL: http://svn.apache.org/viewvc?rev=747744&view=rev
Log:
QPID-1685: Fixed ring queue policy


Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=747744&r1=747743&r2=747744&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Feb 25 11:02:20 2009
@@ -676,9 +676,9 @@
 // return true if store exists, 
 bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
 {
-    if (policy.get() && !policy->isEnqueued(msg)) return false;
     {
         Mutex::ScopedLock locker(messageLock);
+        if (policy.get() && !policy->isEnqueued(msg)) return false;
         if (!ctxt) { 
             dequeued(msg);
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=747744&r1=747743&r2=747744&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Wed Feb 25 11:02:20 2009
@@ -197,11 +197,12 @@
 void RingQueuePolicy::dequeued(const QueuedMessage& m)
 {
     qpid::sys::Mutex::ScopedLock l(lock);
-    QueuePolicy::dequeued(m);
     //find and remove m from queue
-    for (Messages::iterator i = queue.begin(); i != queue.end() && m.position 
<= i->position; i++) {
-        if (i->position == m.position) {
+    for (Messages::iterator i = queue.begin(); i != queue.end(); i++) {
+        if (i->payload == m.payload) {
             queue.erase(i);
+            //now update count and size
+            QueuePolicy::dequeued(m);
             break;
         }
     }
@@ -210,9 +211,11 @@
 bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
 {
     qpid::sys::Mutex::ScopedLock l(lock);
-    //for non-strict ring policy, a message can be dequeued before acked; need 
to detect this
-    for (Messages::iterator i = queue.begin(); i != queue.end() && m.position 
<= i->position; i++) {
-        if (i->position == m.position) {
+    //for non-strict ring policy, a message can be replaced (and
+    //therefore dequeued) before it is accepted or released by
+    //subscriber; need to detect this
+    for (Messages::const_iterator i = queue.begin(); i != queue.end(); i++) {
+        if (i->payload == m.payload) {
             return true;
         }
     }
@@ -236,13 +239,10 @@
         oldest = queue.front();
     }
     if (oldest.queue->acquire(oldest) || !strict) {
-        qpid::sys::Mutex::ScopedLock l(lock);
-        if (oldest.position == queue.front().position) {
-            queue.pop_front();
-            QPID_LOG(debug, "Ring policy triggered in queue " 
-                     << (m.queue ? m.queue->getName() : std::string("unknown 
queue"))
-                     << ": removed message " << oldest.position << " to make 
way for " << m.position);
-        }
+        oldest.queue->dequeue(0, oldest);
+        QPID_LOG(debug, "Ring policy triggered in queue " 
+                 << (m.queue ? m.queue->getName() : std::string("unknown 
queue"))
+                 << ": removed message " << oldest.position << " to make way 
for " << m.position);
         return true;
     } else {
         QPID_LOG(debug, "Ring policy could not be triggered in queue " 

Modified: qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=747744&r1=747743&r2=747744&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp Wed Feb 25 11:02:20 2009
@@ -158,6 +158,15 @@
         BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % 
(i+1)).str(), msg.getData());
     }
     BOOST_CHECK(!f.subs.get(msg, q));
+
+    for (int i = 10; i < 20; i++) {
+        
f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%")
 % "Message" % (i+1)).str(), q));
+    }
+    for (int i = 15; i < 20; i++) {
+        BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC));
+        BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % 
(i+1)).str(), msg.getData());
+    }
+    BOOST_CHECK(!f.subs.get(msg, q));
 }
 
 QPID_AUTO_TEST_CASE(testStrictRingPolicy) 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to