Author: astitcher
Date: Mon Jun  8 14:35:01 2009
New Revision: 782651

URL: http://svn.apache.org/viewvc?rev=782651&view=rev
Log:
- Added heartbeat generation to the client (actually echo back any
  broker generated heartbeat)
- Broker now disconnects client if it receives no traffic in
  2 heartbeat intervals (which is now the same as the client behvaiour)

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=782651&r1=782650&r2=782651&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Mon Jun  8 14:35:01 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -68,8 +68,8 @@
     if (parent != 0)
     {
         agent = broker_.getManagementAgent();
-               
-               
+
+
         // TODO set last bool true if system connection
         if (agent != 0) {
             mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, 
!isLink, false);
@@ -95,9 +95,11 @@
     }
     if (isLink)
         links.notifyClosed(mgmtId);
-        
+
     if (heartbeatTimer)
         heartbeatTimer->cancel();
+    if (timeoutTimer)
+        timeoutTimer->cancel();
 }
 
 void Connection::received(framing::AMQFrame& frame) {
@@ -181,7 +183,9 @@
 {
     QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << 
mgmtId << " closed by error: " << text << "(" << code << ")");
     if (heartbeatTimer)
-       heartbeatTimer->cancel();
+        heartbeatTimer->cancel();
+    if (timeoutTimer)
+        timeoutTimer->cancel();
     adapter.close(code, text);
     //make sure we delete dangling pointers from outputTasks before deleting 
sessions
     outputTasks.removeAll();
@@ -192,7 +196,9 @@
 // Send a close to the client but keep the channels. Used by cluster.
 void Connection::sendClose() {
     if (heartbeatTimer)
-       heartbeatTimer->cancel();
+        heartbeatTimer->cancel();
+    if (timeoutTimer)
+        timeoutTimer->cancel();
     adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
     getOutput().close();
 }
@@ -203,7 +209,7 @@
 
 void Connection::closed(){ // Physically closed, suspend open sessions.
     try {
-        while (!channels.empty()) 
+        while (!channels.empty())
             ptr_map_ptr(channels.begin())->handleDetach();
         while (!exclusiveQueues.empty()) {
             Queue::shared_ptr q(exclusiveQueues.front());
@@ -221,7 +227,7 @@
 
 bool Connection::hasOutput() { return outputTasks.hasOutput(); }
 
-bool Connection::doOutput() {    
+bool Connection::doOutput() {
     try{
         {
         ScopedLock<Mutex> l(ioCallbackLock);
@@ -292,33 +298,65 @@
 struct ConnectionHeartbeatTask : public TimerTask {
     Timer& timer;
     Connection& connection;
-    ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) :  
+    ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) :
         TimerTask(Duration(hb*TIME_SEC)),
         timer(t),
         connection(c)
     {}
-    
+
     void fire() {
         // This is the best we can currently do to avoid a destruction/fire 
race
         if (!isCancelled()) {
             // Setup next firing
             reset();
             timer.add(this);
-            
+
             // Send Heartbeat
             connection.sendHeartbeat();
         }
     }
 };
 
+struct ConnectionTimeoutTask : public TimerTask {
+    Timer& timer;
+    Connection& connection;
+    ConnectionTimeoutTask(uint16_t hb, Timer& t, Connection& c) :
+        TimerTask(Duration(hb*2*TIME_SEC)),
+        timer(t),
+        connection(c)
+    {}
+
+    void fire() {
+        // This is the best we can currently do to avoid a destruction/fire 
race
+        if (!isCancelled()) {
+            // If we get here then we've not received any traffic in the 
timeout period
+            // Schedule closing the connection for the io thread
+            QPID_LOG(error, "Connection timed out: closing");
+            connection.abort();
+        }
+    }
+};
+
+void Connection::abort()
+{
+    out.abort();
+}
+
 void Connection::setHeartbeatInterval(uint16_t heartbeat)
 {
     setHeartbeat(heartbeat);
     if (heartbeat > 0) {
-       heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
-       timer.add(heartbeatTimer);
+        heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
+        timer.add(heartbeatTimer);
+        timeoutTimer = new ConnectionTimeoutTask(heartbeat, timer, *this);
+        timer.add(timeoutTimer);
     }
 }
 
-}}
+void Connection::restartTimeout()
+{
+    if (timeoutTimer)
+        timeoutTimer->reset();
+}
 
+}}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=782651&r1=782650&r2=782651&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Mon Jun  8 14:35:01 2009
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -61,7 +61,7 @@
 class LinkRegistry;
 class SecureConnection;
 
-class Connection : public sys::ConnectionInputHandler, 
+class Connection : public sys::ConnectionInputHandler,
                    public ConnectionState,
                    public RefCounted
 {
@@ -115,9 +115,11 @@
     /** Connection does not delete the listener. 0 resets. */
     void setErrorListener(ErrorListener* l) { errorListener=l; }
     ErrorListener* getErrorListener() { return errorListener; }
-    
+
     void setHeartbeatInterval(uint16_t heartbeat);
     void sendHeartbeat();
+    void restartTimeout();
+    void abort();
 
     template <class F> void eachSessionHandler(F f) {
         for (ChannelMap::iterator i = channels.begin(); i != channels.end(); 
++i)
@@ -143,6 +145,7 @@
     management::ManagementAgent* agent;
     Timer& timer;
     boost::intrusive_ptr<TimerTask> heartbeatTimer;
+    boost::intrusive_ptr<TimerTask> timeoutTimer;
     ErrorListener* errorListener;
 
   public:

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=782651&r1=782650&r2=782651&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Mon Jun  8 
14:35:01 2009
@@ -8,9 +8,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -63,6 +63,9 @@
 
 void ConnectionHandler::handle(framing::AMQFrame& frame)
 {
+    // Received frame on connection so delay timeout
+    handler->connection.restartTimeout();
+
     AMQMethodBody* method=frame.getBody()->getMethod();
     Connection::ErrorListener* errorListener = 
handler->connection.getErrorListener();
     try{
@@ -186,7 +189,7 @@
 {
     std::vector<Url> urls = connection.broker.getKnownBrokers();
     framing::Array array(0x95); // str16 array
-    for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i) 
+    for (std::vector<Url>::iterator i = urls.begin(); i < urls.end(); ++i)
         array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
     proxy.openOk(array);
 
@@ -197,7 +200,7 @@
     }
 }
 
-        
+
 void ConnectionHandler::Handler::close(uint16_t replyCode, const string& 
replyText)
 {
     if (replyCode != 200) {
@@ -209,11 +212,11 @@
 
     proxy.closeOk();
     connection.getOutput().close();
-} 
-        
+}
+
 void ConnectionHandler::Handler::closeOk(){
     connection.getOutput().close();
-} 
+}
 
 void ConnectionHandler::Handler::heartbeat(){
        // Do nothing - the purpose of heartbeats is just to make sure that 
there is some

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=782651&r1=782650&r2=782651&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Mon Jun  8 
14:35:01 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -70,8 +70,8 @@
     }
 }
 
-ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, 
ProtocolVersion& v) 
-    : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), 
proxy(outHandler), 
+ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, 
ProtocolVersion& v)
+    : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), 
proxy(outHandler),
       errorCode(CLOSE_CODE_NORMAL), version(v)
 {
     insist = true;
@@ -82,7 +82,7 @@
 
     FINISHED.insert(FAILED);
     FINISHED.insert(CLOSED);
-    
+
     properties.setInt(SESSION_FLOW_CONTROL, SESSION_FLOW_CONTROL_VER);
     properties.setString(CLIENT_PROCESS_NAME, 
sys::SystemInfo::getProcessName());
     properties.setInt(CLIENT_PID, sys::SystemInfo::getProcessId());
@@ -125,7 +125,7 @@
 
 void ConnectionHandler::outgoing(AMQFrame& frame)
 {
-    if (getState() == OPEN) 
+    if (getState() == OPEN)
         out(frame);
     else
         throw TransportFailure(errorText.empty() ? "Connection is not open." : 
errorText);
@@ -160,6 +160,10 @@
     // Do nothing - the purpose of heartbeats is just to make sure that there 
is some
     // traffic on the connection within the heart beat interval, we check for 
the
     // traffic and don't need to do anything in response to heartbeats
+
+    // Although the above is still true we're now using a received heartbeat 
as a trigger
+    // to send out our own heartbeat
+    proxy.heartbeat();
 }
 
 void ConnectionHandler::checkState(STATES s, const std::string& msg)
@@ -223,13 +227,13 @@
     }
 }
 
-void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t 
maxFrameSizeProposed, 
+void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t 
maxFrameSizeProposed,
                              uint16_t heartbeatMin, uint16_t heartbeatMax)
 {
     checkState(NEGOTIATING, INVALID_STATE_TUNE);
     maxChannels = std::min(maxChannels, maxChannelsProposed);
     maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed);
-    // Clip the requested heartbeat to the maximum/minimum offered 
+    // Clip the requested heartbeat to the maximum/minimum offered
     uint16_t heartbeat = ConnectionSettings::heartbeat;
     heartbeat = heartbeat < heartbeatMin ? heartbeatMin :
                 heartbeat > heartbeatMax ? heartbeatMax :



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to