Author: aconway
Date: Fri Dec 19 09:22:52 2008
New Revision: 728072

URL: http://svn.apache.org/viewvc?rev=728072&view=rev
Log:
cluster: Increase initial estimate controlling writes.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=728072&r1=728071&r2=728072&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Dec 19 09:22:52 2008
@@ -83,7 +83,7 @@
     bool invoke(AMQBody& body) { return framing::invoke(*this, 
body).wasHandled(); }
 };
 
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, 
bool quorum_, size_t readMax_) :
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, 
bool quorum_, size_t readMax_, size_t writeEstimate_) :
     broker(b),
     poller(b.getPoller()),
     cpg(*this),
@@ -91,6 +91,7 @@
     myUrl(url_),
     myId(cpg.self()),
     readMax(readMax_),
+    writeEstimate(writeEstimate_),
     cpgDispatchHandle(
         cpg,
         boost::bind(&Cluster::dispatch, this, _1), // read

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=728072&r1=728071&r2=728072&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Dec 19 09:22:52 2008
@@ -69,7 +69,8 @@
     /**
      * Join a cluster. 
      */
-    Cluster(const std::string& name, const Url& url, broker::Broker&, bool 
useQuorum, size_t readMax);
+    Cluster(const std::string& name, const Url& url, broker::Broker&, bool 
useQuorum,
+            size_t readMax, size_t writeEstimate);
 
     virtual ~Cluster();
 
@@ -95,6 +96,7 @@
     void checkQuorum();         // called in connection threads.
 
     size_t getReadMax() { return readMax; }
+    size_t getWriteEstimate() { return writeEstimate; }
     
   private:
     typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr;
@@ -181,6 +183,7 @@
     const Url myUrl;
     const MemberId myId;
     const size_t readMax;
+    const size_t writeEstimate;
     framing::Uuid clusterId;
     NoOpConnectionOutputHandler shadowOut;
     sys::DispatchHandle cpgDispatchHandle;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=728072&r1=728071&r2=728072&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Dec 19 09:22:52 
2008
@@ -41,10 +41,10 @@
     string name;
     string url;
     bool quorum;
-    size_t readMax;
+    size_t readMax, writeEstimate;
 
     // FIXME aconway 2008-12-09: revisit default.
-    ClusterValues() : quorum(false), readMax(0) {}
+    ClusterValues() : quorum(false), readMax(0), writeEstimate(64) {}
   
     Url getUrl(uint16_t port) const {
         if (url.empty()) return Url::getIpAddressesUrl(port);
@@ -68,7 +68,10 @@
 #if HAVE_LIBCMAN
             ("cluster-cman", optValue(values.quorum), "Integrate with Cluster 
Manager (CMAN) cluster.")
 #endif
-            ("cluster-read-max", optValue(values.readMax,"N"), "Throttle read 
rate from client connections.")
+            ("cluster-read-max", optValue(values.readMax,"N"),
+             "Throttle read rate from client connections.")
+            ("cluster-write-estimate", optValue(values.writeEstimate, "Kb"),
+             "Estimate connection write rate per multicast cycle")
             ;
     }
 };
@@ -88,7 +91,7 @@
         if (values.name.empty()) return; // Only if --cluster-name option was 
specified.
         Broker* broker = dynamic_cast<Broker*>(&target);
         if (!broker) return;
-        cluster = new Cluster(values.name, 
values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum, 
values.readMax);
+        cluster = new Cluster(values.name, 
values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum, 
values.readMax, values.writeEstimate*1024);
         broker->setConnectionFactory(
             boost::shared_ptr<sys::ConnectionCodec::Factory>(
                 new ConnectionCodec::Factory(broker->getConnectionFactory(), 
*cluster)));

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=728072&r1=728071&r2=728072&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Fri Dec 19 
09:22:52 2008
@@ -32,8 +32,10 @@
 
 using namespace framing;
 
-OutputInterceptor::OutputInterceptor(cluster::Connection& p, 
sys::ConnectionOutputHandler& h)
-    : parent(p), next(&h), sent(), moreOutput(), doingOutput()
+OutputInterceptor::OutputInterceptor(
+    cluster::Connection& p, sys::ConnectionOutputHandler& h)
+    : parent(p), next(&h), sent(), 
writeEstimate(p.getCluster().getWriteEstimate()),
+      moreOutput(), doingOutput()
 {}
 
 void OutputInterceptor::send(framing::AMQFrame& f) {
@@ -69,7 +71,7 @@
 void OutputInterceptor::deliverDoOutput(size_t requested) {
     size_t buf = next->getBuffered();
     if (parent.isLocal())
-        writeEstimate.delivered(sent, buf); // Update the estimate.
+        writeEstimate.delivered(requested, sent, buf); // Update the estimate.
 
     // Run the real doOutput() till we have added the requested data or 
there's nothing to output.
     sent = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h?rev=728072&r1=728071&r2=728072&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h Fri Dec 19 
09:22:52 2008
@@ -64,6 +64,7 @@
     mutable sys::Mutex lock;
     sys::ConnectionOutputHandler* next;
     size_t sent;
+    size_t lastDoOutput;
     WriteEstimate writeEstimate;
     bool moreOutput;
     bool doingOutput;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp?rev=728072&r1=728071&r2=728072&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp Fri Dec 19 09:22:52 
2008
@@ -27,7 +27,7 @@
 namespace cluster {
 
 WriteEstimate::WriteEstimate(size_t initial)
-    : growing(true), estimate(initial) {}
+    : growing(true), estimate(initial), lastEstimate(initial) {}
 
 size_t WriteEstimate::sending(size_t buffered) {
     // We want to send a doOutput request for enough data such
@@ -42,7 +42,8 @@
 
 size_t pad(size_t value) { return value + value/2; }
 
-void WriteEstimate::delivered(size_t sent, size_t buffered) {
+void WriteEstimate::delivered(size_t last, size_t sent, size_t buffered) {
+    lastEstimate = last;
     size_t wrote =  sent > buffered ? sent - buffered : 0;
     if (wrote == 0)             // No change
         return; 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h?rev=728072&r1=728071&r2=728072&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h Fri Dec 19 09:22:52 
2008
@@ -51,12 +51,17 @@
      * doOutput request just delivered, not yet executed. Update the estimate.
      * and estimate how much data to request in the next onOutput
      * request. 0 means don't send an onOutput request.
+     *
+     * @param delivered value in doOutput control.
      */
-    void delivered(size_t sent, size_t buffered);
+    void delivered(size_t delivered, size_t sent, size_t buffered);
+
+    /** Last estimate delivered, i.e. known to cluster */
+    size_t getLastEstimate() const { return estimate; }
 
   private:
     bool growing;
-    size_t estimate;
+    size_t estimate, lastEstimate;
 };
 
 }} // namespace qpid::cluster


Reply via email to