Author: aconway
Date: Thu Jan 19 23:06:24 2012
New Revision: 1233665

URL: http://svn.apache.org/viewvc?rev=1233665&view=rev
Log:
QPID-3603: Fix race condition in setting initial position of 
ReplicatingSubscription.

Modified:
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1233665&r1=1233664&r2=1233665&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.cpp Thu Jan 19 
23:06:24 2012
@@ -1471,15 +1471,6 @@ class FindLowest
 };
 }
 
-bool Queue::getOldest(qpid::framing::SequenceNumber& oldest)
-{
-    //Horribly inefficient, but saves modifying Messages interface and
-    //all its implementations at present:
-    FindLowest f;
-    eachMessage(boost::bind(&FindLowest::process, &f, _1));
-    return f.getLowest(oldest);
-}
-
 Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
 
 bool Queue::UsageBarrier::acquire()

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.h?rev=1233665&r1=1233664&r2=1233665&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Queue.h Thu Jan 19 
23:06:24 2012
@@ -407,7 +407,6 @@ class Queue : public boost::enable_share
 
     uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
     void setDequeueSincePurge(uint32_t value);
-    bool getOldest(framing::SequenceNumber& result);
 };
 }
 }

Modified: 
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1233665&r1=1233664&r2=1233665&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
Thu Jan 19 23:06:24 2012
@@ -98,24 +98,21 @@ ReplicatingSubscription::ReplicatingSubs
     // Note that broker::Queue::getPosition() returns the sequence
     // number that will be assigned to the next message *minus 1*.
 
-    // this->position is inherited from ConsumerImpl. It tracks the
-    // position of the last message browsed on the local (primary)
-    // queue, or more exactly the next sequence number to browse
-    // *minus 1*
-    qpid::framing::SequenceNumber oldest;
-    position = queue->getOldest(oldest) ? --oldest : queue->getPosition();
-
     // this->backupPosition tracks the position of the remote backup
     // queue, i.e. the sequence number for the next delivered message
     // *minus one*
     backupPosition = 0;
+
+    // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
+    // so we will start consuming from the lowest numbered message.
+    // This is incorrect if the sequence number wraps around, but
+    // this is what all consumers currently do.
 }
 
 // Message is delivered in the subscription's connection thread.
 bool ReplicatingSubscription::deliver(QueuedMessage& m) {
     // Add position events for the subscribed queue, not for the internal 
event queue.
-    if (m.queue && m.queue->getName() == getQueue()->getName()) {
-        QPID_LOG(trace, "HA: replicating message to backup: " << QueuePos(m));
+    if (m.queue && m.queue == getQueue().get()) {
         assert(position == m.position);
         {
              sys::Mutex::ScopedLock l(lock);
@@ -130,6 +127,7 @@ bool ReplicatingSubscription::deliver(Qu
              }
              backupPosition = position;
         }
+        QPID_LOG(trace, "HA: replicating message to backup: " << QueuePos(m));
     }
     return ConsumerImpl::deliver(m);
 }
@@ -213,7 +211,7 @@ void ReplicatingSubscription::dequeued(c
     {
         sys::Mutex::ScopedLock l(lock);
         dequeues.add(m.position);
-        QPID_LOG(trace, "HA: Will dequeue " << QueuePos(m) << " on " << 
getName());
+        QPID_LOG(trace, "HA: Dequeued " << QueuePos(m) << " on " << getName());
     }
     notify();                   // Ensure a call to doDispatch
     if (m.position > position) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to