Author: aconway
Date: Mon Jun 18 18:08:09 2012
New Revision: 1351434

URL: http://svn.apache.org/viewvc?rev=1351434&view=rev
Log:
QPID-3603: Minor cleanup and test improvements in HA code.

- Enabled 10 queue failover test
- Minor cleanup in types.h
- Rewording, adding comments.
- Detect and reject invalid replication values.
- Cleaned up some unnecessary #includes

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
    qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/types.h
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1351434&r1=1351433&r2=1351434&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Jun 18 18:08:09 2012
@@ -23,6 +23,7 @@
 #include "ConnectionObserver.h"
 #include "HaBroker.h"
 #include "Primary.h"
+#include "QueueReplicator.h"
 #include "ReplicatingSubscription.h"
 #include "Settings.h"
 #include "qpid/amqp_0_10/Codecs.h"

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1351434&r1=1351433&r2=1351434&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Mon Jun 18 18:08:09 2012
@@ -21,6 +21,7 @@
 #include "Backup.h"
 #include "HaBroker.h"
 #include "Primary.h"
+#include "ReplicationTest.h"
 #include "ReplicatingSubscription.h"
 #include "RemoteBackup.h"
 #include "ConnectionObserver.h"
@@ -72,6 +73,9 @@ Primary::Primary(HaBroker& hb, const Bro
         QPID_LOG(debug, logPrefix << "Expected backups: none");
     }
     else {
+        // NOTE: RemoteBackups must be created before we set the 
ConfigurationObserver
+        // orr ConnectionObserver so that there is no client activity while
+        // the QueueGuards are created.
         QPID_LOG(debug, logPrefix << "Expected backups: " << expect);
         for (BrokerInfo::Set::const_iterator i = expect.begin(); i != 
expect.end(); ++i) {
             bool guard = true;  // Create queue guards immediately for 
expected backups.
@@ -126,6 +130,8 @@ void Primary::readyReplica(const Replica
 }
 
 void Primary::queueCreate(const QueuePtr& q) {
+    // Throw if there is an invalid replication level in the queue settings.
+    haBroker.getReplicationTest().replicateLevel(q->getSettings());
     Mutex::ScopedLock l(lock);
     for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
         i->second->queueCreate(q);
@@ -147,7 +153,7 @@ void Primary::opened(broker::Connection&
         BackupMap::iterator i = backups.find(info.getSystemId());
         if (i == backups.end()) {
             QPID_LOG(debug, logPrefix << "New backup connected: " << info);
-            bool guard = false; // Lazy-create queue guards, pre-creating them 
here could cause deadlock.
+            bool guard = false; // Lazy-create guards for new backups. 
Creating them here could deadlock.
             backups[info.getSystemId()].reset(
                 new RemoteBackup(info, haBroker.getBroker(), 
haBroker.getReplicationTest(), guard));
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1351434&r1=1351433&r2=1351434&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Mon Jun 18 18:08:09 2012
@@ -51,13 +51,15 @@ class QueueGuard::QueueObserver : public
 QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
     : queue(q), subscription(0)
 {
+    // NOTE: There is no activity on the queue while QueueGuard constructor is
+    // running It is called either from Primary before client connections are
+    // allowed or from ConfigurationObserver::queueCreate before the queue is
+    // visible.
     std::ostringstream os;
     os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": 
";
     logPrefix = os.str();
     observer.reset(new QueueObserver(*this));
-    // Once we call addObserver we can get calls to enqueued and  dequeued
     queue.addObserver(observer);
-    // Must set after addObserver so we don't miss any enqueues.
     firstSafe = queue.getPosition(); // FIXME aconway 2012-06-13: fencepost 
error
 }
 
@@ -95,6 +97,7 @@ void QueueGuard::cancel() {
         Mutex::ScopedLock l(lock);
         if (delayed.empty()) return; // No need if no delayed messages.
     }
+    // FIXME aconway 2012-06-15: optimize, only messages in delayed set.
     queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1351434&r1=1351433&r2=1351434&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Mon Jun 18 18:08:09 2012
@@ -50,8 +50,10 @@ bool RemoteBackup::isReady() {
 }
 
 void RemoteBackup::initialQueue(const QueuePtr& q) {
-    if (replicationTest.isReplicated(ALL, *q)) initialQueues.insert(q);
-    queueCreate(q);
+    if (replicationTest.isReplicated(ALL, *q)) {
+        initialQueues.insert(q);
+        queueCreate(q);
+    }
 }
 
 RemoteBackup::GuardPtr RemoteBackup::guard(const QueuePtr& q) {
@@ -81,11 +83,12 @@ std::ostream& operator<<(std::ostream& o
 
 void RemoteBackup::ready(const QueuePtr& q) {
     initialQueues.erase(q);
-    QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName() << " 
remaining unready: " << QueueSetPrinter(initialQueues));
+    QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName()
+             << " remaining unready: " << QueueSetPrinter(initialQueues));
     if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready");
 }
 
-// Called via ConfigurationObserver
+// Called via ConfigurationObserver and from initialQueue
 void RemoteBackup::queueCreate(const QueuePtr& q) {
     if (createGuards && replicationTest.isReplicated(ALL, *q))
         guards[q].reset(new QueueGuard(*q, brokerInfo));

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1351434&r1=1351433&r2=1351434&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Mon Jun 18 
18:08:09 2012
@@ -20,6 +20,7 @@
  */
 
 #include "QueueGuard.h"
+#include "QueueReplicator.h"
 #include "ReplicatingSubscription.h"
 #include "Primary.h"
 #include "qpid/broker/Queue.h"
@@ -161,7 +162,7 @@ struct QueueRange {
 
     QueueRange(const framing::FieldTable args) {
         back = args.getAsInt(ReplicatingSubscription::QPID_BACK);
-        front = back+1;
+        front = back+1;         // Assume empty
         empty = !args.isSet(ReplicatingSubscription::QPID_FRONT);
         if (!empty) {
             front = args.getAsInt(ReplicatingSubscription::QPID_FRONT);
@@ -223,7 +224,7 @@ ReplicatingSubscription::ReplicatingSubs
 
         // We can re-use some backup messages if backup and primary queues
         // overlap and the backup is not missing messages at the front of the 
queue.
-        // FIXME aconway 2012-06-10: disable re-use of backup queue till stall 
problem is solved.
+
         /*        if (!primary.empty &&   // Primary not empty
             !backup.empty &&    // Backup not empty
             primary.front >= backup.front && // Not missing messages at the 
front

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1351434&r1=1351433&r2=1351434&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Mon Jun 18 
18:08:09 2012
@@ -22,7 +22,6 @@
  *
  */
 
-#include "QueueReplicator.h"    // For DEQUEUE_EVENT_KEY
 #include "BrokerInfo.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/broker/QueueObserver.h"

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp?rev=1351434&r1=1351433&r2=1351434&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp Mon Jun 18 18:08:09 2012
@@ -28,9 +28,9 @@ namespace ha {
 using types::Variant;
 
 ReplicateLevel ReplicationTest::replicateLevel(const std::string& str) {
-    Enum<ReplicateLevel> rl;
-    if (rl.parseNoThrow(str)) return ReplicateLevel(rl.get());
-    else return replicateDefault;
+    Enum<ReplicateLevel> rl(replicateDefault);
+    if (!str.empty()) rl.parse(str);
+    return rl.get();
 }
 
 ReplicateLevel ReplicationTest::replicateLevel(const framing::FieldTable& f) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h?rev=1351434&r1=1351433&r2=1351434&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h Mon Jun 18 18:08:09 2012
@@ -52,8 +52,8 @@ class ReplicationTest
     ReplicateLevel replicateLevel(const framing::FieldTable& f);
     ReplicateLevel replicateLevel(const types::Variant::Map& m);
 
-    // Return true if replication for a queue is enabled at level or
-    // higher, taking account of all settings.
+    // Return true if replication for a queue is enabled at level or higher,
+    // taking account of default level and queue settings.
     bool isReplicated(ReplicateLevel level,
                       const types::Variant::Map& args, bool autodelete, bool 
exclusive);
     bool isReplicated(ReplicateLevel level,

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp?rev=1351434&r1=1351433&r2=1351434&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp Mon Jun 18 18:08:09 2012
@@ -41,7 +41,7 @@ string EnumBase::str() const {
 
 void EnumBase::parse(const string& s) {
     if (!parseNoThrow(s))
-        throw Exception(QPID_MSG("Invalid " << names[count] << " value: " << 
s));
+        throw Exception(QPID_MSG("Invalid " << name << " value: " << s));
 }
 
 bool EnumBase::parseNoThrow(const string& s) {
@@ -50,16 +50,15 @@ bool EnumBase::parseNoThrow(const string
     return value < count;
 }
 
-template <> const char* Enum<ReplicateLevel>::NAMES[] = {
-    "none", "configuration", "all", "replication"
-};
+template <> const char* Enum<ReplicateLevel>::NAME = "replication";
+template <> const char* Enum<ReplicateLevel>::NAMES[] = { "none", 
"configuration", "all" };
 template <> const size_t Enum<ReplicateLevel>::N = 3;
 
+template <> const char* Enum<BrokerStatus>::NAME = "HA broker status";
 template <> const char* Enum<BrokerStatus>::NAMES[] = {
-    "joining", "catchup", "ready", "recovering", "active",
-    "standalone", "broker status"
+    "joining", "catchup", "ready", "recovering", "active", "standalone"
 };
-template <> const size_t Enum<BrokerStatus>::N = 7;
+template <> const size_t Enum<BrokerStatus>::N = 6;
 
 ostream& operator<<(ostream& o, EnumBase e) {
     return o << e.str();

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/types.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/types.h?rev=1351434&r1=1351433&r2=1351434&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/types.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/types.h Mon Jun 18 18:08:09 2012
@@ -39,8 +39,8 @@ namespace ha {
 /** Base class for enums with string conversion */
 class EnumBase {
   public:
-    EnumBase(const char* names_[], size_t count_, unsigned value)
-        : value(value), names(names_), count(count_) {}
+    EnumBase(const char* name_, const char* names_[], size_t count_, unsigned 
value)
+        : name(name_), names(names_), count(count_), value(value) {}
 
     /** Convert to string */
     std::string str() const;
@@ -50,9 +50,10 @@ class EnumBase {
     bool parseNoThrow(const std::string&);
 
   protected:
-    unsigned value;
+    const char* name;
     const char** names;
     size_t count;
+    unsigned value;
 };
 
 std::ostream& operator<<(std::ostream&, EnumBase);
@@ -61,12 +62,14 @@ std::istream& operator>>(std::istream&, 
 /** Wrapper template for enums with string conversion */
 template <class T> class Enum : public EnumBase {
   public:
-    Enum(T x=T()) : EnumBase(NAMES, N, x) {}
+    Enum(T x=T()) : EnumBase(NAME, NAMES, N, x) {}
     T get() const { return T(value); }
     void operator=(T x) { value = x; }
+
   private:
-    static const size_t N;
-    static const char* NAMES[];
+    static const size_t N;      // Number of enum values.
+    static const char* NAMES[]; // Names of enum values.
+    static const char* NAME;    // Descriptive name for the enum type.
 };
 
 /** To print an enum x: o << printable(x) */
@@ -94,13 +97,10 @@ inline bool isPrimary(BrokerStatus s) {
 
 inline bool isBackup(BrokerStatus s) { return !isPrimary(s); }
 
+// String constants.
 extern const std::string QPID_REPLICATE;
 
-// FIXME aconway 2012-06-04: rename types.h->types.h
-
-/**
- * Define IdSet type, not a typedef so we can overload operator <<
- */
+/** Define IdSet type, not a typedef so we can overload operator << */
 class IdSet : public std::set<types::Uuid> {};
 
 std::ostream& operator<<(std::ostream& o, const IdSet& ids);

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1351434&r1=1351433&r2=1351434&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Mon Jun 18 18:08:09 2012
@@ -76,7 +76,7 @@ def error_line(filename, n=1):
     except: return ""
     return ":\n" + "".join(result)
 
-def retry(function, timeout=3, delay=.01):
+def retry(function, timeout=10, delay=.01):
     """Call function until it returns a true value or timeout expires.
     Double the delay for each retry. Returns what function returns if
     true, None if timeout expires."""
@@ -243,7 +243,7 @@ class Broker(Popen):
     _broker_count = 0
     _log_count = 0
 
-    def __str__(self): return "Broker<%s %s :%d>"%(self.name, self.pname, 
self.port())
+    def __str__(self): return "Broker<%s %s :%d>"%(self.log, self.pname, 
self.port())
 
     def find_log(self):
         self.log = "%03d:%s.log" % (Broker._log_count, self.name)

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1351434&r1=1351433&r2=1351434&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Jun 18 18:08:09 2012
@@ -610,11 +610,13 @@ class ReplicationTests(BrokerTest):
             self.fail("Excpected no-such-queue exception")
         except NotFound: pass
 
-    def test_invalid_default(self):
-        """Verify that a queue with an invalid qpid.replicate gets default 
treatment"""
-        cluster = HaCluster(self, 2, ha_replicate="all")
-        c = cluster[0].connect().session().sender("q;{create:always, 
node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
-        cluster[1].wait_backup("q")
+    def test_invalid_replication(self):
+        """Verify that we reject an attempt to declare a queue with invalid 
replication value."""
+        cluster = HaCluster(self, 1, ha_replicate="all")
+        try:
+            c = cluster[0].connect().session().sender("q;{create:always, 
node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
+            self.fail("Expected ConnectionError")
+        except ConnectionError: pass
 
     def test_exclusive_queue(self):
         """Ensure that we can back-up exclusive queues, i.e. the replicating
@@ -720,7 +722,7 @@ class LongTests(BrokerTest):
         brokers = HaCluster(self, 3)
 
         # Start sender and receiver threads
-        n = 1;           # FIXME aconway 2012-06-10: n = 10
+        n = 10;
         senders = [NumberedSender(brokers[0], max_depth=1024, 
failover_updates=False,
                                  queue="test%s"%(i)) for i in xrange(n)]
         receivers = [NumberedReceiver(brokers[0], sender=senders[i],



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to