Author: aconway
Date: Mon Jun 18 18:08:19 2012
New Revision: 1351435

URL: http://svn.apache.org/viewvc?rev=1351435&view=rev
Log:
QPID-3603: Fix & clean up in HA code.

- Fix fencepost error in getFirstSafe()
- QueueGuard::attach completes messages before the ReplicatingSubscription 
postion
- Fix minor test issues in brokertest.py and ha_test.py.
- ReplicatingSubscription check for ready in acknowledge not dispatch.
- HA test fix: retry wait_status retry on ConnectErrors, broker may not be up.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1351435&r1=1351434&r2=1351435&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Mon Jun 18 18:08:19 2012
@@ -24,6 +24,7 @@
 #include "qpid/broker/QueuedMessage.h"
 #include "qpid/broker/QueueObserver.h"
 #include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
 #include <sstream>
 
 namespace qpid {
@@ -51,16 +52,13 @@ class QueueGuard::QueueObserver : public
 QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
     : queue(q), subscription(0)
 {
-    // NOTE: There is no activity on the queue while QueueGuard constructor is
-    // running It is called either from Primary before client connections are
-    // allowed or from ConfigurationObserver::queueCreate before the queue is
-    // visible.
     std::ostringstream os;
     os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": 
";
     logPrefix = os.str();
     observer.reset(new QueueObserver(*this));
     queue.addObserver(observer);
-    firstSafe = queue.getPosition(); // FIXME aconway 2012-06-13: fencepost 
error
+    // Set after addObserver to ensure we dont miss an enqueue.
+    firstSafe = queue.getPosition() + 1; // Next message will be protected by 
the guard.
 }
 
 QueueGuard::~QueueGuard() { cancel(); }
@@ -101,9 +99,23 @@ void QueueGuard::cancel() {
     queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
 }
 
+namespace {
+void completeBefore(QueueGuard* guard, SequenceNumber position, const 
QueuedMessage& qm) {
+    if (qm.position <= position) guard->complete(qm);
+}
+}
+
 void QueueGuard::attach(ReplicatingSubscription& rs) {
-    Mutex::ScopedLock l(lock);
+    // NOTE: attach is called before the ReplicatingSubscription is active so
+    // it's position is not changing.
     assert(firstSafe >= rs.getPosition());
+    // Complete any messages before or at the ReplicatingSubscription position.
+    if (!delayed.empty() && delayed.front() <= rs.getPosition()) {
+        // FIXME aconway 2012-06-15: queue iteration, only messages in delayed
+        queue.eachMessage(boost::bind(&completeBefore, this, rs.getPosition(), 
_1));
+    }
+    Mutex::ScopedLock l(lock);
+    // FIXME aconway 2012-06-15: complete messages before rs.getPosition
     subscription = &rs;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1351435&r1=1351434&r2=1351435&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h Mon Jun 18 18:08:19 2012
@@ -43,12 +43,11 @@ class BrokerInfo;
 class ReplicatingSubscription;
 
 /**
- * A queue guard is a QueueObserver that delays completion of new
- * messages arriving on a queue.  It works as part of a
- * ReplicatingSubscription to ensure messages are not acknowledged
- * till they have been replicated.
+ * A queue guard is a QueueObserver that delays completion of new messages
+ * arriving on a queue.  It works as part of a ReplicatingSubscription to 
ensure
+ * messages are not acknowledged till they have been replicated.
  *
- * The guard is created before the ReplicatingSubscription to protect
+ * The guard can be created before the ReplicatingSubscription to protect
  * messages arriving before the creation of the subscription.
  *
  * THREAD SAFE: Concurrent calls:
@@ -78,8 +77,9 @@ class QueueGuard {
 
     void attach(ReplicatingSubscription&);
 
-    /** The first sequence number protected by this guard.
-     * All messages at or after this position are protected.
+    /**
+     * The first sequence number to be protected by this guard.  All messages 
at
+     * or after this position are protected.
      */
     framing::SequenceNumber getFirstSafe();
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1351435&r1=1351434&r2=1351435&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Mon Jun 18 
18:08:19 2012
@@ -248,9 +248,9 @@ ReplicatingSubscription::ReplicatingSubs
         }
         QPID_LOG(debug, logPrefix << "Subscribed: "
                  << " backup:" << backup
-                 << " backup position:" << backupPosition
                  << " primary:" << primary
                  << " position:" << position
+                 << " safe position: " << guard->getFirstSafe()
         );
 
         // Are we ready yet?
@@ -308,10 +308,7 @@ bool ReplicatingSubscription::deliver(Qu
                 backupPosition = qm.position;
             }
             // Deliver the message
-            bool delivered = ConsumerImpl::deliver(qm);
-            // If we have advanced past the initial position, the backup is 
ready.
-            if (qm.position >= guard->getFirstSafe()) setReady();
-            return delivered;
+            return ConsumerImpl::deliver(qm);
         }
         else
             return ConsumerImpl::deliver(qm); // Message is for internal event 
