Author: aconway
Date: Thu Jan 19 23:08:11 2012
New Revision: 1233679
URL: http://svn.apache.org/viewvc?rev=1233679&view=rev
Log:
QPID-3603: Logging improvements for bridges, links and HA classes.
Modified:
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1233679&r1=1233678&r2=1233679&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Jan 19
23:08:11 2012
@@ -75,7 +75,7 @@ Bridge::Bridge(Link* _link, framing::Cha
args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
agent->addObject(mgmtObject);
}
- QPID_LOG(debug, "Bridge created from " << args.i_src << " to " <<
args.i_dest);
+ QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << "
to " << args.i_dest);
}
Bridge::~Bridge()
@@ -114,7 +114,7 @@ void Bridge::create(Connection& c)
peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0
: 1, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
- QPID_LOG(debug, "Activated route from queue " << args.i_src << " to "
<< args.i_dest);
+ QPID_LOG(debug, "Activated bridge " << name << " for route from queue
" << args.i_src << " to " << args.i_dest);
} else {
FieldTable queueSettings;
@@ -148,9 +148,9 @@ void Bridge::create(Connection& c)
if (exchange.get() == 0)
throw Exception("Exchange not found for dynamic route");
exchange->registerDynamicBridge(this);
- QPID_LOG(debug, "Activated dynamic route for exchange " <<
args.i_src);
+ QPID_LOG(debug, "Activated bridge " << name << " for dynamic route
for exchange " << args.i_src);
} else {
- QPID_LOG(debug, "Activated static route from exchange " <<
args.i_src << " to " << args.i_dest);
+ QPID_LOG(debug, "Activated bridge " << name << " for static route
from exchange " << args.i_src << " to " << args.i_dest);
}
}
if (args.i_srcIsLocal)
sessionHandler.getSession()->enableReceiverTracking();
@@ -162,15 +162,16 @@ void Bridge::cancel(Connection&)
peer->getMessage().cancel(args.i_dest);
peer->getSession().detach(name);
}
+ QPID_LOG(debug, "Cancelled bridge " << name);
}
void Bridge::closed()
{
if (args.i_dynamic) {
- Exchange::shared_ptr exchange =
link->getBroker()->getExchanges().get(args.i_src);
- if (exchange.get() != 0)
- exchange->removeDynamicBridge(this);
+ Exchange::shared_ptr exchange =
link->getBroker()->getExchanges().find(args.i_src);
+ if (exchange.get()) exchange->removeDynamicBridge(this);
}
+ QPID_LOG(debug, "Closed bridge " << name);
}
void Bridge::destroy()
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp?rev=1233679&r1=1233678&r2=1233679&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp Thu Jan 19
23:08:11 2012
@@ -221,26 +221,28 @@ void Link::add(Bridge::shared_ptr bridge
void Link::cancel(Bridge::shared_ptr bridge)
{
- Mutex::ScopedLock mutex(lock);
+ bool needIOProcessing = false;
+ {
+ Mutex::ScopedLock mutex(lock);
- for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
- if ((*i).get() == bridge.get()) {
- created.erase(i);
- break;
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ created.erase(i);
+ break;
+ }
}
- }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if ((*i).get() == bridge.get()) {
- cancellations.push_back(bridge);
- bridge->closed();
- active.erase(i);
- break;
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ cancellations.push_back(bridge);
+ bridge->closed();
+ active.erase(i);
+ break;
+ }
}
+ needIOProcessing = !cancellations.empty();
}
-
- if (!cancellations.empty()) {
+ if (needIOProcessing)
connection->requestIOProcessing
(boost::bind(&Link::ioThreadProcessing, this));
- }
}
void Link::ioThreadProcessing()
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1233679&r1=1233678&r2=1233679&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Jan
19 23:08:11 2012
@@ -65,6 +65,7 @@ void QueueReplicator::activate() {
// Take a reference to myself to ensure not deleted before initializeBridge
// is called.
self = shared_from_this();
+ // Note this may create a new bridge or use an existing one.
queue->getBroker()->getLinks().declare(
link->getHost(), link->getPort(),
false, // durable
@@ -77,7 +78,8 @@ void QueueReplicator::activate() {
"", // excludes
false, // dynamic
0, // sync?
- // Include shared_ptr to self to ensure we not deleted before
initializeBridge is called.
+ // Include shared_ptr to self to ensure we are not deleted
+ // before initializeBridge is called.
boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, self)
);
}
@@ -88,6 +90,7 @@ void QueueReplicator::deactivate() {
sys::Mutex::ScopedLock l(lock);
queue->getBroker()->getLinks().destroy(
link->getHost(), link->getPort(), queue->getName(), getName(),
string());
+ QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
}
// Called in a broker connection thread when the bridge is created.
@@ -95,7 +98,7 @@ void QueueReplicator::deactivate() {
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler&
sessionHandler,
boost::shared_ptr<QueueReplicator>
/*self*/) {
sys::Mutex::ScopedLock l(lock);
-
+ bridgeName = bridge.getName();
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge&
args(bridge.getArgs());
framing::FieldTable settings;
@@ -117,7 +120,7 @@ void QueueReplicator::initializeBridge(B
peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/,
1/*not-acquired*/, false, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
- QPID_LOG(debug, logPrefix << "Activated bridge from " << args.i_src << "
to " << args.i_dest);
+ QPID_LOG(debug, logPrefix << "Activated bridge " << bridgeName);
}
namespace {
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1233679&r1=1233678&r2=1233679&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.h Thu Jan 19
23:08:11 2012
@@ -75,6 +75,7 @@ class QueueReplicator : public broker::E
void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
std::string logPrefix;
+ std::string bridgeName;
sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1233679&r1=1233678&r2=1233679&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Thu Jan
19 23:08:11 2012
@@ -260,7 +260,7 @@ void WiringReplicator::doEventQueueDecla
string name = values[QNAME].asString();
Variant::Map argsMap = values[ARGS].asMap();
if (values[DISP] == CREATED && replicateLevel(argsMap)) {
- framing::FieldTable args;
+ framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
@@ -287,21 +287,20 @@ void WiringReplicator::doEventQueueDecla
}
void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
+ // The remote queue has already been deleted so replicator
+ // sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (queue && replicateLevel(queue->getSettings())) {
QPID_LOG(debug, "HA: Backup deleting queue: " << name);
- broker.deleteQueue(
- name,
- values[USER].asString(),
- values[RHOST].asString());
- // Delete the QueueReplicator exchange for this queue.
- boost::shared_ptr<broker::Exchange> ex =
- broker.getExchanges().find(QueueReplicator::replicatorName(name));
- boost::shared_ptr<QueueReplicator> qr =
- boost::dynamic_pointer_cast<QueueReplicator>(ex);
+ string rname = QueueReplicator::replicatorName(name);
+ boost::shared_ptr<broker::Exchange> ex =
broker.getExchanges().find(rname);
+ boost::shared_ptr<QueueReplicator> qr =
boost::dynamic_pointer_cast<QueueReplicator>(ex);
if (qr) qr->deactivate();
- broker.getExchanges().destroy(QueueReplicator::replicatorName(name));
+ // QueueReplicator's bridge is now queued for destruction but may not
+ // actually be destroyed, deleting the exhange
+ broker.getExchanges().destroy(rname);
+ broker.deleteQueue(name, values[USER].asString(),
values[RHOST].asString());
}
}
@@ -455,8 +454,8 @@ void WiringReplicator::doResponseBind(Va
void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>&
queue) {
if (replicateLevel(queue->getSettings()) == RL_ALL) {
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue,
link));
- qr->activate();
broker.getExchanges().registerExchange(qr);
+ qr->activate();
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]