Author: aconway
Date: Thu Oct 18 19:42:21 2012
New Revision: 1399814
URL: http://svn.apache.org/viewvc?rev=1399814&view=rev
Log:
Bug 867030 - QPID-4374: Make QueueGuard::cancel idempotent (Jason Dillaman)
Added QueueGuard::cancelled, only call cancel once.
Don't enqueue after cancel.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1399814&r1=1399813&r2=1399814&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Thu Oct 18 19:42:21 2012
@@ -50,7 +50,7 @@ class QueueGuard::QueueObserver : public
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
- : queue(q), subscription(0)
+ : cancelled(false), queue(q), subscription(0)
{
std::ostringstream os;
os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ":
";
@@ -61,10 +61,7 @@ QueueGuard::QueueGuard(broker::Queue& q,
range = QueueRange(q);
}
-QueueGuard::~QueueGuard() {
- QPID_LOG(debug, logPrefix << "Cancelled");
- cancel();
-}
+QueueGuard::~QueueGuard() { cancel(); }
// NOTE: Called with message lock held.
void QueueGuard::enqueued(const Message& m) {
@@ -73,10 +70,9 @@ void QueueGuard::enqueued(const Message&
m.getIngressCompletion()->startCompleter();
{
Mutex::ScopedLock l(lock);
- if (!delayed.insert(Delayed::value_type(m.getSequence(),
m.getIngressCompletion())).second) {
- QPID_LOG(critical, logPrefix << "Second enqueue for message with
sequence " << m.getSequence());
- assert(false);
- }
+ if (cancelled) return; // Don't record enqueues after we are
cancelled.
+ assert(delayed.find(m.getSequence()) == delayed.end());
+ delayed[m.getSequence()] = m.getIngressCompletion();
}
}
@@ -104,7 +100,8 @@ void QueueGuard::cancel() {
Delayed removed;
{
Mutex::ScopedLock l(lock);
- if (delayed.empty()) return; // No need if no delayed messages.
+ if (cancelled) return;
+ cancelled = true;
delayed.swap(removed);
}
completeRange(removed.begin(), removed.end());
@@ -116,12 +113,13 @@ void QueueGuard::attach(ReplicatingSubsc
}
bool QueueGuard::subscriptionStart(SequenceNumber position) {
+ // Complete any messages before or at the ReplicatingSubscription start
position.
+ // Those messages are already on the backup.
Delayed removed;
{
Mutex::ScopedLock l(lock);
- // Complete any messages before or at the ReplicatingSubscription
start position.
- // Those messages are already on the backup.
- for (Delayed::iterator i = delayed.begin(); i != delayed.end() &&
i->first <= position;) {
+ Delayed::iterator i = delayed.begin();
+ while(i != delayed.end() && i->first <= position) {
removed.insert(*i);
delayed.erase(i++);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1399814&r1=1399813&r2=1399814&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h Thu Oct 18 19:42:21 2012
@@ -106,6 +106,7 @@ class QueueGuard {
class QueueObserver;
sys::Mutex lock;
+ bool cancelled;
std::string logPrefix;
broker::Queue& queue;
typedef std::map<framing::SequenceNumber,
boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]