Author: aconway
Date: Thu Oct 18 19:42:21 2012
New Revision: 1399814

URL: http://svn.apache.org/viewvc?rev=1399814&view=rev
Log:
Bug 867030 - QPID-4374: Make QueueGuard::cancel idempotent (Jason Dillaman)

Added QueueGuard::cancelled, only call cancel once.
Don't enqueue after cancel.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1399814&r1=1399813&r2=1399814&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Thu Oct 18 19:42:21 2012
@@ -50,7 +50,7 @@ class QueueGuard::QueueObserver : public
 
 
 QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
-    : queue(q), subscription(0)
+    : cancelled(false), queue(q), subscription(0)
 {
     std::ostringstream os;
     os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": 
";
@@ -61,10 +61,7 @@ QueueGuard::QueueGuard(broker::Queue& q,
     range = QueueRange(q);
 }
 
-QueueGuard::~QueueGuard() {
-    QPID_LOG(debug, logPrefix << "Cancelled");
-    cancel();
-}
+QueueGuard::~QueueGuard() { cancel(); }
 
 // NOTE: Called with message lock held.
 void QueueGuard::enqueued(const Message& m) {
@@ -73,10 +70,9 @@ void QueueGuard::enqueued(const Message&
     m.getIngressCompletion()->startCompleter();
     {
         Mutex::ScopedLock l(lock);
-        if (!delayed.insert(Delayed::value_type(m.getSequence(), 
m.getIngressCompletion())).second) {
-            QPID_LOG(critical, logPrefix << "Second enqueue for message with 
sequence " << m.getSequence());
-            assert(false);
-        }
+        if (cancelled) return;  // Don't record enqueues after we are 
cancelled.
+        assert(delayed.find(m.getSequence()) == delayed.end());
+        delayed[m.getSequence()] = m.getIngressCompletion();
     }
 }
 
@@ -104,7 +100,8 @@ void QueueGuard::cancel() {
     Delayed removed;
     {
         Mutex::ScopedLock l(lock);
-        if (delayed.empty()) return; // No need if no delayed messages.
+        if (cancelled) return;
+        cancelled = true;
         delayed.swap(removed);
     }
     completeRange(removed.begin(), removed.end());
@@ -116,12 +113,13 @@ void QueueGuard::attach(ReplicatingSubsc
 }
 
 bool QueueGuard::subscriptionStart(SequenceNumber position) {
+    // Complete any messages before or at the ReplicatingSubscription start 
position.
+    // Those messages are already on the backup.
     Delayed removed;
     {
         Mutex::ScopedLock l(lock);
-        // Complete any messages before or at the ReplicatingSubscription 
start position.
-        // Those messages are already on the backup.
-        for (Delayed::iterator i = delayed.begin(); i != delayed.end() && 
i->first <= position;) {
+        Delayed::iterator i = delayed.begin();
+        while(i != delayed.end() && i->first <= position) {
             removed.insert(*i);
             delayed.erase(i++);
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1399814&r1=1399813&r2=1399814&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h Thu Oct 18 19:42:21 2012
@@ -106,6 +106,7 @@ class QueueGuard {
     class QueueObserver;
 
     sys::Mutex lock;
+    bool cancelled;
     std::string logPrefix;
     broker::Queue& queue;
     typedef std::map<framing::SequenceNumber, 
boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to