Author: astitcher
Date: Mon May 21 23:18:50 2012
New Revision: 1341262
URL: http://svn.apache.org/viewvc?rev=1341262&view=rev
Log:
QPID-2518: Qpid C++ broker can easily be blocked by client trying to connect
over SSL port
Implement timed disconnect for TCP and for SSL/TCP mux
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1341262&r1=1341261&r2=1341262&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon May 21 23:18:50 2012
@@ -128,8 +128,9 @@ Broker::Options::Options(const std::stri
queueFlowResumeRatio(70),
queueThresholdEventRatio(80),
defaultMsgGroup("qpid.no-group"),
- timestampRcvMsgs(false), // set the 0.10 timestamp delivery property
- linkMaintenanceInterval(2)
+ timestampRcvMsgs(false), // set the 0.10 timestamp delivery property
+ linkMaintenanceInterval(2),
+ maxNegotiateTime(2000) // 2s
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -171,6 +172,7 @@ Broker::Options::Options(const std::stri
("default-message-group", optValue(defaultMsgGroup,
"GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a
message group queue that do not contain an identifier.")
("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add
current time to each received message.")
("link-maintenace-interval", optValue(linkMaintenanceInterval,
"SECONDS"))
+ ("max-negotiate-time", optValue(maxNegotiateTime, "MilliSeconds"),
"Maximum time a connection can take to send the initial protocol negotiation")
;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1341262&r1=1341261&r2=1341262&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon May 21 23:18:50 2012
@@ -126,6 +126,7 @@ class Broker : public sys::Runnable, pub
std::string defaultMsgGroup;
bool timestampRcvMsgs;
double linkMaintenanceInterval; // FIXME aconway 2012-02-13:
consistent parsing of SECONDS values.
+ uint32_t maxNegotiateTime; // Max time in ms for connection with no
negotiation
private:
std::string getHome();
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1341262&r1=1341261&r2=1341262&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Mon May 21 23:18:50
2012
@@ -23,6 +23,7 @@
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
@@ -41,7 +42,25 @@ struct Buff : public AsynchIO::BufferBas
{ delete [] bytes;}
};
-AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+struct ProtocolTimeoutTask : public sys::TimerTask {
+ AsynchIOHandler& handler;
+ std::string id;
+
+ ProtocolTimeoutTask(const std::string& i, const Duration& timeout,
AsynchIOHandler& h) :
+ TimerTask(timeout, "ProtocolTimeout"),
+ handler(h),
+ id(i)
+ {}
+
+ void fire() {
+ // If this fires it means that we didn't negotiate the connection in
the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection " << id << " No protocol received
closing");
+ handler.abort();
+ }
+};
+
+AsynchIOHandler::AsynchIOHandler(const std::string& id,
ConnectionCodec::Factory* f) :
identifier(id),
aio(0),
factory(f),
@@ -57,9 +76,13 @@ AsynchIOHandler::~AsynchIOHandler() {
delete codec;
}
-void AsynchIOHandler::init(AsynchIO* a, int numBuffs) {
+void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer,
uint32_t maxTime, int numBuffs) {
aio = a;
+ // Start timer for this connection
+ timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC,
*this);
+ timer.add(timeoutTimerTask);
+
// Give connection some buffers to use
for (int i = 0; i < numBuffs; i++) {
aio->queueReadBuffer(new Buff);
@@ -143,6 +166,9 @@ void AsynchIOHandler::readbuff(AsynchIO&
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
decoded = in.getPosition();
+ // We've just got the protocol negotiation so we can cancel the
timeout for that
+ timeoutTimerTask->cancel();
+
QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" <<
protocolInit << ")");
try {
codec = factory->create(protocolInit.getVersion(), *this,
identifier, SecuritySettings());
@@ -202,6 +228,10 @@ void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
codec = factory->create(*this, identifier, SecuritySettings());
write(framing::ProtocolInitiation(codec->getVersion()));
+ // We've just sent the protocol negotiation so we can cancel the
timeout for that
+ // This is not ideal, because we've not received anything yet, but
heartbeats will
+ // be active soon
+ timeoutTimerTask->cancel();
return;
}
if (codec == 0) return;
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1341262&r1=1341261&r2=1341262&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOHandler.h Mon May 21 23:18:50 2012
@@ -27,6 +27,8 @@
#include "qpid/sys/Mutex.h"
#include "qpid/CommonImportExport.h"
+#include <boost/intrusive_ptr.hpp>
+
namespace qpid {
namespace framing {
@@ -38,6 +40,8 @@ namespace sys {
class AsynchIO;
struct AsynchIOBufferBase;
class Socket;
+class Timer;
+class TimerTask;
class AsynchIOHandler : public OutputControl {
std::string identifier;
@@ -49,13 +53,14 @@ class AsynchIOHandler : public OutputCon
AtomicValue<int32_t> readCredit;
static const int32_t InfiniteCredit = -1;
Mutex creditLock;
+ boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
void write(const framing::ProtocolInitiation&);
public:
- QPID_COMMON_EXTERN AsynchIOHandler(std::string id,
ConnectionCodec::Factory* f);
+ QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id,
qpid::sys::ConnectionCodec::Factory* f );
QPID_COMMON_EXTERN ~AsynchIOHandler();
- QPID_COMMON_EXTERN void init(AsynchIO* a, int numBuffs);
+ QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime,
int numBuffs);
QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp?rev=1341262&r1=1341261&r2=1341262&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/SslPlugin.cpp Mon May 21 23:18:50 2012
@@ -39,6 +39,8 @@
namespace qpid {
namespace sys {
+class Timer;
+
using namespace qpid::sys::ssl;
struct SslServerOptions : ssl::SslOptions
@@ -68,6 +70,8 @@ class SslProtocolFactoryTmpl : public Pr
typedef SslAcceptorTmpl<T> SslAcceptor;
+ Timer& brokerTimer;
+ uint32_t maxNegotiateTime;
const bool tcpNoDelay;
T listener;
const uint16_t listeningPort;
@@ -75,7 +79,7 @@ class SslProtocolFactoryTmpl : public Pr
bool nodict;
public:
- SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay);
+ SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay,
Timer& timer, uint32_t maxTime);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, const
std::string& port,
ConnectionCodec::Factory*,
@@ -132,16 +136,18 @@ static struct SslPlugin : public Plugin
try {
ssl::initNSS(options, true);
nssInitialized = true;
-
+
const broker::Broker::Options& opts = broker->getOptions();
ProtocolFactory::shared_ptr protocol(options.multiplex ?
static_cast<ProtocolFactory*>(new
SslMuxProtocolFactory(options,
opts.connectionBacklog,
- opts.tcpNoDelay)) :
+ opts.tcpNoDelay,
+ broker->getTimer(),
opts.maxNegotiateTime)) :
static_cast<ProtocolFactory*>(new
SslProtocolFactory(options,
opts.connectionBacklog,
- opts.tcpNoDelay)));
+ opts.tcpNoDelay,
+ broker->getTimer(),
opts.maxNegotiateTime)));
QPID_LOG(notice, "Listening for " <<
(options.multiplex ? "SSL or TCP" :
"SSL") <<
" connections on TCP port " <<
@@ -156,7 +162,9 @@ static struct SslPlugin : public Plugin
} sslPlugin;
template <class T>
-SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions&
options, int backlog, bool nodelay) :
+SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions&
options, int backlog, bool nodelay, Timer& timer, uint32_t maxTime) :
+ brokerTimer(timer),
+ maxNegotiateTime(maxTime),
tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog,
options.certName, options.clientAuth)),
nodict(options.nodict)
{}
@@ -239,7 +247,7 @@ void SslMuxProtocolFactory::established(
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, 4);
+ async->init(aio, brokerTimer, maxNegotiateTime, 4);
aio->start(poller);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1341262&r1=1341261&r2=1341262&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp Mon May 21 23:18:50 2012
@@ -36,14 +36,21 @@
namespace qpid {
namespace sys {
+class Timer;
+
class AsynchIOProtocolFactory : public ProtocolFactory {
- const bool tcpNoDelay;
boost::ptr_vector<Socket> listeners;
boost::ptr_vector<AsynchAcceptor> acceptors;
+ Timer& brokerTimer;
+ uint32_t maxNegotiateTime;
uint16_t listeningPort;
+ const bool tcpNoDelay;
public:
- AsynchIOProtocolFactory(const std::string& host, const std::string& port,
int backlog, bool nodelay, bool shouldListen);
+ AsynchIOProtocolFactory(const std::string& host, const std::string& port,
+ int backlog, bool nodelay,
+ Timer& timer, uint32_t maxTime,
+ bool shouldListen);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, const
std::string& port,
ConnectionCodec::Factory*,
@@ -90,6 +97,7 @@ static class TCPIOPlugin : public Plugin
"", boost::lexical_cast<std::string>(opts.port),
opts.connectionBacklog,
opts.tcpNoDelay,
+ broker->getTimer(), opts.maxNegotiateTime,
shouldListen));
if (shouldListen) {
@@ -101,7 +109,12 @@ static class TCPIOPlugin : public Plugin
}
} tcpPlugin;
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host,
const std::string& port, int backlog, bool nodelay, bool shouldListen) :
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host,
const std::string& port,
+ int backlog, bool nodelay,
+ Timer& timer, uint32_t
maxTime,
+ bool shouldListen) :
+ brokerTimer(timer),
+ maxNegotiateTime(maxTime),
tcpNoDelay(nodelay)
{
if (!shouldListen) {
@@ -153,7 +166,7 @@ void AsynchIOProtocolFactory::establishe
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, 4);
+ async->init(aio, brokerTimer, maxNegotiateTime, 4);
aio->start(poller);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]