Author: aconway
Date: Tue Oct  9 19:52:24 2012
New Revision: 1396244

URL: http://svn.apache.org/viewvc?rev=1396244&view=rev
Log:
QPID-4360: Fix test bug: Non-ready HA broker can be incorrectly promoted to 
primary.

Test test_delete_missing_response was failing with "cluster active, cannot 
promote".
- Fixed test bug: "fake" primary triggered "cannot promote".
- Backup: always create QueueReplicator if not already existing.
- Terminology change: "initial" queues -> "catch-up" queues.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1396244&r1=1396243&r2=1396244&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Oct  9 19:52:24 
2012
@@ -302,12 +302,12 @@ void BrokerReplicator::route(Deliverable
                 else if (type == HA_BROKER) doResponseHaBroker(values);
             }
             if (MessageTransfer::isLastQMFResponse(msg.getMessage(), 
EXCHANGE)) {
-                QPID_LOG(debug, logPrefix << "Initial exchange configuration 
complete.");
+                QPID_LOG(debug, logPrefix << "All exchange responses 
received.")
                 cleaner.cleanExchanges(); // Clean up exchanges that no longer 
exist in the primary
                 alternates.clear();
             }
             if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) {
-                QPID_LOG(debug, logPrefix << "Initial queue configuration 
complete.");
+                QPID_LOG(debug, logPrefix << "All queue responses received.")
                 cleaner.cleanQueues(); // Clean up queues that no longer exist 
in the primary
             }
         }
@@ -338,16 +338,9 @@ void BrokerReplicator::doEventQueueDecla
             deleteQueue(name);
         }
         settings.populate(args, settings.storeSettings);
-        CreateQueueResult result =
-            broker.createQueue(
-                name,
-                settings,
-                0 /*i.e. no owner regardless of exclusivity on master*/,
-                values[ALTEX].asString(),
-                userId,
-                remoteHost);
-        assert(result.second);
-        startQueueReplicator(result.first);
+        CreateQueueResult result = createQueue(
+            name, values[DURABLE].asBool(), autoDel, args, 
values[ALTEX].asString());
+        assert(result.second);  // Should be created since we destroed the 
previous queue above.
     }
 }
 
@@ -494,7 +487,9 @@ void BrokerReplicator::doResponseQueue(V
     CreateQueueResult result =
         createQueue(name, values[DURABLE].asBool(), 
values[AUTODELETE].asBool(), args,
                     getAltExchange(values[ALTEXCHANGE]));
-    if (result.second) startQueueReplicator(result.first);
+    // It is normal for the queue to already exist if we are failing over.
+    if (!result.second)
+        QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
 }
 
 void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -596,8 +591,8 @@ void BrokerReplicator::startQueueReplica
     }
 }
 
