Author: aconway Date: Tue Jun 16 21:21:09 2009 New Revision: 785408 URL: http://svn.apache.org/viewvc?rev=785408&view=rev Log: Performance improvements in AggregateOutput and SemanticState.
Replaced AggregateOutput hierarchy with a flat list per connection holding only the OutputTasks that are potentially active. Tasks are droped from the list as soon as they return false, and added back when they may have output. Inlined frequently-used SequenceNumber functions. Replace std::list in QueueListeners with std::vector. Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h qpid/trunk/qpid/cpp/xml/cluster.xml Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jun 16 21:21:09 2009 @@ -134,6 +134,9 @@ /** Called by cluster to mark shadow connections */ void setShadow() { shadow = true; } + // Used by cluster to update connection status + sys::AggregateOutput& getOutputTasks() { return outputTasks; } + private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jun 16 21:21:09 2009 @@ -1025,3 +1025,5 @@ { return !policy.get() || policy->isEnqueued(msg); } + +QueueListeners& Queue::getListeners() { return listeners; } Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Jun 16 21:21:09 2009 @@ -325,6 +325,9 @@ * Notify queue that recovery has completed. */ void recoveryComplete(); + + // For cluster update + QueueListeners& getListeners(); }; } } Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp Tue Jun 16 21:21:09 2009 @@ -46,9 +46,11 @@ { if (consumers.size()) { set.consumer = consumers.front(); - consumers.pop_front(); + consumers.erase(consumers.begin()); } else { - browsers.swap(set.browsers); + // Don't swap the vectors, hang on to the memory allocated. + set.browsers = browsers; + browsers.clear(); } } @@ -70,4 +72,10 @@ else for_each(browsers.begin(), browsers.end(), boost::mem_fn(&Consumer::notify)); } +bool QueueListeners::contains(Consumer::shared_ptr c) const { + return + find(browsers.begin(), browsers.end(), c) != browsers.end() || + find(consumers.begin(), consumers.end(), c) != consumers.end(); +} + }} // namespace qpid::broker Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h Tue Jun 16 21:21:09 2009 @@ -22,7 +22,7 @@ * */ #include "Consumer.h" -#include <list> +#include <vector> namespace qpid { namespace broker { @@ -40,7 +40,7 @@ class QueueListeners { public: - typedef std::list<Consumer::shared_ptr> Listeners; + typedef std::vector<Consumer::shared_ptr> Listeners; class NotificationSet { @@ -55,6 +55,8 @@ void addListener(Consumer::shared_ptr); void removeListener(Consumer::shared_ptr); void populate(NotificationSet&); + bool contains(Consumer::shared_ptr c) const; + private: Listeners consumers; Listeners browsers; Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Jun 16 21:21:09 2009 @@ -61,7 +61,6 @@ deliveryAdapter(da), tagGenerator("sgen"), dtxSelected(false), - outputTasks(ss), authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))) { @@ -90,7 +89,6 @@ { ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception - outputTasks.addOutputTask(c.get()); consumers[tag] = c; } @@ -98,7 +96,7 @@ ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) { cancel(i->second); - consumers.erase(i); + consumers.erase(i); //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); @@ -257,9 +255,9 @@ msgCredit(0), byteCredit(0), notifyEnabled(true), - queueHasMessages(1), syncFrequency(_arguments.getAsInt("qpid.sync_frequency")), - deliveryCount(0) {} + deliveryCount(0) +{} OwnershipToken* SemanticState::ConsumerImpl::getSession() { @@ -290,6 +288,11 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) { + // FIXME aconway 2009-06-08: if we have byte & message credit but + // checkCredit fails because the message is to big, we should + // remain on queue's listener list for possible smaller messages + // in future. + // blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } @@ -328,7 +331,8 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c) { c->disableNotify(); - outputTasks.removeOutputTask(c.get()); + if (session.isAttached()) + session.getConnection().outputTasks.removeOutputTask(c.get()); Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); @@ -397,16 +401,18 @@ } void SemanticState::requestDispatch() -{ - for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - requestDispatch(*(i->second)); - } +{ + for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) + i->second->requestDispatch(); } -void SemanticState::requestDispatch(ConsumerImpl& c) -{ - if(c.isBlocked()) - outputTasks.activateOutput(); +void SemanticState::ConsumerImpl::requestDispatch() +{ + if (blocked) { + parent->session.getConnection().outputTasks.addOutputTask(this); + parent->session.getConnection().outputTasks.activateOutput(); + blocked = false; + } } bool SemanticState::complete(DeliveryRecord& delivery) @@ -475,7 +481,7 @@ { ConsumerImpl& c = find(destination); c.addByteCredit(value); - requestDispatch(c); + c.requestDispatch(); } @@ -483,7 +489,7 @@ { ConsumerImpl& c = find(destination); c.addMessageCredit(value); - requestDispatch(c); + c.requestDispatch(); } void SemanticState::flush(const std::string& destination) @@ -593,11 +599,7 @@ bool SemanticState::ConsumerImpl::doOutput() { - if (!haveCredit() || !queueHasMessages.boolCompareAndSwap(1, 0)) - return false; - if (queue->dispatch(shared_from_this())) - queueHasMessages.boolCompareAndSwap(0, 1); - return queueHasMessages.get(); + return haveCredit() && queue->dispatch(shared_from_this()); } void SemanticState::ConsumerImpl::enableNotify() @@ -619,14 +621,11 @@ void SemanticState::ConsumerImpl::notify() { - queueHasMessages.boolCompareAndSwap(0, 1); - - //TODO: alter this, don't want to hold locks across external - //calls; for now its is required to protect the notify() from - //having part of the object chain of the invocation being - //concurrently deleted Mutex::ScopedLock l(lock); - if (notifyEnabled) parent->outputTasks.activateOutput(); + if (notifyEnabled) { + parent->session.getConnection().outputTasks.addOutputTask(this); + parent->session.getConnection().outputTasks.activateOutput(); + } } @@ -670,13 +669,16 @@ { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->enableNotify(); + session.getConnection().outputTasks.addOutputTask(i->second.get()); } + session.getConnection().outputTasks.activateOutput(); } void SemanticState::detached() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->disableNotify(); + session.getConnection().outputTasks.removeOutputTask(i->second.get()); } } Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Jun 16 21:21:09 2009 @@ -55,9 +55,7 @@ * SemanticState holds the L3 and L4 state of an open session, whether * attached to a channel or suspended. */ -class SemanticState : public sys::OutputTask, - private boost::noncopyable -{ +class SemanticState : private boost::noncopyable { public: class ConsumerImpl : public Consumer, public sys::OutputTask, public boost::enable_shared_from_this<ConsumerImpl> @@ -77,9 +75,6 @@ uint32_t msgCredit; uint32_t byteCredit; bool notifyEnabled; - // queueHasMessages is boolean but valgrind has trouble with - // AtomicValue<bool> so use an int with 1 or 0. - sys:: AtomicValue<int> queueHasMessages; const int syncFrequency; int deliveryCount; @@ -105,6 +100,8 @@ void notify(); bool isNotifyEnabled() const; + void requestDispatch(); + void setWindowMode(); void setCreditMode(); void addByteCredit(uint32_t value); @@ -130,6 +127,8 @@ std::string getResumeId() const { return resumeId; }; uint64_t getResumeTtl() const { return resumeTtl; } const framing::FieldTable& getArguments() const { return arguments; } + + SemanticState& getParent() { return *parent; } }; private: @@ -147,7 +146,6 @@ DtxBufferMap suspendedXids; framing::SequenceSet accumulatedAck; boost::shared_ptr<Exchange> cacheExchange; - sys::AggregateOutput outputTasks; AclModule* acl; const bool authMsg; const string userID; @@ -158,7 +156,6 @@ bool complete(DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); - void requestDispatch(ConsumerImpl&); void cancel(ConsumerImpl::shared_ptr); public: @@ -208,8 +205,6 @@ void release(DeliveryId first, DeliveryId last, bool setRedelivered); void reject(DeliveryId first, DeliveryId last); void handle(boost::intrusive_ptr<Message> msg); - bool hasOutput() { return outputTasks.hasOutput(); } - bool doOutput() { return outputTasks.doOutput(); } //final 0-10 spec (completed and accepted are distinct): void completed(DeliveryId deliveryTag, DeliveryId endTag); @@ -218,10 +213,11 @@ void attached(); void detached(); - // Used by cluster to re-create replica sessions - static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); } - - template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); } + // Used by cluster to re-create sessions + template <class F> void eachConsumer(F f) { + for(ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); ++i) + f(i->second); + } DeliveryRecords& getUnacked() { return unacked; } framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; } TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; } Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Tue Jun 16 21:21:09 2009 @@ -40,9 +40,11 @@ public: virtual ~SessionContext(){} virtual bool isLocal(const ConnectionToken* t) const = 0; + virtual bool isAttached() const = 0; virtual ConnectionState& getConnection() = 0; virtual framing::AMQP_ClientProxy& getProxy() = 0; virtual Broker& getBroker() = 0; + virtual uint16_t getChannel() const = 0; }; }} // namespace qpid::broker Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Jun 16 21:21:09 2009 @@ -99,6 +99,11 @@ return handler->getProxy(); } +uint16_t SessionState::getChannel() const { + assert(isAttached()); + return handler->getChannel(); +} + ConnectionState& SessionState::getConnection() { assert(isAttached()); return handler->getConnection(); @@ -119,8 +124,7 @@ void SessionState::disableOutput() { - semanticState.detached();//prevents further activateOutput calls until reattached - getConnection().outputTasks.removeOutputTask(&semanticState); + semanticState.detached(); //prevents further activateOutput calls until reattached } void SessionState::attach(SessionHandler& h) { @@ -362,10 +366,6 @@ QPID_LOG(debug, getId() << ": ready to send, activating output."); assert(handler); semanticState.attached(); - sys::AggregateOutput& tasks = handler->getConnection().outputTasks; - tasks.addOutputTask(&semanticState); - tasks.activateOutput(); - if (rateFlowcontrol) { qpid::sys::ScopedLock<Mutex> l(rateLock); // Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Jun 16 21:21:09 2009 @@ -81,6 +81,9 @@ framing::AMQP_ClientProxy& getProxy(); /** @pre isAttached() */ + uint16_t getChannel() const; + + /** @pre isAttached() */ ConnectionState& getConnection(); bool isLocal(const ConnectionToken* t) const; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jun 16 21:21:09 2009 @@ -755,13 +755,16 @@ expiryPolicy->deliverExpire(id); } -void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) { +void Cluster::errorCheck(const MemberId& m, uint8_t type, uint64_t frameSeq, Lock&) { // If we receive an errorCheck here, it's because we have processed past the point // of the error so respond with ERROR_TYPE_NONE assert(map.getFrameSeq() >= frameSeq); - if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE. + if (type != framing::cluster::ERROR_TYPE_NONE) { // Don't respond to NONE. + QPID_LOG(debug, "Error " << frameSeq << " on " << m << " did not occur locally"); mcast.mcastControl( - ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self); + ClusterErrorCheckBody(ProtocolVersion(), + framing::cluster::ERROR_TYPE_NONE, frameSeq), self); + } } }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Jun 16 21:21:09 2009 @@ -113,7 +113,7 @@ Decoder& getDecoder() { return decoder; } ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; } - + private: typedef sys::Monitor::ScopedLock Lock; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Jun 16 21:21:09 2009 @@ -245,10 +245,13 @@ return sessionState().getSemanticState(); } -void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) { +void Connection::consumerState( + const string& name, bool blocked, bool notifyEnabled, bool isInListener) +{ broker::SemanticState::ConsumerImpl& c = semanticState().find(name); c.setBlocked(blocked); if (notifyEnabled) c.enableNotify(); else c.disableNotify(); + if (isInListener) c.getQueue()->getListeners().addListener(c.shared_from_this()); } void Connection::sessionState( @@ -270,6 +273,17 @@ unknownCompleted, receivedIncomplete); QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); + // The output tasks will be added later in the update process. + connection.getOutputTasks().removeAll(); +} + +void Connection::outputTask(uint16_t channel, const std::string& name) { + broker::SessionState* session = connection.getChannel(channel).getSession(); + if (!session) + throw Exception(QPID_MSG(cluster << " channel not attached " << *this + << "[" << channel << "] ")); + OutputTask* task = &session->getSemanticState().find(name); + connection.getOutputTasks().addOutputTask(task); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Jun 16 21:21:09 2009 @@ -103,7 +103,7 @@ // Called for data delivered from the cluster. void deliveredFrame(const EventFrame&); - void consumerState(const std::string& name, bool blocked, bool notifyEnabled); + void consumerState(const std::string& name, bool blocked, bool notifyEnabled, bool isInListener); // ==== Used in catch-up mode to build initial state. // @@ -115,6 +115,8 @@ const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); + void outputTask(uint16_t channel, const std::string& name); + void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax); void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Tue Jun 16 21:21:09 2009 @@ -48,8 +48,6 @@ LATENCY_TRACK(doOutputTracker.finish(f.getBody())); parent.getCluster().checkQuorum(); { - // FIXME aconway 2009-04-28: locking around next-> may be redundant - // with the fixes to read-credit in the IO layer. Review. sys::Mutex::ScopedLock l(lock); next->send(f); } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Jun 16 21:21:09 2009 @@ -54,6 +54,7 @@ #include "qpid/log/Statement.h" #include "qpid/Url.h" #include <boost/bind.hpp> +#include <boost/cast.hpp> #include <algorithm> namespace qpid { @@ -64,6 +65,8 @@ using broker::Queue; using broker::QueueBinding; using broker::Message; +using broker::SemanticState; + using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; @@ -125,7 +128,8 @@ Broker& b = updaterBroker; b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1)); - // Update queue is used to transfer acquired messages that are no longer on their original queue. + // Update queue is used to transfer acquired messages that are no + // longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); @@ -256,6 +260,16 @@ s.exchangeBind(queue, binding.exchange, binding.key, binding.args); } +void UpdateClient::updateOutputTask(const sys::OutputTask* task) { + const SemanticState::ConsumerImpl* cci = + boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task); + SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci); + uint16_t channel = ci->getParent().getSession().getChannel(); + ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); + QPID_LOG(debug, updaterId << " updating output task " << ci->getName() + << " channel=" << channel); +} + void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) { QPID_LOG(debug, updaterId << " updating connection " << *updateConnection); shadowConnection = catchUpConnection(); @@ -266,6 +280,8 @@ bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); // Safe to use decoder here because we are stalled for update. std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment(); + bc.getOutputTasks().eachOutput( + boost::bind(&UpdateClient::updateOutputTask, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( updateConnection->getId().getMember(), updateConnection->getId().getNumber(), @@ -294,9 +310,9 @@ QPID_LOG(debug, updaterId << " updating exclusive queues."); ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1)); - // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33. QPID_LOG(debug, updaterId << " updating consumers."); - ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this)); + ss->getSemanticState().eachConsumer( + boost::bind(&UpdateClient::updateConsumer, this, _1)); QPID_LOG(debug, updaterId << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); @@ -304,7 +320,7 @@ updateTxState(ss->getSemanticState()); // Tx transaction state. - // Adjust for command counter for message in progress, will be sent after state update. + // Adjust command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); SequenceNumber received = ss->receiverGetReceived().command; if (inProgress) @@ -328,8 +344,11 @@ QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); } -void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) { - QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " << shadowSession.getId()); +void UpdateClient::updateConsumer( + const broker::SemanticState::ConsumerImpl::shared_ptr& ci) +{ + QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " on " + << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), @@ -344,13 +363,12 @@ shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); - ClusterConnectionConsumerStateBody state( - ProtocolVersion(), + ClusterConnectionProxy(shadowSession).consumerState( ci->getName(), ci->isBlocked(), - ci->isNotifyEnabled() + ci->isNotifyEnabled(), + ci->getQueue()->getListeners().contains(ci) ); - client::SessionBase_0_10Access(shadowSession).get()->send(state); QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Tue Jun 16 21:21:09 2009 @@ -91,7 +91,8 @@ void updateConnection(const boost::intrusive_ptr<Connection>& connection); void updateSession(broker::SessionHandler& s); void updateTxState(broker::SemanticState& s); - void updateConsumer(const broker::SemanticState::ConsumerImpl*); + void updateOutputTask(const sys::OutputTask* task); + void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&); MemberId updaterId; MemberId updateeId; Modified: qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp Tue Jun 16 21:21:09 2009 @@ -26,60 +26,6 @@ using qpid::framing::SequenceNumber; using qpid::framing::Buffer; -SequenceNumber::SequenceNumber() : value(0) {} - -SequenceNumber::SequenceNumber(uint32_t v) : value((int32_t) v) {} - -bool SequenceNumber::operator==(const SequenceNumber& other) const -{ - return value == other.value; -} - -bool SequenceNumber::operator!=(const SequenceNumber& other) const -{ - return !(value == other.value); -} - - -SequenceNumber& SequenceNumber::operator++() -{ - value = value + 1; - return *this; -} - -const SequenceNumber SequenceNumber::operator++(int) -{ - SequenceNumber old(value); - value = value + 1; - return old; -} - -SequenceNumber& SequenceNumber::operator--() -{ - value = value - 1; - return *this; -} - -bool SequenceNumber::operator<(const SequenceNumber& other) const -{ - return (value - other.value) < 0; -} - -bool SequenceNumber::operator>(const SequenceNumber& other) const -{ - return other < *this; -} - -bool SequenceNumber::operator<=(const SequenceNumber& other) const -{ - return *this == other || *this < other; -} - -bool SequenceNumber::operator>=(const SequenceNumber& other) const -{ - return *this == other || *this > other; -} - void SequenceNumber::encode(Buffer& buffer) const { buffer.putLong(value); @@ -97,12 +43,6 @@ namespace qpid { namespace framing { -int32_t operator-(const SequenceNumber& a, const SequenceNumber& b) -{ - int32_t result = a.value - b.value; - return result; -} - std::ostream& operator<<(std::ostream& o, const SequenceNumber& n) { return o << n.getValue(); } Modified: qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h Tue Jun 16 21:21:09 2009 @@ -22,6 +22,7 @@ #define _framing_SequenceNumber_h #include "amqp_types.h" +#include <boost/operators.hpp> #include <iosfwd> #include "qpid/CommonImportExport.h" @@ -33,35 +34,37 @@ /** * 4-byte sequence number that 'wraps around'. */ -class SequenceNumber +class SequenceNumber : public +boost::equality_comparable< + SequenceNumber, boost::less_than_comparable< + SequenceNumber, boost::incrementable< + SequenceNumber, boost::decrementable<SequenceNumber> > > > { int32_t value; - public: - QPID_COMMON_EXTERN SequenceNumber(); - QPID_COMMON_EXTERN SequenceNumber(uint32_t v); - - QPID_COMMON_EXTERN SequenceNumber& operator++();//prefix ++ - QPID_COMMON_EXTERN const SequenceNumber operator++(int);//postfix ++ - QPID_COMMON_EXTERN SequenceNumber& operator--();//prefix ++ - QPID_COMMON_EXTERN bool operator==(const SequenceNumber& other) const; - QPID_COMMON_EXTERN bool operator!=(const SequenceNumber& other) const; - QPID_COMMON_EXTERN bool operator<(const SequenceNumber& other) const; - QPID_COMMON_EXTERN bool operator>(const SequenceNumber& other) const; - QPID_COMMON_EXTERN bool operator<=(const SequenceNumber& other) const; - QPID_COMMON_EXTERN bool operator>=(const SequenceNumber& other) const; - uint32_t getValue() const { return (uint32_t) value; } - operator uint32_t() const { return (uint32_t) value; } - - QPID_COMMON_EXTERN friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b); + public: + SequenceNumber(uint32_t v=0) : value(v) {} + + SequenceNumber& operator++() { ++value; return *this; } + SequenceNumber& operator--() { --value; return *this; } + bool operator==(const SequenceNumber& other) const { return value == other.value; } + bool operator<(const SequenceNumber& other) const { return (value - other.value) < 0; } + uint32_t getValue() const { return uint32_t(value); } + operator uint32_t() const { return uint32_t(value); } void encode(Buffer& buffer) const; void decode(Buffer& buffer); uint32_t encodedSize() const; template <class S> void serialize(S& s) { s(value); } + + friend inline int32_t operator-(const SequenceNumber& a, const SequenceNumber& b); }; +inline int32_t operator-(const SequenceNumber& a, const SequenceNumber& b) { + return int32_t(a.value - b.value); +} + struct Window { SequenceNumber hwm; Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp Tue Jun 16 21:21:09 2009 @@ -26,50 +26,66 @@ namespace qpid { namespace sys { +AggregateOutput::AggregateOutput(OutputControl& c) : busy(false), control(c) {} + void AggregateOutput::abort() { control.abort(); } void AggregateOutput::activateOutput() { control.activateOutput(); } void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); } -bool AggregateOutput::hasOutput() { - for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i) - if ((*i)->hasOutput()) return true; - return false; +bool AggregateOutput::AggregateOutput::hasOutput() { + Mutex::ScopedLock l(lock); + return !tasks.empty(); } -bool AggregateOutput::doOutput() -{ - bool result = false; - if (!tasks.empty()) { - if (next >= tasks.size()) next = next % tasks.size(); - - size_t start = next; - //loop until a task generated some output - while (!result) { - result = tasks[next++]->doOutput(); - if (tasks.empty()) break; - if (next >= tasks.size()) next = next % tasks.size(); - if (start == next) break; +// Clear the busy flag and notify waiting threads in destructor. +struct ScopedBusy { + bool& flag; + Monitor& monitor; + ScopedBusy(bool& f, Monitor& m) : flag(f), monitor(m) { f = true; } + ~ScopedBusy() { flag = false; monitor.notifyAll(); } +}; + +bool AggregateOutput::doOutput() { + Mutex::ScopedLock l(lock); + ScopedBusy sb(busy, lock); + + while (!tasks.empty()) { + OutputTask* t=tasks.front(); + tasks.pop_front(); + bool didOutput; + { + // Allow concurrent call to addOutputTask. + // removeOutputTask will wait till !busy before removing a task. + Mutex::ScopedUnlock u(lock); + didOutput = t->doOutput(); + } + if (didOutput) { + tasks.push_back(t); + return true; } } - return result; + return false; } - -void AggregateOutput::addOutputTask(OutputTask* t) -{ - tasks.push_back(t); + +void AggregateOutput::addOutputTask(OutputTask* task) { + Mutex::ScopedLock l(lock); + tasks.push_back(task); } -void AggregateOutput::removeOutputTask(OutputTask* t) -{ - TaskList::iterator i = std::find(tasks.begin(), tasks.end(), t); - if (i != tasks.end()) tasks.erase(i); +void AggregateOutput::removeOutputTask(OutputTask* task) { + Mutex::ScopedLock l(lock); + while (busy) lock.wait(); + tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end()); } - + void AggregateOutput::removeAll() { + Mutex::ScopedLock l(lock); + while (busy) lock.wait(); tasks.clear(); } + }} // namespace qpid::sys Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h Tue Jun 16 21:21:09 2009 @@ -21,47 +21,58 @@ #ifndef _AggregateOutput_ #define _AggregateOutput_ -#include "Mutex.h" +#include "Monitor.h" #include "OutputControl.h" #include "OutputTask.h" #include "qpid/CommonImportExport.h" #include <algorithm> -#include <vector> +#include <deque> namespace qpid { namespace sys { - class AggregateOutput : public OutputTask, public OutputControl - { - typedef std::vector<OutputTask*> TaskList; - - TaskList tasks; - size_t next; - OutputControl& control; - - public: - AggregateOutput(OutputControl& c) : next(0), control(c) {}; - //this may be called on any thread - QPID_COMMON_EXTERN void abort(); - QPID_COMMON_EXTERN void activateOutput(); - QPID_COMMON_EXTERN void giveReadCredit(int32_t); - - //all the following will be called on the same thread - QPID_COMMON_EXTERN bool doOutput(); - QPID_COMMON_EXTERN bool hasOutput(); - QPID_COMMON_EXTERN void addOutputTask(OutputTask* t); - QPID_COMMON_EXTERN void removeOutputTask(OutputTask* t); - QPID_COMMON_EXTERN void removeAll(); - - /** Apply f to each OutputTask* in the tasks list */ - template <class F> void eachOutput(F f) { - std::for_each(tasks.begin(), tasks.end(), f); - } - }; +/** + * Holds a collection of output tasks, doOutput picks the next one to execute. + * + * Tasks are automatically removed if their doOutput() or hasOutput() returns false. + * + * Thread safe. addOutputTask may be called in one connection thread while + * doOutput is called in another. + */ + +class AggregateOutput : public OutputTask, public OutputControl +{ + typedef std::deque<OutputTask*> TaskList; + + Monitor lock; + TaskList tasks; + bool busy; + OutputControl& control; + + public: + QPID_COMMON_EXTERN AggregateOutput(OutputControl& c); + + // These may be called concurrently with any function. + QPID_COMMON_EXTERN void abort(); + QPID_COMMON_EXTERN void activateOutput(); + QPID_COMMON_EXTERN void giveReadCredit(int32_t); + QPID_COMMON_EXTERN void addOutputTask(OutputTask* t); + + // These functions must not be called concurrently with each other. + QPID_COMMON_EXTERN bool doOutput(); + QPID_COMMON_EXTERN bool hasOutput(); + QPID_COMMON_EXTERN void removeOutputTask(OutputTask* t); + QPID_COMMON_EXTERN void removeAll(); + + /** Apply f to each OutputTask* in the tasks list */ + template <class F> void eachOutput(F f) { + Mutex::ScopedLock l(lock); + std::for_each(tasks.begin(), tasks.end(), f); + } +}; -} -} +}} // namespace qpid::sys #endif Modified: qpid/trunk/qpid/cpp/xml/cluster.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=785408&r1=785407&r2=785408&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/xml/cluster.xml (original) +++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Jun 16 21:21:09 2009 @@ -65,8 +65,6 @@ </class> - <!-- TODO aconway 2008-09-10: support for un-attached connections. --> - <!-- Controls associated with a specific connection. --> <class name="cluster-connection" code="0x81" label="Qpid clustering extensions."> @@ -91,6 +89,8 @@ <field name="name" type="str8"/> <field name="blocked" type="bit"/> <field name="notifyEnabled" type="bit"/> + <!-- Flag set if the consumer is in its queue's listener set. --> + <field name="is-in-listener" type="bit"/> </control> <!-- Delivery-record for outgoing messages sent but not yet accepted. --> @@ -121,8 +121,14 @@ <control name="tx-end" code="0x17"/> <control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/> </control> + <!-- Consumers in the connection's output task --> + <control name="output-task" code="0x19"> + <field name="channel" type="uint16"/> + <field name="name" type="str8"/> + </control> + <!-- Complete a session state update. --> - <control name="session-state" code="0x1F" label="Set session state during a brain update."> + <control name="session-state" code="0x1F"> <!-- Target session deduced from channel number. --> <field name="replay-start" type="sequence-no"/> <!-- Replay frames will start from this point.--> <field name="command-point" type="sequence-no"/> <!-- Id of next command sent --> --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org