Author: cctrieloff
Date: Wed Jul 8 18:58:27 2009
New Revision: 792259
URL: http://svn.apache.org/viewvc?rev=792259&view=rev
Log:
fix for regression in patch & test to prevent regression again
Simulate this:
1. start 2 nodes
2. create cluster durable lvq
3. send a transient message to the queue
4. kill one of the nodes (to trigger force persistent behaviour)...
5. then restart it (to turn off force persistent behaviour)
6. send another transient message with same lvq key as in 3
7. kill the second node again (retrigger force persistent)
8. stop and recover the first node
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=792259&r1=792258&r2=792259&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Jul 8 18:58:27 2009
@@ -674,9 +674,9 @@
if (persistLastNode){
Mutex::ScopedLock locker(messageLock);
for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i
) {
+ if (lastValueQueue) checkLvqReplace(*i);
// don't force a message twice to disk.
if(!i->payload->isStoredOnQueue(shared_from_this())) {
- if (lastValueQueue) checkLvqReplace(*i);
i->payload->forcePersistent();
if (i->payload->isForcedPersistent() ){
enqueue(0, i->payload);
Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=792259&r1=792258&r2=792259&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Wed Jul 8 18:58:27 2009
@@ -488,6 +488,53 @@
}
+QPID_AUTO_TEST_CASE(testLVQRecover){
+
+/* simulate this
+ 1. start 2 nodes
+ 2. create cluster durable lvq
+ 3. send a transient message to the queue
+ 4. kill one of the nodes (to trigger force persistent behaviour)...
+ 5. then restart it (to turn off force persistent behaviour)
+ 6. send another transient message with same lvq key as in 3
+ 7. kill the second node again (retrigger force persistent)
+ 8. stop and recover the first node
+*/
+ TestMessageStoreOC testStore;
+ client::QueueOptions args;
+ // set queue mode
+ args.setOrdering(client::LVQ);
+ args.setPersistLastNode();
+
+ Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
+ intrusive_ptr<Message> received;
+ queue1->configure(args);
+
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "A");
+ // 2
+ string key;
+ args.getLVQKey(key);
+ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
+
+
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ // 3
+ queue1->deliver(msg1);
+ // 4
+ queue1->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
+ // 5
+ queue1->clearLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
+ // 6
+ queue1->deliver(msg2);
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
+ queue1->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
+ BOOST_CHECK_EQUAL(testStore.deqCnt, 1u);
+}
+
void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint
evenTtl = 0)
{
for (uint i = 0; i < count; i++) {
@@ -594,6 +641,9 @@
queue2->setLastNodeFailure();
BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
+ queue2->clearLastNodeFailure();
+ queue2->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]