Author: aconway
Date: Tue Jul 24 13:33:32 2012
New Revision: 1365044
URL: http://svn.apache.org/viewvc?rev=1365044&view=rev
Log:
QPID-4159: HA Missing messages in failover test.
QueueGuard was taking its snapshot of the initial queue range *before* it
registered its QueueObserver. That means it was possible to have unguarded
messages
between the end of the snapshot and the first position protected by the guard.
Fixed race condition in QueueRange constructor: Must call getPosition() *after*
getFront() since both may be advancing and we want to end up with a valid range
front <= back+1.
Modified:
qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.cpp
qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.h
qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueRange.h
Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1365044&r1=1365043&r2=1365044&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.cpp Tue Jul 24 13:33:32
2012
@@ -50,15 +50,15 @@ class QueueGuard::QueueObserver : public
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
- : queue(q), subscription(0), range(q)
+ : queue(q), subscription(0)
{
- // NOTE: The QueueGuard is created before the queue becomes active: either
- // when a backup is promoted, or when a new queue is created on the
primary.
std::ostringstream os;
os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ":
";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
queue.addObserver(observer);
+ // Set range after addObserver so we know that range.back+1 is a guarded
position.
+ range = QueueRange(q);
}
QueueGuard::~QueueGuard() { cancel(); }
@@ -112,6 +112,7 @@ void completeBefore(QueueGuard* guard, S
bool QueueGuard::subscriptionStart(SequenceNumber position) {
// Complete any messages before or at the ReplicatingSubscription start
position.
+ // Those messages are already on the backup.
if (!delayed.empty() && delayed.front() <= position) {
// FIXME aconway 2012-06-15: queue iteration, only messages in delayed
queue.eachMessage(boost::bind(&completeBefore, this, position, _1));
Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.h
URL:
http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1365044&r1=1365043&r2=1365044&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.h Tue Jul 24 13:33:32
2012
@@ -83,12 +83,14 @@ class QueueGuard {
* QueueGuard is created before the queue becomes active: either when a
* backup is promoted, or when a new queue is created on the primary.
*
- * NOTE: The first position protected by the guard is
getRange().getBack()+1
+ * NOTE: The first position guaranteed to be protected by the guard is
+ * getRange().getBack()+1. It is possible that the guard has protected
+ * some messages before that point.
*/
const QueueRange& getRange() const { return range; } // range is
immutable, no lock needed.
/** Inform the guard of the stating position for the attached subscription.
- * Complete messages that will not be seen by the subscriptino.
+ * Complete messages that will not be seen by the subscription.
*@return true if the subscription has already advanced to a guarded
position.
*/
bool subscriptionStart(framing::SequenceNumber position);
@@ -102,7 +104,7 @@ class QueueGuard {
framing::SequenceSet delayed;
ReplicatingSubscription* subscription;
boost::shared_ptr<QueueObserver> observer;
- const QueueRange range;
+ QueueRange range;
};
}} // namespace qpid::ha
Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueRange.h
URL:
http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueRange.h?rev=1365044&r1=1365043&r2=1365044&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueRange.h (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueRange.h Tue Jul 24 13:33:32
2012
@@ -33,17 +33,31 @@ namespace ha {
/**
* Get the front/back range of a queue or from a ReplicatingSubscription
arguments table.
+ *
+ * The *back* of the queue is the position of the latest (most recently pushed)
+ * message on the queue or, if the queue is empty, the back is n-1 where n is
+ * the position that will be assigned to the next message pushed onto the
queue.
+ *
+ * The *front* of the queue is the position of the oldest (next to be
consumed) message
+ * on the queue or, if the queue is empty, it is the position that will be
occupied
+ * by the next message pushed onto the queue.
+ *
+ * This leads to the slightly surprising conclusion that for an empty queue
+ * front = back+1
*/
struct QueueRange {
public:
framing::SequenceNumber front, back;
- QueueRange() { }
+ QueueRange() : front(1), back(0) { } // Empty range.
QueueRange(broker::Queue& q) {
- back = q.getPosition();
- front = back+1; // assume empty
- ReplicatingSubscription::getFront(q, front);
+ if (ReplicatingSubscription::getFront(q, front))
+ back = q.getPosition();
+ else {
+ back = q.getPosition();
+ front = back+1; // empty
+ }
assert(front <= back + 1);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]