Author: gsim
Date: Wed Oct 29 22:24:41 2014
New Revision: 1635316

URL: http://svn.apache.org/r1635316
Log:
QPID-5538: simple heartbeat enablement over 1.0 for broker

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1635316&r1=1635315&r2=1635316&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Wed Oct 29 22:24:41 
2014
@@ -29,6 +29,8 @@
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/sys/OutputControl.h"
 #include "config.h"
 #include <sstream>
@@ -89,6 +91,28 @@ void Connection::trace(const char* messa
     QPID_LOG_CAT(trace, protocol, "[" << id << "]: " << message);
 }
 
+namespace {
+struct ConnectionTickerTask : public qpid::sys::TimerTask
+{
+    qpid::sys::Timer& timer;
+    Connection& connection;
+    ConnectionTickerTask(uint64_t interval, qpid::sys::Timer& t, Connection& 
c) :
+        TimerTask(qpid::sys::Duration(interval*qpid::sys::TIME_MSEC), 
"ConnectionTicker"),
+        timer(t),
+        connection(c)
+    {}
+
+    void fire() {
+        // Setup next firing
+        setupNextFire();
+        timer.add(this);
+
+        // Send Ticker
+        connection.requestIO();
+    }
+};
+}
+
 Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, 
BrokerContext& b, bool saslInUse, bool brokerInitiated)
     : BrokerContext(b), ManagedConnection(getBroker(), i, brokerInitiated),
       connection(pn_connection()),
@@ -122,9 +146,14 @@ Connection::Connection(qpid::sys::Output
     }
 }
 
+void Connection::requestIO()
+{
+    out.activateOutput();
+}
 
 Connection::~Connection()
 {
+    if (ticker) ticker->cancel();
     getBroker().getConnectionObservers().closed(*this);
     pn_transport_free(transport);
     pn_connection_free(connection);
@@ -161,7 +190,7 @@ size_t Connection::decode(const char* bu
             pn_condition_set_description(error, e.what());
             close();
         }
-        pn_transport_tick(transport, 0);
+        pn_transport_tick(transport, qpid::sys::Duration::FromEpoch() / 
qpid::sys::TIME_MSEC);
         if (!haveOutput) {
             haveOutput = true;
             out.activateOutput();
@@ -245,8 +274,7 @@ bool Connection::canEncode()
     } else {
         QPID_LOG(info, "Connection " << id << " has been closed locally");
     }
-    //TODO: proper handling of time in and out of tick
-    pn_transport_tick(transport, 0);
+    pn_transport_tick(transport, qpid::sys::Duration::FromEpoch() / 
qpid::sys::TIME_MSEC);
     QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput)
     return haveOutput;
 }
@@ -256,6 +284,12 @@ void Connection::open()
     readPeerProperties();
 
     pn_connection_set_container(connection, 
getBroker().getFederationTag().c_str());
+    uint32_t timeout = pn_transport_get_remote_idle_timeout(transport);
+    if (timeout) {
+        ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new 
ConnectionTickerTask(timeout, getBroker().getTimer(), *this));
+        pn_transport_set_idle_timeout(transport, timeout);
+    }
+
     pn_connection_open(connection);
     out.connectionEstablished();
     opened();
@@ -271,6 +305,7 @@ void Connection::readPeerProperties()
 
 void Connection::closed()
 {
+    if (ticker) ticker->cancel();
     for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
         i->second->close();
     }
@@ -372,7 +407,12 @@ void Connection::process()
         pn_connection_close(connection);
     }
 }
-
+namespace {
+std::string convert(pn_delivery_tag_t in)
+{
+    return std::string(in.bytes, in.size);
+}
+}
 void Connection::processDeliveries()
 {
     //handle deliveries
@@ -382,6 +422,7 @@ void Connection::processDeliveries()
             if (pn_link_is_receiver(link)) {
                 Sessions::iterator i = sessions.find(pn_link_session(link));
                 if (i != sessions.end()) {
+                    QPID_LOG(notice, "Processing delivery with tag " << 
convert(pn_delivery_tag(delivery)));
                     i->second->readable(link, delivery);
                 } else {
                     pn_delivery_update(delivery, PN_REJECTED);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h?rev=1635316&r1=1635315&r2=1635316&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h Wed Oct 29 22:24:41 
2014
@@ -25,6 +25,7 @@
 #include "qpid/broker/amqp/BrokerContext.h"
 #include "qpid/broker/amqp/ManagedConnection.h"
 #include <map>
+#include <boost/intrusive_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 
 struct pn_connection_t;
@@ -32,6 +33,9 @@ struct pn_session_t;
 struct pn_transport_t;
 
 namespace qpid {
+namespace sys {
+class TimerTask;
+}
 namespace broker {
 
 class Broker;
@@ -60,6 +64,7 @@ class Connection : public BrokerContext,
     void setUserId(const std::string&);
     void abort();
     void trace(const char*) const;
+    void requestIO();
   protected:
     typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions;
     pn_connection_t* connection;
@@ -70,6 +75,7 @@ class Connection : public BrokerContext,
     Sessions sessions;
     bool closeInitiated;
     bool closeRequested;
+    boost::intrusive_ptr<sys::TimerTask> ticker;
 
     virtual void process();
     void doOutput(size_t);



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to