Author: aconway
Date: Thu Jan 19 23:06:16 2012
New Revision: 1233664

URL: http://svn.apache.org/viewvc?rev=1233664&view=rev
Log:
QPID-3603: Use position events to synchronize queue positions between primary 
and backup

Previous code used dequeues to synchronize queue, but dequeue events
are generated in a different thread to message delivery which led to
race conditions. Position events are generated in-line with message
delivery.

Modified:
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1233664&r1=1233663&r2=1233664&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Jan 
19 23:06:16 2012
@@ -42,12 +42,14 @@ const std::string QPID_SYNC_FREQUENCY("q
 namespace qpid {
 namespace ha {
 using namespace broker;
+using namespace framing;
 
 const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
+const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event");
 
 QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, 
boost::shared_ptr<Link> l)
     : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 
2011-11-24: hidden from management?
-      queue(q), link(l), current(queue->getPosition())
+      queue(q), link(l)
 {
     QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << 
q->getSettings());
     // Declare the replicator bridge.
@@ -96,49 +98,54 @@ void QueueReplicator::initializeBridge(B
     QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to 
" << args.i_dest);
 }
 
-void QueueReplicator::route(Deliverable& msg, const std::string& key, const 
qpid::framing::FieldTable* /*args*/)
+namespace {
+template <class T> T decodeContent(Message& m) {
+    std::string content;
+    m.getFrames().getContent(content);
+    Buffer buffer(const_cast<char*>(content.c_str()), content.size());
+    T result;
+    result.decode(buffer);
+    return result;
+}
+}
+
+void QueueReplicator::dequeue(SequenceNumber n,  const 
sys::Mutex::ScopedLock&) {
+    // Thread safe: only calls thread safe Queue functions.
+    if (queue->getPosition() >= n) { // Ignore dequeus we  haven't reached yet
+        QueuedMessage message;
+        if (queue->acquireMessageAt(n, message)) {
+            queue->dequeue(0, message);
+            QPID_LOG(trace, "HA: Backup dequeued: "<< QueuePos(message));
+        }
+    }
+}
+
+void QueueReplicator::route(Deliverable& msg, const std::string& key, const 
FieldTable* /*args*/)
 {
+    sys::Mutex::ScopedLock l(lock);
     if (key == DEQUEUE_EVENT_KEY) {
-        std::string content;
-        msg.getMessage().getFrames().getContent(content);
-        qpid::framing::Buffer buffer(const_cast<char*>(content.c_str()), 
content.size());
-        qpid::framing::SequenceSet latest;
-        latest.decode(buffer);
-
-        QPID_LOG(trace, "HA: Backup received dequeues: " << latest);
+        SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
+        QPID_LOG(trace, "HA: Backup received dequeues: " << dequeues);
         //TODO: should be able to optimise the following
-        for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != 
latest.end(); i++) {
-            if (current < *i) {
-                //haven't got that far yet, record the dequeue
-                dequeued.add(*i);
-                QPID_LOG(trace, "HA: Recording dequeue of " << 
QueuePos(queue.get(), *i));
-            } else {
-                QueuedMessage message;
-                if (queue->acquireMessageAt(*i, message)) {
-                    queue->dequeue(0, message);
-                    QPID_LOG(trace, "HA: Backup dequeued: "<< 
QueuePos(message));
-                } else {
-                    // This can happen if we're replicating a queue that has 
initial dequeues.
-                    QPID_LOG(trace, "HA: Backup message already dequeued: "<< 
QueuePos(queue.get(), *i));
-                }
-            }
-        }
+        for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); 
i++)
+            dequeue(*i, l);
+    } else if (key == POSITION_EVENT_KEY) {
+        SequenceNumber position = 
decodeContent<SequenceNumber>(msg.getMessage());
+        assert(queue->getPosition() <= position);
+         //TODO aconway 2011-12-14: Optimize this?
+        for (SequenceNumber i = queue->getPosition(); i < position; ++i)
+            dequeue(i,l);
+        queue->setPosition(position);
+        QPID_LOG(trace, "HA: Backup advanced to: " << QueuePos(queue.get(), 
queue->getPosition()));
     } else {
-        //take account of any gaps in sequence created by messages
-        //dequeued before our subscription reached them
-        while (dequeued.contains(++current)) {
-            dequeued.remove(current);
-            QPID_LOG(trace, "HA: Backup skipping dequeued message: " << 
QueuePos(queue.get(), current));
-            queue->setPosition(current);
-        }
-        QPID_LOG(trace, "HA: Backup enqueued message: " << 
QueuePos(queue.get(), current));
+        QPID_LOG(trace, "HA: Backup enqueued message: " << 
QueuePos(queue.get(), queue->getPosition()+1));
         msg.deliverTo(queue);
     }
 }
 
-bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const 
qpid::framing::FieldTable*) { return false; }
-bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, 
const qpid::framing::FieldTable*) { return false; }
-bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* 
const, const qpid::framing::FieldTable* const) { return false; }
+bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const 
FieldTable*) { return false; }
+bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, 
const FieldTable*) { return false; }
+bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* 
const, const FieldTable* const) { return false; }
 std::string QueueReplicator::getType() const { return TYPE_NAME; }
 
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1233664&r1=1233663&r2=1233664&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h Thu Jan 19 
23:06:16 2012
@@ -44,12 +44,13 @@ namespace ha {
  * Creates a ReplicatingSubscription on the primary by passing special
  * arguments to the consume command.
  *
- * THREAD SAFE.
+ * THREAD SAFE: Called in arbitrary connection threads.
  */
 class QueueReplicator : public broker::Exchange
 {
   public:
     static const std::string DEQUEUE_EVENT_KEY;
+    static const std::string POSITION_EVENT_KEY;
 
     QueueReplicator(boost::shared_ptr<broker::Queue> q, 
boost::shared_ptr<broker::Link> l);
     ~QueueReplicator();
@@ -61,12 +62,11 @@ class QueueReplicator : public broker::E
 
   private:
     void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& 
sessionHandler);
+    void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
 
     sys::Mutex lock;
     boost::shared_ptr<broker::Queue> queue;
     boost::shared_ptr<broker::Link> link;
-    framing::SequenceNumber current;
-    framing::SequenceSet dequeued;
 };
 
 }} // namespace qpid::ha

Modified: 
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1233664&r1=1233663&r2=1233664&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
Thu Jan 19 23:06:16 2012
@@ -94,19 +94,43 @@ ReplicatingSubscription::ReplicatingSubs
     // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
 
     QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << 
queue->getName());
-    qpid::framing::SequenceNumber oldest;
-    if (queue->getOldest(oldest))
-        dequeues.add(0, --oldest);
-    else //local queue (i.e. master) is empty
-        dequeues.add(0, queue->getPosition());
 
-    QPID_LOG(debug, "HA: Initial dequeues for " << queue->getName() << ": " << 
dequeues);
-    // Set 'cursor' on backup queue. Will be updated by dequeue event sent 
above.
-    position = 0;
-}
+    // Note that broker::Queue::getPosition() returns the sequence
+    // number that will be assigned to the next message *minus 1*.
 
-bool ReplicatingSubscription::deliver(QueuedMessage& m)
-{
+    // this->position is inherited from ConsumerImpl. It tracks the
+    // position of the last message browsed on the local (primary)
+    // queue, or more exactly the next sequence number to browse
+    // *minus 1*
+    qpid::framing::SequenceNumber oldest;
+    position = queue->getOldest(oldest) ? --oldest : queue->getPosition();
+
+    // this->backupPosition tracks the position of the remote backup
+    // queue, i.e. the sequence number for the next delivered message
+    // *minus one*
+    backupPosition = 0;
+}
+
+// Message is delivered in the subscription's connection thread.
+bool ReplicatingSubscription::deliver(QueuedMessage& m) {
+    // Add position events for the subscribed queue, not for the internal 
event queue.
+    if (m.queue && m.queue->getName() == getQueue()->getName()) {
+        QPID_LOG(trace, "HA: replicating message to backup: " << QueuePos(m));
+        assert(position == m.position);
+        {
+             sys::Mutex::ScopedLock l(lock);
+             // this->position is the new position after enqueueing m locally.
+             // this->backupPosition is the backup position before enqueueing 
m.
+             assert(position > backupPosition);
+             if (position - backupPosition > 1) {
+                 // Position has advanced because of messages dequeued ahead 
of us.
+                 SequenceNumber send(position);
+                 // Send the position before m was enqueued.
+                 sendPositionEvent(--send, l);
+             }
+             backupPosition = position;
+        }
+    }
     return ConsumerImpl::deliver(m);
 }
 
@@ -121,20 +145,38 @@ ReplicatingSubscription::~ReplicatingSub
 //under the message lock in the queue
 void ReplicatingSubscription::enqueued(const QueuedMessage& m)
 {
-    QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m) << " on " << 
getName());
     //delay completion
     m.payload->getIngressCompletion().startCompleter();
 }
 
 // Called with lock held.
-void ReplicatingSubscription::generateDequeueEvent()
+void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
 {
-    QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " 
" << dequeues << " on " << getName());
+    QPID_LOG(trace, "HA: Sending dequeues " << getQueue()->getName() << " " << 
dequeues << " on " << getName());
     string buf(dequeues.encodedSize(),'\0');
     framing::Buffer buffer(&buf[0], buf.size());
     dequeues.encode(buffer);
     dequeues.clear();
     buffer.reset();
+    sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
+}
+
+// Called with lock held.
+void ReplicatingSubscription::sendPositionEvent(
+    SequenceNumber position, const sys::Mutex::ScopedLock&l )
+{
+    QPID_LOG(trace, "HA: Sending position " << QueuePos(getQueue().get(), 
position)
+             << " on " << getName());
+    string buf(backupPosition.encodedSize(),'\0');
+    framing::Buffer buffer(&buf[0], buf.size());
+    position.encode(buffer);
+    buffer.reset();
+    sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
+}
+
+void ReplicatingSubscription::sendEvent(const std::string& key, 
framing::Buffer& buffer,
+                                        const sys::Mutex::ScopedLock&)
+{
     //generate event message
     boost::intrusive_ptr<Message> event = new Message();
     AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
@@ -154,8 +196,14 @@ void ReplicatingSubscription::generateDe
     event->getFrames().append(content);
 
     DeliveryProperties* props = 
event->getFrames().getHeaders()->get<DeliveryProperties>(true);
-    props->setRoutingKey(QueueReplicator::DEQUEUE_EVENT_KEY);
+    props->setRoutingKey(key);
+    // Send the event using the events queue. Consumer is a
+    // DelegatingConsumer that delegates to *this for everything but
+    // has an independnet position. We put an event on events and
+    // dispatch it through ourselves to send it in line with the
+    // normal browsing messages.
     events->deliver(event);
+    events->dispatch(consumer);
 }
 
 // Called after the message has been removed from the deque and under
