Author: astitcher
Date: Tue Jan  8 22:17:55 2013
New Revision: 1430573

URL: http://svn.apache.org/viewvc?rev=1430573&view=rev
Log:
QPID-4315: Changed Connection management name to be supplied by Link
code on outgoing connections so that the Link code can correlate the
connection with the Link using the name.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
    qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h
    qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Jan  8 22:17:55 2013
@@ -1015,11 +1015,12 @@ void Broker::accept() {
 }
 
 void Broker::connect(
+    const std::string& name,
     const std::string& host, const std::string& port, const std::string& 
transport,
     boost::function2<void, int, std::string> failed)
 {
     boost::shared_ptr<ProtocolFactory> pf = getProtocolFactory(transport);
-    if (pf) pf->connect(poller, host, port, factory.get(), failed);
+    if (pf) pf->connect(poller, name, host, port, factory.get(), failed);
     else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: 
" << transport));
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Jan  8 22:17:55 2013
@@ -233,7 +233,8 @@ class Broker : public sys::Runnable, pub
     QPID_BROKER_EXTERN void accept();
 
     /** Create a connection to another broker. */
-    void connect(const std::string& host, const std::string& port,
+    void connect(const std::string& name,
+                 const std::string& host, const std::string& port,
                  const std::string& transport,
                  boost::function2<void, int, std::string> failed);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue Jan  8 22:17:55 2013
@@ -228,7 +228,7 @@ void Link::startConnectionLH ()
         // Set the state before calling connect.  It is possible that connect
         // will fail synchronously and call Link::closed before returning.
         setStateLH(STATE_CONNECTING);
-        broker->connect (host, boost::lexical_cast<std::string>(port), 
transport,
+        broker->connect (name, host, boost::lexical_cast<std::string>(port), 
transport,
                          boost::bind (&Link::closed, this, _1, _2));
         QPID_LOG (info, "Inter-broker link connecting to " << host << ":" << 
port);
     } catch(const std::exception& e) {
@@ -774,14 +774,6 @@ std::string Link::createName(const std::
     return linkName.str();
 }
 
-
-bool Link::pendingConnection(const std::string& _host, uint16_t _port) const
-{
-    Mutex::ScopedLock mutex(lock);
-    return (isConnecting() && _port == port && _host == host);
-}
-
-
 const std::string Link::exchangeTypeName("qpid.LinkExchange");
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Tue Jan  8 22:17:55 2013
@@ -115,7 +115,6 @@ class Link : public PersistableConfig, p
     void closed(int, std::string);   // Called when connection goes away
     void notifyConnectionForced(const std::string text);
     void closeConnection(const std::string& reason);
-    bool pendingConnection(const std::string& host, uint16_t port) const;  // 
is Link trying to connect to this remote?
 
     friend class LinkRegistry; // to call established, opened, closed
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue Jan  8 22:17:55 
2013
@@ -272,38 +272,6 @@ MessageStore* LinkRegistry::getStore() c
     return store;
 }
 
-namespace {
-    void extractHostPort(const std::string& connId, std::string *host, 
uint16_t *port)
-    {
-        // Extract host and port of remote broker from connection id string.
-        //
-        // TODO aconway 2011-02-01: centralize code that constructs/parses 
connection
-        // management IDs. Currently sys:: protocol factories and IO plugins 
construct the
-        // IDs and LinkRegistry parses them.
-        // KAG: current connection id format assumed:
-        // "localhost:port-remotehost:port".  In the case of IpV6, the host 
addresses are
-        // contained within brackets "[...]", example:
-        // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided 
to alert us
-        // if this assumption changes!
-        size_t separator = connId.find('-');
-        assert(separator != std::string::npos);
-        std::string remote = connId.substr(separator+1, std::string::npos);
-        separator = remote.rfind(":");
-        assert(separator != std::string::npos);
-        *host = remote.substr(0, separator);
-        // IPv6 - host is bracketed by "[]", strip them
-        if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') {
-            *host = host->substr(1, host->length() - 2);
-        }
-        try {
-            *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1, 
std::string::npos));
-        } catch (const boost::bad_lexical_cast&) {
-            QPID_LOG(error, "Invalid format for connection identifier! '" << 
connId << "'");
-            assert(false);
-        }
-    }
-}
-
 /** find the Link that corresponds to the given connection */
 Link::shared_ptr LinkRegistry::findLink(const std::string& connId)
 {
@@ -323,19 +291,15 @@ void LinkRegistry::notifyConnection(cons
     // create a mapping from connection id to link
     QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key );
     std::string host;
-    uint16_t port = 0;
-    extractHostPort( key, &host, &port );
     Link::shared_ptr link;
     {
         Mutex::ScopedLock locker(lock);
-        for (LinkMap::iterator l = pendingLinks.begin(); l != 
pendingLinks.end(); ++l) {
-            if (l->second->pendingConnection(host, port)) {
-                link = l->second;
-                pendingLinks.erase(l);
-                connections[key] = link->getName();
-                QPID_LOG(debug, "LinkRegistry:: found pending =" << 
link->getName());
-                break;
-            }
+        LinkMap::iterator l = pendingLinks.find(key);
+        if (l != pendingLinks.end()) {
+            link = l->second;
+            pendingLinks.erase(l);
+            connections[key] = link->getName();
+            QPID_LOG(debug, "LinkRegistry:: found pending =" << 
link->getName());
         }
     }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Tue Jan  8 22:17:55 
2013
@@ -51,14 +51,14 @@ struct ProtocolTimeoutTask : public sys:
     }
 };
 