-void BrokerReplicator::deactivateQueue(const std::string& name) {
-    boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
+void BrokerReplicator::deactivateQueue(const std::string& queueName) {
+    boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(queueName);
     if (qr) {
         qr->deactivate();
         // QueueReplicator's bridge is now queued for destruction but may not
@@ -646,10 +641,12 @@ BrokerReplicator::CreateQueueResult Brok
             string(), // Set alternate exchange below
             userId,
             remoteHost);
-
-    if (!alternateExchange.empty()) {
+    boost::shared_ptr<Queue> queue = result.first;
+    if (!findQueueReplicator(queue->getName())) startQueueReplicator(queue);
+    if (result.second && !alternateExchange.empty()) {
         alternates.setAlternate(
-            alternateExchange, boost::bind(&Queue::setAlternateExchange, 
result.first, _1));
+            alternateExchange,
+            boost::bind(&Queue::setAlternateExchange, result.first, _1));
     }
     return result;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1396244&r1=1396243&r2=1396244&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Tue Oct  9 19:52:24 2012
@@ -93,7 +93,7 @@ Primary::Primary(HaBroker& hb, const Bro
                 new RemoteBackup(*i, haBroker.getReplicationTest(), false));
             backups[i->getSystemId()] = backup;
             if (!backup->isReady()) expectedBackups.insert(backup);
-            backup->setInitialQueues(hb.getBroker().getQueues(), true); // 
Create guards
+            backup->setCatchupQueues(hb.getBroker().getQueues(), true); // 
Create guards
         }
         // Set timeout for expected brokers to connect and become ready.
         sys::Duration 
timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC));
@@ -207,7 +207,7 @@ void Primary::opened(broker::Connection&
             {
                 // Avoid deadlock with queue registry lock.
                 Mutex::ScopedUnlock u(lock);
-                backup->setInitialQueues(haBroker.getBroker().getQueues(), 
false);
+                backup->setCatchupQueues(haBroker.getBroker().getQueues(), 
false);
             }
             backups[info.getSystemId()] = backup;
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1396244&r1=1396243&r2=1396244&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Tue Oct  9 19:52:24 2012
@@ -37,10 +37,10 @@ RemoteBackup::RemoteBackup(const BrokerI
     brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
 {}
 
-void RemoteBackup::setInitialQueues(broker::QueueRegistry& queues, bool 
createGuards)
+void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool 
createGuards)
 {
-    QPID_LOG(debug, logPrefix << "Setting initial queues" << (createGuards ? " 
and guards" : ""));
-    queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1, 
createGuards));
+    QPID_LOG(debug, logPrefix << "Setting catch-up queues" << (createGuards ? 
" and guards" : ""));
+    queues.eachQueue(boost::bind(&RemoteBackup::catchupQueue, this, _1, 
createGuards));
 }
 
 RemoteBackup::~RemoteBackup() { cancel(); }
@@ -52,12 +52,14 @@ void RemoteBackup::cancel() {
 }
 
 bool RemoteBackup::isReady() {
-    return connected && initialQueues.empty();
+    return connected && catchupQueues.empty();
 }
 
-void RemoteBackup::initialQueue(const QueuePtr& q, bool createGuard) {
+void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {
     if (replicationTest.isReplicated(ALL, *q)) {
-        initialQueues.insert(q);
+        QPID_LOG(debug, logPrefix << "Catch-up queue"
+                 << (createGuard ? " and guard" : "") << ": " << q->getName());
+        catchupQueues.insert(q);
         if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo));
     }
 }
@@ -88,13 +90,13 @@ std::ostream& operator<<(std::ostream& o
 }
 
 void RemoteBackup::ready(const QueuePtr& q) {
-    initialQueues.erase(q);
+    catchupQueues.erase(q);
     QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName()
-             <<  QueueSetPrinter(", waiting for: ", initialQueues));
+             <<  QueueSetPrinter(", waiting for: ", catchupQueues));
     if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready");
 }
 