queue.
@@ -329,7 +326,7 @@ void ReplicatingSubscription::setReady()
         ready = true;
     }
     // Notify Primary that a subscription is ready.
-    QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
+    QPID_LOG(debug, logPrefix << "Caught up");
     if (Primary::get()) Primary::get()->readyReplica(*this);
 }
 
@@ -346,6 +343,8 @@ void ReplicatingSubscription::acknowledg
         // Finish completion of message, it has been acknowledged by the 
backup.
         QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
         guard->complete(qm);
+        // If next message is protected by the guard then we are ready
+        if (qm.position+1 >= guard->getFirstSafe()) setReady();
     }
     ConsumerImpl::acknowledged(qm);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1351435&r1=1351434&r2=1351435&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Mon Jun 18 
18:08:19 2012
@@ -48,12 +48,17 @@ class QueueGuard;
 /**
  * A susbcription that replicates to a remote backup.
  *
- * Runs on the primary. In conjunction with a QueueGuard, delays
- * completion of messages till the backup has acknowledged, informs
- * backup of locally dequeued messages.
+ * Runs on the primary. In conjunction with a QueueGuard, delays completion of
+ * messages till the backup has acknowledged, informs backup of locally 
dequeued
+ * messages.
  *
- * THREAD SAFE: Called in subscription's connection thread but also
- * in arbitrary connection threads via dequeued.
+ * A ReplicatingSubscription is "ready" when all the messages on the queue have
+ * either been acknowledged by the backup, or are protected by the queue guard.
+ * On a primary broker the ReplicatingSubscription calls Primary::readyReplica
+ * when it is ready.
+ *
+ * THREAD SAFE: Called in subscription's connection thread but also in 
arbitrary
+ * connection threads via dequeued.
  *
  * Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
  *

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1351435&r1=1351434&r2=1351435&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Mon Jun 18 18:08:19 2012
@@ -76,20 +76,20 @@ def error_line(filename, n=1):
     except: return ""
     return ":\n" + "".join(result)
 
-def retry(function, timeout=10, delay=.01):
+def retry(function, timeout=10, delay=.01, max_delay=1):
     """Call function until it returns a true value or timeout expires.
-    Double the delay for each retry. Returns what function returns if
-    true, None if timeout expires."""
+    Double the delay for each retry up to max_delay.
+    Returns what function returns if true, None if timeout expires."""
     deadline = time.time() + timeout
     ret = None
-    while not ret:
+    while True:
         ret = function()
+        if ret: return ret
         remaining = deadline - time.time()
         if remaining <= 0: return False
         delay = min(delay, remaining)
         time.sleep(delay)
-        delay *= 2
-    return ret
+        delay = min(delay*2, max_delay)
 
 class AtomicCounter:
     def __init__(self):
@@ -657,7 +657,7 @@ class NumberedReceiver(Thread):
             m = self.read_message()
             while m != -1:
                 self.receiver.assert_running()
-                assert m <= self.received, "Missing message %s>%s"%(m, 
self.received)
+                assert m <= self.received, "%s missing message %s>%s"%(queue, 
m, self.received)
                 if (m == self.received): # Ignore duplicates
                     self.received += 1
                     if self.sender:

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1351435&r1=1351434&r2=1351435&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Jun 18 18:08:19 2012
@@ -77,10 +77,17 @@ class HaBroker(Broker):
         if not self._agent: self._agent = QmfAgent(self.host_port())
         return self._agent
 
-    def ha_status(self): return self.agent().getHaBroker().status
+    def ha_status(self):
+        hb = self.agent().getHaBroker()
+        hb.update()
+        return hb.status
 
     def wait_status(self, status):
-        assert retry(lambda: self.ha_status() == status), "%s, %r != 
%r"%(self, self.ha_status(), status)
+        def try_get_status():
+            # Ignore ConnectionError, the broker may not be up yet.
+            try: return self.ha_status() == status;
+            except ConnectionError: return False
+        assert retry(try_get_status, timeout=20), "%s, %r != %r"%(self, 
self.ha_status(), status)
 
     # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
     def qpid_config(self, args):
@@ -755,12 +762,12 @@ class LongTests(BrokerTest):
                     return receivers[0].received > n + 100
                 # FIXME aconway 2012-05-17: client reconnect sometimes takes > 
1 sec.
                 assert retry(enough, 10), "Stalled: %s < 
%s+100"%(receivers[0].received, n)
-            for s in senders: s.stop()
-            for r in receivers: r.stop()
         except:
             traceback.print_exc()
             raise
         finally:
+            for s in senders: s.stop()
+            for r in receivers: r.stop()
             dead = []
             for i in xrange(3):
                 if not brokers[i].is_running(): dead.append(i)



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to