Author: gsim
Date: Fri Jun 11 08:42:37 2010
New Revision: 953610
URL: http://svn.apache.org/viewvc?rev=953610&view=rev
Log:
Ensure that AsynchConnector is disassociated from Poller when aborting
connection attempt due to a heartbeat timeout
Modified:
qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp
qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h
qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=953610&r1=953609&r2=953610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.cpp Fri Jun 11 08:42:37
2010
@@ -76,10 +76,11 @@ TCPConnector::TCPConnector(Poller::share
initiated(false),
closed(true),
shutdownHandler(0),
+ connector(0),
aio(0),
poller(p)
{
- QPID_LOG(debug, "TCPConnector created for " << version.toString());
+ QPID_LOG(debug, "TCPConnector created for " << version);
settings.configureSocket(socket);
}
@@ -90,17 +91,18 @@ TCPConnector::~TCPConnector() {
void TCPConnector::connect(const std::string& host, int port) {
Mutex::ScopedLock l(lock);
assert(closed);
- AsynchConnector* c = AsynchConnector::create(
+ connector = AsynchConnector::create(
socket,
host, port,
boost::bind(&TCPConnector::connected, this, _1),
boost::bind(&TCPConnector::connectFailed, this, _3));
closed = false;
- c->start(poller);
+ connector->start(poller);
}
void TCPConnector::connected(const Socket&) {
+ connector = 0;
aio = AsynchIO::create(socket,
boost::bind(&TCPConnector::readbuff, this, _1, _2),
boost::bind(&TCPConnector::eof, this, _1),
@@ -128,6 +130,7 @@ void TCPConnector::initAmqp() {
}
void TCPConnector::connectFailed(const std::string& msg) {
+ connector = 0;
QPID_LOG(warning, "Connect failed: " << msg);
socket.close();
if (!closed)
@@ -158,8 +161,9 @@ void TCPConnector::abort() {
if (aio) {
// Established connection
aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
- } else {
+ } else if (connector) {
// We're still connecting
+ connector->stop();
connectFailed("Connection timedout");
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h?rev=953610&r1=953609&r2=953610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/TCPConnector.h Fri Jun 11 08:42:37 2010
@@ -71,6 +71,7 @@ class TCPConnector : public Connector, p
sys::Socket socket;
+ sys::AsynchConnector* connector;
sys::AsynchIO* aio;
std::string identifier;
boost::shared_ptr<sys::Poller> poller;
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h?rev=953610&r1=953609&r2=953610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIO.h Fri Jun 11 08:42:37 2010
@@ -69,7 +69,7 @@ public:
ConnectedCallback connCb,
FailedCallback failCb);
virtual void start(boost::shared_ptr<Poller> poller) = 0;
-
+ virtual void stop() {};
protected:
AsynchConnector() {}
virtual ~AsynchConnector() {}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=953610&r1=953609&r2=953610&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Fri Jun 11 08:42:37 2010
@@ -157,6 +157,7 @@ public:
ConnectedCallback connCb,
FailedCallback failCb);
void start(Poller::shared_ptr poller);
+ void stop();
};
AsynchConnector::AsynchConnector(const Socket& s,
@@ -183,6 +184,11 @@ void AsynchConnector::start(Poller::shar
startWatch(poller);
}
+void AsynchConnector::stop()
+{
+ stopWatch();
+}
+
void AsynchConnector::connComplete(DispatchHandle& h)
{
h.stopWatch();
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]