-AsynchIOHandler::AsynchIOHandler(const std::string& id, 
ConnectionCodec::Factory* f, bool nodict0) :
+AsynchIOHandler::AsynchIOHandler(const std::string& id, 
ConnectionCodec::Factory* f, bool isClient0, bool nodict0) :
     identifier(id),
     aio(0),
     factory(f),
     codec(0),
     reads(0),
     readError(false),
-    isClient(false),
+    isClient(isClient0),
     nodict(nodict0),
     readCredit(InfiniteCredit)
 {}

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h Tue Jan  8 22:17:55 2013
@@ -60,12 +60,10 @@ class AsynchIOHandler : public OutputCon
     void write(const framing::ProtocolInitiation&);
 
   public:
-    QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, 
qpid::sys::ConnectionCodec::Factory* f, bool nodict);
+    QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, 
qpid::sys::ConnectionCodec::Factory* f, bool isClient, bool nodict);
     QPID_COMMON_EXTERN ~AsynchIOHandler();
     QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime);
 
-    QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
-
     // Output side
     QPID_COMMON_EXTERN void abort();
     QPID_COMMON_EXTERN void activateOutput();

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h Tue Jan  8 22:17:55 2013
@@ -42,6 +42,7 @@ class ProtocolFactory : public qpid::Sha
     virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) 
= 0;
     virtual void connect(
         boost::shared_ptr<Poller>,
+        const std::string& name,
         const std::string& host, const std::string& port,
         ConnectionCodec::Factory* codec,
         ConnectFailedCallback failed) = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Tue Jan  8 22:17:55 2013
@@ -23,6 +23,7 @@
 
 #include "qpid/Plugin.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
 #include "qpid/framing/AMQP_HighestVersion.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/rdma/RdmaIO.h"
@@ -83,7 +84,7 @@ class RdmaIOHandler : public OutputContr
 };
 
 RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, 
qpid::sys::ConnectionCodec::Factory* f) :
-    identifier(c->getFullName()),
+    identifier(broker::QPID_NAME_PREFIX+c->getFullName()),
     factory(f),
     codec(0),
     readError(false),
@@ -250,7 +251,7 @@ class RdmaIOProtocolFactory : public Pro
   public:
     RdmaIOProtocolFactory(int16_t port, int backlog);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
