Author: aconway
Date: Mon Jul 23 21:29:01 2012
New Revision: 1364804

URL: http://svn.apache.org/viewvc?rev=1364804&view=rev
Log:
QPID-4159: HA Missing messages in failover test.

QueueGuard was taking its snapshot of the initial queue range *before* it
registered its QueueObserver. That means it was possible to have unguarded 
messages
between the end of the snapshot and the first position protected by the guard.

Fixed race condition in QueueRange constructor: Must call getPosition() *after*
getFront() since both may be advancing and we want to end up with a valid range
front <= back+1.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1364804&r1=1364803&r2=1364804&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Mon Jul 23 21:29:01 2012
@@ -50,15 +50,15 @@ class QueueGuard::QueueObserver : public
 
 
 QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
-    : queue(q), subscription(0), range(q)
+    : queue(q), subscription(0)
 {
-    // NOTE: The QueueGuard is created before the queue becomes active: either
-    // when a backup is promoted, or when a new queue is created on the 
primary.
     std::ostringstream os;
     os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": 
";
     logPrefix = os.str();
     observer.reset(new QueueObserver(*this));
     queue.addObserver(observer);
+    // Set range after addObserver so we know that range.back+1 is a guarded 
position.
+    range = QueueRange(q);
 }
 
 QueueGuard::~QueueGuard() { cancel(); }
@@ -112,6 +112,7 @@ void completeBefore(QueueGuard* guard, S
 
 bool QueueGuard::subscriptionStart(SequenceNumber position) {
    // Complete any messages before or at the ReplicatingSubscription start 
position.
+   // Those messages are already on the backup.
     if (!delayed.empty() && delayed.front() <= position) {
         // FIXME aconway 2012-06-15: queue iteration, only messages in delayed
         queue.eachMessage(boost::bind(&completeBefore, this, position, _1));

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1364804&r1=1364803&r2=1364804&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h Mon Jul 23 21:29:01 2012
@@ -83,12 +83,14 @@ class QueueGuard {
      * QueueGuard is created before the queue becomes active: either when a
      * backup is promoted, or when a new queue is created on the primary.
      *
-     * NOTE: The first position protected by the guard is 
getRange().getBack()+1
+     * NOTE: The first position guaranteed to be protected by the guard is
+     * getRange().getBack()+1. It is possible that the guard has protected
+     * some messages before that point.
      */
     const QueueRange& getRange() const { return range; } // range is 
immutable, no lock needed.
 
     /** Inform the guard of the stating position for the attached subscription.
-     * Complete messages that will not be seen by the subscriptino.
+     * Complete messages that will not be seen by the subscription.
      *@return true if the subscription has already advanced to a guarded 
position.
      */
     bool subscriptionStart(framing::SequenceNumber position);
@@ -102,7 +104,7 @@ class QueueGuard {
     framing::SequenceSet delayed;
     ReplicatingSubscription* subscription;
     boost::shared_ptr<QueueObserver> observer;
-    const QueueRange range;
+    QueueRange range;
 };
 }} // namespace qpid::ha
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h?rev=1364804&r1=1364803&r2=1364804&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h Mon Jul 23 21:29:01 2012
@@ -33,17 +33,31 @@ namespace ha {
 
 /**
  * Get the front/back range of a queue or from a ReplicatingSubscription 
arguments table.
+ *
+ * The *back* of the queue is the position of the latest (most recently pushed)
+ * message on the queue or, if the queue is empty, the back is n-1 where n is
+ * the position that will be assigned to the next message pushed onto the 
queue.
+ *
+ * The *front* of the queue is the position of the oldest (next to be 
consumed) message
+ * on the queue or, if the queue is empty, it is the position that will be 
occupied
+ * by the next message pushed onto the queue.
+ *
+ * This leads to the slightly surprising conclusion that for an empty queue
+ * front = back+1
  */
 struct QueueRange {
   public:
     framing::SequenceNumber front, back;
 
-    QueueRange() { }
+    QueueRange() : front(1), back(0) { } // Empty range.
 
     QueueRange(broker::Queue& q) {
-        back = q.getPosition();
-        front = back+1;         // assume empty
-        ReplicatingSubscription::getFront(q, front);
+        if (ReplicatingSubscription::getFront(q, front))
+            back = q.getPosition();
+        else {
+            back = q.getPosition();
+            front = back+1;     // empty
+        }
         assert(front <= back + 1);
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to