Author: astitcher
Date: Tue Jun 4 14:27:55 2013
New Revision: 1489458
URL: http://svn.apache.org/r1489458
Log:
QPID-4854: Make the protocol negotiation timeout actually relate to
the protocol negotiation!
Modified:
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h
qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Tue Jun 4 14:27:55
2013
@@ -118,6 +118,7 @@ size_t Connection::encode(char* buffer,
}
void Connection::abort() { output.abort(); }
+void Connection::connectionEstablished() { output.connectionEstablished(); }
void Connection::activateOutput() { output.activateOutput(); }
void Connection::close() {
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Tue Jun 4 14:27:55 2013
@@ -65,6 +65,7 @@ class Connection : public sys::Connecti
bool isClosed() const;
bool canEncode();
void abort();
+ void connectionEstablished();
void activateOutput();
void closed(); // connection closed by peer.
void close(); // closing from this end.
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Jun 4 14:27:55 2013
@@ -456,6 +456,7 @@ void Connection::setHeartbeatInterval(ui
timer.add(timeoutTimer);
}
}
+ out.connectionEstablished();
}
void Connection::startLinkHeartbeatTimeoutTask() {
@@ -463,6 +464,7 @@ void Connection::startLinkHeartbeatTimeo
linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat *
TIME_SEC, *this);
timer.add(linkHeartbeatTimer);
}
+ out.connectionEstablished();
}
void Connection::restartTimeout()
@@ -480,6 +482,7 @@ bool Connection::isOpen() { return adapt
Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) :
con(_con), next(0) {}
void Connection::OutboundFrameTracker::close() { next->close(); }
void Connection::OutboundFrameTracker::abort() { next->abort(); }
+void Connection::OutboundFrameTracker::connectionEstablished() {
next->connectionEstablished(); }
void Connection::OutboundFrameTracker::activateOutput() {
next->activateOutput(); }
void Connection::OutboundFrameTracker::send(framing::AMQFrame& f)
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jun 4 14:27:55 2013
@@ -189,6 +189,7 @@ class Connection : public sys::Connectio
OutboundFrameTracker(Connection&);
void close();
void abort();
+ void connectionEstablished();
void activateOutput();
void send(framing::AMQFrame&);
void wrap(sys::ConnectionOutputHandlerPtr&);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Tue Jun 4 14:27:55
2013
@@ -181,6 +181,7 @@ void Connection::process()
pn_connection_set_container(connection,
broker.getFederationTag().c_str());
setContainerId(pn_connection_remote_container(connection));
pn_connection_open(connection);
+ out.connectionEstablished();
}
for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s =
pn_session_next(s, REQUIRES_OPEN)) {
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SslTransport.h Tue Jun 4
14:27:55 2013
@@ -48,6 +48,7 @@ class SslTransport : public Transport
void activateOutput();
void abort();
+ void connectionEstablished() {};
void close();
private:
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h Tue Jun 4
14:27:55 2013
@@ -48,6 +48,7 @@ class TcpTransport : public Transport
void activateOutput();
void abort();
+ void connectionEstablished() {};
void close();
private:
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Tue Jun 4 14:27:55
2013
@@ -100,6 +100,13 @@ void AsynchIOHandler::abort() {
aio->queueWriteClose();
}
+void AsynchIOHandler::connectionEstablished() {
+ if (timeoutTimerTask) {
+ timeoutTimerTask->cancel();
+ timeoutTimerTask.reset();
+ }
+}
+
void AsynchIOHandler::activateOutput() {
aio->notifyPendingWrite();
}
@@ -123,13 +130,6 @@ void AsynchIOHandler::readbuff(AsynchIO&
if (codec) { // Already initiated
try {
decoded = codec->decode(buff->bytes+buff->dataStart,
buff->dataCount);
- // When we've decoded 3 reads (probably frames) we will have
authenticated and
- // started heartbeats, if specified, in many (but not all) cases
so now we will cancel
- // the idle connection timeout - this is really hacky, and would
be better implemented
- // in the connection, but that isn't actually created until the
first decode.
- if (reads == 3) {
- timeoutTimerTask->cancel();
- }
}catch(const std::exception& e){
QPID_LOG(error, e.what());
readError = true;
@@ -203,10 +203,6 @@ void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
codec = factory->create(*this, identifier, getSecuritySettings(aio,
nodict));
write(framing::ProtocolInitiation(codec->getVersion()));
- // We've just sent the protocol negotiation so we can cancel the
timeout for that
- // This is not ideal, because we've not received anything yet, but
heartbeats will
- // be active soon
- timeoutTimerTask->cancel();
return;
}
if (codec == 0) return;
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h Tue Jun 4 14:27:55 2013
@@ -63,6 +63,7 @@ class AsynchIOHandler : public OutputCon
// Output side
QPID_COMMON_EXTERN void abort();
+ QPID_COMMON_EXTERN void connectionEstablished();
QPID_COMMON_EXTERN void activateOutput();
// Input side
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h Tue Jun 4
14:27:55 2013
@@ -43,6 +43,7 @@ class ConnectionOutputHandlerPtr : publi
void close() { next->close(); }
void abort() { next->abort(); }
+ void connectionEstablished() { next->connectionEstablished(); }
void activateOutput() { next->activateOutput(); }
void send(framing::AMQFrame& f) { next->send(f); }
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/OutputControl.h Tue Jun 4 14:27:55 2013
@@ -32,6 +32,7 @@ namespace sys {
public:
virtual ~OutputControl() {}
virtual void abort() = 0;
+ virtual void connectionEstablished() = 0;
virtual void activateOutput() = 0;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp?rev=1489458&r1=1489457&r2=1489458&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp Tue Jun 4 14:27:55 2013
@@ -67,6 +67,7 @@ class RdmaIOHandler : public OutputContr
// Output side
void close();
void abort();
+ void connectionEstablished();
void activateOutput();
void initProtocolOut();
@@ -131,6 +132,10 @@ void RdmaIOHandler::close() {
void RdmaIOHandler::abort() {
}
+// TODO: Dummy implementation, need to fill this in for connection
establishment timeout to work
+void RdmaIOHandler::connectionEstablished() {
+}
+
void RdmaIOHandler::activateOutput() {
aio->notifyPendingWrite();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]