Author: cctrieloff
Date: Tue Jul  7 15:05:11 2009
New Revision: 791858

URL: http://svn.apache.org/viewvc?rev=791858&view=rev
Log:
More tests and completion of fix for 791672

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=791858&r1=791857&r2=791858&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jul  7 15:05:11 2009
@@ -99,7 +99,8 @@
     eventMode(0),
     eventMgr(0),
     insertSeqNo(0),
-    broker(b)
+    broker(b),
+    lastForcedPosition(0)
 {
     if (parent != 0 && broker != 0)
     {
@@ -659,6 +660,7 @@
 void Queue::clearLastNodeFailure()
 {
     inLastNodeFailure = false;
+    lastForcedPosition = sequence;
 }
 
 void Queue::setLastNodeFailure()
@@ -666,10 +668,14 @@
     if (persistLastNode){
         Mutex::ScopedLock locker(messageLock);
        for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i 
) {
-            if (lastValueQueue) checkLvqReplace(*i);
-            i->payload->forcePersistent();
-            if (i->payload->isForcedPersistent() ){
-               enqueue(0, i->payload);
+            // don't force a message twice to disk.
+            if(i->position > lastForcedPosition) {
+                if (lastValueQueue) checkLvqReplace(*i);
+                i->payload->forcePersistent();
+                if (i->payload->isForcedPersistent() ){
+                   enqueue(0, i->payload);
+                }
+                lastForcedPosition = i->position;
             }
        }
         inLastNodeFailure = true;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=791858&r1=791857&r2=791858&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Jul  7 15:05:11 2009
@@ -106,6 +106,7 @@
             bool insertSeqNo;
             std::string seqNoKey;
             Broker* broker;
+            framing::SequenceNumber lastForcedPosition;
 
             void push(boost::intrusive_ptr<Message>& msg, bool 
isRecovery=false);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=791858&r1=791857&r2=791858&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Jul  7 15:05:11 2009
@@ -548,6 +548,34 @@
     queue2->setLastNodeFailure();
     BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
 
+    // check they don't get stored twice
+    queue1->setLastNodeFailure();
+    queue2->setLastNodeFailure();
+    BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
+
+    intrusive_ptr<Message> msg2 = create_message("e", "B");
+    queue1->deliver(msg2);
+    queue2->deliver(msg2);
+
+    queue1->clearLastNodeFailure();
+    queue2->clearLastNodeFailure();
+    // check only new messages get forced
+    queue1->setLastNodeFailure();
+    queue2->setLastNodeFailure();
+    BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
+
+    // check no failure messages are stored
+    queue1->clearLastNodeFailure();
+    queue2->clearLastNodeFailure();
+    
+    intrusive_ptr<Message> msg3 = create_message("e", "B");
+    queue1->deliver(msg3);
+    queue2->deliver(msg3);
+    BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
+    queue1->setLastNodeFailure();
+    queue2->setLastNodeFailure();
+    BOOST_CHECK_EQUAL(testStore.enqCnt, 6u);
+    
 }
 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to