Author: aconway
Date: Thu Jan 19 23:07:38 2012
New Revision: 1233675
URL: http://svn.apache.org/viewvc?rev=1233675&view=rev
Log:
QPID-3603: Code cleanup to make ReplicatingSubscription more readable.
Clarified deliver() and dequeued() logic and locking.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1233675&r1=1233674&r2=1233675&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
Thu Jan 19 23:07:38 2012
@@ -103,14 +103,6 @@ ReplicatingSubscription::ReplicatingSubs
QPID_LOG(debug, logPrefix << "Created subscription " << name);
- // Note that broker::Queue::getPosition() returns the sequence
- // number that will be assigned to the next message *minus 1*.
-
- // this->backupPosition tracks the position of the remote backup
- // queue, i.e. the sequence number for the next delivered message
- // *minus one*
- backupPosition = 0;
-
// FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
// so we will start consuming from the lowest numbered message.
// This is incorrect if the sequence number wraps around, but
@@ -121,22 +113,20 @@ ReplicatingSubscription::ReplicatingSubs
bool ReplicatingSubscription::deliver(QueuedMessage& m) {
// Add position events for the subscribed queue, not for the internal
event queue.
if (m.queue && m.queue == getQueue().get()) {
+ sys::Mutex::ScopedLock l(lock);
assert(position == m.position);
- {
- sys::Mutex::ScopedLock l(lock);
- // this->position is the new position after enqueueing m locally.
- // this->backupPosition is the backup position before enqueueing
m.
- assert(position > backupPosition);
- if (position - backupPosition > 1) {
- // Position has advanced because of messages dequeued ahead
of us.
- SequenceNumber send(position);
- --send; // Send the position before m was enqueued.
- sendPositionEvent(send, l);
- QPID_LOG(trace, logPrefix << "Sending position " << send
- << ", was " << backupPosition);
- }
- backupPosition = position;
+ // m.position is the position of the newly enqueued m on the local
queue.
+ // backupPosition is latest position on the backup queue (before
enqueueing m.)
+ assert(m.position > backupPosition);
+ if (m.position - backupPosition > 1) {
+ // Position has advanced because of messages dequeued ahead of us.
+ SequenceNumber send(m.position);
+ --send; // Send the position before m was enqueued.
+ sendPositionEvent(send, l);
+ QPID_LOG(trace, logPrefix << "Sending position " << send
+ << ", was " << backupPosition);
}
+ backupPosition = m.position;
QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
}
return ConsumerImpl::deliver(m);
@@ -215,21 +205,25 @@ void ReplicatingSubscription::sendEvent(
}
// Called after the message has been removed from the deque and under
-// the message lock in the queue. Called in arbitrary connection threads.
+// the messageLock in the queue. Called in arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& m)
{
QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
{
sys::Mutex::ScopedLock l(lock);
dequeues.add(m.position);
+ // If we have not yet sent this message to the backup, then
+ // complete it now as it will never be accepted.
+
+ // FIXME aconway 2012-01-05: suspect use of position in
+ // foreign connection thread. Race with deliver() which is
+ // not under the message lock?
+ if (m.position > position) {
+ m.payload->getIngressCompletion().finishCompleter();
+ QPID_LOG(trace, logPrefix << "Completed message " << m.position <<
" early");
+ }
}
notify(); // Ensure a call to doDispatch
- // FIXME aconway 2011-12-20: not thread safe to access position here,
- // we're not in the dispatch thread.
- if (m.position > position) {
- m.payload->getIngressCompletion().finishCompleter();
- QPID_LOG(trace, logPrefix << "Completed message " << m.position << "
early");
- }
}
// Called in subscription's connection thread.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]