@@ -165,8 +213,7 @@ void ReplicatingSubscription::dequeued(c
     {
         sys::Mutex::ScopedLock l(lock);
         dequeues.add(m.position);
-        QPID_LOG(trace, "HA: Added " << QueuePos(m)
-                 << " to dequeue event; subscription at " << position);
+        QPID_LOG(trace, "HA: Will dequeue " << QueuePos(m) << " on " << 
getName());
     }
     notify();                   // Ensure a call to doDispatch
     if (m.position > position) {
@@ -179,13 +226,9 @@ bool ReplicatingSubscription::doDispatch
 {
     {
         sys::Mutex::ScopedLock l(lock);
-        if (!dequeues.empty()) {
-            generateDequeueEvent();
-        }
+        if (!dequeues.empty()) sendDequeueEvent(l);
     }
-    bool r1 = events->dispatch(consumer);
-    bool r2 = ConsumerImpl::doDispatch();
-    return r1 || r2;
+    return ConsumerImpl::doDispatch();
 }
 
 
ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription&
 c) : Consumer(c.getName(), true), delegate(c) {}

Modified: 
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1233664&r1=1233663&r2=1233664&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h 
Thu Jan 19 23:06:16 2012
@@ -36,6 +36,10 @@ class QueuedMessage;
 class OwnershipToken;
 }
 
+namespace framing {
+class Buffer;
+}
+
 namespace ha {
 
 /**
@@ -44,7 +48,8 @@ namespace ha {
  * Runs on the primary. Delays completion of messages till the backup
  * has acknowledged, informs backup of locally dequeued messages.
  *
- * THREAD UNSAFE: used only in broker connection thread.
+ * THREAD SAFE: Used as a consume in subscription's connection
+ * thread, and as a QueueObserver in arbitrary connection threads.
  */
 class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
                                 public broker::QueueObserver
@@ -85,8 +90,12 @@ class ReplicatingSubscription : public b
     boost::shared_ptr<broker::Queue> events;
     boost::shared_ptr<broker::Consumer> consumer;
     qpid::framing::SequenceSet dequeues;
+    framing::SequenceNumber backupPosition;
 
-    void generateDequeueEvent();
+    void sendDequeueEvent(const sys::Mutex::ScopedLock&);
+    void sendPositionEvent(framing::SequenceNumber, const 
sys::Mutex::ScopedLock&);
+    void sendEvent(const std::string& key, framing::Buffer&,
+                   const sys::Mutex::ScopedLock&);
     class DelegatingConsumer : public Consumer
     {
       public:

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1233664&r1=1233663&r2=1233664&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Thu Jan 19 
23:06:16 2012
@@ -106,18 +106,28 @@ class ShortTests(BrokerTest):
         verify(b, "1", p)
         verify(b, "2", p)
 
-        # Test a series of messages, enqueue and dequeue.
+        # Test a series of messages, enqueue all then dequeue all.
         s = p.sender(queue("foo","all"))
         msgs = [str(i) for i in range(10)]
         for m in msgs: s.send(Message(m))
-        self.assert_browse_retry(b, "foo", msgs)
         self.assert_browse_retry(p, "foo", msgs)
+        self.assert_browse_retry(b, "foo", msgs)
         r = p.receiver("foo")
         for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content)
         p.acknowledge()
         self.assert_browse_retry(p, "foo", [])
         self.assert_browse_retry(b, "foo", [])
 
+        # Another series, this time verify each dequeue individually.
+        for m in msgs: s.send(Message(m))
+        self.assert_browse_retry(p, "foo", msgs)
+        self.assert_browse_retry(b, "foo", msgs)
+        for i in range(len(msgs)):
+            self.assertEqual(msgs[i], r.fetch(timeout=0).content)
+            p.acknowledge()
+            self.assert_browse_retry(p, "foo", msgs[i+1:])
+            self.assert_browse_retry(b, "foo", msgs[i+1:])
+
     def qpid_replicate(self, value="all"):
         return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
 
@@ -172,6 +182,7 @@ class ShortTests(BrokerTest):
         except:
             print self.browse(primary.connect().session(), "q", transform=sn)
             print self.browse(backup1.connect().session(), "q", transform=sn)
+            print self.browse(backup2.connect().session(), "q", transform=sn)
             raise
 
 if __name__ == "__main__":



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to