Author: aconway
Date: Thu Oct  6 20:38:09 2011
New Revision: 1179839

URL: http://svn.apache.org/viewvc?rev=1179839&view=rev
Log:
QPID-2920: Configurable connection output prefetch.

Allow connections to collect more output than they can write immediately.

Improves performance in a cluster: while a broker has the consume lock
connections can collect extra output data to keep them busy while
waiting for the consume lock to return.

Modified:
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.h
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=1179839&r1=1179838&r2=1179839&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp 
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp 
Thu Oct  6 20:38:09 2011
@@ -30,9 +30,10 @@ namespace amqp_0_10 {
 
 using sys::Mutex;
 
-Connection::Connection(sys::OutputControl& o, const std::string& id, bool 
_isClient)
+Connection::Connection(
+    sys::OutputControl& o, const std::string& id, bool _isClient, size_t 
prefetch_)
     : pushClosed(false), popClosed(false), output(o), identifier(id), 
initialized(false),
-      isClient(_isClient), buffered(0), version(0,10)
+      isClient(_isClient), buffered(0), version(0,10), prefetch(prefetch_)
 {}
 
 void Connection::setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c) 
{
@@ -90,6 +91,7 @@ size_t  Connection::encode(const char* b
     }
     size_t frameSize=0;
     size_t encoded=0;
+    // Encode as much as possible into the IO buffer
     while (!workQueue.empty() && ((frameSize=workQueue.front().encodedSize()) 
<= out.available())) {
         workQueue.front().encode(out);
         QPID_LOG(trace, "SENT [" << identifier << "]: " << workQueue.front());
@@ -108,6 +110,12 @@ size_t  Connection::encode(const char* b
         workQueue.clear();
         if (frameQueue.empty() && pushClosed)
             popClosed = true;
+        // Prefetch frames to be encoded on the next call.
+        bool more = true;
+        while (buffered < prefetch && more) {
+            Mutex::ScopedUnlock u(frameQueueLock);
+            more = connection->doOutput();
+        }
     }
     return out.getPosition();
 }

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=1179839&r1=1179838&r2=1179839&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.h 
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/amqp_0_10/Connection.h Thu 
Oct  6 20:38:09 2011
@@ -56,9 +56,11 @@ class Connection  : public sys::Connecti
     bool isClient;
     size_t buffered;
     framing::ProtocolVersion version;
-
+    size_t prefetch;
+    
   public:
-    QPID_BROKER_EXTERN Connection(sys::OutputControl&, const std::string& id, 
bool isClient);
+    QPID_BROKER_EXTERN Connection(
+        sys::OutputControl&, const std::string& id, bool isClient, size_t 
prefetch);
     QPID_BROKER_EXTERN void 
setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c);
     size_t decode(const char* buffer, size_t size);
     size_t encode(const char* buffer, size_t size);

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1179839&r1=1179838&r2=1179839&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.cpp 
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.cpp Thu Oct  
6 20:38:09 2011
@@ -123,7 +123,8 @@ Broker::Options::Options(const std::stri
     qmf1Support(true),
     queueFlowStopRatio(80),
     queueFlowResumeRatio(70),
-    queueThresholdEventRatio(80)
+    queueThresholdEventRatio(80),
+    outputPrefetch(0)
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -159,7 +160,9 @@ Broker::Options::Options(const std::stri
         ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set 
Queue Events async, used for services like replication")
         ("default-flow-stop-threshold", optValue(queueFlowStopRatio, 
"PERCENT"), "Percent of queue's maximum capacity at which flow control is 
activated.")
         ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, 
"PERCENT"), "Percent of queue's maximum capacity at which flow control is 
de-activated.")
-        ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, 
"%age of limit"), "The ratio of any specified queue limit at which an event 
will be raised");
+        ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, 
"%age of limit"), "The ratio of any specified queue limit at which an event 
will be raised")
+        // FIXME aconway 2011-10-06: in or out? Needs a bettter name & 
description.
+        ("output-prefetch", optValue(outputPrefetch, "BYTES"), "Experimental: 
Pre fetch limit for connection output in bytes");
 }
 
 const std::string empty;

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.h?rev=1179839&r1=1179838&r2=1179839&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Broker.h Thu Oct  6 
20:38:09 2011
@@ -122,6 +122,7 @@ public:
         uint queueFlowStopRatio;    // producer flow control: on
         uint queueFlowResumeRatio;  // producer flow control: off
         uint16_t queueThresholdEventRatio;
