Author: aconway
Date: Mon Jun 18 18:08:19 2012
New Revision: 1351435
URL: http://svn.apache.org/viewvc?rev=1351435&view=rev
Log:
QPID-3603: Fix & clean up in HA code.
- Fix fencepost error in getFirstSafe()
- QueueGuard::attach completes messages before the ReplicatingSubscription
postion
- Fix minor test issues in brokertest.py and ha_test.py.
- ReplicatingSubscription check for ready in acknowledge not dispatch.
- HA test fix: retry wait_status retry on ConnectErrors, broker may not be up.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
qpid/trunk/qpid/cpp/src/tests/brokertest.py
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1351435&r1=1351434&r2=1351435&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Mon Jun 18 18:08:19 2012
@@ -24,6 +24,7 @@
#include "qpid/broker/QueuedMessage.h"
#include "qpid/broker/QueueObserver.h"
#include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
#include <sstream>
namespace qpid {
@@ -51,16 +52,13 @@ class QueueGuard::QueueObserver : public
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
: queue(q), subscription(0)
{
- // NOTE: There is no activity on the queue while QueueGuard constructor is
- // running It is called either from Primary before client connections are
- // allowed or from ConfigurationObserver::queueCreate before the queue is
- // visible.
std::ostringstream os;
os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ":
";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
queue.addObserver(observer);
- firstSafe = queue.getPosition(); // FIXME aconway 2012-06-13: fencepost
error
+ // Set after addObserver to ensure we dont miss an enqueue.
+ firstSafe = queue.getPosition() + 1; // Next message will be protected by
the guard.
}
QueueGuard::~QueueGuard() { cancel(); }
@@ -101,9 +99,23 @@ void QueueGuard::cancel() {
queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
}
+namespace {
+void completeBefore(QueueGuard* guard, SequenceNumber position, const
QueuedMessage& qm) {
+ if (qm.position <= position) guard->complete(qm);
+}
+}
+
void QueueGuard::attach(ReplicatingSubscription& rs) {
- Mutex::ScopedLock l(lock);
+ // NOTE: attach is called before the ReplicatingSubscription is active so
+ // it's position is not changing.
assert(firstSafe >= rs.getPosition());
+ // Complete any messages before or at the ReplicatingSubscription position.
+ if (!delayed.empty() && delayed.front() <= rs.getPosition()) {
+ // FIXME aconway 2012-06-15: queue iteration, only messages in delayed
+ queue.eachMessage(boost::bind(&completeBefore, this, rs.getPosition(),
_1));
+ }
+ Mutex::ScopedLock l(lock);
+ // FIXME aconway 2012-06-15: complete messages before rs.getPosition
subscription = &rs;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1351435&r1=1351434&r2=1351435&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h Mon Jun 18 18:08:19 2012
@@ -43,12 +43,11 @@ class BrokerInfo;
class ReplicatingSubscription;
/**
- * A queue guard is a QueueObserver that delays completion of new
- * messages arriving on a queue. It works as part of a
- * ReplicatingSubscription to ensure messages are not acknowledged
- * till they have been replicated.
+ * A queue guard is a QueueObserver that delays completion of new messages
+ * arriving on a queue. It works as part of a ReplicatingSubscription to
ensure
+ * messages are not acknowledged till they have been replicated.
*
- * The guard is created before the ReplicatingSubscription to protect
+ * The guard can be created before the ReplicatingSubscription to protect
* messages arriving before the creation of the subscription.
*
* THREAD SAFE: Concurrent calls:
@@ -78,8 +77,9 @@ class QueueGuard {
void attach(ReplicatingSubscription&);
- /** The first sequence number protected by this guard.
- * All messages at or after this position are protected.
+ /**
+ * The first sequence number to be protected by this guard. All messages
at
+ * or after this position are protected.
*/
framing::SequenceNumber getFirstSafe();
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1351435&r1=1351434&r2=1351435&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Mon Jun 18
18:08:19 2012
@@ -248,9 +248,9 @@ ReplicatingSubscription::ReplicatingSubs
}
QPID_LOG(debug, logPrefix << "Subscribed: "
<< " backup:" << backup
- << " backup position:" << backupPosition
<< " primary:" << primary
<< " position:" << position
+ << " safe position: " << guard->getFirstSafe()
);
// Are we ready yet?
@@ -308,10 +308,7 @@ bool ReplicatingSubscription::deliver(Qu
backupPosition = qm.position;
}
// Deliver the message
- bool delivered = ConsumerImpl::deliver(qm);
- // If we have advanced past the initial position, the backup is
ready.
- if (qm.position >= guard->getFirstSafe()) setReady();
- return delivered;
+ return ConsumerImpl::deliver(qm);
}
else
return ConsumerImpl::deliver(qm); // Message is for internal event
queue.
@@ -329,7 +326,7 @@ void ReplicatingSubscription::setReady()
ready = true;
}
// Notify Primary that a subscription is ready.
- QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
+ QPID_LOG(debug, logPrefix << "Caught up");
if (Primary::get()) Primary::get()->readyReplica(*this);
}
@@ -346,6 +343,8 @@ void ReplicatingSubscription::acknowledg
// Finish completion of message, it has been acknowledged by the
backup.
QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
guard->complete(qm);
+ // If next message is protected by the guard then we are ready
+ if (qm.position+1 >= guard->getFirstSafe()) setReady();
}
ConsumerImpl::acknowledged(qm);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1351435&r1=1351434&r2=1351435&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Mon Jun 18
18:08:19 2012
@@ -48,12 +48,17 @@ class QueueGuard;
/**
* A susbcription that replicates to a remote backup.
*
- * Runs on the primary. In conjunction with a QueueGuard, delays
- * completion of messages till the backup has acknowledged, informs
- * backup of locally dequeued messages.
+ * Runs on the primary. In conjunction with a QueueGuard, delays completion of
+ * messages till the backup has acknowledged, informs backup of locally
dequeued
+ * messages.
*
- * THREAD SAFE: Called in subscription's connection thread but also
- * in arbitrary connection threads via dequeued.
+ * A ReplicatingSubscription is "ready" when all the messages on the queue have
+ * either been acknowledged by the backup, or are protected by the queue guard.
+ * On a primary broker the ReplicatingSubscription calls Primary::readyReplica
+ * when it is ready.
+ *
+ * THREAD SAFE: Called in subscription's connection thread but also in
arbitrary
+ * connection threads via dequeued.
*
* Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
*
Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1351435&r1=1351434&r2=1351435&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Mon Jun 18 18:08:19 2012
@@ -76,20 +76,20 @@ def error_line(filename, n=1):
except: return ""
return ":\n" + "".join(result)
-def retry(function, timeout=10, delay=.01):
+def retry(function, timeout=10, delay=.01, max_delay=1):
"""Call function until it returns a true value or timeout expires.
- Double the delay for each retry. Returns what function returns if
- true, None if timeout expires."""
+ Double the delay for each retry up to max_delay.
+ Returns what function returns if true, None if timeout expires."""
deadline = time.time() + timeout
ret = None
- while not ret:
+ while True:
ret = function()
+ if ret: return ret
remaining = deadline - time.time()
if remaining <= 0: return False
delay = min(delay, remaining)
time.sleep(delay)
- delay *= 2
- return ret
+ delay = min(delay*2, max_delay)
class AtomicCounter:
def __init__(self):
@@ -657,7 +657,7 @@ class NumberedReceiver(Thread):
m = self.read_message()
while m != -1:
self.receiver.assert_running()
- assert m <= self.received, "Missing message %s>%s"%(m,
self.received)
+ assert m <= self.received, "%s missing message %s>%s"%(queue,
m, self.received)
if (m == self.received): # Ignore duplicates
self.received += 1
if self.sender:
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1351435&r1=1351434&r2=1351435&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Jun 18 18:08:19 2012
@@ -77,10 +77,17 @@ class HaBroker(Broker):
if not self._agent: self._agent = QmfAgent(self.host_port())
return self._agent
- def ha_status(self): return self.agent().getHaBroker().status
+ def ha_status(self):
+ hb = self.agent().getHaBroker()
+ hb.update()
+ return hb.status
def wait_status(self, status):
- assert retry(lambda: self.ha_status() == status), "%s, %r !=
%r"%(self, self.ha_status(), status)
+ def try_get_status():
+ # Ignore ConnectionError, the broker may not be up yet.
+ try: return self.ha_status() == status;
+ except ConnectionError: return False
+ assert retry(try_get_status, timeout=20), "%s, %r != %r"%(self,
self.ha_status(), status)
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
@@ -755,12 +762,12 @@ class LongTests(BrokerTest):
return receivers[0].received > n + 100
# FIXME aconway 2012-05-17: client reconnect sometimes takes >
1 sec.
assert retry(enough, 10), "Stalled: %s <
%s+100"%(receivers[0].received, n)
- for s in senders: s.stop()
- for r in receivers: r.stop()
except:
traceback.print_exc()
raise
finally:
+ for s in senders: s.stop()
+ for r in receivers: r.stop()
dead = []
for i in xrange(3):
if not brokers[i].is_running(): dead.append(i)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]