Author: cctrieloff
Date: Tue Jul  7 01:53:13 2009
New Revision: 791672

URL: http://svn.apache.org/viewvc?rev=791672&view=rev
Log:
Corrected the case where message on more than one queue does not persist when 
last node standing is enabled

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=791672&r1=791671&r2=791672&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Jul  7 01:53:13 2009
@@ -59,7 +59,15 @@
 
 void Message::forcePersistent()
 {
-    forcePersistentPolicy = true;
+    // only set forced bit if we actually need to force.
+    if (! getAdapter().isPersistent(frames) ){
+        forcePersistentPolicy = true;
+    }
+}
+
+bool Message::isForcedPersistent()
+{
+    return forcePersistentPolicy;
 }
 
 std::string Message::getRoutingKey() const

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=791672&r1=791671&r2=791672&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Tue Jul  7 01:53:13 2009
@@ -147,6 +147,7 @@
     void addTraceId(const std::string& id);
        
        void forcePersistent();
+       bool isForcedPersistent();
     
     boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) 
const;
     void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* 
qfor);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=791672&r1=791671&r2=791672&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jul  7 01:53:13 2009
@@ -668,7 +668,7 @@
        for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i 
) {
             if (lastValueQueue) checkLvqReplace(*i);
             i->payload->forcePersistent();
-            if (i->payload->getPersistenceId() == 0){
+            if (i->payload->isForcedPersistent() ){
                enqueue(0, i->payload);
             }
        }

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=791672&r1=791671&r2=791672&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Jul  7 01:53:13 2009
@@ -272,20 +272,25 @@
 class TestMessageStoreOC : public NullMessageStore
 {
   public:
+
+    uint enqCnt;
+    uint deqCnt;
     
     virtual void dequeue(TransactionContext*,
                  const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
                  const PersistableQueue& /*queue*/)
     {
+        deqCnt++;
     }
 
     virtual void enqueue(TransactionContext*,
                  const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
                  const PersistableQueue& /* queue */)
     {
+        enqCnt++;
     }
 
-    TestMessageStoreOC() : NullMessageStore() {}
+    TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0) {}
     ~TestMessageStoreOC(){}
 };
 
@@ -521,6 +526,32 @@
     BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
 }
 
+QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
+
+    TestMessageStoreOC  testStore;
+    client::QueueOptions args;
+    args.setPersistLastNode();
+
+    Queue::shared_ptr queue1(new Queue("queue1", true, &testStore ));
+    queue1->configure(args);
+    Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
+    queue2->configure(args);
+       
+    intrusive_ptr<Message> msg1 = create_message("e", "A");
+
+    queue1->deliver(msg1);
+    queue2->deliver(msg1);
+
+    //change mode
+    queue1->setLastNodeFailure();
+    BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
+    queue2->setLastNodeFailure();
+    BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
+
+}
+
+
+
 QPID_AUTO_TEST_SUITE_END()
 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to