Author: cctrieloff
Date: Tue Jul  7 16:00:16 2009
New Revision: 791886

URL: http://svn.apache.org/viewvc?rev=791886&view=rev
Log:
Correcting to ensure recovered cluster-durable messages are dequeued from store

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=791886&r1=791885&r2=791886&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Tue Jul  7 
16:00:16 2009
@@ -45,8 +45,7 @@
     intrusive_ptr<Message> msg;
     const uint64_t stagingThreshold;
 public:
-    RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t 
_stagingThreshold) 
-        : msg(_msg), stagingThreshold(_stagingThreshold) {}
+    RecoverableMessageImpl(const intrusive_ptr<Message>& _msg, uint64_t 
_stagingThreshold); 
     ~RecoverableMessageImpl() {};
     void setPersistenceId(uint64_t id);
     bool loadContent(uint64_t available);
@@ -160,6 +159,13 @@
     queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1));
 }
 
+RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& 
_msg, uint64_t _stagingThreshold) : msg(_msg), 
stagingThreshold(_stagingThreshold) 
+{
+    if (!msg->isPersistent()) {
+        msg->forcePersistent(); // set so that message will get dequeued from 
store.
+    }
+}
+
 bool RecoverableMessageImpl::loadContent(uint64_t available)
 {
     return !stagingThreshold || available < stagingThreshold;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to