Author: kgiusti
Date: Wed Aug 8 14:46:04 2012
New Revision: 1370792
URL: http://svn.apache.org/viewvc?rev=1370792&view=rev
Log:
QPID-4193: prevent race when connecting a multi-homed federated cluster.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1370792&r1=1370791&r2=1370792&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Wed Aug 8 14:46:04
2012
@@ -119,6 +119,7 @@ pair<Link::shared_ptr, bool> LinkRegistr
parent, failover));
if (durable && store) store->create(*link);
links[name] = link;
+ pendingLinks[name] = link;
QPID_LOG(debug, "Creating new link; name=" << name );
return std::pair<Link::shared_ptr, bool>(link, true);
}
@@ -229,6 +230,7 @@ void LinkRegistry::linkDestroyed(Link *l
QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName());
Mutex::ScopedLock locker(lock);
+ pendingLinks.erase(link->getName());
LinkMap::iterator i = links.find(link->getName());
if (i != links.end())
{
@@ -322,10 +324,12 @@ void LinkRegistry::notifyConnection(cons
Link::shared_ptr link;
{
Mutex::ScopedLock locker(lock);
- for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) {
+ for (LinkMap::iterator l = pendingLinks.begin(); l !=
pendingLinks.end(); ++l) {
if (l->second->pendingConnection(host, port)) {
link = l->second;
+ pendingLinks.erase(l);
connections[key] = link->getName();
+ QPID_LOG(debug, "LinkRegistry:: found pending =" <<
link->getName());
break;
}
}
@@ -347,6 +351,10 @@ void LinkRegistry::notifyClosed(const st
{
Link::shared_ptr link = findLink(key);
if (link) {
+ {
+ Mutex::ScopedLock locker(lock);
+ pendingLinks[link->getName()] = link;
+ }
link->closed(0, "Closed by peer");
}
}
@@ -355,6 +363,10 @@ void LinkRegistry::notifyConnectionForce
{
Link::shared_ptr link = findLink(key);
if (link) {
+ {
+ Mutex::ScopedLock locker(lock);
+ pendingLinks[link->getName()] = link;
+ }
link->notifyConnectionForced(text);
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=1370792&r1=1370791&r2=1370792&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Wed Aug 8 14:46:04 2012
@@ -47,6 +47,7 @@ namespace broker {
LinkMap links; /** indexed by name of Link */
BridgeMap bridges; /** indexed by name of Bridge */
ConnectionMap connections; /** indexed by connection identifier,
gives link name */
+ LinkMap pendingLinks; /** pending connection, indexed by name of
Link */
qpid::sys::Mutex lock;
Broker* broker;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1370792&r1=1370791&r2=1370792&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Aug 8 14:46:04 2012
@@ -808,48 +808,19 @@ void Connection::config(const std::strin
else throw Exception(QPID_MSG("Update failed, invalid kind of config: " <<
kind));
}
-namespace {
- // find a Link that matches the given Address
- class LinkFinder {
- qpid::Address id;
- boost::shared_ptr<broker::Link> link;
- public:
- LinkFinder(const qpid::Address& _id) : id(_id) {}
- boost::shared_ptr<broker::Link> getLink() { return link; }
- void operator() (boost::shared_ptr<broker::Link> l)
- {
- if (!link) {
- qpid::Address addr(l->getTransport(), l->getHost(),
l->getPort());
- if (id == addr) {
- link = l;
- }
- }
- }
- };
-}
-
void Connection::internalState(const std::string& type,
const std::string& name,
const framing::FieldTable& state)
{
if (type == "link") {
- // name is the string representation of the Link's _configured_
destination address
- Url dest;
- try {
- dest = name;
- } catch(...) {
- throw Exception(QPID_MSG("Update failed, invalid format for Link
destination address: " << name));
- }
- assert(dest.size());
- LinkFinder finder(dest[0]);
- cluster.getBroker().getLinks().eachLink(boost::ref(finder));
- if (finder.getLink()) {
+ boost::shared_ptr<qpid::broker::Link>
link(cluster.getBroker().getLinks().getLink(name));
+ if (link.get()) {
try {
- finder.getLink()->setState(state);
+ link->setState(state);
} catch(...) {
throw Exception(QPID_MSG("Update failed, invalid state for
Link " << name << ", state: " << state));
}
- QPID_LOG(debug, cluster << " updated link " << dest[0] << " with
state: " << state);
+ QPID_LOG(debug, cluster << " updated link " << name << " with
state: " << state);
} else throw Exception(QPID_MSG("Update failed, unable to find Link
named: " << name));
}
else throw Exception(QPID_MSG("Update failed, invalid object type for
internal state replication: " << type));
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1370792&r1=1370791&r2=1370792&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed Aug 8 14:46:04
2012
@@ -690,10 +690,8 @@ void UpdateClient::updateLink(const boos
// now push the current state
framing::FieldTable state;
link->getState(state);
- std::ostringstream os;
- os << qpid::Address(link->getTransport(), link->getHost(),
link->getPort());
ClusterConnectionProxy(session).internalState(std::string("link"),
- os.str(),
+ link->getName(),
state);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]