Author: aconway
Date: Fri Jun 22 19:28:30 2012
New Revision: 1353000

URL: http://svn.apache.org/viewvc?rev=1353000&view=rev
Log:
NO-JIRA: Cleanup in HA code.

- Better printed representation for queue ranges.
- Removed dead code in ha module related to activated/deactivated backups.
- Improved log messages in ha/BrokerReplicator.cpp

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1353000&r1=1352999&r2=1353000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Jun 22 19:28:30 
2012
@@ -298,36 +298,33 @@ void BrokerReplicator::route(Deliverable
 }
 
 void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
-    string name = values[QNAME].asString();
     Variant::Map argsMap = asMapVoid(values[ARGS]);
-    if (!replicationTest.isReplicated(
-            CONFIGURATION,
-            values[ARGS].asMap(),
-            values[AUTODEL].asBool(),
-            values[EXCL].asBool()))
-        return;
-    if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
+    bool autoDel = values[AUTODEL].asBool();
+    bool excl = values[EXCL].asBool();
+    if (values[DISP] == CREATED &&
+        replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl))
+    {
+        string name = values[QNAME].asString();
+        QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
         framing::FieldTable args;
         amqp_0_10::translate(argsMap, args);
         // If we already have a queue with this name, replace it.
         // The queue was definitely created on the primary.
         if (broker.getQueues().find(name)) {
+            QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << 
name);
             broker.getQueues().destroy(name);
-            QPID_LOG(warning, logPrefix << "Queue declare event, replaced 
exsiting: "
-                     << name);
         }
         std::pair<boost::shared_ptr<Queue>, bool> result =
             broker.createQueue(
                 name,
                 values[DURABLE].asBool(),
-                values[AUTODEL].asBool(),
+                autoDel,
                 0, // no owner regardless of exclusivity on primary
                 values[ALTEX].asString(),
                 args,
                 values[USER].asString(),
                 values[RHOST].asString());
         assert(result.second);  // Should be true since we destroyed existing 
queue above
-        QPID_LOG(debug, logPrefix << "Queue declare event, starting 
replication: " << name);
         startQueueReplicator(result.first);
     }
 }
@@ -345,21 +342,16 @@ void BrokerReplicator::doEventQueueDelet
     // sessions may be closed by a "queue deleted" exception.
     string name = values[QNAME].asString();
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
-    if (!queue) {
-        QPID_LOG(warning, logPrefix << "Queue delete event, does not exist: " 
<< name);
-    } else if (!replicationTest.replicateLevel(queue->getSettings())) {
-        QPID_LOG(warning, logPrefix << "Queue delete event, not replicated: " 
<< name);
-    } else {
+    if (queue && replicationTest.replicateLevel(queue->getSettings())) {
+        QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
         boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
         if (qr) {
             qr->deactivate();
-            haBroker.deactivatedBackup(name);
             // QueueReplicator's bridge is now queued for destruction but may 
not
             // actually be destroyed.
             broker.getExchanges().destroy(qr->getName());
         }
         broker.deleteQueue(name, values[USER].asString(), 
values[RHOST].asString());
-        QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
     }
 }
 
@@ -368,14 +360,15 @@ void BrokerReplicator::doEventExchangeDe
     if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated 
exchange.
     if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
         string name = values[EXNAME].asString();
+        QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
         framing::FieldTable args;
         amqp_0_10::translate(argsMap, args);
         // If we already have a exchange with this name, replace it.
         // The exchange was definitely created on the primary.
         if (broker.getExchanges().find(name)) {
             broker.getExchanges().destroy(name);
-            QPID_LOG(warning, logPrefix << "Exchange declare event, replaced 
exsiting: " << name)
-                }
+            QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << 
name);
+        }
         std::pair<boost::shared_ptr<Exchange>, bool> result =
             broker.createExchange(
                 name,
@@ -386,7 +379,6 @@ void BrokerReplicator::doEventExchangeDe
                 values[USER].asString(),
                 values[RHOST].asString());
         assert(result.second);
-        QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
     }
 }
 
@@ -419,10 +411,10 @@ void BrokerReplicator::doEventBind(Varia
         framing::FieldTable args;
         amqp_0_10::translate(asMapVoid(values[ARGS]), args);
         string key = values[KEY].asString();
-        exchange->bind(queue, key, &args);
         QPID_LOG(debug, logPrefix << "Bind event: exchange=" << 
exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
+        exchange->bind(queue, key, &args);
     }
 }
 
@@ -439,16 +431,17 @@ void BrokerReplicator::doEventUnbind(Var
         framing::FieldTable args;
         amqp_0_10::translate(asMapVoid(values[ARGS]), args);
         string key = values[KEY].asString();
-        exchange->unbind(queue, key, &args);
         QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << 
exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
+        exchange->unbind(queue, key, &args);
     }
 }
 
 void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
     Variant::List members = values[MEMBERS].asList();
-    haBroker.getMembership().assign(members);
+    QPID_LOG(debug, logPrefix << "Membership update event: " <<  members);
+    haBroker.setMembership(members);
 }
 
 void BrokerReplicator::doResponseQueue(Variant::Map& values) {
@@ -459,9 +452,10 @@ void BrokerReplicator::doResponseQueue(V
             values[AUTODELETE].asBool(),
             values[EXCLUSIVE].asBool()))
         return;
+    string name(values[NAME].asString());
+    QPID_LOG(debug, logPrefix << "Queue response: " << name);
     framing::FieldTable args;
     amqp_0_10::translate(argsMap, args);
