Author: aconway
Date: Wed Feb 22 18:50:18 2012
New Revision: 1292445
URL: http://svn.apache.org/viewvc?rev=1292445&view=rev
Log:
QPID-3603: HA logging improvements.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1292445&r1=1292444&r2=1292445&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Wed Feb 22 18:50:18 2012
@@ -52,7 +52,7 @@ Backup::Backup(broker::Broker& b, const
void Backup::initialize(const Url& url) {
assert(!url.empty());
- QPID_LOG(notice, "Ha: Backup started: " << url);
+ QPID_LOG(notice, "HA: Backup started: " << url);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
// Declare the link
std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1292445&r1=1292444&r2=1292445&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Wed Feb 22 18:50:18
2012
@@ -266,6 +266,7 @@ void BrokerReplicator::route(Deliverable
}
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
+ QPID_LOG(debug, "HA: Backup queue declare event " << values);
string name = values[QNAME].asString();
Variant::Map argsMap = asMapVoid(values[ARGS]);
if (values[DISP] == CREATED && replicateLevel(argsMap)) {
@@ -296,6 +297,7 @@ void BrokerReplicator::doEventQueueDecla
}
void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
+ QPID_LOG(debug, "HA: Backup queue delete event " << values);
// The remote queue has already been deleted so replicator
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
@@ -314,6 +316,7 @@ void BrokerReplicator::doEventQueueDelet
}
void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
+ QPID_LOG(debug, "HA: Backup exchange declare event " << values);
Variant::Map argsMap(asMapVoid(values[ARGS]));
if (values[DISP] == CREATED && replicateLevel(argsMap)) {
string name = values[EXNAME].asString();
@@ -338,6 +341,7 @@ void BrokerReplicator::doEventExchangeDe
}
void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
+ QPID_LOG(debug, "HA: Backup exchange delete event " << values);
string name = values[EXNAME].asString();
try {
boost::shared_ptr<Exchange> exchange =
broker.getExchanges().find(name);
@@ -352,6 +356,7 @@ void BrokerReplicator::doEventExchangeDe
}
void BrokerReplicator::doEventBind(Variant::Map& values) {
+ QPID_LOG(debug, "HA: Backup bind event " << values);
boost::shared_ptr<Exchange> exchange =
broker.getExchanges().find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
@@ -372,6 +377,7 @@ void BrokerReplicator::doEventBind(Varia
}
void BrokerReplicator::doEventUnbind(Variant::Map& values) {
+ QPID_LOG(debug, "HA: Backup unbind event " << values);
boost::shared_ptr<Exchange> exchange =
broker.getExchanges().find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
@@ -392,6 +398,7 @@ void BrokerReplicator::doEventUnbind(Var
}
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
+ QPID_LOG(debug, "HA: Backup queue response " << values);
// FIXME aconway 2011-11-22: more flexible ways & defaults to indicate
replication
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicateLevel(argsMap)) return;
@@ -419,6 +426,7 @@ void BrokerReplicator::doResponseQueue(V
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
+ QPID_LOG(debug, "HA: Backup exchange response " << values);
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
@@ -460,6 +468,7 @@ const std::string QUEUE_REF("queueRef");
} // namespace
void BrokerReplicator::doResponseBind(Variant::Map& values) {
+ QPID_LOG(debug, "HA: Backup bind response " << values);
std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1292445&r1=1292444&r2=1292445&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Wed Feb 22 18:50:18 2012
@@ -118,11 +118,12 @@ void HaBroker::updateClientUrl(const sys
mgmtObject->set_clientAddresses(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
- QPID_LOG(debug, "HA: Setting client known-brokers to: " << url);
+ QPID_LOG(debug, "HA: Setting client URL to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
if (url.empty()) throw Exception("Invalid empty URL for HA broker
failover");
+ QPID_LOG(debug, "HA: Setting broker URL to: " << url);
brokerUrl = url;
mgmtObject->set_brokerAddresses(brokerUrl.str());
if (backup.get()) backup->setBrokerUrl(brokerUrl);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1292445&r1=1292444&r2=1292445&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp Wed Feb 22 18:50:18 2012
@@ -56,6 +56,7 @@ struct HaPlugin : public Plugin {
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker && settings.enabled) {
+ QPID_LOG(notice, "HA: Enabled");
haBroker.reset(new ha::HaBroker(*broker, settings));
} else
QPID_LOG(notice, "HA: Disabled");
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1292445&r1=1292444&r2=1292445&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Wed Feb 22 18:50:18 2012
@@ -31,7 +31,6 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
-#include <sstream>
namespace {
const std::string QPID_REPLICATOR_("qpid.replicator-");
@@ -54,9 +53,7 @@ std::string QueueReplicator::replicatorN
QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q),
link(l)
{
- std::stringstream ss;
- ss << "HA: Backup " << queue->getName() << ": ";
- logPrefix = ss.str();
+ logPrefix = "HA: Backup " + queue->getName() + ": ";
QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
}
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1292445&r1=1292444&r2=1292445&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Wed Feb 22 18:50:18 2012
@@ -49,6 +49,10 @@ class HaBroker(Broker):
assert os.system(
"qpid-ha-tool --broker-addresses=%s %s"%(url, self.host_port()))
== 0
+def set_broker_urls(brokers):
+ url = ",".join([b.host_port() for b in brokers])
+ for b in brokers: b.set_broker_url(url)
+
class ShortTests(BrokerTest):
"""Short HA functionality tests."""
@@ -59,7 +63,7 @@ class ShortTests(BrokerTest):
session.sender(address)
return True
except NotFound: return False
- assert retry(check), "Timed out waiting for %s"%(address)
+ assert retry(check), "Timed out waiting for address %s"%(address)
# Wait for address to become valid on a backup broker.
def wait_backup(self, backup, address):
@@ -342,6 +346,7 @@ class ShortTests(BrokerTest):
s = session.sender("priority-queue; {create:always,
node:{x-declare:{arguments:{'qpid.priorities':10,
'qpid.replicate':messages}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
+ # Can't use browse_backup as browser sees messages in delivery order
not priority.
self.wait_backup(backup, "priority-queue")
r = self.connect_admin(backup).session().receiver("priority-queue")
received = [r.fetch().priority for i in priorities]
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]