Author: cctrieloff
Date: Tue Jul 7 15:05:11 2009
New Revision: 791858
URL: http://svn.apache.org/viewvc?rev=791858&view=rev
Log:
More tests and completion of fix for 791672
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=791858&r1=791857&r2=791858&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jul 7 15:05:11 2009
@@ -99,7 +99,8 @@
eventMode(0),
eventMgr(0),
insertSeqNo(0),
- broker(b)
+ broker(b),
+ lastForcedPosition(0)
{
if (parent != 0 && broker != 0)
{
@@ -659,6 +660,7 @@
void Queue::clearLastNodeFailure()
{
inLastNodeFailure = false;
+ lastForcedPosition = sequence;
}
void Queue::setLastNodeFailure()
@@ -666,10 +668,14 @@
if (persistLastNode){
Mutex::ScopedLock locker(messageLock);
for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i
) {
- if (lastValueQueue) checkLvqReplace(*i);
- i->payload->forcePersistent();
- if (i->payload->isForcedPersistent() ){
- enqueue(0, i->payload);
+ // don't force a message twice to disk.
+ if(i->position > lastForcedPosition) {
+ if (lastValueQueue) checkLvqReplace(*i);
+ i->payload->forcePersistent();
+ if (i->payload->isForcedPersistent() ){
+ enqueue(0, i->payload);
+ }
+ lastForcedPosition = i->position;
}
}
inLastNodeFailure = true;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=791858&r1=791857&r2=791858&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Jul 7 15:05:11 2009
@@ -106,6 +106,7 @@
bool insertSeqNo;
std::string seqNoKey;
Broker* broker;
+ framing::SequenceNumber lastForcedPosition;
void push(boost::intrusive_ptr<Message>& msg, bool
isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=791858&r1=791857&r2=791858&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Jul 7 15:05:11 2009
@@ -548,6 +548,34 @@
queue2->setLastNodeFailure();
BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
+ // check they don't get stored twice
+ queue1->setLastNodeFailure();
+ queue2->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
+
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+ queue1->deliver(msg2);
+ queue2->deliver(msg2);
+
+ queue1->clearLastNodeFailure();
+ queue2->clearLastNodeFailure();
+ // check only new messages get forced
+ queue1->setLastNodeFailure();
+ queue2->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
+
+ // check no failure messages are stored
+ queue1->clearLastNodeFailure();
+ queue2->clearLastNodeFailure();
+
+ intrusive_ptr<Message> msg3 = create_message("e", "B");
+ queue1->deliver(msg3);
+ queue2->deliver(msg3);
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
+ queue1->setLastNodeFailure();
+ queue2->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 6u);
+
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]