-    void connect(Poller::shared_ptr, const string& host, const std::string& 
port, ConnectionCodec::Factory*, ConnectFailedCallback);
+    void connect(Poller::shared_ptr, const std::string& name, const string& 
host, const std::string& port, ConnectionCodec::Factory*, 
ConnectFailedCallback);
 
     uint16_t getPort() const;
 
@@ -371,6 +372,7 @@ void RdmaIOProtocolFactory::connected(Po
 
 void RdmaIOProtocolFactory::connect(
     Poller::shared_ptr poller,
+    const std::string& /*name*/,
     const std::string& host, const std::string& port,
     ConnectionCodec::Factory* f,
     ConnectFailedCallback failed)

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp Tue Jan  8 22:17:55 2013
@@ -23,6 +23,7 @@
 
 #include "qpid/Plugin.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/AsynchIOHandler.h"
 #include "qpid/sys/AsynchIO.h"
@@ -76,15 +77,16 @@ class SslProtocolFactory : public Protoc
     SslProtocolFactory(const qpid::broker::Broker::Options& opts, const 
SslServerOptions& options,
                        Timer& timer);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
-    void connect(Poller::shared_ptr, const std::string& host, const 
std::string& port,
+    void connect(Poller::shared_ptr, const std::string& name, const 
std::string& host, const std::string& port,
                  ConnectionCodec::Factory*,
                  ConnectFailedCallback);
 
     uint16_t getPort() const;
 
   private:
-    void established(Poller::shared_ptr, const Socket&, 
ConnectionCodec::Factory*,
-                     bool isClient);
+    void establishedIncoming(Poller::shared_ptr, const Socket&, 
ConnectionCodec::Factory*);
+    void establishedOutgoing(Poller::shared_ptr, const Socket&, 
ConnectionCodec::Factory*, const std::string&);
+    void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const 
Socket&);
     void connectFailed(const Socket&, int, const std::string&, 
ConnectFailedCallback);
 };
 
@@ -220,21 +222,24 @@ SslProtocolFactory::SslProtocolFactory(c
     }
 }
 
+void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const 
Socket& s,
+                                          ConnectionCodec::Factory* f) {
+    AsynchIOHandler* async = new 
AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
+    establishedCommon(async, poller, s);
+}
 
-void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket& 
s,
-                                     ConnectionCodec::Factory* f, bool 
isClient) {
-
-    AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, 
nodict);
+void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const 
Socket& s,
+                                             ConnectionCodec::Factory* f, 
const std::string& name) {
+    AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
+    establishedCommon(async, poller, s);
+}
 
