# Prioritised Message Delivery from Queues
## Status
Draft
## Summary
Implementation of prioritised delivery from message queues in the c++
broker.
## Problem
Applications designers occasionally want to have the broker recognise
application assigned message priorities, and deliver higher priority
messages ahead of earlier messages of lower priority.
This behaviour is defined in both AMQP and JMS specifications and is a
common feature of message brokers
## Solution
The solution will allow a queue to be configured to recognise a
particular number of message priority levels. Message delivery will
dispatch messages in priority order.
This will involve additional logic in qpid::broker::Queue and
qpid::broker::QueuePolicy. I have an initial patch (attached). However
I would like as part of this work to refactor the Queue class to
better support different logic based on configuration. This would
hopefully eliminate a lot of the confusing conditional logic in the
existing single class by allowing different implementations of a
common contract (or set of contracts) that a queue implementation
fulfills.
This only directly affects the c++ broker. The clients already allow
message priority to be set. However these will need to be tested
against the c++ broker implementation. The java broker already
supports prioritised message delivery.
In addition to the basic behaviour the solution will also provide a
mechanism to customise the message delivery to avoid the situation
where high priority messages completely starve out lower priority
messages. This will be done by allowing the queue to be configured to
allow through lower priority messages every n messages for a
particular priority level.
Within a ring queue, the messages should be dequeued in order of
priority, then age. I.e. lower priority messages will be removed first
when the capacity is reached.
## Rationale
The AMQP 0-10 specification defines rules on how message priority
should be treated. This solution will follow those rules (as does the
existing implementation within the java broker).
## Implementation Notes
Covered by QPID-529.
1. Refactor qpid::broker::Queue to allow conditional logic to be
supplied in different implementations of well defined interfaces,
rather than the convoluted, hard to follow and error probe situation
we have at present where this logic is mixed in throughout all
methods.
2. Provide an implementation that supports the functionality of the
attached patch in a cleaner and more maintainable form.
## Consequences
This will hopefully make developments in and around broker side
queueing, simpler and less error-prone, in particular ensuring
different features if combined do not inadvertently break each other.
It will have no impact on the release artefacts.
It will introduce new queue configuration options (aligned with the
options available for the same purpose in the java broker) and these
will need to be documented.
It doesn't break backwards compatibility in any way. Unless
specifically configured to support message priorities, queue behaviour
will remain unchanged.
## Contributor-in-Charge
[email protected]
## Contributors
TBD
## Version
1.0
commit 8b1fe28234716a96612fb38851a60fd148264ac3
Author: Gordon Sim <gordon@GRST500.(none)>
Date: Fri Aug 13 16:23:52 2010 +0100
Initial priority queue implementation
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 40ef605..d54905e 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -45,6 +45,7 @@
#include <boost/bind.hpp>
#include <boost/intrusive_ptr.hpp>
+#include <boost/lexical_cast.hpp>
using namespace qpid::broker;
@@ -104,7 +105,8 @@ Queue::Queue(const string& _name, bool _autodelete,
insertSeqNo(0),
broker(b),
deleted(false),
- barrier(*this)
+ barrier(*this),
+ prioritySlots(1)
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -212,7 +214,8 @@ void Queue::requeue(const QueuedMessage& msg){
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
- messages.insert(lower_bound(messages.begin(), messages.end(), msg), msg);
+ uint p = priorityLevel(msg.payload->getPriority());
+ messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), msg), msg);
listeners.populate(copy);
// for persistLastNode - don't force a message twice to disk, but force it if no force before
@@ -243,14 +246,14 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
QPID_LOG(debug, "Attempting to acquire message at " << position);
Messages::iterator i = findAt(position);
- if (i != messages.end() ) {
+ if (i != messages[0].end() ) {
message = *i;
if (lastValueQueue) {
clearLVQIndex(*i);
}
QPID_LOG(debug,
"Acquired message at " << i->position << " from " << name);
- messages.erase(i);
+ messages[0].erase(i);
return true;
}
QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
@@ -261,9 +264,20 @@ bool Queue::acquire(const QueuedMessage& msg) {
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
+ if (prioritySlots > 1) {
+ for (int i = 0; i < prioritySlots; ++i) {
+ Messages::iterator m = lower_bound(messages[i].begin(), messages[i].end(), msg);
+ if (m != messages[i].end() && m->position == msg.position) {
+ messages[i].erase(m);
+ return true;
+ }
+ }
+ return false;
+ }
+
QPID_LOG(debug, "attempting to acquire " << msg.position);
Messages::iterator i = findAt(msg.position);
- if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
+ if ((i != messages[0].end() && i->position == msg.position) && // note that in some cases payload not be set
(!lastValueQueue ||
(lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
) {
@@ -272,7 +286,7 @@ bool Queue::acquire(const QueuedMessage& msg) {
QPID_LOG(debug,
"Match found, acquire succeeded: " <<
i->position << " == " << msg.position);
- messages.erase(i);
+ messages[0].erase(i);
return true;
}
@@ -286,7 +300,7 @@ void Queue::notifyListener()
QueueListeners::NotificationSet set;
{
Mutex::ScopedLock locker(messageLock);
- if (messages.size()) {
+ if (messages[0].size()) {
listeners.populate(set);
}
}
@@ -315,12 +329,15 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
{
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
+ int count = 0;
+ for (int i = 0; i < prioritySlots; i++)
+ count += messages[i].size();
+ if (count == 0) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
} else {
- QueuedMessage msg = getFront();
+ QueuedMessage msg = getFrontWithFairshare();
if (msg.payload->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
popAndDequeue();
@@ -382,7 +399,7 @@ void Queue::removeListener(Consumer::shared_ptr c)
{
Mutex::ScopedLock locker(messageLock);
listeners.removeListener(c);
- if (messages.size()) {
+ if (messages[0].size()) {
listeners.populate(set);
}
}
@@ -400,16 +417,39 @@ bool Queue::dispatch(Consumer::shared_ptr c)
}
}
+bool Queue::seekInAllSlots(QueuedMessage& msg, Consumer::shared_ptr c) {
+ //find 'next' message i.e. message with lowest position that
+ //is greater than c->position, message may be in any priority
+ //level
+ QueuedMessage match;
+ match.position = c->position;
+ QueuedMessage* next(0);
+ for (int i = 0; i < prioritySlots; ++i) {
+ Messages::iterator m = upper_bound(messages[i].begin(), messages[i].end(), match);
+ if (m != messages[i].end() && (!next || m->position < next->position)) {
+ next = &(*m);
+ }
+ }
+ if (next) {
+ msg = *next;
+ return true;
+ } else {
+ return false;
+ }
+}
+
// Find the next message
bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Mutex::ScopedLock locker(messageLock);
- if (!messages.empty() && messages.back().position > c->position) {
+ if (prioritySlots > 1) return seekInAllSlots(msg, c);
+
+ if (!messages[0].empty() && messages[0].back().position > c->position) {
if (c->position < getFront().position) {
msg = getFront();
return true;
} else {
Messages::iterator pos = findAt(c->position);
- if (pos != messages.end() && pos+1 != messages.end()) {
+ if (pos != messages[0].end() && pos+1 != messages[0].end()) {
msg = *(pos+1);
return true;
}
@@ -421,31 +461,31 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
- if(!messages.empty()){
+ if(!messages[0].empty()){
QueuedMessage compM;
compM.position = pos;
- unsigned long diff = pos.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
+ unsigned long diff = pos.getValue() - messages[0].front().position.getValue();
+ long maxEnd = diff < messages[0].size()? diff : messages[0].size();
- Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
- if (i!= messages.end() && i->position == pos)
+ Messages::iterator i = lower_bound(messages[0].begin(),messages[0].begin()+maxEnd,compM);
+ if (i!= messages[0].end() && i->position == pos)
return i;
}
- return messages.end(); // no match found.
+ return messages[0].end(); // no match found.
}
QueuedMessage Queue::find(SequenceNumber pos) const {
Mutex::ScopedLock locker(messageLock);
- if(!messages.empty()){
+ if(!messages[0].empty()){
QueuedMessage compM;
compM.position = pos;
- unsigned long diff = pos.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
+ unsigned long diff = pos.getValue() - messages[0].front().position.getValue();
+ long maxEnd = diff < messages[0].size()? diff : messages[0].size();
- Messages::const_iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
- if (i != messages.end())
+ Messages::const_iterator i = lower_bound(messages[0].begin(),messages[0].begin()+maxEnd,compM);
+ if (i != messages[0].end())
return *i;
}
return QueuedMessage();
@@ -483,7 +523,7 @@ QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
- if(!messages.empty()){
+ if(!messages[0].empty()){
msg = getFront();
popMsg(msg);
}
@@ -499,12 +539,12 @@ void Queue::purgeExpired()
Messages expired;
{
Mutex::ScopedLock locker(messageLock);
- for (Messages::iterator i = messages.begin(); i != messages.end();) {
+ for (Messages::iterator i = messages[0].begin(); i != messages[0].end();) {
if (lastValueQueue) checkLvqReplace(*i);
if (i->payload->hasExpired()) {
expired.push_back(*i);
clearLVQIndex(*i);
- i = messages.erase(i);
+ i = messages[0].erase(i);
} else {
++i;
}
@@ -534,7 +574,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
uint32_t count = 0;
// Either purge them all or just the some (purge_count) while the queue isn't empty.
- while((!purge_request || purge_count--) && !messages.empty()) {
+ while((!purge_request || purge_count--) && !messages[0].empty()) {
if (dest.get()) {
//
// If there is a destination exchange, stage the messages onto a reroute queue
@@ -566,7 +606,7 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
uint32_t move_count = qty; // only comes into play if qty >0
uint32_t count = 0; // count how many were moved for returning
- while((!qty || move_count--) && !messages.empty()) {
+ while((!qty || move_count--) && !messages[0].empty()) {
QueuedMessage qmsg = getFront();
boost::intrusive_ptr<Message> msg = qmsg.payload;
destq->deliver(msg); // deliver message to the destination queue
@@ -585,7 +625,8 @@ void Queue::popMsg(QueuedMessage& qmsg)
string key = ft->getAsString(qpidVQMatchProperty);
lvq.erase(key);
}
- messages.pop_front();
+ uint8_t priority = qmsg.payload->getPriority();
+ messages[priorityLevel(priority)].pop_front();
++dequeueTracker;
}
@@ -595,6 +636,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
+ uint8_t priority = qm.payload->getPriority();
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
@@ -604,7 +646,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
i = lvq.find(key);
if (i == lvq.end() || (broker && broker->isClusterUpdatee())) {
- messages.push_back(qm);
+ messages[priorityLevel(priority)].push_back(qm);
listeners.populate(copy);
lvq[key] = msg;
}else {
@@ -621,7 +663,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
}
}
}else {
- messages.push_back(qm);
+ messages[priorityLevel(priority)].push_back(qm);
listeners.populate(copy);
}
if (eventMode) {
@@ -637,12 +679,41 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
QueuedMessage Queue::getFront()
{
- QueuedMessage msg = messages.front();
- if (lastValueQueue) {
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) msg.payload = replacement;
+ for (int priority = prioritySlots-1; priority >= 0; priority--) {
+ if (!messages[priority].empty()) {
+ QueuedMessage msg = messages[priority].front();
+ if (lastValueQueue) {
+ boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
+ if (replacement.get()) msg.payload = replacement;
+ }
+ return msg;
+ }
}
- return msg;
+ throw InternalErrorException(QPID_MSG("No message available"));
+}
+
+QueuedMessage Queue::getFrontWithFairshare()
+{
+ if (!fairshare) return getFront();
+
+ const uint start = fairshare->current();
+ uint priority = start;
+ QPID_LOG(debug, "Front message requested, starting with level " << priority);
+ do {
+ if (!messages[priority].empty()) {
+ QueuedMessage msg = messages[priority].front();
+ if (lastValueQueue) {
+ boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
+ if (replacement.get()) msg.payload = replacement;
+ }
+ QPID_LOG(debug, "Front message taken from level " << priority);
+ return msg;
+ } else {
+ QPID_LOG(debug, "No messages on level " << priority);
+ }
+ priority = fairshare->next();
+ } while (priority != start);
+ throw InternalErrorException(QPID_MSG("No message available"));
}
QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg)
@@ -666,7 +737,7 @@ uint32_t Queue::getEnqueueCompleteMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
uint32_t count = 0;
- for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
+ for ( Messages::const_iterator i = messages[0].begin(); i != messages[0].end(); ++i ) {
//NOTE: don't need to use checkLvqReplace() here as it
//is only relevant for LVQ which does not support persistence
//so the enqueueComplete check has no effect
@@ -679,7 +750,7 @@ uint32_t Queue::getEnqueueCompleteMessageCount() const
uint32_t Queue::getMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
- return messages.size();
+ return messages[0].size();
}
uint32_t Queue::getConsumerCount() const
@@ -704,7 +775,7 @@ void Queue::setLastNodeFailure()
if (persistLastNode){
Mutex::ScopedLock locker(messageLock);
try {
- for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
+ for ( Messages::iterator i = messages[0].begin(); i != messages[0].end(); ++i ) {
if (lastValueQueue) checkLvqReplace(*i);
// don't force a message twice to disk.
if(!i->payload->isStoredOnQueue(shared_from_this())) {
@@ -845,8 +916,42 @@ void Queue::create(const FieldTable& _settings)
configure(_settings);
}
+
+int getIntegerSetting(const FieldTable& _settings, const std::string& key)
+{
+ FieldTable::ValuePtr v = _settings.get(key);
+ if (!v) {
+ return 0;
+ } else if (v->convertsTo<int>()) {
+ return v->get<int>();
+ } else if (v->convertsTo<std::string>()){
+ std::string s = v->get<std::string>();
+ try {
+ return boost::lexical_cast<int>(s);
+ } catch(const boost::bad_lexical_cast&) {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
+ return 0;
+ }
+ } else {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v);
+ return 0;
+ }
+}
+
+int getSetting(const FieldTable& _settings, const std::string& key, int minvalue, int maxvalue)
+{
+ return std::max(minvalue,std::min(getIntegerSetting(_settings, key), maxvalue));
+}
+
void Queue::configure(const FieldTable& _settings, bool recovering)
{
+ //TODO: name for priority levels chosen to match java broker, but
+ //need some consistency (without breaking backward compatibility);
+ //introduce some form of translation between conventions.
+ prioritySlots = getSetting(_settings, "x-qpid-priorities", 1, MAX_PRIORITY_SLOTS);
+ QPID_LOG(debug, "Configured queue " << getName() << " with " << (uint) prioritySlots << " priority levels");
+ fairshare = boost::shared_ptr<Fairshare>(new Fairshare(prioritySlots-1, _settings));
+ if (prioritySlots == 1 || fairshare->isNull()) fairshare.reset();
eventMode = _settings.getAsInt(qpidQueueEventGeneration);
@@ -875,6 +980,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
lastValueQueue = lastValueQueueNoBrowse;
}
+ if (prioritySlots > 1 && lastValueQueue) {
+ prioritySlots = 1; fairshare.reset();
+ QPID_LOG(warning, "Prioritised delivery and last-value semantics are not currently supported on the same queue");
+ }
persistLastNode= _settings.get(qpidPersistLastNode);
if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
@@ -901,7 +1010,7 @@ void Queue::destroy()
{
if (alternateExchange.get()) {
Mutex::ScopedLock locker(messageLock);
- while(!messages.empty()){
+ while(!messages[0].empty()){
DeliverableMessage msg(getFront().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
msg.getMessage().getApplicationHeaders());
@@ -1194,6 +1303,34 @@ void Queue::flush()
if (u.acquired && store) store->flush(*this);
}
+uint Queue::priorityLevel(uint priority)
+{
+ //Use AMQP 0-10 approach to mapping priorities to a fixed level
+ //(see rule priority-level-implementation)
+ const uint firstLevel = 5 - ceil((double) prioritySlots/2.0);
+ if (priority <= firstLevel) return 0;
+ return std::min(priority - firstLevel, (uint)prioritySlots-1);
+}
+
+void Queue::collectFromAllSlots(Messages& result)
+{
+ for (int p = 0; p < prioritySlots; p++) {
+ for (Messages::const_iterator i = messages[p].begin(); i != messages[p].end(); ++i) {
+ result.insert(lower_bound(result.begin(), result.end(), *i), *i);
+ }
+ }
+}
+
+bool Queue::getFairshareState(uint& priority, uint& count)
+{
+ return fairshare && fairshare->getState(priority, count);
+}
+
+bool Queue::setFairshareState(uint priority, uint count)
+{
+ return fairshare && fairshare->setState(priority, count);
+}
+
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
bool Queue::UsageBarrier::acquire()
@@ -1219,3 +1356,56 @@ void Queue::UsageBarrier::destroy()
parent.deleted = true;
while (count) parent.messageLock.wait();
}
+
+Queue::Fairshare::Fairshare(uint p, const FieldTable& settings) : max_priority(p), priority(max_priority), count(0)
+{
+ uint defaultLimit = getIntegerSetting(settings, "x-qpid-fairshare");
+
+ for (uint i = 0; i <= max_priority; i++) {
+ std::string key = (boost::format("x-qpid-fairshare-%1%") % i).str();
+ limits.push_back(settings.isSet(key) ? getIntegerSetting(settings, key) : defaultLimit);
+ }
+}
+
+bool Queue::Fairshare::limit_reached()
+{
+ uint l = limits[priority];
+ return l && ++count > l;
+}
+
+uint Queue::Fairshare::current()
+{
+ if (limit_reached()) {
+ return next();
+ } else {
+ return priority;
+ }
+}
+
+uint Queue::Fairshare::next()
+{
+ count = 1;
+ if (priority) --priority;
+ else priority = max_priority;
+ return priority;
+}
+
+bool Queue::Fairshare::isNull()
+{
+ for (uint i = 0; i <= max_priority; i++) if (limits[i]) return false;
+ return true;
+}
+
+bool Queue::Fairshare::getState(uint& p, uint& c)
+{
+ p = priority;
+ c = count;
+ return true;
+}
+
+bool Queue::Fairshare::setState(uint p, uint c)
+{
+ priority = p;
+ count = c;
+ return true;
+}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 41c6b46..462c8f5 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -88,11 +88,31 @@ namespace qpid {
ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
~ScopedUse() { if (acquired) barrier.release(); }
};
+
+ class Fairshare
+ {
+ const uint max_priority;
+ std::vector<uint> limits;
+
+ uint priority;
+ uint count;
+
+ bool limit_reached();
+ public:
+ Fairshare(uint maxPriority, const qpid::framing::FieldTable& limits);
+ uint current();
+ uint next();
+ bool isNull();
+ bool getState(uint& priority, uint& count);
+ bool setState(uint priority, uint count);
+ };
typedef std::deque<QueuedMessage> Messages;
typedef std::map<string,boost::intrusive_ptr<Message> > LVQ;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
+ const static int MAX_PRIORITY_SLOTS = 10;
+
const string name;
const bool autodelete;
MessageStore* store;
@@ -107,7 +127,7 @@ namespace qpid {
std::string traceId;
std::vector<std::string> traceExclude;
QueueListeners listeners;
- Messages messages;
+ Messages messages[MAX_PRIORITY_SLOTS];
Messages pendingDequeues;//used to avoid dequeuing during recovery
LVQ lvq;
mutable qpid::sys::Mutex consumerLock;
@@ -130,9 +150,13 @@ namespace qpid {
Broker* broker;
bool deleted;
UsageBarrier barrier;
+ int prioritySlots;
+ boost::shared_ptr<Fairshare> fairshare;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
+ void collectFromAllSlots(Messages& result);
+ bool seekInAllSlots(QueuedMessage& msg, Consumer::shared_ptr position);
bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
@@ -146,6 +170,7 @@ namespace qpid {
void dequeued(const QueuedMessage& msg);
void popAndDequeue();
QueuedMessage getFront();
+ QueuedMessage getFrontWithFairshare();
QueuedMessage& checkLvqReplace(QueuedMessage& msg);
void clearLVQIndex(const QueuedMessage& msg);
@@ -324,11 +349,18 @@ namespace qpid {
template <class F> void eachMessage(F f) {
sys::Mutex::ScopedLock l(messageLock);
if (lastValueQueue) {
- for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) {
+ for (Messages::iterator i = messages[0].begin(); i != messages[0].end(); ++i) {
f(checkLvqReplace(*i));
}
+ } else if (prioritySlots > 1) {
+ //need to include messages from all slots/levels;
+ //need to include them in the correct order
+ //so that they are enqueued in the same order on all nodes
+ Messages temp;
+ collectFromAllSlots(temp);
+ std::for_each(temp.begin(), temp.end(), f);
} else {
- std::for_each(messages.begin(), messages.end(), f);
+ std::for_each(messages[0].begin(), messages[0].end(), f);
}
}
@@ -365,6 +397,10 @@ namespace qpid {
void recoverPrepared(boost::intrusive_ptr<Message>& msg);
void flush();
+
+ uint priorityLevel(uint priority);
+ QPID_BROKER_EXTERN bool getFairshareState(uint& priority, uint& count);
+ QPID_BROKER_EXTERN bool setFairshareState(uint priority, uint count);
};
}
}
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
index c8feaa8..2cf93ee 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -194,7 +194,10 @@ RingQueuePolicy::RingQueuePolicy(const std::string& _name,
bool before(const QueuedMessage& a, const QueuedMessage& b)
{
- return a.position < b.position;
+ int priorityA = a.queue->priorityLevel(a.payload->getPriority());
+ int priorityB = b.queue->priorityLevel(b.payload->getPriority());
+ if (priorityA == priorityB) return a.position < b.position;
+ else return priorityA < priorityB;
}
void RingQueuePolicy::enqueued(const QueuedMessage& m)
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 7b51d24..fbaafe0 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -540,6 +540,13 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi
findQueue(qname)->setPosition(position);
}
+void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count)
+{
+ if (!findQueue(qname)->setFairshareState(priority, count)) {
+ QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies.");
+ }
+}
+
void Connection::expiryId(uint64_t id) {
cluster.getExpiryPolicy().setId(id);
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 24b8c85..cf331d1 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -152,6 +152,7 @@ class Connection :
uint32_t credit);
void queuePosition(const std::string&, const framing::SequenceNumber&);
+ void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
void expiryId(uint64_t);
void txStart();
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 148526b..79120e3 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -321,6 +321,10 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1));
ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition());
+ uint priority, count;
+ if (q->getFairshareState(priority, count)) {
+ ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
+ }
}
void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 0565ecc..511c913 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -1191,5 +1191,41 @@ QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) {
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
}
+QPID_AUTO_TEST_CASE(testFairsharePriorityDelivery) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+
+ FieldTable arguments;
+ arguments.setInt("x-qpid-priorities", 10);
+ arguments.setInt("x-qpid-fairshare", 5);
+ c0.session.queueDeclare("q", arg::durable=durableFlag, arg::arguments=arguments);
+
+ //send messages of different priorities
+ for (int i = 0; i < 20; i++) {
+ Message msg = makeMessage((boost::format("msg-%1%") % i).str(), "q", durableFlag);
+ msg.getDeliveryProperties().setPriority(i % 2 ? 9 : 5);
+ c0.session.messageTransfer(arg::content=msg);
+ }
+
+ //pull off a couple of the messages (first four should be the top priority messages
+ for (int i = 0; i < 4; i++) {
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % ((i*2)+1)).str(), c0.subs.get("q", TIMEOUT).getData());
+ }
+
+ // Add another member
+ cluster.add();
+
+ //pull off some more messages
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 9).str(), c0.subs.get("q", TIMEOUT).getData());
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 0).str(), c0.subs.get("q", TIMEOUT).getData());
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 2).str(), c0.subs.get("q", TIMEOUT).getData());
+
+ //check queue has same content on both nodes
+ Client c1(cluster[1], "c1");
+ BOOST_CHECK_EQUAL(browse(c0, "q", 13), browse(c0, "q", 13));
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/qpid_receive.cpp b/qpid/cpp/src/tests/qpid_receive.cpp
index 294a60b..5f8df40 100644
--- a/qpid/cpp/src/tests/qpid_receive.cpp
+++ b/qpid/cpp/src/tests/qpid_receive.cpp
@@ -194,6 +194,7 @@ int main(int argc, char ** argv)
if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl;
if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl;
if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl;
+ if (msg.getPriority()) std::cout << "Priority: " << msg.getPriority() << std::endl;
if (msg.getDurable()) std::cout << "Durable: true" << std::endl;
if (msg.getRedelivered()) std::cout << "Redelivered: true" << std::endl;
std::cout << "Properties: " << msg.getProperties() << std::endl;
diff --git a/qpid/cpp/src/tests/qpid_send.cpp b/qpid/cpp/src/tests/qpid_send.cpp
index 98d7cd6..30f6cba 100644
--- a/qpid/cpp/src/tests/qpid_send.cpp
+++ b/qpid/cpp/src/tests/qpid_send.cpp
@@ -55,6 +55,7 @@ struct Options : public qpid::Options
uint sendEos;
bool durable;
uint ttl;
+ uint priority;
std::string userid;
std::string correlationid;
string_vector properties;
@@ -82,6 +83,7 @@ struct Options : public qpid::Options
sendEos(0),
durable(false),
ttl(0),
+ priority(0),
contentString(),
contentSize(0),
contentStdin(false),
@@ -107,6 +109,7 @@ struct Options : public qpid::Options
("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
+ ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)")
("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
("user-id", qpid::optValue(userid, "USERID"), "userid for message")
@@ -262,6 +265,9 @@ int main(int argc, char ** argv)
if (opts.ttl) {
msg.setTtl(Duration(opts.ttl));
}
+ if (opts.priority) {
+ msg.setPriority(opts.priority);
+ }
if (!opts.replyto.empty()) msg.setReplyTo(Address(opts.replyto));
if (!opts.userid.empty()) msg.setUserId(opts.userid);
if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid);
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 9cbad82..10dcd4f 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -278,5 +278,12 @@
<control name="management-agents" code="0x37">
<field name="data" type="vbin32"/>
</control>
+
+ <!-- Set the fairshare delivery related state of a replicated queue. -->
+ <control name="queue-fairshare-state" code="0x38">
+ <field name="queue" type="str8"/>
+ <field name="position" type="uint8"/>
+ <field name="count" type="uint8"/>
+ </control>
</class>
</amqp>
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
index f9315a6..9d473e6 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
@@ -26,6 +26,7 @@ from example import *
from exchange import *
from management import *
from message import *
+from priority import *
from query import *
from queue import *
from tx import *
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py b/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py
new file mode 100644
index 0000000..81b3873
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py
@@ -0,0 +1,221 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+from qpid.compat import set
+import math
+
+class PriorityTests (Base):
+ """
+ Test prioritised messaging
+ """
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def prioritised_delivery(self, priorities, levels=10):
+ """
+ Test that message on a queue are delivered in priority order.
+ """
+ msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
+
+ snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s}}}}" % levels,
+ durable=self.durable())
+ for m in msgs: snd.send(m)
+
+ rcv = self.ssn.receiver(snd.target)
+ for expected in sorted(msgs, key=lambda m: priority_level(m.priority,levels), reverse=True):
+ msg = rcv.fetch(0)
+ #print "expected priority %s got %s" % (expected.priority, msg.priority)
+ assert msg.content == expected.content
+ self.ssn.acknowledge(msg)
+
+ def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10):
+ msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
+
+ limit_policy = "x-qpid-fairshare:%s" % default_limit
+ if limits:
+ for k, v in limits.items():
+ limit_policy += ", x-qpid-fairshare-%s:%s" % (k, v)
+
+ snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s, %s}}}}"
+ % (levels, limit_policy),
+ durable=self.durable())
+ for m in msgs: snd.send(m)
+
+ rcv = self.ssn.receiver(snd.target)
+ if limits:
+ limit_function = lambda x : limits.get(x, 0)
+ else:
+ limit_function = lambda x : default_limit
+ for expected in fairshare(sorted(msgs, key=lambda m: priority_level(m.priority,levels), reverse=True),
+ limit_function, levels):
+ msg = rcv.fetch(0)
+ #print "expected priority %s got %s" % (expected.priority, msg.priority)
+ assert msg.priority == expected.priority
+ assert msg.content == expected.content
+ self.ssn.acknowledge(msg)
+
+ def test_prioritised_delivery_1(self):
+ self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10)
+
+ def test_prioritised_delivery_2(self):
+ self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 5)
+
+ def test_fairshare_1(self):
+ self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3])
+
+ def test_fairshare_2(self):
+ self.fairshare_delivery(priorities = [10 for i in range(30)])
+
+ def test_fairshare_3(self):
+ self.fairshare_delivery(priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3], limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}, levels=8)
+
+ def test_browsing(self):
+ priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3]
+ msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
+ snd = self.ssn.sender("priority-queue; {create: sender, node: {x-declare:{arguments:{x-qpid-priorities:10}}}}",
+ durable=self.durable())
+ for m in msgs: snd.send(m)
+
+ rcv = self.ssn.receiver("priority-queue; {mode: browse, delete: receiver}")
+ received = []
+ try:
+ while True: received.append(rcv.fetch(0))
+ except Empty: None
+ #check all messages on the queue were received by the browser; don't relay on any specific ordering at present
+ assert set([m.content for m in msgs]) == set([m.content for m in received])
+
+ def ring_queue_check(self, msgs):
+ """
+ Ensure that a ring queue removes lowest priority messages first.
+ """
+ snd = self.ssn.sender(address("priority-ring-queue", arguments="x-qpid-priorities:10, 'qpid.policy_type':ring, 'qpid.max_count':10"),
+ durable=self.durable())
+ for m in msgs: snd.send(m)
+
+ rcv = self.ssn.receiver(snd.target)
+ received = []
+ try:
+ while True: received.append(rcv.fetch(0))
+ except Empty: None
+
+ expected = []
+ for m in msgs:
+ while len(expected) > 9:
+ expected.sort(key=lambda x: priority_level(x.priority,10))
+ expected.pop(0)
+ expected.append(m)
+ #print "sent %s; expected %s; got %s" % ([m.content for m in msgs], [m.content for m in expected], [m.content for m in received])
+ assert [m.content for m in expected] == [m.content for m in received]
+
+ def test_ring_queue_1(self):
+ priorities = [4,5,3,6,9,9,2,9,2,9,9,1,9,9,9,3,3,3,9,9,3,9,3,9,9,9,9,9,9,2,3]
+ seq = content("msg")
+ self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities])
+
+ def test_ring_queue_2(self):
+ priorities = [9,0,2,3,6,9,9,2,9,2,9,9,1,9,4,7,1,1,3,9,9,3,9,3,9,9,9,1,9,9,2,3,0,9]
+ seq = content("msg")
+ self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities])
+
+ def test_requeue(self):
+ priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3]
+ msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
+
+ snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:10}}}}",
+ durable=self.durable())
+ #want to have some messages requeued so enable prefetch on a dummy receiver
+ other = self.conn.session()
+ dummy = other.receiver("priority-queue")
+ dummy.capacity = 10
+
+ for m in msgs: snd.send(m)
+
+ #fetch some with dummy receiver on which prefetch is also enabled
+ for i in range(5):
+ msg = dummy.fetch(0)
+ #close session without acknowledgements to requeue messages
+ other.close()
+
+ #now test delivery works as expected after that
+ rcv = self.ssn.receiver(snd.target)
+ for expected in sorted(msgs, key=lambda m: priority_level(m.priority,10), reverse=True):
+ msg = rcv.fetch(0)
+ #print "expected priority %s got %s" % (expected.priority, msg.priority)
+ assert msg.content == expected.content
+ self.ssn.acknowledge(msg)
+
+def content(base, counter=1):
+ while True:
+ yield "%s-%s" % (base, counter)
+ counter += 1
+
+def address(name, create_policy="sender", delete_policy="receiver", arguments=None):
+ if arguments: node = "node: {x-declare:{arguments:{%s}}}" % arguments
+ else: node = "node: {}"
+ return "%s; {create: %s, delete: %s, %s}" % (name, create_policy, delete_policy, node)
+
+def fairshare(msgs, limit, levels):
+ """
+ Generator to return prioritised messages in expected order for a given fairshare limit
+ """
+ count = 0
+ last_priority = None
+ postponed = []
+ while msgs or postponed:
+ if not msgs:
+ msgs = postponed
+ count = 0
+ last_priority = None
+ postponed = []
+ msg = msgs.pop(0)
+ if last_priority and priority_level(msg.priority, levels) == last_priority:
+ count += 1
+ else:
+ last_priority = priority_level(msg.priority, levels)
+ count = 1
+ l = limit(last_priority)
+ if (l and count > l):
+ postponed.append(msg)
+ else:
+ yield msg
+ return
+
+def effective_priority(value, levels):
+ """
+ Method to determine effective priority given a distinct number of
+ levels supported. Returns the lowest priority value that is of
+ equivalent priority to the value passed in.
+ """
+ if value <= 5-math.ceil(levels/2.0): return 0
+ if value >= 4+math.floor(levels/2.0): return 4+math.floor(levels/2.0)
+ return value
+
+def priority_level(value, levels):
+ """
+ Method to determine which of a distinct number of priority levels
+ a given value falls into.
+ """
+ offset = 5-math.ceil(levels/2.0)
+ return min(max(value - offset, 0), levels-1)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]