Author: aconway
Date: Thu Oct 6 20:38:09 2011
New Revision: 1179839
URL: http://svn.apache.org/viewvc?rev=1179839&view=rev
Log:
QPID-2920: Configurable connection output prefetch.
Allow connections to collect more output than they can write immediately.
Improves performance in a cluster: while a broker has the consume lock
connections can collect extra output data to keep them busy while
waiting for the consume lock to return.
Modified:
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.h
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.h
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
Modified:
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=1179839&r1=1179838&r2=1179839&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
Thu Oct 6 20:38:09 2011
@@ -30,9 +30,10 @@ namespace amqp_0_10 {
using sys::Mutex;
-Connection::Connection(sys::OutputControl& o, const std::string& id, bool
_isClient)
+Connection::Connection(
+ sys::OutputControl& o, const std::string& id, bool _isClient, size_t
prefetch_)
: pushClosed(false), popClosed(false), output(o), identifier(id),
initialized(false),
- isClient(_isClient), buffered(0), version(0,10)
+ isClient(_isClient), buffered(0), version(0,10), prefetch(prefetch_)
{}
void Connection::setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c)
{
@@ -90,6 +91,7 @@ size_t Connection::encode(const char* b
}
size_t frameSize=0;
size_t encoded=0;
+ // Encode as much as possible into the IO buffer
while (!workQueue.empty() && ((frameSize=workQueue.front().encodedSize())
<= out.available())) {
workQueue.front().encode(out);
QPID_LOG(trace, "SENT [" << identifier << "]: " << workQueue.front());
@@ -108,6 +110,12 @@ size_t Connection::encode(const char* b
workQueue.clear();
if (frameQueue.empty() && pushClosed)
popClosed = true;
+ // Prefetch frames to be encoded on the next call.
+ bool more = true;
+ while (buffered < prefetch && more) {
+ Mutex::ScopedUnlock u(frameQueueLock);
+ more = connection->doOutput();
+ }
}
return out.getPosition();
}
Modified:
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=1179839&r1=1179838&r2=1179839&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.h
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.h Thu
Oct 6 20:38:09 2011
@@ -56,9 +56,11 @@ class Connection : public sys::Connecti
bool isClient;
size_t buffered;
framing::ProtocolVersion version;
-
+ size_t prefetch;
+
public:
- QPID_BROKER_EXTERN Connection(sys::OutputControl&, const std::string& id,
bool isClient);
+ QPID_BROKER_EXTERN Connection(
+ sys::OutputControl&, const std::string& id, bool isClient, size_t
prefetch);
QPID_BROKER_EXTERN void
setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c);
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1179839&r1=1179838&r2=1179839&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.cpp
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.cpp Thu Oct
6 20:38:09 2011
@@ -123,7 +123,8 @@ Broker::Options::Options(const std::stri
qmf1Support(true),
queueFlowStopRatio(80),
queueFlowResumeRatio(70),
- queueThresholdEventRatio(80)
+ queueThresholdEventRatio(80),
+ outputPrefetch(0)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -159,7 +160,9 @@ Broker::Options::Options(const std::stri
("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set
Queue Events async, used for services like replication")
("default-flow-stop-threshold", optValue(queueFlowStopRatio,
"PERCENT"), "Percent of queue's maximum capacity at which flow control is
activated.")
("default-flow-resume-threshold", optValue(queueFlowResumeRatio,
"PERCENT"), "Percent of queue's maximum capacity at which flow control is
de-activated.")
- ("default-event-threshold-ratio", optValue(queueThresholdEventRatio,
"%age of limit"), "The ratio of any specified queue limit at which an event
will be raised");
+ ("default-event-threshold-ratio", optValue(queueThresholdEventRatio,
"%age of limit"), "The ratio of any specified queue limit at which an event
will be raised")
+ // FIXME aconway 2011-10-06: in or out? Needs a bettter name &
description.
+ ("output-prefetch", optValue(outputPrefetch, "BYTES"), "Experimental:
Pre fetch limit for connection output in bytes");
}
const std::string empty;
Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.h?rev=1179839&r1=1179838&r2=1179839&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.h Thu Oct 6
20:38:09 2011
@@ -122,6 +122,7 @@ public:
uint queueFlowStopRatio; // producer flow control: on
uint queueFlowResumeRatio; // producer flow control: off
uint16_t queueThresholdEventRatio;
+ size_t outputPrefetch;
private:
std::string getHome();
Modified:
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=1179839&r1=1179838&r2=1179839&view=diff
==============================================================================
---
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
(original)
+++
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
Thu Oct 6 20:38:09 2011
@@ -46,7 +46,8 @@ ConnectionFactory::create(ProtocolVersio
return 0;
}
if (v == ProtocolVersion(0, 10)) {
- ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
+ ConnectionPtr c(
+ new amqp_0_10::Connection(out, id, false,
broker.getOptions().outputPrefetch));
c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker,
id, external, false)));
return c.release();
}
@@ -57,10 +58,10 @@ sys::ConnectionCodec*
ConnectionFactory::create(sys::OutputControl& out, const std::string& id,
const SecuritySettings& external) {
// used to create connections from one broker to another
- ConnectionPtr c(new amqp_0_10::Connection(out, id, true));
+ ConnectionPtr c(
+ new amqp_0_10::Connection(out, id, true,
broker.getOptions().outputPrefetch));
c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id,
external, true)));
return c.release();
}
-
}} // namespace qpid::broker
Modified:
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp?rev=1179839&r1=1179838&r2=1179839&view=diff
==============================================================================
---
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
(original)
+++
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
Thu Oct 6 20:38:09 2011
@@ -48,7 +48,8 @@ SecureConnectionFactory::create(Protocol
}
if (v == ProtocolVersion(0, 10)) {
SecureConnectionPtr sc(new SecureConnection());
- CodecPtr c(new amqp_0_10::Connection(out, id, false));
+ CodecPtr c(new amqp_0_10::Connection(
+ out, id, false, broker.getOptions().outputPrefetch));
ConnectionPtr i(new broker::Connection(c.get(), broker, id, external,
false));
i->setSecureConnection(sc.get());
c->setInputHandler(InputPtr(i.release()));
@@ -63,7 +64,8 @@ SecureConnectionFactory::create(sys::Out
const SecuritySettings& external) {
// used to create connections from one broker to another
SecureConnectionPtr sc(new SecureConnection());
- CodecPtr c(new amqp_0_10::Connection(out, id, true));
+ CodecPtr c(
+ new amqp_0_10::Connection(out, id, true,
broker.getOptions().outputPrefetch));
ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, true
));
i->setSecureConnection(sc.get());
c->setInputHandler(InputPtr(i.release()));
Modified:
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=1179839&r1=1179838&r2=1179839&view=diff
==============================================================================
---
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
(original)
+++
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
Thu Oct 6 20:38:09 2011
@@ -50,7 +50,7 @@ ConnectionCodec::Factory::create(Protoco
if (v == ProtocolVersion(0, 10))
return new ConnectionCodec(v, out, id, cluster, false, false,
external);
else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection
- return new ConnectionCodec(v, out, id, cluster, true, false,
external);
+ return new ConnectionCodec(v, out, id, cluster, true, false, external);
return 0;
}
@@ -63,8 +63,9 @@ ConnectionCodec::Factory::create(sys::Ou
ConnectionCodec::ConnectionCodec(
const ProtocolVersion& v, sys::OutputControl& out,
- const std::string& logId, Cluster& cluster, bool catchUp, bool isLink,
const qpid::sys::SecuritySettings& external
-) : codec(out, logId, isLink),
+ const std::string& logId, Cluster& cluster, bool catchUp,
+ bool isLink, const qpid::sys::SecuritySettings& external
+) : codec(out, logId, isLink, cluster.getBroker().getOptions().outputPrefetch),
interceptor(new Connection(cluster, codec, logId, cluster.getId(),
catchUp, isLink, external))
{
cluster.addLocalConnection(interceptor);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]