Author: astitcher
Date: Mon Jun 8 14:35:01 2009
New Revision: 782651
URL: http://svn.apache.org/viewvc?rev=782651&view=rev
Log:
- Added heartbeat generation to the client (actually echo back any
broker generated heartbeat)
- Broker now disconnects client if it receives no traffic in
2 heartbeat intervals (which is now the same as the client behvaiour)
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=782651&r1=782650&r2=782651&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Mon Jun 8 14:35:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -68,8 +68,8 @@
if (parent != 0)
{
agent = broker_.getManagementAgent();
-
-
+
+
// TODO set last bool true if system connection
if (agent != 0) {
mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId,
!isLink, false);
@@ -95,9 +95,11 @@
}
if (isLink)
links.notifyClosed(mgmtId);
-
+
if (heartbeatTimer)
heartbeatTimer->cancel();
+ if (timeoutTimer)
+ timeoutTimer->cancel();
}
void Connection::received(framing::AMQFrame& frame) {
@@ -181,7 +183,9 @@
{
QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " <<
mgmtId << " closed by error: " << text << "(" << code << ")");
if (heartbeatTimer)
- heartbeatTimer->cancel();
+ heartbeatTimer->cancel();
+ if (timeoutTimer)
+ timeoutTimer->cancel();
adapter.close(code, text);
//make sure we delete dangling pointers from outputTasks before deleting
sessions
outputTasks.removeAll();
@@ -192,7 +196,9 @@
// Send a close to the client but keep the channels. Used by cluster.
void Connection::sendClose() {
if (heartbeatTimer)
- heartbeatTimer->cancel();
+ heartbeatTimer->cancel();
+ if (timeoutTimer)
+ timeoutTimer->cancel();
adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
getOutput().close();
}
@@ -203,7 +209,7 @@
void Connection::closed(){ // Physically closed, suspend open sessions.
try {
- while (!channels.empty())
+ while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
@@ -221,7 +227,7 @@
bool Connection::hasOutput() { return outputTasks.hasOutput(); }
-bool Connection::doOutput() {
+bool Connection::doOutput() {
try{
{
ScopedLock<Mutex> l(ioCallbackLock);
@@ -292,33 +298,65 @@
struct ConnectionHeartbeatTask : public TimerTask {
Timer& timer;
Connection& connection;
- ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) :
+ ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) :
TimerTask(Duration(hb*TIME_SEC)),
timer(t),
connection(c)
{}
-
+
void fire() {
// This is the best we can currently do to avoid a destruction/fire
race
if (!isCancelled()) {
// Setup next firing
reset();
timer.add(this);
-
+
// Send Heartbeat
connection.sendHeartbeat();
}
}
};
+struct ConnectionTimeoutTask : public TimerTask {
+ Timer& timer;
+ Connection& connection;
+ ConnectionTimeoutTask(uint16_t hb, Timer& t, Connection& c) :
+ TimerTask(Duration(hb*2*TIME_SEC)),
+ timer(t),
+ connection(c)
+ {}
+
+ void fire() {
+ // This is the best we can currently do to avoid a destruction/fire
race
+ if (!isCancelled()) {
+ // If we get here then we've not received any traffic in the
timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection timed out: closing");
+ connection.abort();
+ }
+ }
+};
+
+void Connection::abort()
+{
+ out.abort();
+}
+
void Connection::setHeartbeatInterval(uint16_t heartbeat)
{
setHeartbeat(heartbeat);
if (heartbeat > 0) {
- heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
- timer.add(heartbeatTimer);
+ heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
+ timer.add(heartbeatTimer);
+ timeoutTimer = new ConnectionTimeoutTask(heartbeat, timer, *this);
+ timer.add(timeoutTimer);
}
}
-}}
+void Connection::restartTimeout()
+{
+ if (timeoutTimer)
+ timeoutTimer->reset();
+}
+}}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=782651&r1=782650&r2=782651&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Mon Jun 8 14:35:01 2009
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -61,7 +61,7 @@
class LinkRegistry;
class SecureConnection;
-class Connection : public sys::ConnectionInputHandler,
+class Connection : public sys::ConnectionInputHandler,
public ConnectionState,
public RefCounted
{
@@ -115,9 +115,11 @@
/** Connection does not delete the listener. 0 resets. */
void setErrorListener(ErrorListener* l) { errorListener=l; }
ErrorListener* getErrorListener() { return errorListener; }
-
+
void setHeartbeatInterval(uint16_t heartbeat);
void sendHeartbeat();
+ void restartTimeout();
+ void abort();
template <class F> void eachSessionHandler(F f) {
for (ChannelMap::iterator i = channels.begin(); i != channels.end();
++i)
@@ -143,6 +145,7 @@
management::ManagementAgent* agent;
Timer& timer;
boost::intrusive_ptr<TimerTask> heartbeatTimer;
+ boost::intrusive_ptr<TimerTask> timeoutTimer;
ErrorListener* errorListener;
public:
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=782651&r1=782650&r2=782651&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Mon Jun 8
14:35:01 2009
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -63,6 +63,9 @@
void ConnectionHandler::handle(framing::AMQFrame& frame)
{
+ // Received frame on connection so delay timeout
+ handler->connection.restartTimeout();
+
AMQMethodBody* method=frame.getBody()->getMethod();
Connection::ErrorListener* errorListener =
handler->connection.getErrorListener();
try{
@@ -186,7 +189,7 @@
{
std::vector<Url> urls = connection.broker.getKnownBrokers();
framing::Array array(0x95); // str16 array
- for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i)
+ for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i)
array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
proxy.openOk(array);
@@ -197,7 +200,7 @@
}
}
-
+
void ConnectionHandler::Handler::close(uint16_t replyCode, const string&
replyText)
{
if (replyCode != 200) {
@@ -209,11 +212,11 @@
proxy.closeOk();
connection.getOutput().close();
-}
-
+}
+
void ConnectionHandler::Handler::closeOk(){
connection.getOutput().close();
-}
+}
void ConnectionHandler::Handler::heartbeat(){
// Do nothing - the purpose of heartbeats is just to make sure that
there is some
Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=782651&r1=782650&r2=782651&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Mon Jun 8
14:35:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -70,8 +70,8 @@
}
}
-ConnectionHandler::ConnectionHandler(const ConnectionSettings& s,
ProtocolVersion& v)
- : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this),
proxy(outHandler),
+ConnectionHandler::ConnectionHandler(const ConnectionSettings& s,
ProtocolVersion& v)
+ : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this),
proxy(outHandler),
errorCode(CLOSE_CODE_NORMAL), version(v)
{
insist = true;
@@ -82,7 +82,7 @@
FINISHED.insert(FAILED);
FINISHED.insert(CLOSED);
-
+
properties.setInt(SESSION_FLOW_CONTROL, SESSION_FLOW_CONTROL_VER);
properties.setString(CLIENT_PROCESS_NAME,
sys::SystemInfo::getProcessName());
properties.setInt(CLIENT_PID, sys::SystemInfo::getProcessId());
@@ -125,7 +125,7 @@
void ConnectionHandler::outgoing(AMQFrame& frame)
{
- if (getState() == OPEN)
+ if (getState() == OPEN)
out(frame);
else
throw TransportFailure(errorText.empty() ? "Connection is not open." :
errorText);
@@ -160,6 +160,10 @@
// Do nothing - the purpose of heartbeats is just to make sure that there
is some
// traffic on the connection within the heart beat interval, we check for
the
// traffic and don't need to do anything in response to heartbeats
+
+ // Although the above is still true we're now using a received heartbeat
as a trigger
+ // to send out our own heartbeat
+ proxy.heartbeat();
}
void ConnectionHandler::checkState(STATES s, const std::string& msg)
@@ -223,13 +227,13 @@
}
}
-void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t
maxFrameSizeProposed,
+void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t
maxFrameSizeProposed,
uint16_t heartbeatMin, uint16_t heartbeatMax)
{
checkState(NEGOTIATING, INVALID_STATE_TUNE);
maxChannels = std::min(maxChannels, maxChannelsProposed);
maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed);
- // Clip the requested heartbeat to the maximum/minimum offered
+ // Clip the requested heartbeat to the maximum/minimum offered
uint16_t heartbeat = ConnectionSettings::heartbeat;
heartbeat = heartbeat < heartbeatMin ? heartbeatMin :
heartbeat > heartbeatMax ? heartbeatMax :
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]