Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Messages.h Fri Aug 10 12:04:27 2012 @@ -29,7 +29,8 @@ namespace framing { class SequenceNumber; } namespace broker { -struct QueuedMessage; +class Message; +class QueueCursor; /** * This interface abstracts out the access to the messages held for @@ -39,8 +40,7 @@ struct QueuedMessage; class Messages { public: - typedef boost::function1<void, QueuedMessage&> Functor; - typedef boost::function1<bool, QueuedMessage&> Predicate; + typedef boost::function1<void, Message&> Functor; virtual ~Messages() {} /** @@ -51,47 +51,44 @@ class Messages /** * Called when a message is deleted from the queue. */ - virtual bool deleted(const QueuedMessage&) = 0; + virtual bool deleted(const QueueCursor&) = 0; /** - * Releases an acquired message, making it available again. + * Makes a message available. */ - virtual void release(const QueuedMessage&) = 0; + virtual void publish(const Message& added) = 0; /** - * Acquire the message at the specified position, returning true - * if found, false otherwise. The acquired message is passed back - * via the second parameter. - */ - virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&) = 0; - /** - * Find the message at the specified position, returning true if - * found, false otherwise. The matched message is passed back via - * the second parameter. + * Retrieve the next message for the given cursor. A reference to + * the message is passed back via the second parameter. + * + * @return a pointer to the message if there is one, in which case + * the cursor that points to it is assigned to cursor; null + * otherwise. */ - virtual bool find(const framing::SequenceNumber&, QueuedMessage&) = 0; + virtual Message* next(QueueCursor& cursor) = 0; + /** - * Retrieve the next message to be given to a browsing - * subscription that has reached the specified position. The next - * message is passed back via the second parameter. + * Release the message i.e. return it to the available state + * unless it has already been deleted. * - * @param unacquired, if true, will only browse unacquired messages - * - * @return true if there is another message, false otherwise. + * @return a pointer to the Message if it is still in acquired state and + * hence can be released; null if it has already been deleted */ - virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool unacquired) = 0; + virtual Message* release(const QueueCursor& cursor) = 0; /** - * Retrieve the next message available for a consuming - * subscription. - * - * @return true if there is such a message, false otherwise. + * Find the message with the specified sequence number, returning + * a pointer if found, null otherwise. A cursor to the matched + * message can be passed back via the second parameter, regardless + * of whether the message is found, using this cursor to call + * next() will give the next message greater than position if one + * exists. */ - virtual bool consume(QueuedMessage&) = 0; + virtual Message* find(const framing::SequenceNumber&, QueueCursor*) = 0; + /** - * Pushes a message to the back of the 'queue'. For some types of - * queue this may cause another message to be removed; if that is - * the case the method will return true and the removed message - * will be passed out via the second parameter. + * Find the message at the specified position, returning a pointer if + * found, null otherwise. */ - virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0; + virtual Message* find(const QueueCursor&) = 0; /** * Add an already acquired message to the queue. @@ -99,25 +96,11 @@ class Messages * Only need be implemented by subclasses that keep track of * acquired messages. */ - virtual void updateAcquired(const QueuedMessage&) { } - - /** - * Set the position of the back of the queue. Next message enqueued will be n+1. - *@pre Any messages with seq > n must already be dequeued. - */ - virtual void setPosition(const framing::SequenceNumber& /*n*/) = 0; - + //virtual void updateAcquired(const QueuedMessage&) { } /** * Apply, the functor to each message held */ - virtual void foreach(Functor) = 0; - /** - * Remove every message held that for which the specified - * predicate returns true - */ - virtual void removeIf(Predicate) = 0; - private: }; }} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Persistable.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Persistable.h?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Persistable.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Persistable.h Fri Aug 10 12:04:27 2012 @@ -32,7 +32,7 @@ namespace broker { /** * Base class for all persistable objects */ -class Persistable : public RefCounted +class Persistable : public virtual RefCounted { public: /** Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Fri Aug 10 12:04:27 2012 @@ -29,132 +29,23 @@ using namespace qpid::broker; namespace qpid { namespace broker { -class MessageStore; - PersistableMessage::~PersistableMessage() {} - -PersistableMessage::PersistableMessage() : - asyncDequeueCounter(0), - store(0) -{} +PersistableMessage::PersistableMessage() : persistenceId(0) {} void PersistableMessage::flush() { - syncList copy; - { - sys::ScopedLock<sys::Mutex> l(storeLock); - if (store) { - copy = synclist; - } else { - return;//early exit as nothing to do - } - } - for (syncList::iterator i = copy.begin(); i != copy.end(); ++i) { - PersistableQueue::shared_ptr q(i->lock()); - if (q) { - q->flush(); - } - } -} - -void PersistableMessage::setContentReleased() -{ - contentReleaseState.released = true; -} - -bool PersistableMessage::isContentReleased() const -{ - return contentReleaseState.released; -} - - -bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ - if (store && (queue->getPersistenceId()!=0)) { - for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { - PersistableQueue::shared_ptr q(i->lock()); - if (q && q->getPersistenceId() == queue->getPersistenceId()) return true; - } - } - return false; + //TODO: is this really the right place for this? } -void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store) { - if (_store){ - sys::ScopedLock<sys::Mutex> l(storeLock); - store = _store; - boost::weak_ptr<PersistableQueue> q(queue); - synclist.push_back(q); - } -} - -void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { - addToSyncList(queue, _store); - enqueueStart(); -} - -bool PersistableMessage::isDequeueComplete() { - sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); - return asyncDequeueCounter == 0; -} - -void PersistableMessage::dequeueComplete() { - bool notify = false; - { - sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); - if (asyncDequeueCounter > 0) { - if (--asyncDequeueCounter == 0) { - notify = true; - } - } - } - if (notify) allDequeuesComplete(); -} - -void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { - if (_store){ - sys::ScopedLock<sys::Mutex> l(storeLock); - store = _store; - boost::weak_ptr<PersistableQueue> q(queue); - synclist.push_back(q); - } - dequeueAsync(); -} - -void PersistableMessage::dequeueAsync() { - sys::ScopedLock<sys::Mutex> l(asyncDequeueLock); - asyncDequeueCounter++; -} - -PersistableMessage::ContentReleaseState::ContentReleaseState() : blocked(false), requested(false), released(false) {} - -void PersistableMessage::setStore(MessageStore* s) -{ - store = s; -} - -void PersistableMessage::requestContentRelease() +void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*) { - contentReleaseState.requested = true; -} -void PersistableMessage::blockContentRelease() -{ - contentReleaseState.blocked = true; -} -bool PersistableMessage::checkContentReleasable() -{ - return contentReleaseState.requested && !contentReleaseState.blocked; -} - -bool PersistableMessage::isContentReleaseBlocked() -{ - return contentReleaseState.blocked; + enqueueStart(); } -bool PersistableMessage::isContentReleaseRequested() -{ - return contentReleaseState.requested; -} +bool PersistableMessage::isDequeueComplete() { return false; } +void PersistableMessage::dequeueComplete() {} +void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {} }} Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Fri Aug 10 12:04:27 2012 @@ -24,29 +24,30 @@ #include <string> #include <list> -#include <boost/shared_ptr.hpp> -#include <boost/weak_ptr.hpp> +#include <map> +#include <boost/intrusive_ptr.hpp> #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/Persistable.h" #include "qpid/framing/amqp_types.h" +#include "qpid/framing/amqp_framing.h" #include "qpid/sys/Mutex.h" #include "qpid/broker/PersistableQueue.h" #include "qpid/broker/AsyncCompletion.h" namespace qpid { +namespace types { +class Variant; +} namespace broker { class MessageStore; +class Queue; /** * Base class for persistable messages. */ class PersistableMessage : public Persistable { - typedef std::list< boost::weak_ptr<PersistableQueue> > syncList; - sys::Mutex asyncDequeueLock; - sys::Mutex storeLock; - /** * "Ingress" messages == messages sent _to_ the broker. * Tracks the number of outstanding asynchronous operations that must @@ -56,85 +57,44 @@ class PersistableMessage : public Persis * operations have completed, the transfer of this message from the client * may be considered complete. */ - AsyncCompletion ingressCompletion; - - /** - * Tracks the number of outstanding asynchronous dequeue - * operations. When the message is dequeued asynchronously the - * count is incremented; when that dequeue completes it is - * decremented. Thus when it is 0, there are no outstanding - * dequeues. - */ - int asyncDequeueCounter; - - void dequeueAsync(); - - syncList synclist; - struct ContentReleaseState - { - bool blocked; - bool requested; - bool released; - - ContentReleaseState(); - }; - ContentReleaseState contentReleaseState; - - protected: - /** Called when all dequeues are complete for this message. */ - virtual void allDequeuesComplete() = 0; - - void setContentReleased(); - - MessageStore* store; - + boost::intrusive_ptr<AsyncCompletion> ingressCompletion; + mutable uint64_t persistenceId; public: - typedef boost::shared_ptr<PersistableMessage> shared_ptr; - - /** - * @returns the size of the headers when encoded - */ - virtual uint32_t encodedHeaderSize() const = 0; - virtual ~PersistableMessage(); - PersistableMessage(); void flush(); - - QPID_BROKER_EXTERN bool isContentReleased() const; QPID_BROKER_EXTERN void setStore(MessageStore*); - void requestContentRelease(); - void blockContentRelease(); - bool checkContentReleasable(); - bool isContentReleaseBlocked(); - bool isContentReleaseRequested(); virtual QPID_BROKER_EXTERN bool isPersistent() const = 0; /** track the progress of a message received by the broker - see ingressCompletion above */ - QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion.isDone(); } - QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return ingressCompletion; } + QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion->isDone(); } + QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return *ingressCompletion; } + QPID_BROKER_INLINE_EXTERN void setIngressCompletion(boost::intrusive_ptr<AsyncCompletion> i) { ingressCompletion = i; } - QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion.startCompleter(); } - QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion.finishCompleter(); } + QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion->startCompleter(); } + QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion->finishCompleter(); } QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store); QPID_BROKER_EXTERN bool isDequeueComplete(); - QPID_BROKER_EXTERN void dequeueComplete(); - QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store); - bool isStoredOnQueue(PersistableQueue::shared_ptr queue); - - void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store); + uint64_t getPersistenceId() const { return persistenceId; } + void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } + + + virtual void decodeHeader(framing::Buffer& buffer) = 0; + virtual void decodeContent(framing::Buffer& buffer) = 0; + virtual uint32_t encodedHeaderSize() const = 0; + virtual boost::intrusive_ptr<PersistableMessage> merge(const std::map<std::string, qpid::types::Variant>& annotations) const = 0; }; }} Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp Fri Aug 10 12:04:27 2012 @@ -19,24 +19,53 @@ * */ #include "qpid/broker/PriorityQueue.h" +#include "qpid/broker/Message.h" #include "qpid/broker/Queue.h" -#include "qpid/broker/QueuedMessage.h" +#include "qpid/broker/QueueCursor.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" +#include <algorithm> #include <cmath> +#include <boost/bind.hpp> namespace qpid { namespace broker { +namespace { +class PriorityContext : public CursorContext { + public: + std::vector<QueueCursor> position; + PriorityContext(size_t levels, SubscriptionType type) : position(levels, QueueCursor(type)) {} +}; +} + PriorityQueue::PriorityQueue(int l) : levels(l), - messages(levels, Deque()), - frontLevel(0), haveFront(false), cached(false) {} + messages(levels, Deque(boost::bind(&PriorityQueue::priorityPadding, this, _1))), + counters(levels, framing::SequenceNumber()), + fifo(boost::bind(&PriorityQueue::fifoPadding, this, _1)), + frontLevel(0), haveFront(false), cached(false) +{ +} -bool PriorityQueue::deleted(const QueuedMessage& qm) { - bool deleted = fifo.deleted(qm); - if (deleted) erase(qm); - return deleted; +bool PriorityQueue::deleted(const QueueCursor& c) +{ + MessagePointer* ptr = fifo.find(c); + if (ptr && ptr->holder) { + //mark the message as deleted + ptr->holder->message.setState(DELETED); + //clean the deque for the relevant priority level + boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(c.context); + messages[ptr->holder->priority].clean(); + //stop referencing that message holder (it may now have been + //deleted) + ptr->holder = 0; + //clean fifo index + fifo.clean(); + return true; + } else { + return false; + } } size_t PriorityQueue::size() @@ -44,85 +73,69 @@ size_t PriorityQueue::size() return fifo.size(); } -namespace { -bool before(QueuedMessage* a, QueuedMessage* b) { return *a < *b; } -} - -void PriorityQueue::release(const QueuedMessage& message) +Message* PriorityQueue::next(QueueCursor& cursor) { - QueuedMessage* qm = fifo.releasePtr(message); - if (qm) { - uint p = getPriorityLevel(message); - messages[p].insert( - lower_bound(messages[p].begin(), messages[p].end(), qm, before), qm); - clearCache(); - } -} - - -void PriorityQueue::erase(const QueuedMessage& qm) { - size_t i = getPriorityLevel(qm); - if (!messages[i].empty()) { - long diff = qm.position.getValue() - messages[i].front()->position.getValue(); - if (diff < 0) return; - long maxEnd = std::min(size_t(diff), messages[i].size()); - QueuedMessage mutableQm = qm; // need non-const qm for lower_bound - Deque::iterator l = - lower_bound(messages[i].begin(),messages[i].begin()+maxEnd, &mutableQm, before); - if (l != messages[i].end() && (*l)->position == qm.position) { - messages[i].erase(l); - clearCache(); - return; + boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(cursor.context); + if (!ctxt) { + ctxt = boost::shared_ptr<PriorityContext>(new PriorityContext(levels, CONSUMER)); + cursor.context = ctxt; + } + if (cursor.type == REPLICATOR) { + //browse in fifo order + MessagePointer* ptr = fifo.next(cursor); + return ptr ? &(ptr->holder->message) : 0; + } else if (cursor.type == PURGE) { + //iterate over message in reverse priority order (i.e. purge lowest priority message first) + //ignore any fairshare configuration here as well + for (int p = 0; p < levels; ++p) { + MessageHolder* holder = messages[p].next(ctxt->position[p]); + if (holder) { + cursor.setPosition(holder->message.getSequence(), 0); + return &(holder->message); + } } + return 0; + } else { + //check each level in turn, in priority order, for any more messages + Priority p = firstLevel(); + do { + MessageHolder* holder = messages[p.current].next(ctxt->position[p.current]); + if (holder) { + cursor.setPosition(holder->message.getSequence(), 0); + return &(holder->message); + } + } while (nextLevel(p)); + return 0; } } -bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message) +Message* PriorityQueue::find(const QueueCursor& cursor) { - bool acquired = fifo.acquire(position, message); - if (acquired) erase(message); // No longer available - return acquired; + return find(cursor.position, 0); } -bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message) +Message* PriorityQueue::find(const framing::SequenceNumber& position, QueueCursor* cursor) { - return fifo.find(position, message); + MessagePointer* ptr = fifo.find(position, cursor); + return ptr ? &(ptr->holder->message) : 0; } -bool PriorityQueue::browse( - const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) +void PriorityQueue::publish(const Message& published) { - return fifo.browse(position, message, unacquired); + MessageHolder holder; + holder.message = published; + holder.priority = getPriorityLevel(published); + holder.id = ++(counters[holder.priority]); + MessagePointer pointer; + pointer.holder = &(messages[holder.priority].publish(holder)); + pointer.id = published.getSequence(); + fifo.publish(pointer); } -bool PriorityQueue::consume(QueuedMessage& message) +Message* PriorityQueue::release(const QueueCursor& cursor) { - if (checkFront()) { - QueuedMessage* pm = messages[frontLevel].front(); - messages[frontLevel].pop_front(); - clearCache(); - pm->status = QueuedMessage::ACQUIRED; // Updates FIFO index - message = *pm; - return true; - } else { - return false; - } -} - -bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) -{ - QueuedMessage* qmp = fifo.pushPtr(added); - messages[getPriorityLevel(added)].push_back(qmp); - clearCache(); - return false; // Adding a message never causes one to be removed for deque -} - -void PriorityQueue::updateAcquired(const QueuedMessage& acquired) { - fifo.updateAcquired(acquired); -} - -void PriorityQueue::setPosition(const framing::SequenceNumber& n) { - fifo.setPosition(n); + MessagePointer* ptr = fifo.release(cursor); + return ptr ? &(ptr->holder->message) : 0; } void PriorityQueue::foreach(Functor f) @@ -130,62 +143,87 @@ void PriorityQueue::foreach(Functor f) fifo.foreach(f); } -void PriorityQueue::removeIf(Predicate p) -{ - for (int priority = 0; priority < levels; ++priority) { - for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) { - if (p(**i)) { - (*i)->status = QueuedMessage::DELETED; // Updates fifo index - i = messages[priority].erase(i); - clearCache(); - } else { - ++i; - } - } - } - fifo.clean(); -} - -uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const +uint PriorityQueue::getPriorityLevel(const Message& m) const { - uint priority = m.payload->getPriority(); + uint priority = m.getPriority(); //Use AMQP 0-10 approach to mapping priorities to a fixed level //(see rule priority-level-implementation) const uint firstLevel = 5 - uint(std::min(5.0, std::ceil((double) levels/2.0))); if (priority <= firstLevel) return 0; return std::min(priority - firstLevel, (uint)levels-1); } +PriorityQueue::MessagePointer PriorityQueue::fifoPadding(qpid::framing::SequenceNumber id) +{ + PriorityQueue::MessagePointer pointer; + pointer.holder = 0; + pointer.id = id; + return pointer; +} -void PriorityQueue::clearCache() +PriorityQueue::MessageHolder PriorityQueue::priorityPadding(qpid::framing::SequenceNumber id) { - cached = false; + PriorityQueue::MessageHolder holder; + holder.id = id; + holder.message.setState(DELETED); + return holder; } -bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m) +PriorityQueue::Priority PriorityQueue::firstLevel() { - for (int p = levels-1; p >= 0; --p) { - if (!m[p].empty()) { - l = p; - return true; - } + return Priority(levels - 1); +} +bool PriorityQueue::nextLevel(Priority& p) +{ + if (p.current > 0) { + --(p.current); + return true; + } else { + return false; } - return false; } -bool PriorityQueue::checkFront() +framing::SequenceNumber PriorityQueue::MessageHolder::getSequence() const +{ + return id; +} +void PriorityQueue::MessageHolder::setState(MessageState s) { - if (!cached) { - haveFront = findFrontLevel(frontLevel, messages); - cached = true; + message.setState(s); +} +MessageState PriorityQueue::MessageHolder::getState() const +{ + return message.getState(); +} +PriorityQueue::MessageHolder::operator Message&() +{ + return message; +} +framing::SequenceNumber PriorityQueue::MessagePointer::getSequence() const +{ + if (holder) { + return holder->message.getSequence(); + } else { + //this is used when the instance is merely acting as padding + return id; } - return haveFront; } - -uint PriorityQueue::getPriority(const QueuedMessage& message) +void PriorityQueue::MessagePointer::setState(MessageState s) { - const PriorityQueue* queue = dynamic_cast<const PriorityQueue*>(&(message.queue->getMessages())); - if (queue) return queue->getPriorityLevel(message); - else return 0; + if (holder) { + holder->message.setState(s); + } +} +MessageState PriorityQueue::MessagePointer::getState() const +{ + if (holder) { + return holder->message.getState(); + } else { + return DELETED; + } +} +PriorityQueue::MessagePointer::operator Message&() +{ + assert(holder); + return holder->message; } - }} // namespace qpid::broker Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h?rev=1371676&r1=1371675&r2=1371676&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h Fri Aug 10 12:04:27 2012 @@ -22,6 +22,7 @@ * */ #include "qpid/broker/MessageDeque.h" +#include "qpid/broker/IndexedDeque.h" #include "qpid/sys/IntegerTypes.h" #include <deque> #include <vector> @@ -44,42 +45,63 @@ class PriorityQueue : public Messages virtual ~PriorityQueue() {} size_t size(); - bool deleted(const QueuedMessage&); - void release(const QueuedMessage&); - bool acquire(const framing::SequenceNumber&, QueuedMessage&); - bool find(const framing::SequenceNumber&, QueuedMessage&); - bool browse(const framing::SequenceNumber&, QueuedMessage&, bool); - bool consume(QueuedMessage&); - bool push(const QueuedMessage& added, QueuedMessage& removed); - void updateAcquired(const QueuedMessage& acquired); - void setPosition(const framing::SequenceNumber&); - void foreach(Functor); - void removeIf(Predicate); - - static uint getPriority(const QueuedMessage&); + bool deleted(const QueueCursor&); + void publish(const Message& added); + Message* next(QueueCursor&); + Message* release(const QueueCursor& cursor); + Message* find(const QueueCursor&); + Message* find(const framing::SequenceNumber&, QueueCursor*); + void foreach(Functor); + static uint getPriority(const Message&); protected: - typedef std::deque<QueuedMessage*> Deque; - typedef std::vector<Deque> PriorityLevels; - virtual bool findFrontLevel(uint& p, PriorityLevels&); - const int levels; + struct Priority + { + const int start; + int current; + Priority(int s) : start(s), current(start) {} + }; + virtual Priority firstLevel(); + virtual bool nextLevel(Priority& ); private: - /** Available messages separated by priority and sorted in priority order. - * Holds pointers to the QueuedMessages in fifo + struct MessageHolder + { + Message message; + int priority; + framing::SequenceNumber id; + framing::SequenceNumber getSequence() const; + void setState(MessageState); + MessageState getState() const; + operator Message&(); + }; + struct MessagePointer + { + MessageHolder* holder; + framing::SequenceNumber id;//used only for padding + framing::SequenceNumber getSequence() const; + void setState(MessageState); + MessageState getState() const; + operator Message&(); + }; + typedef IndexedDeque<MessageHolder> Deque; + typedef std::vector<Deque> PriorityLevels; + typedef std::vector<framing::SequenceNumber> Counters; + + /** Holds pointers to messages (stored in the fifo index) separated by priority. */ PriorityLevels messages; - /** FIFO index of all messsagse (including acquired messages) for fast browsing and indexing */ - MessageDeque fifo; + Counters counters; + /** FIFO index of messages for fast browsing and indexing */ + IndexedDeque<MessagePointer> fifo; uint frontLevel; bool haveFront; bool cached; - void erase(const QueuedMessage&); - uint getPriorityLevel(const QueuedMessage&) const; - void clearCache(); - bool checkFront(); + uint getPriorityLevel(const Message&) const; + MessageHolder priorityPadding(qpid::framing::SequenceNumber); + MessagePointer fifoPadding(qpid::framing::SequenceNumber); }; }} // namespace qpid::broker --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
