Author: aconway
Date: Mon Feb 13 16:18:21 2012
New Revision: 1243580
URL: http://svn.apache.org/viewvc?rev=1243580&view=rev
Log:
QPID-3603: HA brokers set known-hosts to the HA broker-url.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1243580&r1=1243579&r2=1243580&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp Mon Feb 13
16:18:21 2012
@@ -46,10 +46,12 @@ using std::string;
Backup::Backup(broker::Broker& b, const Settings& s) :
broker(b), settings(s), excluder(new ConnectionExcluder())
{
+ // Empty brokerUrl means delay initialization until setUrl() is called.
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
}
void Backup::initialize(const Url& url) {
+ assert(!url.empty());
QPID_LOG(notice, "Ha: Backup started: " << url);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
// Declare the link
@@ -66,20 +68,23 @@ void Backup::initialize(const Url& url)
broker.getConnectionObservers().add(excluder);
}
-void Backup::setUrl(const Url& url) {
+void Backup::setBrokerUrl(const Url& url) {
+ // Ignore empty URLs seen during start-up for some tests.
+ if (url.empty()) return;
sys::Mutex::ScopedLock l(lock);
- if (!replicator.get())
- initialize(url);
- else {
- QPID_LOG(info, "HA: Backup URL set to " << url);
+ if (link) { // URL changed after we initialized.
+ QPID_LOG(info, "HA: Backup failover URL set to " << url);
link->setUrl(url);
}
+ else {
+ initialize(url); // Deferred initialization
+ }
}
Backup::~Backup() {
if (link) link->close();
if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
- broker.getConnectionObservers().remove(excluder); // Allows client
connections.
+ broker.getConnectionObservers().remove(excluder); // This allows client
connections.
}
}} // namespace qpid::ha
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h?rev=1243580&r1=1243579&r2=1243580&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.h Mon Feb 13 16:18:21
2012
@@ -49,7 +49,7 @@ class Backup
public:
Backup(broker::Broker&, const Settings&);
~Backup();
- void setUrl(const Url&);
+ void setBrokerUrl(const Url&);
private:
void initialize(const Url&);
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1243580&r1=1243579&r2=1243580&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Feb 13
16:18:21 2012
@@ -39,21 +39,13 @@ using namespace management;
using namespace std;
namespace {
-Url url(const std::string& s, const std::string& id) {
- try {
- // Allow the URL to be empty, used in tests that set the URL
- // after starting broker
- return s.empty() ? Url() : Url(s);
- } catch (const std::exception& e) {
- throw Exception(Msg() << "Invalid URL for " << id << ": '" << s <<
"'");
- }
-}
const std::string PRIMARY="primary";
const std::string BACKUP="backup";
} // namespace
+
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: broker(b),
settings(s),
@@ -65,6 +57,8 @@ HaBroker::HaBroker(broker::Broker& b, co
boost::shared_ptr<ReplicatingSubscription::Factory>(
new ReplicatingSubscription::Factory()));
+ broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
+
ManagementAgent* ma = broker.getManagementAgent();
if (!ma)
throw Exception("Cannot start HA: management is disabled");
@@ -74,6 +68,9 @@ HaBroker::HaBroker(broker::Broker& b, co
mgmtObject->set_status(BACKUP);
ma->addObject(mgmtObject);
}
+ sys::Mutex::ScopedLock l(lock);
+ if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
+ if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
}
HaBroker::~HaBroker() {}
@@ -92,17 +89,15 @@ Manageable::status_t HaBroker::Managemen
break;
}
case _qmf::HaBroker::METHOD_SETCLIENTADDRESSES: {
- string url =
dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args)
- .i_clientAddresses;
- mgmtObject->set_clientAddresses(url);
- // FIXME aconway 2012-01-30: upate status for new URL
+ setClientUrl(
+ Url(dynamic_cast<_qmf::ArgsHaBrokerSetClientAddresses&>(args).
+ i_clientAddresses), l);
break;
}
case _qmf::HaBroker::METHOD_SETBROKERADDRESSES: {
- string url =
dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args)
- .i_brokerAddresses;
- mgmtObject->set_brokerAddresses(url);
- if (backup.get()) backup->setUrl(Url(url));
+ setBrokerUrl(
+ Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokerAddresses&>(args)
+ .i_brokerAddresses), l);
break;
}
default:
@@ -111,4 +106,32 @@ Manageable::status_t HaBroker::Managemen
return Manageable::STATUS_OK;
}
+void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
+ if (url.empty()) throw Exception("Invalid empty URL for HA client
failover");
+ clientUrl = url;
+ updateClientUrl(l);
+}
+
+void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
+ Url url = clientUrl.empty() ? brokerUrl : clientUrl;
+ assert(!url.empty());
+ mgmtObject->set_clientAddresses(url.str());
+ knownBrokers.clear();
+ knownBrokers.push_back(url);
+ QPID_LOG(debug, "HA: Setting client known-brokers to: " << url);
+}
+
+void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
+ if (url.empty()) throw Exception("Invalid empty URL for HA broker
failover");
+ brokerUrl = url;
+ mgmtObject->set_brokerAddresses(brokerUrl.str());
+ if (backup.get()) backup->setBrokerUrl(brokerUrl);
+ // Updating broker URL also updates defaulted client URL:
+ if (clientUrl.empty()) updateClientUrl(l);
+}
+
+std::vector<Url> HaBroker::getKnownBrokers() const {
+ return knownBrokers;
+}
+
}} // namespace qpid::ha
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1243580&r1=1243579&r2=1243580&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.h Mon Feb 13
16:18:21 2012
@@ -54,11 +54,20 @@ class HaBroker : public management::Mana
uint32_t methodId, management::Args& args, std::string& text);
private:
- sys::Mutex lock;
+ void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
+ void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);
+ void updateClientUrl(const sys::Mutex::ScopedLock&);
+ bool isPrimary(const sys::Mutex::ScopedLock&) { return !backup.get(); }
+ std::vector<Url> getKnownBrokers() const;
+
broker::Broker& broker;
- Settings settings;
+ const Settings settings;
+
+ sys::Mutex lock;
std::auto_ptr<Backup> backup;
qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
+ Url clientUrl, brokerUrl;
+ std::vector<Url> knownBrokers;
};
}} // namespace qpid::ha
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h?rev=1243580&r1=1243579&r2=1243580&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Settings.h Mon Feb 13
16:18:21 2012
@@ -27,8 +27,6 @@
namespace qpid {
namespace ha {
-using std::string;
-
/**
* Configurable settings for HA.
*/
@@ -37,9 +35,9 @@ class Settings
public:
Settings() : enabled(false) {}
bool enabled;
- string clientUrl;
- string brokerUrl;
- string username, password, mechanism;
+ std::string clientUrl;
+ std::string brokerUrl;
+ std::string username, password, mechanism;
private:
};
}} // namespace qpid::ha
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]