Author: aconway
Date: Tue Jul 31 16:11:44 2012
New Revision: 1367649

URL: http://svn.apache.org/viewvc?rev=1367649&view=rev
Log:
QPID-4176: HA Error handling

Additional error handling and logging for ConnectionObserver, Primary and
ReplicatingSubscription.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp?rev=1367649&r1=1367648&r2=1367649&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp Tue Jul 31 16:11:44 
2012
@@ -52,28 +52,40 @@ ConnectionObserver::ObserverPtr Connecti
 }
 
 void ConnectionObserver::opened(broker::Connection& connection) {
-    if (connection.isLink()) return; // Allow outgoing links.
-    if (connection.getClientProperties().isSet(ADMIN_TAG)) {
-        QPID_LOG(debug, logPrefix << "Allowing admin connection: "
-                 << connection.getMgmtId());
-        return;                 // No need to call observer, always allow 
admins.
-    }
-    BrokerInfo info;            // Avoid self connections.
-    if (getBrokerInfo(connection, info)) {
-        if (info.getSystemId() == self) {
-            QPID_LOG(debug, "HA broker rejected self connection 
"+connection.getMgmtId());
-            connection.abort();
+    try {
+        if (connection.isLink()) return; // Allow outgoing links.
+        if (connection.getClientProperties().isSet(ADMIN_TAG)) {
+            QPID_LOG(debug, logPrefix << "Allowing admin connection: "
+                     << connection.getMgmtId());
+            return;                 // No need to call observer, always allow 
admins.
         }
+        BrokerInfo info;            // Avoid self connections.
+        if (getBrokerInfo(connection, info)) {
+            if (info.getSystemId() == self) {
+                QPID_LOG(debug, "HA broker rejected self connection 
"+connection.getMgmtId());
+                connection.abort();
+            }
 
+        }
+        ObserverPtr o(getObserver());
+        if (o) o->opened(connection);
+    }
+    catch (const std::exception& e) {
+        QPID_LOG(error, logPrefix << "Open error: " << e.what());
+        throw;
     }
-    ObserverPtr o(getObserver());
-    if (o) o->opened(connection);
 }
 
 void ConnectionObserver::closed(broker::Connection& connection) {
-    BrokerInfo info;
-    ObserverPtr o(getObserver());
-    if (o) o->closed(connection);
+    try {
+        BrokerInfo info;
+        ObserverPtr o(getObserver());
+        if (o) o->closed(connection);
+    }
+    catch (const std::exception& e) {
+        QPID_LOG(error, logPrefix << "Close error: " << e.what());
+        throw;
+    }
 }
 
 const std::string ConnectionObserver::ADMIN_TAG="qpid.ha-admin";

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1367649&r1=1367648&r2=1367649&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Tue Jul 31 16:11:44 2012
@@ -141,26 +141,32 @@ void Primary::checkReady(BackupMap::iter
 }
 
 void Primary::timeoutExpectedBackups() {
-    sys::Mutex::ScopedLock l(lock);
-    if (active) return;         // Already activated
-    // Remove records for any expectedBackups that are not yet connected
-    // Allow backups that are connected to continue becoming ready.
-    for (BackupSet::iterator i = expectedBackups.begin(); i != 
expectedBackups.end();)
-    {
-        boost::shared_ptr<RemoteBackup> rb = *i;
-        if (!rb->isConnected()) {
-            BrokerInfo info = rb->getBrokerInfo();
-            QPID_LOG(error, logPrefix << "Expected backup timed out: " << 
info);
-            expectedBackups.erase(i++);
-            backups.erase(info.getSystemId());
-            rb->cancel();
-            // Downgrade the broker to CATCHUP
-            info.setStatus(CATCHUP);
-            haBroker.addBroker(info);
+    try {
+        sys::Mutex::ScopedLock l(lock);
+        if (active) return;         // Already activated
+        // Remove records for any expectedBackups that are not yet connected
+        // Allow backups that are connected to continue becoming ready.
+        for (BackupSet::iterator i = expectedBackups.begin(); i != 
expectedBackups.end();)
+        {
+            boost::shared_ptr<RemoteBackup> rb = *i;
+            if (!rb->isConnected()) {
+                BrokerInfo info = rb->getBrokerInfo();
+                QPID_LOG(error, logPrefix << "Expected backup timed out: " << 
info);
+                expectedBackups.erase(i++);
+                backups.erase(info.getSystemId());
+                rb->cancel();
+                // Downgrade the broker to CATCHUP
+                info.setStatus(CATCHUP);
+                haBroker.addBroker(info);
+            }
+            else ++i;
         }
-        else ++i;
+        checkReady(l);
+    }
+    catch(const std::exception& e) {
+        QPID_LOG(error, logPrefix << "Error timing out backups: " << e.what());
+        // No-where for this exception to go.
     }
-    checkReady(l);
 }
 
 void Primary::readyReplica(const ReplicatingSubscription& rs) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1367649&r1=1367648&r2=1367649&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Tue Jul 31 
16:11:44 2012
@@ -228,8 +228,8 @@ ReplicatingSubscription::ReplicatingSubs
         if (guard->subscriptionStart(position)) setReady();
     }
     catch (const std::exception& e) {
-        throw InvalidArgumentException(QPID_MSG(logPrefix << e.what()
-                                                << ": arguments=" << 
arguments));
+        QPID_LOG(error, logPrefix << "Creation error: " << e.what()
+                 << ": arguments=" << getArguments());
     }
 }
 
@@ -242,13 +242,19 @@ ReplicatingSubscription::~ReplicatingSub
 // shared_from_this
 //
 void ReplicatingSubscription::initialize() {
-    Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
+    try {
+        Mutex::ScopedLock l(lock); // Note dequeued() can be called 
concurrently.
 
-    // Send initial dequeues and position to the backup.
-    // There must be a shared_ptr(this) when sending.
-    sendDequeueEvent(l);
-    sendPositionEvent(position, l);
-    backupPosition = position;
+        // Send initial dequeues and position to the backup.
+        // There must be a shared_ptr(this) when sending.
+        sendDequeueEvent(l);
+        sendPositionEvent(position, l);
+        backupPosition = position;
+    }
+    catch (const std::exception& e) {
+        QPID_LOG(error, logPrefix << "Initialization error: " << e.what()
+                 << ": arguments=" << getArguments());
+    }
 }
 
 // Message is delivered in the subscription's connection thread.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1367649&r1=1367648&r2=1367649&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Tue Jul 31 
16:11:44 2012
@@ -24,7 +24,6 @@
 
 #include "BrokerInfo.h"
 #include "qpid/broker/SemanticState.h"
-#include "qpid/broker/QueueObserver.h"
 #include "qpid/broker/ConsumerFactory.h"
 #include "qpid/types/Uuid.h"
 #include <iosfwd>



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to