-    string name(values[NAME].asString());
     std::pair<boost::shared_ptr<Queue>, bool> result =
         broker.createQueue(
             name,
@@ -473,28 +467,28 @@ void BrokerReplicator::doResponseQueue(V
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/);
     // It is normal for the queue to already exist if we are failing over.
-    QPID_LOG(debug, logPrefix << "Queue response, "
-             << (result.second ? "starting replication: " : "already 
replicated: ")
-             << name);
-    if (result.second) startQueueReplicator(result.first);
+     if (result.second)
+         startQueueReplicator(result.first);
+     else
+         QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
 }
 
 void BrokerReplicator::doResponseExchange(Variant::Map& values) {
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
     if (!replicationTest.replicateLevel(argsMap)) return;
+    string name = values[NAME].asString();
+    QPID_LOG(debug, logPrefix << "Exchange response: " << name);
     framing::FieldTable args;
     amqp_0_10::translate(argsMap, args);
     bool created = broker.createExchange(
-        values[NAME].asString(),
+        name,
         values[TYPE].asString(),
         values[DURABLE].asBool(),
         ""/*TODO: need to include alternate-exchange*/,
         args,
         ""/*TODO: who is the user?*/,
         ""/*TODO: what should we use as connection id?*/).second;
-    QPID_LOG(debug, logPrefix << "Exchange response, "
-             << (created ? "created replica: " : "already exists: ")
-             << values[NAME].asString());
+    QPID_LOG_IF(debug, !created, logPrefix << "Exchange already exists: " << 
name);
 }
 
 namespace {
@@ -528,13 +522,13 @@ void BrokerReplicator::doResponseBind(Va
     if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
         queue && replicationTest.replicateLevel(queue->getSettings()))
     {
+        string key = values[KEY].asString();
+        QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
+                 << " queue:" << qName
+                 << " key:" << key);
         framing::FieldTable args;
         amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
-        string key = values[KEY].asString();
         exchange->bind(queue, key, &args);
-        QPID_LOG(debug, logPrefix << "Bind response: exchange=" << 
exchange->getName()
-                 << " queue=" << queue->getName()
-                 << " key=" << key);
     }
 }
 
@@ -545,6 +539,7 @@ const string REPLICATE_DEFAULT="replicat
 // Received the ha-broker configuration object for the primary broker.
 void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
     try {
+        QPID_LOG(debug, logPrefix << "HA Broker response: " << values);
         ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
         ReplicateLevel primary = replicationTest.replicateLevel(
             values[REPLICATE_DEFAULT].asString());
@@ -566,7 +561,6 @@ void BrokerReplicator::startQueueReplica
         if (!broker.getExchanges().registerExchange(qr))
             throw Exception(QPID_MSG("Duplicate queue replicator " << 
qr->getName()));
         qr->activate();
-        haBroker.activatedBackup(queue->getName());
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1353000&r1=1352999&r2=1353000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Jun 22 19:28:30 2012
@@ -297,20 +297,4 @@ void HaBroker::setLinkProperties(Mutex::
     broker.setLinkClientProperties(linkProperties);
 }
 
-void HaBroker::activatedBackup(const std::string& queue) {
-    Mutex::ScopedLock l(lock);
-    activeBackups.insert(queue);
-}
-
-void HaBroker::deactivatedBackup(const std::string& queue) {
-    Mutex::ScopedLock l(lock);
-    activeBackups.erase(queue);
-}
-
-// FIXME aconway 2012-05-31: strip out.
-HaBroker::QueueNames HaBroker::getActiveBackups() const {
-    Mutex::ScopedLock l(lock);
-    return activeBackups;
-}
-
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1353000&r1=1352999&r2=1353000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Fri Jun 22 19:28:30 2012
@@ -84,13 +84,6 @@ class HaBroker : public management::Mana
     Backup* getBackup() { return backup.get(); }
     ReplicationTest getReplicationTest() const { return replicationTest; }
 
-    // Keep track of the set of actively replicated queues on a backup
-    // so that it can be transferred to the Primary on promotion.
-    typedef std::set<std::string> QueueNames;
-    void activatedBackup(const std::string& queue);
-    void deactivatedBackup(const std::string& queue);
-    QueueNames getActiveBackups() const;
-
     boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
 
     const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
@@ -124,7 +117,6 @@ class HaBroker : public management::Mana
     Url clientUrl, brokerUrl;
     std::vector<Url> knownBrokers;
     BrokerStatus status;
-    QueueNames activeBackups;
     boost::shared_ptr<ConnectionObserver> observer;
     BrokerInfo brokerInfo;
     Membership membership;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h?rev=1353000&r1=1352999&r2=1353000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h Fri Jun 22 19:28:30 2012
@@ -85,11 +85,15 @@ class Primary
     HaBroker& haBroker;
     std::string logPrefix;
     bool active;
+    /**
+     * Set of expected backups that must be ready before we declare ourselves
+     * active
+     */
     BackupSet initialBackups;
     /**
-     * Backups is a map of all the remote backups we know about: any expected
-     * backups plus all actual backups that have connected. We do not remove
-     * entries when a backup disconnects. @see Primary::closed()
+     * Map of all the remote backups we know about: any expected backups plus
+     * all actual backups that have connected. We do not remove entries when a
+     * backup disconnects. @see Primary::closed()
      */
     BackupMap backups;
     boost::shared_ptr<broker::ConnectionObserver> connectionObserver;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h?rev=1353000&r1=1352999&r2=1353000&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h Fri Jun 22 19:28:30 2012
@@ -61,7 +61,7 @@ struct QueueRange {
 
 
 inline std::ostream& operator<<(std::ostream& o, const QueueRange& qr) {
-    if (qr.front > qr.back) return o << "[-" << qr.back << "]";
+    if (qr.front > qr.back) return o << "[-," << qr.back << "]";
     else return o << "[" << qr.front << "," << qr.back << "]";
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to