-// Called via ConfigurationObserver::queueCreate and from initialQueue
+// Called via ConfigurationObserver::queueCreate and from catchupQueue
 void RemoteBackup::queueCreate(const QueuePtr& q) {
     if (replicationTest.isReplicated(ALL, *q))
         guards[q].reset(new QueueGuard(*q, brokerInfo));
@@ -102,7 +104,7 @@ void RemoteBackup::queueCreate(const Que
 
 // Called via ConfigurationObserver
 void RemoteBackup::queueDestroy(const QueuePtr& q) {
-    initialQueues.erase(q);
+    catchupQueues.erase(q);
     GuardMap::iterator i = guards.find(q);
     if (i != guards.end()) {
         i->second->cancel();

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1396244&r1=1396243&r2=1396244&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h Tue Oct  9 19:52:24 2012
@@ -57,10 +57,10 @@ class RemoteBackup
     RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected);
     ~RemoteBackup();
 
-    /** Set the initial queues for all queues in the registry.
-     *@createGuards if true create guards also, if false guards will be 
created on demand.
+    /** Set all queues in the registry as catch-up queues.
+     *@createGuards if true create guards also, if false guards are created on 
demand.
      */
-    void setInitialQueues(broker::QueueRegistry&, bool createGuards);
+    void setCatchupQueues(broker::QueueRegistry&, bool createGuards);
 
     /** Return guard associated with a queue. Used to create 
ReplicatingSubscription. */
     GuardPtr guard(const QueuePtr&);
@@ -80,7 +80,7 @@ class RemoteBackup
     /** Called via ConfigurationObserver. Note: may set isReady() */
     void queueDestroy(const QueuePtr&);
 
-    /**@return true when all initial queues for this backup are ready. */
+    /**@return true when all catch-up queues for this backup are ready. */
     bool isReady();
 
     /**@return true if isReady() and this is the first call to reportReady */
@@ -94,14 +94,13 @@ class RemoteBackup
     typedef std::map<QueuePtr, GuardPtr> GuardMap;
     typedef std::set<QueuePtr> QueueSet;
 
-    /** Add queue to guard as an initial queue */
-    void initialQueue(const QueuePtr&, bool createGuard);
+    void catchupQueue(const QueuePtr&, bool createGuard);
 
     std::string logPrefix;
     BrokerInfo brokerInfo;
     ReplicationTest replicationTest;
     GuardMap guards;
-    QueueSet initialQueues;
+    QueueSet catchupQueues;
     bool connected;
     bool reportedReady;
 };

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1396244&r1=1396243&r2=1396244&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Tue Oct  9 19:52:24 2012
@@ -199,7 +199,7 @@ class HaCluster(object):
         HaCluster._cluster_count += 1
         for i in xrange(n): self.start(False)
         self.update_urls()
-        self[0].promote()
+        if promote: self[0].promote()
 
     def next_name(self):
         name="cluster%s-%s"%(self.id, self.broker_id)

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1396244&r1=1396243&r2=1396244&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Oct  9 19:52:24 2012
@@ -627,25 +627,29 @@ acl deny all all
     def test_delete_missing_response(self):
         """Check that a backup correctly deletes leftover queues and exchanges 
that are
         missing from the initial reponse set."""
-        cluster = HaCluster(self,2)
-        s = cluster[0].connect().session()
+        # This test is a bit contrived, we set up the situation on backup 
brokers
+        # and then promote one.
+        cluster = HaCluster(self, 2, promote=False)
+
+        # cluster[0] Will be the primary
+        s = cluster[0].connect_admin().session()
         s.sender("q1;{create:always}")
-        s.sender("q2;{create:always}")
         s.sender("e1;{create:always, node:{type:topic}}")
-        s.sender("e2;{create:always, node:{type:topic}}")
-        cluster.bounce(0, promote_next=False)
-        # Fake a primary that has deleted some queues and exchanges.
-        s = cluster[0].connect_admin().session()
+
+        # cluster[1] will be the backup, has extra queues/exchanges
+        s = cluster[1].connect_admin().session()
+        s.sender("q1;{create:always}")
         s.sender("q2;{create:always}")
+        s.sender("e1;{create:always, node:{type:topic}}")
         s.sender("e2;{create:always, node:{type:topic}}")
-        s.sender("x;{create:always}") # A new queue so we can wait for the 
update.
+        for a in ["q1", "q2", "e1", "e2"]: cluster[1].wait_backup(a)
+
         cluster[0].promote()
-        # Verify the backup has deleted the missing queues and exchanges
+        # Verify the backup deletes the surpluis queue and exchange
         cluster[1].wait_status("ready")
         s = cluster[1].connect_admin().session()
-        cluster[1].wait_backup("x");
-        self.assertRaises(NotFound, s.receiver, ("q1"));
-        self.assertRaises(NotFound, s.receiver, ("e1"));
+        self.assertRaises(NotFound, s.receiver, ("q2"));
+        self.assertRaises(NotFound, s.receiver, ("e2"));
 
 
     def test_auto_delete_qpid_4285(self):



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to