Author: kgiusti
Date: Mon Feb  2 16:05:43 2015
New Revision: 1656505

URL: http://svn.apache.org/r1656505
Log:
QPID-5538: Implement AMQP 1.0 connection idle-timeout.

Original patch by Gordon Sim.

Added:
    qpid/trunk/qpid/cpp/src/tests/idle_timeout_tests.py   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/Transport.h
    qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1656505&r1=1656504&r2=1656505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Mon Feb  2 16:05:43 
2015
@@ -121,7 +121,7 @@ Connection::Connection(qpid::sys::Output
       connection(pn_connection()),
       transport(pn_transport()),
       collector(0),
-      out(o), id(i), haveOutput(true), closeInitiated(false), 
closeRequested(false)
+      out(o), id(i), haveOutput(true), closeInitiated(false), 
closeRequested(false), ioRequested(false)
 {
 #ifdef HAVE_PROTON_EVENTS
     collector = pn_collector();
@@ -157,6 +157,7 @@ Connection::Connection(qpid::sys::Output
 
 void Connection::requestIO()
 {
+    ioRequested = true;
     out.activateOutput();
 }
 
@@ -179,13 +180,24 @@ size_t Connection::decode(const char* bu
 {
     QPID_LOG(trace, id << " decode(" << size << ")");
     if (size == 0) return 0;
-    //TODO: Fix pn_engine_input() to take const buffer
+
     ssize_t n = pn_transport_input(transport, const_cast<char*>(buffer), size);
     if (n > 0 || n == PN_EOS) {
-        //If engine returns EOS, have no way of knowing how many bytes
-        //it processed, but can assume none need to be reprocessed so
-        //consider them all read:
-        if (n == PN_EOS) n = size;
+        // PN_EOS either means we received a Close (which also means we've
+        // consumed all the input), OR some Very Bad Thing happened and this
+        // connection is toast.
+        if (n == PN_EOS)
+        {
+            std::string error;
+            if (checkTransportError(error)) {
+                // "He's dead, Jim."
+                QPID_LOG_CAT(error, network, id << " connection failed: " << 
error);
+                out.abort();
+                return 0;
+            } else {
+                n = size;   // assume all consumed
+            }
+        }
         QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " 
<< size);
         try {
             process();
@@ -224,6 +236,15 @@ size_t Connection::encode(char* buffer,
         QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " 
<< size)
         haveOutput = true;
         return n;
+    } else if (n == PN_EOS) {
+        haveOutput = false;
+        // Normal close, or error?
+        std::string error;
+        if (checkTransportError(error)) {
+            QPID_LOG_CAT(error, network, id << " connection failed: " << 
error);
+            out.abort();
+        }
+        return 0;
     } else if (n == PN_ERR) {
         throw Exception(qpid::amqp::error_conditions::INTERNAL_ERROR, 
QPID_MSG("Error on output: " << getError()));
     } else {
@@ -291,6 +312,7 @@ bool Connection::canEncode()
     } else {
         QPID_LOG(info, "Connection " << id << " has been closed locally");
     }
+    if (ioRequested.valueCompareAndSwap(true, false)) haveOutput = true;
     pn_transport_tick(transport, qpid::sys::Duration::FromEpoch() / 
qpid::sys::TIME_MSEC);
     QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput)
     return haveOutput;
@@ -303,8 +325,22 @@ void Connection::open()
     pn_connection_set_container(connection, 
getBroker().getFederationTag().c_str());
     uint32_t timeout = pn_transport_get_remote_idle_timeout(transport);
     if (timeout) {
-        ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new 
ConnectionTickerTask(timeout, getBroker().getTimer(), *this));
-        pn_transport_set_idle_timeout(transport, timeout);
+        // if idle generate empty frames at 1/2 the timeout interval as 
keepalives:
+        ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new 
ConnectionTickerTask((timeout+1)/2,
+                                                                               
      getBroker().getTimer(),
+                                                                               
      *this));
+        getBroker().getTimer().add(ticker);
+
+        // Note: in version 0-10 of the protocol, idle timeout applies to both
+        // ends.  AMQP 1.0 changes that - it's now asymmetric: each end can
+        // configure/disable it independently.  For backward compatibility, by
+        // default mimic the old behavior and set our local timeout.
+        // Use 2x the remote's timeout, as per the spec the remote should
+        // advertise 1/2 its actual timeout threshold
+        pn_transport_set_idle_timeout(transport, timeout * 2);
+        QPID_LOG_CAT(debug, network, id << " AMQP 1.0 idle-timeout set:"
+                     << " local=" << pn_transport_get_idle_timeout(transport)
+                     << " remote=" << 
pn_transport_get_remote_idle_timeout(transport));
     }
 
     pn_connection_open(connection);
@@ -585,4 +621,22 @@ void Connection::doDeliveryUpdated(pn_de
     }
 }
 
+// check for failures of the transport:
+bool Connection::checkTransportError(std::string& text)
+{
+    std::stringstream info;
+
+#ifdef USE_PROTON_TRANSPORT_CONDITION
+    pn_condition_t* tcondition = pn_transport_condition(transport);
+    if (pn_condition_is_set(tcondition))
+        info << "transport error: " << pn_condition_get_name(tcondition) << ", 
" << pn_condition_get_description(tcondition);
+#else
+    pn_error_t* terror = pn_transport_error(transport);
+    if (terror) info << "transport error " << pn_error_text(terror) << " [" << 
terror << "]";
+#endif
+
+    text = info.str();
+    return !text.empty();
+}
+
 }}} // namespace qpid::broker::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h?rev=1656505&r1=1656504&r2=1656505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Connection.h Mon Feb  2 16:05:43 