+        size_t outputPrefetch;
 
       private:
         std::string getHome();

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=1179839&r1=1179838&r2=1179839&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp 
Thu Oct  6 20:38:09 2011
@@ -46,7 +46,8 @@ ConnectionFactory::create(ProtocolVersio
         return 0;
     }
     if (v == ProtocolVersion(0, 10)) {
-        ConnectionPtr c(new amqp_0_10::Connection(out, id, false));
+        ConnectionPtr c(
+            new amqp_0_10::Connection(out, id, false, 
broker.getOptions().outputPrefetch));
         c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, 
id, external, false)));
         return c.release();
     }
@@ -57,10 +58,10 @@ sys::ConnectionCodec*
 ConnectionFactory::create(sys::OutputControl& out, const std::string& id,
                           const SecuritySettings& external) {
     // used to create connections from one broker to another
-    ConnectionPtr c(new amqp_0_10::Connection(out, id, true));
+    ConnectionPtr c(
+        new amqp_0_10::Connection(out, id, true, 
broker.getOptions().outputPrefetch));
     c->setInputHandler(InputPtr(new broker::Connection(c.get(), broker, id, 
external, true)));
     return c.release();
 }
 
-    
 }} // namespace qpid::broker

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp?rev=1179839&r1=1179838&r2=1179839&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
 (original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
 Thu Oct  6 20:38:09 2011
@@ -48,7 +48,8 @@ SecureConnectionFactory::create(Protocol
     }
     if (v == ProtocolVersion(0, 10)) {
         SecureConnectionPtr sc(new SecureConnection());
-        CodecPtr c(new amqp_0_10::Connection(out, id, false));
+        CodecPtr c(new amqp_0_10::Connection(
+                       out, id, false, broker.getOptions().outputPrefetch));
         ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, 
false));
         i->setSecureConnection(sc.get());
         c->setInputHandler(InputPtr(i.release()));
@@ -63,7 +64,8 @@ SecureConnectionFactory::create(sys::Out
                                 const SecuritySettings& external) {
     // used to create connections from one broker to another
     SecureConnectionPtr sc(new SecureConnection());
-    CodecPtr c(new amqp_0_10::Connection(out, id, true));
+    CodecPtr c(
+        new amqp_0_10::Connection(out, id, true, 
broker.getOptions().outputPrefetch));
     ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, true 
));
     i->setSecureConnection(sc.get());
     c->setInputHandler(InputPtr(i.release()));

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=1179839&r1=1179838&r2=1179839&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp 
Thu Oct  6 20:38:09 2011
@@ -50,7 +50,7 @@ ConnectionCodec::Factory::create(Protoco
     if (v == ProtocolVersion(0, 10))
         return new ConnectionCodec(v, out, id, cluster, false, false, 
external);
     else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection
-        return new ConnectionCodec(v, out, id, cluster, true, false, 
external); 
+        return new ConnectionCodec(v, out, id, cluster, true, false, external);
     return 0;
 }
 
@@ -63,8 +63,9 @@ ConnectionCodec::Factory::create(sys::Ou
 
 ConnectionCodec::ConnectionCodec(
     const ProtocolVersion& v, sys::OutputControl& out,
-    const std::string& logId, Cluster& cluster, bool catchUp, bool isLink, 
const qpid::sys::SecuritySettings& external
-) : codec(out, logId, isLink),
+    const std::string& logId, Cluster& cluster, bool catchUp,
+    bool isLink, const qpid::sys::SecuritySettings& external
+) : codec(out, logId, isLink, cluster.getBroker().getOptions().outputPrefetch),
     interceptor(new Connection(cluster, codec, logId, cluster.getId(), 
catchUp, isLink, external))
 {
     cluster.addLocalConnection(interceptor);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to