Author: astitcher
Date: Mon May 21 23:18:50 2012
New Revision: 1341262

URL: http://svn.apache.org/viewvc?rev=1341262&view=rev
Log:
QPID-2518: Qpid C++ broker can easily be blocked by client trying to connect 
over SSL port
Implement timed disconnect for TCP and for SSL/TCP mux

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
    qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1341262&r1=1341261&r2=1341262&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon May 21 23:18:50 2012
@@ -128,8 +128,9 @@ Broker::Options::Options(const std::stri
     queueFlowResumeRatio(70),
     queueThresholdEventRatio(80),
     defaultMsgGroup("qpid.no-group"),
-    timestampRcvMsgs(false),     // set the 0.10 timestamp delivery property
-    linkMaintenanceInterval(2)
+    timestampRcvMsgs(false),    // set the 0.10 timestamp delivery property
+    linkMaintenanceInterval(2),
+    maxNegotiateTime(2000)      // 2s
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -171,6 +172,7 @@ Broker::Options::Options(const std::stri
         ("default-message-group", optValue(defaultMsgGroup, 
"GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a 
message group queue that do not contain an identifier.")
         ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add 
current time to each received message.")
         ("link-maintenace-interval", optValue(linkMaintenanceInterval, 
"SECONDS"))
+        ("max-negotiate-time", optValue(maxNegotiateTime, "MilliSeconds"), 
"Maximum time a connection can take to send the initial protocol negotiation")
         ;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1341262&r1=1341261&r2=1341262&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon May 21 23:18:50 2012
@@ -126,6 +126,7 @@ class Broker : public sys::Runnable, pub
         std::string defaultMsgGroup;
         bool timestampRcvMsgs;
         double linkMaintenanceInterval; // FIXME aconway 2012-02-13: 
consistent parsing of SECONDS values.
+        uint32_t maxNegotiateTime;  // Max time in ms for connection with no 
negotiation
 
       private:
         std::string getHome();

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1341262&r1=1341261&r2=1341262&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Mon May 21 23:18:50 
2012
@@ -23,6 +23,7 @@
 #include "qpid/sys/AsynchIO.h"
 #include "qpid/sys/Socket.h"
 #include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/framing/AMQP_HighestVersion.h"
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/log/Statement.h"
@@ -41,7 +42,25 @@ struct Buff : public AsynchIO::BufferBas
     { delete [] bytes;}
 };
 
-AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+struct ProtocolTimeoutTask : public sys::TimerTask {
+    AsynchIOHandler& handler;
+    std::string id;
+
+    ProtocolTimeoutTask(const std::string& i, const Duration& timeout, 
AsynchIOHandler& h) :
+        TimerTask(timeout, "ProtocolTimeout"),
+        handler(h),
+        id(i)
+    {}
+
+    void fire() {
+        // If this fires it means that we didn't negotiate the connection in 
the timeout period
+        // Schedule closing the connection for the io thread
+        QPID_LOG(error, "Connection " << id << " No protocol received 
closing");
+        handler.abort();
+    }
+};
+
+AsynchIOHandler::AsynchIOHandler(const std::string& id, 
ConnectionCodec::Factory* f) :
     identifier(id),
     aio(0),
     factory(f),
@@ -57,9 +76,13 @@ AsynchIOHandler::~AsynchIOHandler() {
     delete codec;
 }
 
-void AsynchIOHandler::init(AsynchIO* a, int numBuffs) {
+void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, 
uint32_t maxTime, int numBuffs) {
     aio = a;
 
+    // Start timer for this connection
+    timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, 
*this);
+    timer.add(timeoutTimerTask);
+
     // Give connection some buffers to use
     for (int i = 0; i < numBuffs; i++) {
         aio->queueReadBuffer(new Buff);
@@ -143,6 +166,9 @@ void AsynchIOHandler::readbuff(AsynchIO&
         framing::ProtocolInitiation protocolInit;
         if (protocolInit.decode(in)) {
             decoded = in.getPosition();
+            // We've just got the protocol negotiation so we can cancel the 
timeout for that
+            timeoutTimerTask->cancel();
+
             QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << 
protocolInit << ")");
             try {
                 codec = factory->create(protocolInit.getVersion(), *this, 
identifier, SecuritySettings());
@@ -202,6 +228,10 @@ void AsynchIOHandler::idle(AsynchIO&){
     if (isClient && codec == 0) {
         codec = factory->create(*this, identifier, SecuritySettings());
         write(framing::ProtocolInitiation(codec->getVersion()));
+        // We've just sent the protocol negotiation so we can cancel the 
timeout for that
+        // This is not ideal, because we've not received anything yet, but 
heartbeats will
+        // be active soon
+        timeoutTimerTask->cancel();
         return;
     }
     if (codec == 0) return;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1341262&r1=1341261&r2=1341262&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h Mon May 21 23:18:50 2012
@@ -27,6 +27,8 @@
 #include "qpid/sys/Mutex.h"
 #include "qpid/CommonImportExport.h"
 
+#include <boost/intrusive_ptr.hpp>
+
 namespace qpid {
 
 namespace framing {
@@ -38,6 +40,8 @@ namespace sys {
 class AsynchIO;
 struct AsynchIOBufferBase;
 class Socket;
+class Timer;
+class TimerTask;
 
 class AsynchIOHandler : public OutputControl {
     std::string identifier;
@@ -49,13 +53,14 @@ class AsynchIOHandler : public OutputCon
     AtomicValue<int32_t> readCredit;
     static const int32_t InfiniteCredit = -1;
     Mutex creditLock;
+    boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
 
     void write(const framing::ProtocolInitiation&);
 
   public:
-    QPID_COMMON_EXTERN AsynchIOHandler(std::string id, 
ConnectionCodec::Factory* f);
+    QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, 
qpid::sys::ConnectionCodec::Factory* f );
     QPID_COMMON_EXTERN ~AsynchIOHandler();
-    QPID_COMMON_EXTERN void init(AsynchIO* a, int numBuffs);
+    QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime, 
int numBuffs);
 
     QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=1341262&r1=1341261&r2=1341262&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp Mon May 21 23:18:50 2012
@@ -39,6 +39,8 @@
 namespace qpid {
 namespace sys {
 
+class Timer;
+
 using namespace qpid::sys::ssl;
 
 struct SslServerOptions : ssl::SslOptions
@@ -68,6 +70,8 @@ class SslProtocolFactoryTmpl : public Pr
 
     typedef SslAcceptorTmpl<T> SslAcceptor;
 
+    Timer& brokerTimer;
+    uint32_t maxNegotiateTime;
     const bool tcpNoDelay;
     T listener;
     const uint16_t listeningPort;
@@ -75,7 +79,7 @@ class SslProtocolFactoryTmpl : public Pr
     bool nodict;
 
   public:
-    SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay);
+    SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay, 
Timer& timer, uint32_t maxTime);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
     void connect(Poller::shared_ptr, const std::string& host, const 
std::string& port,
                  ConnectionCodec::Factory*,
@@ -132,16 +136,18 @@ static struct SslPlugin : public Plugin 
                 try {
                     ssl::initNSS(options, true);
                     nssInitialized = true;
-                    
+
                     const broker::Broker::Options& opts = broker->getOptions();
 
                     ProtocolFactory::shared_ptr protocol(options.multiplex ?
                         static_cast<ProtocolFactory*>(new 
SslMuxProtocolFactory(options,
                                                   opts.connectionBacklog,
-                                                  opts.tcpNoDelay)) :
+                                                  opts.tcpNoDelay,
+                                                  broker->getTimer(), 
opts.maxNegotiateTime)) :
                         static_cast<ProtocolFactory*>(new 
SslProtocolFactory(options,
                                                opts.connectionBacklog,
-                                               opts.tcpNoDelay)));
+                                               opts.tcpNoDelay,
+                                               broker->getTimer(), 
opts.maxNegotiateTime)));
                     QPID_LOG(notice, "Listening for " <<
                                      (options.multiplex ? "SSL or TCP" : 
"SSL") <<
                                      " connections on TCP port " <<
@@ -156,7 +162,9 @@ static struct SslPlugin : public Plugin 
 } sslPlugin;
 
 template <class T>
-SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& 
options, int backlog, bool nodelay) :
+SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& 
options, int backlog, bool nodelay, Timer& timer, uint32_t maxTime) :
+    brokerTimer(timer),
+    maxNegotiateTime(maxTime),
     tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, 
options.certName, options.clientAuth)),
     nodict(options.nodict)
 {}
@@ -239,7 +247,7 @@ void SslMuxProtocolFactory::established(
        boost::bind(&AsynchIOHandler::nobuffs, async, _1),
        boost::bind(&AsynchIOHandler::idle, async, _1));
 
-    async->init(aio, 4);
+    async->init(aio, brokerTimer, maxNegotiateTime, 4);
     aio->start(poller);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1341262&r1=1341261&r2=1341262&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Mon May 21 23:18:50 2012
@@ -36,14 +36,21 @@
 namespace qpid {
 namespace sys {
 
+class Timer;
+
 class AsynchIOProtocolFactory : public ProtocolFactory {
-    const bool tcpNoDelay;
     boost::ptr_vector<Socket> listeners;
     boost::ptr_vector<AsynchAcceptor> acceptors;
+    Timer& brokerTimer;
+    uint32_t maxNegotiateTime;
     uint16_t listeningPort;
+    const bool tcpNoDelay;
 
   public:
-    AsynchIOProtocolFactory(const std::string& host, const std::string& port, 
int backlog, bool nodelay, bool shouldListen);
+    AsynchIOProtocolFactory(const std::string& host, const std::string& port,
+                            int backlog, bool nodelay,
+                            Timer& timer, uint32_t maxTime,
+                            bool shouldListen);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
     void connect(Poller::shared_ptr, const std::string& host, const 
std::string& port,
                  ConnectionCodec::Factory*,
@@ -90,6 +97,7 @@ static class TCPIOPlugin : public Plugin
                     "", boost::lexical_cast<std::string>(opts.port),
                     opts.connectionBacklog,
                     opts.tcpNoDelay,
+                    broker->getTimer(), opts.maxNegotiateTime,
                     shouldListen));
 
             if (shouldListen) {
@@ -101,7 +109,12 @@ static class TCPIOPlugin : public Plugin
     }
 } tcpPlugin;
 
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, 
const std::string& port, int backlog, bool nodelay, bool shouldListen) :
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, 
const std::string& port,
+                                                 int backlog, bool nodelay,
+                                                 Timer& timer, uint32_t 
maxTime,
+                                                 bool shouldListen) :
+    brokerTimer(timer),
+    maxNegotiateTime(maxTime),
     tcpNoDelay(nodelay)
 {
     if (!shouldListen) {
@@ -153,7 +166,7 @@ void AsynchIOProtocolFactory::establishe
        boost::bind(&AsynchIOHandler::nobuffs, async, _1),
        boost::bind(&AsynchIOHandler::idle, async, _1));
 
-    async->init(aio, 4);
+    async->init(aio, brokerTimer, maxNegotiateTime, 4);
     aio->start(poller);
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to