Author: aconway
Date: Mon Feb 13 16:17:42 2012
New Revision: 1243576
URL: http://svn.apache.org/viewvc?rev=1243576&view=rev
Log:
QPID-3603: HA broker close backup link when promoted.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp?rev=1243576&r1=1243575&r2=1243576&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp Mon Feb 13
16:17:42 2012
@@ -321,7 +321,7 @@ void Link::maintenanceVisit ()
{
visitCount = 0;
//switch host and port to next in url list if possible
- if (!tryFailover()) {
+ if (!tryFailoverLH()) {
currentInterval *= 2;
if (currentInterval > MAX_INTERVAL)
currentInterval = MAX_INTERVAL;
@@ -347,7 +347,7 @@ void Link::reconnect(const qpid::Address
}
}
-bool Link::tryFailover() { // FIXME aconway 2012-01-30: lock held?
+bool Link::tryFailoverLH() { // FIXME aconway 2012-01-30: lock held?
if (reconnectNext >= url.size()) reconnectNext = 0;
if (url.empty()) return false;
Address next = url[reconnectNext++];
@@ -440,18 +440,24 @@ ManagementObject* Link::GetManagementObj
return (ManagementObject*) mgmtObject;
}
+void Link::close() {
+ Mutex::ScopedLock mutex(lock);
+ if (!closing) {
+ closing = true;
+ if (state != STATE_CONNECTING && connection) {
+ //connection can only be closed on the connections own IO
processing thread
+ connection->requestIOProcessing(boost::bind(&Link::destroy, this));
+ }
+ }
+}
+
+
Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string&
text)
{
switch (op)
{
case _qmf::Link::METHOD_CLOSE :
- if (!closing) {
- closing = true;
- if (state != STATE_CONNECTING && connection) {
- //connection can only be closed on the connections own IO
processing thread
- connection->requestIOProcessing(boost::bind(&Link::destroy,
this));
- }
- }
+ close();
return Manageable::STATUS_OK;
case _qmf::Link::METHOD_BRIDGE :
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h?rev=1243576&r1=1243575&r2=1243576&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.h Mon Feb 13
16:17:42 2012
@@ -84,7 +84,7 @@ namespace qpid {
void startConnectionLH(); // Start the IO Connection
void destroy(); // Called when mgmt deletes this
link
void ioThreadProcessing(); // Called on connection's IO
thread by request
- bool tryFailover(); // Called during maintenance visit
+ bool tryFailoverLH(); // Called during maintenance visit
bool hideManagement() const;
public:
@@ -119,6 +119,7 @@ namespace qpid {
void closed(int, std::string); // Called when connection goes
away
void setConnection(Connection*); // Set pointer to the AMQP
Connection
void reconnect(const Address&); //called by LinkRegistry
+ void close(); // Close the link from within the broker.
std::string getAuthMechanism() { return authMechanism; }
std::string getUsername() { return username; }
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1243576&r1=1243575&r2=1243576&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp Mon Feb 13
16:17:42 2012
@@ -65,11 +65,12 @@ Backup::Backup(broker::Broker& b, const
}
void Backup::setUrl(const Url& url) {
- // FIXME aconway 2012-01-30: locking?
+ sys::Mutex::ScopedLock l(lock);
link->setUrl(url);
}
Backup::~Backup() {
+ link->close();
broker.getExchanges().destroy(replicator->getName());
broker.getConnectionObservers().remove(excluder); // Allows client
connections.
}
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h?rev=1243576&r1=1243575&r2=1243576&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h Mon Feb 13 16:17:42
2012
@@ -24,6 +24,7 @@
#include "Settings.h"
#include "qpid/Url.h"
+#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
namespace qpid {
@@ -41,8 +42,7 @@ class BrokerReplicator;
/**
* State associated with a backup broker. Manages connections to primary.
*
- * THREAD SAFE: trivially because currently it only has a constructor.
- * May need locking as the functionality grows.
+ * THREAD SAFE
*/
class Backup
{
@@ -52,6 +52,7 @@ class Backup
void setUrl(const Url&);
private:
+ sys::Mutex lock;
broker::Broker& broker;
Settings settings;
boost::shared_ptr<broker::Link> link;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]