2015
@@ -24,6 +24,7 @@
 #include "qpid/sys/ConnectionCodec.h"
 #include "qpid/broker/amqp/BrokerContext.h"
 #include "qpid/broker/amqp/ManagedConnection.h"
+#include "qpid/sys/AtomicValue.h"
 #include <map>
 #include <boost/intrusive_ptr.hpp>
 #include <boost/shared_ptr.hpp>
@@ -80,6 +81,7 @@ class Connection : public BrokerContext,
     bool closeInitiated;
     bool closeRequested;
     boost::intrusive_ptr<sys::TimerTask> ticker;
+    qpid::sys::AtomicValue<bool> ioRequested;
 
     virtual void process();
     void doOutput(size_t);
@@ -92,6 +94,8 @@ class Connection : public BrokerContext,
     void closedByManagement();
 
  private:
+    bool checkTransportError(std::string&);
+
     // handle Proton engine events
     void doConnectionRemoteOpen();
     void doConnectionRemoteClose();
@@ -100,7 +104,6 @@ class Connection : public BrokerContext,
     void doLinkRemoteOpen(pn_link_t *link);
     void doLinkRemoteClose(pn_link_t *link);
     void doDeliveryUpdated(pn_delivery_t *delivery);
-
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1656505&r1=1656504&r2=1656505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Mon Feb  
2 16:05:43 2015
@@ -39,6 +39,7 @@
 #include "qpid/sys/SecurityLayer.h"
 #include "qpid/sys/SystemInfo.h"
 #include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/sys/urlAdd.h"
 #include "config.h"
 #include <boost/lexical_cast.hpp>
@@ -95,6 +96,27 @@ std::string get_error(pn_connection_t* c
 }
 #endif
 
+class ConnectionTickerTask : public qpid::sys::TimerTask
+{
+    qpid::sys::Timer& timer;
+    ConnectionContext& connection;
+  public:
+    ConnectionTickerTask(const qpid::sys::Duration& interval, 
qpid::sys::Timer& t, ConnectionContext& c) :
+        TimerTask(interval, "ConnectionTicker"),
+        timer(t),
+        connection(c)
+    {}
+
+    void fire() {
+        QPID_LOG(debug, "ConnectionTickerTask fired");
+        // Setup next firing
+        setupNextFire();
+        timer.add(this);
+
+        // Send Ticker
+        connection.activateOutput();
+    }
+};
 }
 
 void ConnectionContext::trace(const char* message) const
@@ -118,23 +140,15 @@ ConnectionContext::ConnectionContext(con
     // Concatenate all known URLs into a single URL, get rid of duplicate 
addresses.
     sys::urlAddStrings(fullUrl, urls.begin(), urls.end(), protocol.empty() ?
                        qpid::Address::TCP : protocol);
-    if (pn_transport_bind(engine, connection)) {
-        //error
-    }
     if (identifier.empty()) {
         identifier = qpid::types::Uuid(true).str();
     }
-    pn_connection_set_container(connection, identifier.c_str());
-    bool enableTrace(false);
-    QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
-    if (enableTrace) {
-        pn_transport_trace(engine, PN_TRACE_FRM);
-        set_tracer(engine, this);
-    }
+    configureConnection();
 }
 
 ConnectionContext::~ConnectionContext()
 {
+    if (ticker) ticker->cancel();
     close();
     sessions.clear();
     pn_transport_free(engine);
@@ -218,6 +232,10 @@ void ConnectionContext::close()
             lock.wait();
         }
     }
+    if (ticker) {
+        ticker->cancel();
+        ticker.reset();
+    }
 }
 
 bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, 
boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, 
qpid::messaging::Duration timeout)
@@ -498,7 +516,7 @@ uint32_t ConnectionContext::getUnsettled
 void ConnectionContext::activateOutput()
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
-    wakeupDriver();
+    if (state == CONNECTED) wakeupDriver();
 }
 /**
  * Expects lock to be held by caller
@@ -530,14 +548,11 @@ void ConnectionContext::reset()
 
     engine = pn_transport();
     connection = pn_connection();
-    pn_connection_set_container(connection, identifier.c_str());
-    bool enableTrace(false);
-    QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
-    if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM);
+    configureConnection();
+
     for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
         i->second->reset(connection);
     }
-    pn_transport_bind(engine, connection);
 }
 
 void ConnectionContext::check() {
@@ -758,12 +773,23 @@ std::size_t ConnectionContext::decodePla
     //TODO: Fix pn_engine_input() to take const buffer
     ssize_t n = pn_transport_input(engine, const_cast<char*>(buffer), size);
     if (n > 0 || n == PN_EOS) {
-        //If engine returns EOS, have no way of knowing how many bytes
-        //it processed, but can assume none need to be reprocessed so
-        //consider them all read:
-        if (n == PN_EOS) n = size;
+        // PN_EOS either means we received a Close (which also means we've
+        // consumed all the input), OR some Very Bad Thing happened and this
+        // connection is toast.
+        if (n == PN_EOS)
+        {
+            std::string error;
+            if (checkTransportError(error)) {
+                // "He's dead, Jim."
+                QPID_LOG_CAT(error, network, id << " connection failed: " << 
error);
+                transport->abort();
+                return 0;
+            } else {
+                n = size;   // assume all consumed
+            }
+        }
         QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " 
<< size)
-        pn_transport_tick(engine, 0);
+        pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / 
qpid::sys::TIME_MSEC);
         lock.notifyAll();
         return n;
     } else if (n == PN_ERR) {
@@ -795,7 +821,13 @@ std::size_t ConnectionContext::encodePla
         throw MessagingException(QPID_MSG("Error on output: " << getError()));
     } else if (n == PN_EOS) {
         haveOutput = false;
-        return 0;//Is this right?
+        // Normal close, or error?
+        std::string error;
+        if (checkTransportError(error)) {
+            QPID_LOG_CAT(error, network, id << " connection failed: " << 
error);
+            transport->abort();
+        }
+        return 0;
     } else {
         haveOutput = false;
         return 0;
@@ -804,6 +836,7 @@ std::size_t ConnectionContext::encodePla
 bool ConnectionContext::canEncodePlain()
 {
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / 
qpid::sys::TIME_MSEC);
     return haveOutput && state == CONNECTED;
 }
 void ConnectionContext::closed()
@@ -1061,7 +1094,6 @@ bool ConnectionContext::tryOpenAddr(cons
     }
 
     QPID_LOG(debug, id << " Opening...");
-    setProperties();
     pn_connection_open(connection);
     wakeupDriver(); //want to write
     while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) &&
@@ -1071,6 +1103,25 @@ bool ConnectionContext::tryOpenAddr(cons
     if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
         throw qpid::messaging::ConnectionError("Failed to open connection");
     }
+
+    // Connection open - check for idle timeout from the remote and start a
+    // periodic tick to monitor for idle connections
+    pn_timestamp_t remote = pn_transport_get_remote_idle_timeout(engine);
+    pn_timestamp_t local = pn_transport_get_idle_timeout(engine);
+    uint64_t shortest = ((remote && local)
+                         ? std::min(remote, local)
+                         : (remote) ? remote : local);
+    if (shortest) {
+        // send an idle frame at least twice before timeout
+        shortest = (shortest + 1)/2;
+        qpid::sys::Duration d(shortest * qpid::sys::TIME_MSEC);
+        ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new 
ConnectionTickerTask(d, driver->getTimer(), *this));
+        driver->getTimer().add(ticker);
+        QPID_LOG(debug, id << " AMQP 1.0 idle-timeout set:"
+                 << " local=" << pn_transport_get_idle_timeout(engine)
+                 << " remote=" << 
pn_transport_get_remote_idle_timeout(engine));
+    }
+
     QPID_LOG(debug, id << " Opened");
 
     return restartSessions();
@@ -1151,4 +1202,44 @@ bool ConnectionContext::CodecAdapter::ca
 }
 
 
+// setup the transport and connection objects:
+void ConnectionContext::configureConnection()
+{
+    pn_connection_set_container(connection, identifier.c_str());
+    setProperties();
+    if (heartbeat) {
+        // fail an idle connection at 2 x heartbeat (in msecs)
+        pn_transport_set_idle_timeout(engine, heartbeat*2*1000);
+    }
+
+    bool enableTrace(false);
+    QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
+    if (enableTrace) {
+        pn_transport_trace(engine, PN_TRACE_FRM);
+        set_tracer(engine, this);
+    }
+
+    int err = pn_transport_bind(engine, connection);
+    if (err)
+        QPID_LOG(error, id << " Error binding connection and transport: " << 
err);
+}
+
+
+// check for failures of the transport:
+bool ConnectionContext::checkTransportError(std::string& text)
+{
+    std::stringstream info;
+
+#ifdef USE_PROTON_TRANSPORT_CONDITION
+    pn_condition_t* tcondition = pn_transport_condition(engine);
+    if (pn_condition_is_set(tcondition))
+        info << "transport error: " << pn_condition_get_name(tcondition) << ", 
" << pn_condition_get_description(tcondition);
+#else
+    pn_error_t* terror = pn_transport_error(engine);
+    if (terror) info << "transport error " << pn_error_text(terror) << " [" << 
terror << "]";
+#endif
+
+    text = info.str();
+    return !text.empty();
+}
 }}} // namespace qpid::messaging::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1656505&r1=1656504&r2=1656505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Mon Feb  2 
16:05:43 2015
@@ -25,6 +25,7 @@
 #include <map>
 #include <memory>
 #include <string>
+#include <boost/intrusive_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 #include "qpid/Url.h"
 #include "qpid/messaging/ConnectionOptions.h"
@@ -47,6 +48,7 @@ class ProtocolVersion;
 namespace sys {
 class SecurityLayer;
 struct SecuritySettings;
+class TimerTask;
 }
 namespace messaging {
 class Duration;
@@ -120,7 +122,7 @@ class ConnectionContext : public qpid::s
     void initSecurityLayer(qpid::sys::SecurityLayer&);
     void trace(const char*) const;
 
-  private:
+ private:
     typedef std::map<std::string, boost::shared_ptr<SessionContext> > 
SessionMap;
     class CodecAdapter : public qpid::sys::Codec
     {
@@ -155,6 +157,7 @@ class ConnectionContext : public qpid::s
     std::auto_ptr<Sasl> sasl;
     CodecAdapter codecAdapter;
     bool notifyOnWrite;
+    boost::intrusive_ptr<qpid::sys::TimerTask> ticker;
 
     void check();
     bool checkDisconnected();
@@ -191,6 +194,8 @@ class ConnectionContext : public qpid::s
     std::string getError();
     bool useSasl();
     void setProperties();
+    void configureConnection();
+    bool checkTransportError(std::string&);
 };
 
 }}} // namespace qpid::messaging::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp?rev=1656505&r1=1656504&r2=1656505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp Mon Feb  2 
16:05:43 2015
@@ -22,13 +22,14 @@
 #include "Transport.h"
 #include "qpid/messaging/exceptions.h"
 #include "qpid/sys/Poller.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace messaging {
 namespace amqp {
 
-DriverImpl::DriverImpl() : poller(new qpid::sys::Poller)
+DriverImpl::DriverImpl() : poller(new qpid::sys::Poller), timer(new 
qpid::sys::Timer)
 {
     start();
 }
@@ -48,6 +49,7 @@ void DriverImpl::stop()
     QPID_LOG(debug, "Driver stopped");
     poller->shutdown();
     thread.join();
+    timer->stop();
 }
 
 boost::shared_ptr<Transport> DriverImpl::getTransport(const std::string& 
protocol, TransportContext& connection)

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h?rev=1656505&r1=1656504&r2=1656505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h Mon Feb  2 
16:05:43 2015
@@ -29,6 +29,7 @@
 namespace qpid {
 namespace sys {
 class Poller;
+class Timer;
 }
 namespace messaging {
 namespace amqp {
@@ -47,11 +48,14 @@ class DriverImpl
     void stop();
 
     boost::shared_ptr<Transport> getTransport(const std::string& protocol, 
TransportContext& connection);
+    sys::Timer& getTimer() { return *timer; }
 
     static boost::shared_ptr<DriverImpl> getDefault();
   private:
     boost::shared_ptr<qpid::sys::Poller> poller;
     qpid::sys::Thread thread;
+    std::auto_ptr<sys::Timer> timer;
+
     static qpid::sys::Mutex defaultLock;
     static boost::weak_ptr<DriverImpl> theDefault;
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/Transport.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/Transport.h?rev=1656505&r1=1656504&r2=1656505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/Transport.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/Transport.h Mon Feb  2 16:05:43 
2015
@@ -40,6 +40,7 @@ class Transport : public qpid::sys::Outp
     virtual ~Transport() {}
     virtual void connect(const std::string& host, const std::string& port) = 0;
     virtual void close() = 0;
+    virtual void abort() = 0;
     virtual const qpid::sys::SecuritySettings* getSecuritySettings() = 0;
 
     typedef Transport* Factory(TransportContext&, 
boost::shared_ptr<qpid::sys::Poller>);

Modified: qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt?rev=1656505&r1=1656504&r2=1656505&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt Mon Feb  2 16:05:43 2015
@@ -364,6 +364,7 @@ add_test (ha_tests ${python_wrap} -- ${C
 add_test (qpidd_qmfv2_tests ${python_wrap} -- 
${CMAKE_CURRENT_SOURCE_DIR}/qpidd_qmfv2_tests.py)
 if (BUILD_AMQP)
   add_test (interlink_tests ${python_wrap} -- 
${CMAKE_CURRENT_SOURCE_DIR}/interlink_tests.py)
+  add_test (idle_timeout_tests ${python_wrap} -- 
${CMAKE_CURRENT_SOURCE_DIR}/idle_timeout_tests.py)
 endif (BUILD_AMQP)
 add_test (swig_python_tests ${test_wrap} -- 
${CMAKE_CURRENT_SOURCE_DIR}/swig_python_tests${test_script_suffix})
 add_test (ipv6_test ${test_wrap} -- 
${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix})

Added: qpid/trunk/qpid/cpp/src/tests/idle_timeout_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/idle_timeout_tests.py?rev=1656505&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/idle_timeout_tests.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/idle_timeout_tests.py Mon Feb  2 16:05:43 2015
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import shutil
+import signal
+import sys
+
+from brokertest import *
+
+class AmqpIdleTimeoutTest(BrokerTest):
+    """
+    Test AMQP 1.0 idle-timeout support
+    """
+    def setUp(self):
+        BrokerTest.setUp(self)
+        if not BrokerTest.amqp_lib:
+            raise Skipped("AMQP 1.0 library not found")
+        if qm != qpid_messaging:
+            raise Skipped("AMQP 1.0 client not found")
+        self._broker = self.broker()
+
+    def test_client_timeout(self):
+        """Ensure that the client disconnects should the broker stop
+        responding.
+        """
+        conn = self._broker.connect(native=False, timeout=None,
+                                    protocol="amqp1.0", heartbeat=1)
+        self.assertTrue(conn.isOpen())
+        # should disconnect within 2 seconds of broker stop
+        deadline = time.time() + 8
+        os.kill(self._broker.pid, signal.SIGSTOP)
+        while time.time() < deadline:
+            if not conn.isOpen():
+                break;
+        self.assertTrue(not conn.isOpen())
+        os.kill(self._broker.pid, signal.SIGCONT)
+
+
+    def test_broker_timeout(self):
+        """By default, the broker will adopt the same timeout as the client
+        (mimics the 0-10 timeout behavior).  Verify the broker disconnects
+        unresponsive clients.
+        """
+
+        count = len(self._broker.agent.getAllConnections())
+
+        # Create a new connection to the broker:
+        receiver_cmd = ["qpid-receive",
+                        "--broker", self._broker.host_port(),
+                        "--address=amq.fanout",
+                        "--connection-options={protocol:amqp1.0, heartbeat:1}",
+                        "--forever"]
+        receiver = self.popen(receiver_cmd, stdout=PIPE, stderr=PIPE,
+                              expect=EXPECT_UNKNOWN)
+        start = time.time()
+        deadline = time.time() + 10
+        while time.time() < deadline:
+            if count < len(self._broker.agent.getAllConnections()):
+                break;
+        self.assertTrue(count < len(self._broker.agent.getAllConnections()))
+
+        # now 'hang' the client, the broker should disconnect
+        start = time.time()
+        receiver.send_signal(signal.SIGSTOP)
+        deadline = time.time() + 10
+        while time.time() < deadline:
+            if count == len(self._broker.agent.getAllConnections()):
+                break;
+        self.assertEqual(count, len(self._broker.agent.getAllConnections()))
+        receiver.send_signal(signal.SIGCONT)
+        receiver.teardown()
+
+
+if __name__ == "__main__":
+    shutil.rmtree("brokertest.tmp", True)
+    os.execvp("qpid-python-test",
+              ["qpid-python-test", "-m", "idle_timeout_tests"] + sys.argv[1:])

Propchange: qpid/trunk/qpid/cpp/src/tests/idle_timeout_tests.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/tests/idle_timeout_tests.py
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to