+void SslProtocolFactory::establishedCommon(AsynchIOHandler* async, 
Poller::shared_ptr poller, const Socket& s) {
     if (tcpNoDelay) {
         s.setTcpNoDelay();
         QPID_LOG(info, "Set TCP_NODELAY on connection to " << 
s.getPeerAddress());
     }
 
-    if (isClient) {
-        async->setClient();
-    }
-
     AsynchIO* aio = AsynchIO::create(
         s,
         boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
@@ -257,7 +262,7 @@ void SslProtocolFactory::accept(Poller::
     for (unsigned i = 0; i<listeners.size(); ++i) {
         acceptors.push_back(
             AsynchAcceptor::create(listeners[i],
-                            boost::bind(&SslProtocolFactory::established, 
this, poller, _1, fact, false)));
+                            
boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
         acceptors[i].start(poller);
     }
 }
@@ -273,6 +278,7 @@ void SslProtocolFactory::connectFailed(
 
 void SslProtocolFactory::connect(
     Poller::shared_ptr poller,
+    const std::string& name,
     const std::string& host, const std::string& port,
     ConnectionCodec::Factory* fact,
     ConnectFailedCallback failed)
@@ -289,8 +295,8 @@ void SslProtocolFactory::connect(
         *socket,
         host,
         port,
-        boost::bind(&SslProtocolFactory::established,
-                    this, poller, _1, fact, true),
+        boost::bind(&SslProtocolFactory::establishedOutgoing,
+                    this, poller, _1, fact, name),
         boost::bind(&SslProtocolFactory::connectFailed,
                     this, _1, _2, _3, failed));
     c->start(poller);

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Tue Jan  8 22:17:55 2013
@@ -23,6 +23,7 @@
 
 #include "qpid/Plugin.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/AsynchIOHandler.h"
 #include "qpid/sys/AsynchIO.h"
@@ -50,15 +51,17 @@ class AsynchIOProtocolFactory : public P
   public:
     AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer& 
timer, bool shouldListen);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
-    void connect(Poller::shared_ptr, const std::string& host, const 
std::string& port,
+    void connect(Poller::shared_ptr, const std::string& name,
+                 const std::string& host, const std::string& port,
                  ConnectionCodec::Factory*,
                  ConnectFailedCallback);
 
     uint16_t getPort() const;
 
   private:
-    void established(Poller::shared_ptr, const Socket&, 
ConnectionCodec::Factory*,
-                     bool isClient);
+    void establishedIncoming(Poller::shared_ptr, const Socket&, 
ConnectionCodec::Factory*);
+    void establishedOutgoing(Poller::shared_ptr, const Socket&, 
ConnectionCodec::Factory*, const std::string&);
+    void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const 
Socket&);
     void connectFailed(const Socket&, int, const std::string&, 
ConnectFailedCallback);
 };
 
@@ -171,17 +174,24 @@ AsynchIOProtocolFactory::AsynchIOProtoco
     }
 }
 
-void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const 
Socket& s,
-                                          ConnectionCodec::Factory* f, bool 
isClient) {
-    AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, false);
+void AsynchIOProtocolFactory::establishedIncoming(Poller::shared_ptr poller, 
const Socket& s,
+                                          ConnectionCodec::Factory* f) {
+    AsynchIOHandler* async = new 
AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
+    establishedCommon(async, poller, s);
+}
+
+void AsynchIOProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, 
const Socket& s,
+                                              ConnectionCodec::Factory* f, 
const std::string& name) {
+    AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
+    establishedCommon(async, poller, s);
+}
 
+void AsynchIOProtocolFactory::establishedCommon(AsynchIOHandler* async, 
Poller::shared_ptr poller, const Socket& s) {
     if (tcpNoDelay) {
         s.setTcpNoDelay();
         QPID_LOG(info, "Set TCP_NODELAY on connection to " << 
s.getPeerAddress());
     }
 
-    if (isClient)
-        async->setClient();
     AsynchIO* aio = AsynchIO::create
       (s,
        boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
@@ -204,7 +214,7 @@ void AsynchIOProtocolFactory::accept(Pol
     for (unsigned i = 0; i<listeners.size(); ++i) {
         acceptors.push_back(
             AsynchAcceptor::create(listeners[i],
-                            boost::bind(&AsynchIOProtocolFactory::established, 
this, poller, _1, fact, false)));
+                            
boost::bind(&AsynchIOProtocolFactory::establishedIncoming, this, poller, _1, 
fact)));
         acceptors[i].start(poller);
     }
 }
@@ -220,6 +230,7 @@ void AsynchIOProtocolFactory::connectFai
 
 void AsynchIOProtocolFactory::connect(
     Poller::shared_ptr poller,
+    const std::string& name,
     const std::string& host, const std::string& port,
     ConnectionCodec::Factory* fact,
     ConnectFailedCallback failed)
@@ -235,8 +246,8 @@ void AsynchIOProtocolFactory::connect(
         *socket,
         host,
         port,
-        boost::bind(&AsynchIOProtocolFactory::established,
-                    this, poller, _1, fact, true),
+        boost::bind(&AsynchIOProtocolFactory::establishedOutgoing,
+                    this, poller, _1, fact, name),
         boost::bind(&AsynchIOProtocolFactory::connectFailed,
                     this, _1, _2, _3, failed));
     c->start(poller);



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to