Author: kgiusti
Date: Wed Aug  8 14:46:04 2012
New Revision: 1370792

URL: http://svn.apache.org/viewvc?rev=1370792&view=rev
Log:
QPID-4193: prevent race when connecting a multi-homed federated cluster.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1370792&r1=1370791&r2=1370792&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Wed Aug  8 14:46:04 
2012
@@ -119,6 +119,7 @@ pair<Link::shared_ptr, bool> LinkRegistr
                       parent, failover));
         if (durable && store) store->create(*link);
         links[name] = link;
+        pendingLinks[name] = link;
         QPID_LOG(debug, "Creating new link; name=" << name );
         return std::pair<Link::shared_ptr, bool>(link, true);
     }
@@ -229,6 +230,7 @@ void LinkRegistry::linkDestroyed(Link *l
     QPID_LOG(debug, "LinkRegistry::destroy(); link= " << link->getName());
     Mutex::ScopedLock   locker(lock);
 
+    pendingLinks.erase(link->getName());
     LinkMap::iterator i = links.find(link->getName());
     if (i != links.end())
     {
@@ -322,10 +324,12 @@ void LinkRegistry::notifyConnection(cons
     Link::shared_ptr link;
     {
         Mutex::ScopedLock locker(lock);
-        for (LinkMap::iterator l = links.begin(); l != links.end(); ++l) {
+        for (LinkMap::iterator l = pendingLinks.begin(); l != 
pendingLinks.end(); ++l) {
             if (l->second->pendingConnection(host, port)) {
                 link = l->second;
+                pendingLinks.erase(l);
                 connections[key] = link->getName();
+                QPID_LOG(debug, "LinkRegistry:: found pending =" << 
link->getName());
                 break;
             }
         }
@@ -347,6 +351,10 @@ void LinkRegistry::notifyClosed(const st
 {
     Link::shared_ptr link = findLink(key);
     if (link) {
+        {
+            Mutex::ScopedLock locker(lock);
+            pendingLinks[link->getName()] = link;
+        }
         link->closed(0, "Closed by peer");
     }
 }
@@ -355,6 +363,10 @@ void LinkRegistry::notifyConnectionForce
 {
     Link::shared_ptr link = findLink(key);
     if (link) {
+        {
+            Mutex::ScopedLock locker(lock);
+            pendingLinks[link->getName()] = link;
+        }
         link->notifyConnectionForced(text);
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=1370792&r1=1370791&r2=1370792&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Wed Aug  8 14:46:04 2012
@@ -47,6 +47,7 @@ namespace broker {
         LinkMap   links;    /** indexed by name of Link */
         BridgeMap bridges;  /** indexed by name of Bridge */
         ConnectionMap   connections;  /** indexed by connection identifier, 
gives link name */
+        LinkMap   pendingLinks;  /** pending connection, indexed by name of 
Link */
 
         qpid::sys::Mutex lock;
         Broker* broker;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1370792&r1=1370791&r2=1370792&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Aug  8 14:46:04 2012
@@ -808,48 +808,19 @@ void Connection::config(const std::strin
     else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << 
kind));
 }
 
-namespace {
-    // find a Link that matches the given Address
-    class LinkFinder {
-        qpid::Address id;
-        boost::shared_ptr<broker::Link> link;
-    public:
-        LinkFinder(const qpid::Address& _id) : id(_id) {}
-        boost::shared_ptr<broker::Link> getLink() { return link; }
-        void operator() (boost::shared_ptr<broker::Link> l)
-        {
-            if (!link) {
-                qpid::Address addr(l->getTransport(), l->getHost(), 
l->getPort());
-                if (id == addr) {
-                    link = l;
-                }
-            }
-        }
-    };
-}
-
 void Connection::internalState(const std::string& type,
                                const std::string& name,
                                const framing::FieldTable& state)
 {
     if (type == "link") {
-        // name is the string representation of the Link's _configured_ 
destination address
-        Url dest;
-        try {
-            dest = name;
-        } catch(...) {
-            throw Exception(QPID_MSG("Update failed, invalid format for Link 
destination address: " << name));
-        }
-        assert(dest.size());
-        LinkFinder finder(dest[0]);
-        cluster.getBroker().getLinks().eachLink(boost::ref(finder));
-        if (finder.getLink()) {
+        boost::shared_ptr<qpid::broker::Link> 
link(cluster.getBroker().getLinks().getLink(name));
+        if (link.get()) {
             try {
-                finder.getLink()->setState(state);
+                link->setState(state);
             } catch(...) {
                 throw Exception(QPID_MSG("Update failed, invalid state for 
Link " << name << ", state: " << state));
             }
-            QPID_LOG(debug, cluster << " updated link " << dest[0] << " with 
state: " << state);
+            QPID_LOG(debug, cluster << " updated link " << name << " with 
state: " << state);
         } else throw Exception(QPID_MSG("Update failed, unable to find Link 
named: " << name));
     }
     else throw Exception(QPID_MSG("Update failed, invalid object type for 
internal state replication: " << type));

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1370792&r1=1370791&r2=1370792&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed Aug  8 14:46:04 
2012
@@ -690,10 +690,8 @@ void UpdateClient::updateLink(const boos
     // now push the current state
     framing::FieldTable state;
     link->getState(state);
-    std::ostringstream os;
-    os << qpid::Address(link->getTransport(), link->getHost(), 
link->getPort());
     ClusterConnectionProxy(session).internalState(std::string("link"),
-                                                  os.str(),
+                                                  link->getName(),
                                                   state);
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to