Author: aconway
Date: Thu Jun 14 00:47:39 2012
New Revision: 1350069

URL: http://svn.apache.org/viewvc?rev=1350069&view=rev
Log:
QPID-3603: Bug fixes to HA code, passing test_failover_send_receive

- Updated HA logging messages to conform to new [Category] log format.
- QueueGuard fencepost error, set firstSafe correctly
- ReplicatingSubscription passing correct broker info to getGuard()
- Don't remove RemoteBackups on disconnect: fixes race where backup rejects 
connection but primary sees closed.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1350069&r1=1350068&r2=1350069&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Thu Jun 14 00:47:39 2012
@@ -45,7 +45,7 @@ using types::Variant;
 using std::string;
 
 Backup::Backup(HaBroker& hb, const Settings& s) :
-    logPrefix("HA backup: "), haBroker(hb), broker(hb.getBroker()), settings(s)
+    logPrefix("Backup: "), haBroker(hb), broker(hb.getBroker()), settings(s)
 {
     // Empty brokerUrl means delay initialization until seBrokertUrl() is 
called.
     if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h?rev=1350069&r1=1350068&r2=1350069&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h Thu Jun 14 
00:47:39 2012
@@ -24,6 +24,7 @@
 
 #include "qpid/broker/ConnectionObserver.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace ha {
@@ -36,7 +37,9 @@ class BackupConnectionExcluder : public 
 {
   public:
     void opened(broker::Connection& connection) {
-        throw Exception("HA backup rejected connection 
"+connection.getMgmtId());
+        // FIXME aconway 2012-06-13: suppress caught error message, make this 
an info message.
+        QPID_LOG(error, "Backup broker rejected connection 
"+connection.getMgmtId());
+        throw Exception("Backup broker rejected connection 
"+connection.getMgmtId());
     }
 
     void closed(broker::Connection&) {}

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1350069&r1=1350068&r2=1350069&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Thu Jun 14 00:47:39 
2012
@@ -170,7 +170,7 @@ Variant::Map asMapVoid(const Variant& va
 
 BrokerReplicator::BrokerReplicator(HaBroker& hb, const 
boost::shared_ptr<Link>& l)
     : Exchange(QPID_CONFIGURATION_REPLICATOR),
-      logPrefix("HA backup: "), replicationTest(hb.getReplicationTest()),
+      logPrefix("Backup configuration: "), 
replicationTest(hb.getReplicationTest()),
       haBroker(hb), broker(hb.getBroker()), link(l)
 {}
 
@@ -243,7 +243,9 @@ void BrokerReplicator::initializeBridge(
     sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
     sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
     sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
-    QPID_LOG(debug, logPrefix << "Opened configuration bridge: " << queueName);
+    qpid::Address primary;
+    link->getRemoteAddress(primary);
+    QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << queueName 
<< ")");
 }
 
 void BrokerReplicator::route(Deliverable& msg) {
@@ -320,7 +322,7 @@ void BrokerReplicator::doEventQueueDecla
                 values[USER].asString(),
                 values[RHOST].asString());
         assert(result.second);  // Should be true since we destroyed existing 
queue above
-        QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
+        QPID_LOG(debug, logPrefix << "Queue declare event, starting 
replication: " << name);
         startQueueReplicator(result.first);
     }
 }
@@ -466,8 +468,10 @@ void BrokerReplicator::doResponseQueue(V
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/);
     // It is normal for the queue to already exist if we are failing over.
+    QPID_LOG(debug, logPrefix << "Queue response, "
+             << (result.second ? "starting replication: " : "already 
replicated: ")
+             << name);
     if (result.second) startQueueReplicator(result.first);
-    QPID_LOG(debug, logPrefix << "Queue response: " << name);
 }
 
 void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -475,20 +479,17 @@ void BrokerReplicator::doResponseExchang
     if (!replicationTest.replicateLevel(argsMap)) return;
     framing::FieldTable args;
     amqp_0_10::translate(argsMap, args);
-    if (broker.createExchange(
-            values[NAME].asString(),
-            values[TYPE].asString(),
-            values[DURABLE].asBool(),
-            ""/*TODO: need to include alternate-exchange*/,
-            args,
-            ""/*TODO: who is the user?*/,
-            ""/*TODO: what should we use as connection id?*/).second)
-    {
-        QPID_LOG(debug, logPrefix << "Exchange response: " << 
values[NAME].asString());
-    } else {
-        QPID_LOG(warning, logPrefix << "Exchange response, already exists: " <<
-                 values[NAME].asString());
-    }
+    bool created = broker.createExchange(
+        values[NAME].asString(),
+        values[TYPE].asString(),
+        values[DURABLE].asBool(),
+        ""/*TODO: need to include alternate-exchange*/,
+        args,
+        ""/*TODO: who is the user?*/,
+        ""/*TODO: what should we use as connection id?*/).second;
+    QPID_LOG(debug, logPrefix << "Exchange response, "
+             << (created ? "created replica: " : "already exists: ")
+             << values[NAME].asString());
 }
 
 namespace {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp?rev=1350069&r1=1350068&r2=1350069&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp Thu Jun 14 00:47:39 
2012
@@ -30,7 +30,7 @@ namespace qpid {
 namespace ha {
 
 ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
-    : haBroker(hb), logPrefix("HA connections: "), self(uuid) {}
+    : haBroker(hb), logPrefix("Connections: "), self(uuid) {}
 
 // FIXME aconway 2012-06-06: move to BrokerInfo
 bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, 
BrokerInfo& info) {
@@ -61,8 +61,12 @@ void ConnectionObserver::opened(broker::
     }
     BrokerInfo info;            // Avoid self connections.
     if (getBrokerInfo(connection, info)) {
-        if (info.getSystemId() == self)
-            throw Exception(QPID_MSG(logPrefix << "Rejected connection from 
self"));
+        if (info.getSystemId() == self) {
+            // FIXME aconway 2012-06-13: suppress caught error message, make 
this an info message.
+            QPID_LOG(error, "HA broker rejected self connection 
"+connection.getMgmtId());
+            throw Exception("HA broker rejected self connection 
"+connection.getMgmtId());
+        }
+
     }
     ObserverPtr o(getObserver());
     if (o) o->opened(connection);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1350069&r1=1350068&r2=1350069&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Thu Jun 14 00:47:39 2012
@@ -55,7 +55,7 @@ using types::Uuid;
 using sys::Mutex;
 
 HaBroker::HaBroker(broker::Broker& b, const Settings& s)
-    : logPrefix("HA: "),
+    : logPrefix("Broker: "),
       broker(b),
       systemId(broker.getSystem()->getSystemId().data()),
       settings(s),

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1350069&r1=1350068&r2=1350069&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Thu Jun 14 00:47:39 2012
@@ -64,7 +64,7 @@ class PrimaryConfigurationObserver : pub
 Primary* Primary::instance = 0;
 
 Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
-    haBroker(hb), logPrefix("HA primary: "), active(false)
+    haBroker(hb), logPrefix("Primary: "), active(false)
 {
     assert(instance == 0);
     instance = this;            // Let queue replicators find us.
@@ -163,10 +163,14 @@ void Primary::closed(broker::Connection&
     BrokerInfo info;
     if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
         haBroker.getMembership().remove(info.getSystemId());
-        QPID_LOG(debug, "HA primary: Backup disconnected: " << info);
-        backups.erase(info.getSystemId());
-        // FIXME aconway 2012-06-01: changes to expected backup set for 
unready queues.
+        QPID_LOG(debug, logPrefix << "Backup disconnected: " << info);
     }
+    // NOTE: we do not modify backups here, we only add to the known backups 
set
+    // we never remove from it.
+
+    // It is possible for a backup connection to be rejected while we are a 
backup,
+    // but the closed is seen when we have become primary. Removing the entry
+    // from backups in this case would be incorrect.
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h?rev=1350069&r1=1350068&r2=1350069&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h Thu Jun 14 00:47:39 2012
@@ -86,6 +86,11 @@ class Primary
     std::string logPrefix;
     bool active;
     BackupSet initialBackups;
+    /**
+     * Backups is a map of all the remote backups we know about: any expected
+     * backups plus all actual backups that have connected. We do not remove
+     * entries when a backup disconnects. @see Primary::closed()
+     */
     BackupMap backups;
     boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
     boost::shared_ptr<broker::ConfigurationObserver> configurationObserver;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1350069&r1=1350068&r2=1350069&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Thu Jun 14 00:47:39 2012
@@ -52,13 +52,13 @@ QueueGuard::QueueGuard(broker::Queue& q,
     : queue(q), subscription(0)
 {
     std::ostringstream os;
-    os << "HA primary guard " << queue.getName() << "@" << info.getLogId() << 
": ";
+    os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": 
";
     logPrefix = os.str();
     observer.reset(new QueueObserver(*this));
     // Once we call addObserver we can get calls to enqueued and  dequeued
     queue.addObserver(observer);
     // Must set after addObserver so we don't miss any enqueues.
-    firstSafe = queue.getPosition()+1; // Next message will be safe.
+    firstSafe = queue.getPosition(); // FIXME aconway 2012-06-13: fencepost 
error
 }
 
 QueueGuard::~QueueGuard() { cancel(); }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1350069&r1=1350068&r2=1350069&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Jun 14 00:47:39 2012
@@ -63,12 +63,11 @@ QueueReplicator::QueueReplicator(const B
                                  boost::shared_ptr<Queue> q,
                                  boost::shared_ptr<Link> l)
     : Exchange(replicatorName(q->getName()), 0, q->getBroker()),
-      logPrefix("HA backup of "+q->getName()+": "),
+      logPrefix("Backup queue "+q->getName()+": "),
       queue(q), link(l), brokerInfo(info)
 {
     Uuid uuid(true);
     bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
-    QPID_LOG(info, logPrefix << "Created");
 }
 
 // This must be separate from the constructor so we can call shared_from_this.
@@ -128,7 +127,11 @@ void QueueReplicator::initializeBridge(B
     // FIXME aconway 2012-05-22: use a finite credit window
     peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
     peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
-    QPID_LOG(debug, logPrefix << "Subscribed: " << bridgeName);
+
+    qpid::Address primary;
+    link->getRemoteAddress(primary);
+    QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << 
bridgeName << ")");
+    QPID_LOG(trace, logPrefix << "Subscription settings: " << settings);
 }
 
 namespace {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1350069&r1=1350068&r2=1350069&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Thu Jun 14 00:47:39 2012
@@ -32,7 +32,7 @@ using sys::Mutex;
 
 RemoteBackup::RemoteBackup(
     const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt, bool 
cg) :
-    logPrefix("HA primary, backup to "+info.getLogId()+": "), 
brokerInfo(info), replicationTest(rt),
+    logPrefix("Primary remote backup "+info.getLogId()+": "), 
brokerInfo(info), replicationTest(rt),
     createGuards(cg)
 {
     QPID_LOG(debug, logPrefix << "Guarding queues for backup broker.");
@@ -85,11 +85,13 @@ void RemoteBackup::ready(const QueuePtr&
     if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready");
 }
 
+// Called via ConfigurationObserver
 void RemoteBackup::queueCreate(const QueuePtr& q) {
     if (createGuards && replicationTest.isReplicated(ALL, *q))
         guards[q].reset(new QueueGuard(*q, brokerInfo));
 }
 
+// Called via ConfigurationObserver
 void RemoteBackup::queueDestroy(const QueuePtr& q) {
     initialQueues.erase(q);
     GuardMap::iterator i = guards.find(q);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1350069&r1=1350068&r2=1350069&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h Thu Jun 14 00:47:39 2012
@@ -51,20 +51,25 @@ class RemoteBackup
     typedef boost::shared_ptr<QueueGuard> GuardPtr;
     typedef boost::shared_ptr<broker::Queue> QueuePtr;
 
+    /** Note: isReady() can be true after construction */
     RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt, 
bool createGuards);
     ~RemoteBackup();
 
     /** Return guard associated with a queue. Used to create 
ReplicatingSubscription. */
     GuardPtr guard(const QueuePtr&);
 
-    /** ReplicatingSubscription associated with queue is ready. */
+    /** ReplicatingSubscription associated with queue is ready.
+     * Note: may set isReady()
+     */
     void ready(const QueuePtr& queue);
 
-    // Called ConfigurationObserver
+    /** Called via ConfigurationObserver */
     void queueCreate(const QueuePtr&);
+
+    /** Called via ConfigurationObserver. Note: may set isReady() */
     void queueDestroy(const QueuePtr&);
 
-    /**@return true when all initial queues for this backup are ready */
+    /**@return true when all initial queues for this backup are ready. */
     bool isReady();
 
     BrokerInfo getBrokerInfo() const { return brokerInfo; }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1350069&r1=1350068&r2=1350069&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Jun 14 
00:47:39 2012
@@ -201,7 +201,7 @@ ReplicatingSubscription::ReplicatingSubs
 
         // Set a log prefix message that identifies the remote broker.
         ostringstream os;
-        os << "HA primary replica " << queue->getName() << "@" << 
info.getLogId() << ": ";
+        os << "Primary " << queue->getName() << "@" << info.getLogId() << ": ";
         logPrefix = os.str();
 
         // FIXME aconway 2012-06-10: unsafe to rely in queue front or position 
they are changing?
@@ -217,7 +217,7 @@ ReplicatingSubscription::ReplicatingSubs
         // However we must attach the guard _before_ we scan for
         // initial dequeues to be sure we don't miss any dequeues
         // between the scan and attaching the guard.
-        if (Primary::get()) guard = Primary::get()->getGuard(queue, 
getBrokerInfo());
+        if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
         if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo()));
         guard->attach(*this);
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to