Author: astitcher
Date: Tue Jan 8 22:17:55 2013
New Revision: 1430573
URL: http://svn.apache.org/viewvc?rev=1430573&view=rev
Log:
QPID-4315: Changed Connection management name to be supplied by Link
code on outgoing connections so that the Link code can correlate the
connection with the Link using the name.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h
qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Jan 8 22:17:55 2013
@@ -1015,11 +1015,12 @@ void Broker::accept() {
}
void Broker::connect(
+ const std::string& name,
const std::string& host, const std::string& port, const std::string&
transport,
boost::function2<void, int, std::string> failed)
{
boost::shared_ptr<ProtocolFactory> pf = getProtocolFactory(transport);
- if (pf) pf->connect(poller, host, port, factory.get(), failed);
+ if (pf) pf->connect(poller, name, host, port, factory.get(), failed);
else throw NoSuchTransportException(QPID_MSG("Unsupported transport type:
" << transport));
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Jan 8 22:17:55 2013
@@ -233,7 +233,8 @@ class Broker : public sys::Runnable, pub
QPID_BROKER_EXTERN void accept();
/** Create a connection to another broker. */
- void connect(const std::string& host, const std::string& port,
+ void connect(const std::string& name,
+ const std::string& host, const std::string& port,
const std::string& transport,
boost::function2<void, int, std::string> failed);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue Jan 8 22:17:55 2013
@@ -228,7 +228,7 @@ void Link::startConnectionLH ()
// Set the state before calling connect. It is possible that connect
// will fail synchronously and call Link::closed before returning.
setStateLH(STATE_CONNECTING);
- broker->connect (host, boost::lexical_cast<std::string>(port),
transport,
+ broker->connect (name, host, boost::lexical_cast<std::string>(port),
transport,
boost::bind (&Link::closed, this, _1, _2));
QPID_LOG (info, "Inter-broker link connecting to " << host << ":" <<
port);
} catch(const std::exception& e) {
@@ -774,14 +774,6 @@ std::string Link::createName(const std::
return linkName.str();
}
-
-bool Link::pendingConnection(const std::string& _host, uint16_t _port) const
-{
- Mutex::ScopedLock mutex(lock);
- return (isConnecting() && _port == port && _host == host);
-}
-
-
const std::string Link::exchangeTypeName("qpid.LinkExchange");
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Tue Jan 8 22:17:55 2013
@@ -115,7 +115,6 @@ class Link : public PersistableConfig, p
void closed(int, std::string); // Called when connection goes away
void notifyConnectionForced(const std::string text);
void closeConnection(const std::string& reason);
- bool pendingConnection(const std::string& host, uint16_t port) const; //
is Link trying to connect to this remote?
friend class LinkRegistry; // to call established, opened, closed
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue Jan 8 22:17:55
2013
@@ -272,38 +272,6 @@ MessageStore* LinkRegistry::getStore() c
return store;
}
-namespace {
- void extractHostPort(const std::string& connId, std::string *host,
uint16_t *port)
- {
- // Extract host and port of remote broker from connection id string.
- //
- // TODO aconway 2011-02-01: centralize code that constructs/parses
connection
- // management IDs. Currently sys:: protocol factories and IO plugins
construct the
- // IDs and LinkRegistry parses them.
- // KAG: current connection id format assumed:
- // "localhost:port-remotehost:port". In the case of IpV6, the host
addresses are
- // contained within brackets "[...]", example:
- // connId="[::1]:36859-[::1]:48603". Liberal use of "asserts" provided
to alert us
- // if this assumption changes!
- size_t separator = connId.find('-');
- assert(separator != std::string::npos);
- std::string remote = connId.substr(separator+1, std::string::npos);
- separator = remote.rfind(":");
- assert(separator != std::string::npos);
- *host = remote.substr(0, separator);
- // IPv6 - host is bracketed by "[]", strip them
- if ((*host)[0] == '[' && (*host)[host->length() - 1] == ']') {
- *host = host->substr(1, host->length() - 2);
- }
- try {
- *port = boost::lexical_cast<uint16_t>(remote.substr(separator+1,
std::string::npos));
- } catch (const boost::bad_lexical_cast&) {
- QPID_LOG(error, "Invalid format for connection identifier! '" <<
connId << "'");
- assert(false);
- }
- }
-}
-
/** find the Link that corresponds to the given connection */
Link::shared_ptr LinkRegistry::findLink(const std::string& connId)
{
@@ -323,19 +291,15 @@ void LinkRegistry::notifyConnection(cons
// create a mapping from connection id to link
QPID_LOG(debug, "LinkRegistry::notifyConnection(); key=" << key );
std::string host;
- uint16_t port = 0;
- extractHostPort( key, &host, &port );
Link::shared_ptr link;
{
Mutex::ScopedLock locker(lock);
- for (LinkMap::iterator l = pendingLinks.begin(); l !=
pendingLinks.end(); ++l) {
- if (l->second->pendingConnection(host, port)) {
- link = l->second;
- pendingLinks.erase(l);
- connections[key] = link->getName();
- QPID_LOG(debug, "LinkRegistry:: found pending =" <<
link->getName());
- break;
- }
+ LinkMap::iterator l = pendingLinks.find(key);
+ if (l != pendingLinks.end()) {
+ link = l->second;
+ pendingLinks.erase(l);
+ connections[key] = link->getName();
+ QPID_LOG(debug, "LinkRegistry:: found pending =" <<
link->getName());
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Tue Jan 8 22:17:55
2013
@@ -51,14 +51,14 @@ struct ProtocolTimeoutTask : public sys:
}
};
-AsynchIOHandler::AsynchIOHandler(const std::string& id,
ConnectionCodec::Factory* f, bool nodict0) :
+AsynchIOHandler::AsynchIOHandler(const std::string& id,
ConnectionCodec::Factory* f, bool isClient0, bool nodict0) :
identifier(id),
aio(0),
factory(f),
codec(0),
reads(0),
readError(false),
- isClient(false),
+ isClient(isClient0),
nodict(nodict0),
readCredit(InfiniteCredit)
{}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h Tue Jan 8 22:17:55 2013
@@ -60,12 +60,10 @@ class AsynchIOHandler : public OutputCon
void write(const framing::ProtocolInitiation&);
public:
- QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id,
qpid::sys::ConnectionCodec::Factory* f, bool nodict);
+ QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id,
qpid::sys::ConnectionCodec::Factory* f, bool isClient, bool nodict);
QPID_COMMON_EXTERN ~AsynchIOHandler();
QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime);
- QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
-
// Output side
QPID_COMMON_EXTERN void abort();
QPID_COMMON_EXTERN void activateOutput();
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ProtocolFactory.h Tue Jan 8 22:17:55 2013
@@ -42,6 +42,7 @@ class ProtocolFactory : public qpid::Sha
virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*)
= 0;
virtual void connect(
boost::shared_ptr<Poller>,
+ const std::string& name,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* codec,
ConnectFailedCallback failed) = 0;
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Tue Jan 8 22:17:55 2013
@@ -23,6 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/rdma/RdmaIO.h"
@@ -83,7 +84,7 @@ class RdmaIOHandler : public OutputContr
};
RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c,
qpid::sys::ConnectionCodec::Factory* f) :
- identifier(c->getFullName()),
+ identifier(broker::QPID_NAME_PREFIX+c->getFullName()),
factory(f),
codec(0),
readError(false),
@@ -250,7 +251,7 @@ class RdmaIOProtocolFactory : public Pro
public:
RdmaIOProtocolFactory(int16_t port, int backlog);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const string& host, const std::string&
port, ConnectionCodec::Factory*, ConnectFailedCallback);
+ void connect(Poller::shared_ptr, const std::string& name, const string&
host, const std::string& port, ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
@@ -371,6 +372,7 @@ void RdmaIOProtocolFactory::connected(Po
void RdmaIOProtocolFactory::connect(
Poller::shared_ptr poller,
+ const std::string& /*name*/,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* f,
ConnectFailedCallback failed)
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp Tue Jan 8 22:17:55 2013
@@ -23,6 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
@@ -76,15 +77,16 @@ class SslProtocolFactory : public Protoc
SslProtocolFactory(const qpid::broker::Broker::Options& opts, const
SslServerOptions& options,
Timer& timer);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, const
std::string& port,
+ void connect(Poller::shared_ptr, const std::string& name, const
std::string& host, const std::string& port,
ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
private:
- void established(Poller::shared_ptr, const Socket&,
ConnectionCodec::Factory*,
- bool isClient);
+ void establishedIncoming(Poller::shared_ptr, const Socket&,
ConnectionCodec::Factory*);
+ void establishedOutgoing(Poller::shared_ptr, const Socket&,
ConnectionCodec::Factory*, const std::string&);
+ void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const
Socket&);
void connectFailed(const Socket&, int, const std::string&,
ConnectFailedCallback);
};
@@ -220,21 +222,24 @@ SslProtocolFactory::SslProtocolFactory(c
}
}
+void SslProtocolFactory::establishedIncoming(Poller::shared_ptr poller, const
Socket& s,
+ ConnectionCodec::Factory* f) {
+ AsynchIOHandler* async = new
AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
+ establishedCommon(async, poller, s);
+}
-void SslProtocolFactory::established(Poller::shared_ptr poller, const Socket&
s,
- ConnectionCodec::Factory* f, bool
isClient) {
-
- AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f,
nodict);
+void SslProtocolFactory::establishedOutgoing(Poller::shared_ptr poller, const
Socket& s,
+ ConnectionCodec::Factory* f,
const std::string& name) {
+ AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
+ establishedCommon(async, poller, s);
+}
+void SslProtocolFactory::establishedCommon(AsynchIOHandler* async,
Poller::shared_ptr poller, const Socket& s) {
if (tcpNoDelay) {
s.setTcpNoDelay();
QPID_LOG(info, "Set TCP_NODELAY on connection to " <<
s.getPeerAddress());
}
- if (isClient) {
- async->setClient();
- }
-
AsynchIO* aio = AsynchIO::create(
s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
@@ -257,7 +262,7 @@ void SslProtocolFactory::accept(Poller::
for (unsigned i = 0; i<listeners.size(); ++i) {
acceptors.push_back(
AsynchAcceptor::create(listeners[i],
- boost::bind(&SslProtocolFactory::established,
this, poller, _1, fact, false)));
+
boost::bind(&SslProtocolFactory::establishedIncoming, this, poller, _1, fact)));
acceptors[i].start(poller);
}
}
@@ -273,6 +278,7 @@ void SslProtocolFactory::connectFailed(
void SslProtocolFactory::connect(
Poller::shared_ptr poller,
+ const std::string& name,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
@@ -289,8 +295,8 @@ void SslProtocolFactory::connect(
*socket,
host,
port,
- boost::bind(&SslProtocolFactory::established,
- this, poller, _1, fact, true),
+ boost::bind(&SslProtocolFactory::establishedOutgoing,
+ this, poller, _1, fact, name),
boost::bind(&SslProtocolFactory::connectFailed,
this, _1, _2, _3, failed));
c->start(poller);
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1430573&r1=1430572&r2=1430573&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Tue Jan 8 22:17:55 2013
@@ -23,6 +23,7 @@
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/NameGenerator.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
@@ -50,15 +51,17 @@ class AsynchIOProtocolFactory : public P
public:
AsynchIOProtocolFactory(const qpid::broker::Broker::Options& opts, Timer&
timer, bool shouldListen);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, const
std::string& port,
+ void connect(Poller::shared_ptr, const std::string& name,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
private:
- void established(Poller::shared_ptr, const Socket&,
ConnectionCodec::Factory*,
- bool isClient);
+ void establishedIncoming(Poller::shared_ptr, const Socket&,
ConnectionCodec::Factory*);
+ void establishedOutgoing(Poller::shared_ptr, const Socket&,
ConnectionCodec::Factory*, const std::string&);
+ void establishedCommon(AsynchIOHandler*, Poller::shared_ptr , const
Socket&);
void connectFailed(const Socket&, int, const std::string&,
ConnectFailedCallback);
};
@@ -171,17 +174,24 @@ AsynchIOProtocolFactory::AsynchIOProtoco
}
}
-void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const
Socket& s,
- ConnectionCodec::Factory* f, bool
isClient) {
- AsynchIOHandler* async = new AsynchIOHandler(s.getFullAddress(), f, false);
+void AsynchIOProtocolFactory::establishedIncoming(Poller::shared_ptr poller,
const Socket& s,
+ ConnectionCodec::Factory* f) {
+ AsynchIOHandler* async = new
AsynchIOHandler(broker::QPID_NAME_PREFIX+s.getFullAddress(), f, false, false);
+ establishedCommon(async, poller, s);
+}
+
+void AsynchIOProtocolFactory::establishedOutgoing(Poller::shared_ptr poller,
const Socket& s,
+ ConnectionCodec::Factory* f,
const std::string& name) {
+ AsynchIOHandler* async = new AsynchIOHandler(name, f, true, false);
+ establishedCommon(async, poller, s);
+}
+void AsynchIOProtocolFactory::establishedCommon(AsynchIOHandler* async,
Poller::shared_ptr poller, const Socket& s) {
if (tcpNoDelay) {
s.setTcpNoDelay();
QPID_LOG(info, "Set TCP_NODELAY on connection to " <<
s.getPeerAddress());
}
- if (isClient)
- async->setClient();
AsynchIO* aio = AsynchIO::create
(s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
@@ -204,7 +214,7 @@ void AsynchIOProtocolFactory::accept(Pol
for (unsigned i = 0; i<listeners.size(); ++i) {
acceptors.push_back(
AsynchAcceptor::create(listeners[i],
- boost::bind(&AsynchIOProtocolFactory::established,
this, poller, _1, fact, false)));
+
boost::bind(&AsynchIOProtocolFactory::establishedIncoming, this, poller, _1,
fact)));
acceptors[i].start(poller);
}
}
@@ -220,6 +230,7 @@ void AsynchIOProtocolFactory::connectFai
void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
+ const std::string& name,
const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
@@ -235,8 +246,8 @@ void AsynchIOProtocolFactory::connect(
*socket,
host,
port,
- boost::bind(&AsynchIOProtocolFactory::established,
- this, poller, _1, fact, true),
+ boost::bind(&AsynchIOProtocolFactory::establishedOutgoing,
+ this, poller, _1, fact, name),
boost::bind(&AsynchIOProtocolFactory::connectFailed,
this, _1, _2, _3, failed));
c->start(poller);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]