Author: aconway Date: Fri Dec 19 09:22:52 2008 New Revision: 728072 URL: http://svn.apache.org/viewvc?rev=728072&view=rev Log: cluster: Increase initial estimate controlling writes.
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=728072&r1=728071&r2=728072&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Dec 19 09:22:52 2008 @@ -83,7 +83,7 @@ bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; -Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_) : +Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_) : broker(b), poller(b.getPoller()), cpg(*this), @@ -91,6 +91,7 @@ myUrl(url_), myId(cpg.self()), readMax(readMax_), + writeEstimate(writeEstimate_), cpgDispatchHandle( cpg, boost::bind(&Cluster::dispatch, this, _1), // read Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=728072&r1=728071&r2=728072&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Dec 19 09:22:52 2008 @@ -69,7 +69,8 @@ /** * Join a cluster. */ - Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum, size_t readMax); + Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum, + size_t readMax, size_t writeEstimate); virtual ~Cluster(); @@ -95,6 +96,7 @@ void checkQuorum(); // called in connection threads. size_t getReadMax() { return readMax; } + size_t getWriteEstimate() { return writeEstimate; } private: typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr; @@ -181,6 +183,7 @@ const Url myUrl; const MemberId myId; const size_t readMax; + const size_t writeEstimate; framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=728072&r1=728071&r2=728072&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Dec 19 09:22:52 2008 @@ -41,10 +41,10 @@ string name; string url; bool quorum; - size_t readMax; + size_t readMax, writeEstimate; // FIXME aconway 2008-12-09: revisit default. - ClusterValues() : quorum(false), readMax(0) {} + ClusterValues() : quorum(false), readMax(0), writeEstimate(64) {} Url getUrl(uint16_t port) const { if (url.empty()) return Url::getIpAddressesUrl(port); @@ -68,7 +68,10 @@ #if HAVE_LIBCMAN ("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.") #endif - ("cluster-read-max", optValue(values.readMax,"N"), "Throttle read rate from client connections.") + ("cluster-read-max", optValue(values.readMax,"N"), + "Throttle read rate from client connections.") + ("cluster-write-estimate", optValue(values.writeEstimate, "Kb"), + "Estimate connection write rate per multicast cycle") ; } }; @@ -88,7 +91,7 @@ if (values.name.empty()) return; // Only if --cluster-name option was specified. Broker* broker = dynamic_cast<Broker*>(&target); if (!broker) return; - cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum, values.readMax); + cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum, values.readMax, values.writeEstimate*1024); broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=728072&r1=728071&r2=728072&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Fri Dec 19 09:22:52 2008 @@ -32,8 +32,10 @@ using namespace framing; -OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h) - : parent(p), next(&h), sent(), moreOutput(), doingOutput() +OutputInterceptor::OutputInterceptor( + cluster::Connection& p, sys::ConnectionOutputHandler& h) + : parent(p), next(&h), sent(), writeEstimate(p.getCluster().getWriteEstimate()), + moreOutput(), doingOutput() {} void OutputInterceptor::send(framing::AMQFrame& f) { @@ -69,7 +71,7 @@ void OutputInterceptor::deliverDoOutput(size_t requested) { size_t buf = next->getBuffered(); if (parent.isLocal()) - writeEstimate.delivered(sent, buf); // Update the estimate. + writeEstimate.delivered(requested, sent, buf); // Update the estimate. // Run the real doOutput() till we have added the requested data or there's nothing to output. sent = 0; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h?rev=728072&r1=728071&r2=728072&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h Fri Dec 19 09:22:52 2008 @@ -64,6 +64,7 @@ mutable sys::Mutex lock; sys::ConnectionOutputHandler* next; size_t sent; + size_t lastDoOutput; WriteEstimate writeEstimate; bool moreOutput; bool doingOutput; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp?rev=728072&r1=728071&r2=728072&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp Fri Dec 19 09:22:52 2008 @@ -27,7 +27,7 @@ namespace cluster { WriteEstimate::WriteEstimate(size_t initial) - : growing(true), estimate(initial) {} + : growing(true), estimate(initial), lastEstimate(initial) {} size_t WriteEstimate::sending(size_t buffered) { // We want to send a doOutput request for enough data such @@ -42,7 +42,8 @@ size_t pad(size_t value) { return value + value/2; } -void WriteEstimate::delivered(size_t sent, size_t buffered) { +void WriteEstimate::delivered(size_t last, size_t sent, size_t buffered) { + lastEstimate = last; size_t wrote = sent > buffered ? sent - buffered : 0; if (wrote == 0) // No change return; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h?rev=728072&r1=728071&r2=728072&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h Fri Dec 19 09:22:52 2008 @@ -51,12 +51,17 @@ * doOutput request just delivered, not yet executed. Update the estimate. * and estimate how much data to request in the next onOutput * request. 0 means don't send an onOutput request. + * + * @param delivered value in doOutput control. */ - void delivered(size_t sent, size_t buffered); + void delivered(size_t delivered, size_t sent, size_t buffered); + + /** Last estimate delivered, i.e. known to cluster */ + size_t getLastEstimate() const { return estimate; } private: bool growing; - size_t estimate; + size_t estimate, lastEstimate; }; }} // namespace qpid::cluster