Author: kgiusti
Date: Fri Nov 4 16:56:47 2011
New Revision: 1197661
URL: http://svn.apache.org/viewvc?rev=1197661&view=rev
Log:
QPID-3346: sync this branch to trunk msg group code.
Modified:
qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
Modified:
qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1197661&r1=1197660&r2=1197661&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
(original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
Fri Nov 4 16:56:47 2011
@@ -43,13 +43,61 @@ const std::string MessageGroupManager::q
const std::string
MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
-const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm )
const
+void MessageGroupManager::unFree( const GroupState& state )
{
+ GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+ assert( pos != freeGroups.end() && pos->second == &state );
+ freeGroups.erase( pos );
+}
+
+void MessageGroupManager::own( GroupState& state, const std::string& owner )
+{
+ state.owner = owner;
+ unFree( state );
+}
+
+void MessageGroupManager::disown( GroupState& state )
+{
+ state.owner.clear();
+ assert(state.members.size());
+ assert(freeGroups.find(state.members.front()) == freeGroups.end());
+ freeGroups[state.members.front()] = &state;
+}
+
+MessageGroupManager::GroupState& MessageGroupManager::findGroup( const
QueuedMessage& qm )
+{
+ uint32_t thisMsg = qm.position.getValue();
+ if (cachedGroup && lastMsg == thisMsg) {
+ hits++;
+ return *cachedGroup;
+ }
+
+ std::string group = defaultGroupId;
const qpid::framing::FieldTable* headers =
qm.payload->getApplicationHeaders();
- if (!headers) return defaultGroupId;
- qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
- if (!id || !id->convertsTo<std::string>()) return defaultGroupId;
- return id->get<std::string>();
+ if (headers) {
+ qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
+ if (id && id->convertsTo<std::string>()) {
+ std::string tmp = id->get<std::string>();
+ if (!tmp.empty()) // empty group is reserved
+ group = tmp;
+ }
+ }
+
+ if (cachedGroup && group == lastGroup) {
+ hits++;
+ lastMsg = thisMsg;
+ return *cachedGroup;
+ }
+
+ misses++;
+
+ GroupState& found = messageGroups[group];
+ if (found.group.empty())
+ found.group = group; // new group, assign name
+ lastMsg = thisMsg;
+ lastGroup = group;
+ cachedGroup = &found;
+ return found;
}
@@ -57,15 +105,13 @@ void MessageGroupManager::enqueued( cons
{
// @todo KAG optimization - store reference to group state in QueuedMessage
// issue: const-ness??
- std::string group( getGroupId(qm) );
- GroupState &state(messageGroups[group]);
+ GroupState& state = findGroup(qm);
state.members.push_back(qm.position);
uint32_t total = state.members.size();
QPID_LOG( trace, "group queue " << qName <<
- ": added message to group id=" << group << " total=" << total );
+ ": added message to group id=" << state.group << " total=" <<
total );
if (total == 1) {
// newly created group, no owner
- state.group = group;
assert(freeGroups.find(qm.position) == freeGroups.end());
freeGroups[qm.position] = &state;
}
@@ -76,13 +122,11 @@ void MessageGroupManager::acquired( cons
{
// @todo KAG avoid lookup: retrieve direct reference to group state from
QueuedMessage
// issue: const-ness??
- std::string group( getGroupId(qm) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
+ GroupState& state = findGroup(qm);
+ assert(state.members.size()); // there are msgs present
state.acquired += 1;
QPID_LOG( trace, "group queue " << qName <<
- ": acquired message in group id=" << group << " acquired=" <<
state.acquired );
+ ": acquired message in group id=" << state.group << " acquired="
<< state.acquired );
}
@@ -90,19 +134,16 @@ void MessageGroupManager::requeued( cons
{
// @todo KAG avoid lookup: retrieve direct reference to group state from
QueuedMessage
// issue: const-ness??
- std::string group( getGroupId(qm) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
+ GroupState& state = findGroup(qm);
assert( state.acquired != 0 );
state.acquired -= 1;
if (state.acquired == 0 && state.owned()) {
QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << state.owner << " released group id="
<< gs->first);
+ ": consumer name=" << state.owner << " released group id="
<< state.group);
disown(state);
}
QPID_LOG( trace, "group queue " << qName <<
- ": requeued message to group id=" << group << " acquired=" <<
state.acquired );
+ ": requeued message to group id=" << state.group << " acquired="
<< state.acquired );
}
@@ -110,10 +151,7 @@ void MessageGroupManager::dequeued( cons
{
// @todo KAG avoid lookup: retrieve direct reference to group state from
QueuedMessage
// issue: const-ness??
- std::string group( getGroupId(qm) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
+ GroupState& state = findGroup(qm);
assert( state.members.size() != 0 );
assert( state.acquired != 0 );
state.acquired -= 1;
@@ -141,99 +179,55 @@ void MessageGroupManager::dequeued( cons
}
uint32_t total = state.members.size();
+ QPID_LOG( trace, "group queue " << qName <<
+ ": dequeued message from group id=" << state.group << " total="
<< total );
+
if (total == 0) {
- QPID_LOG( trace, "group queue " << qName << ": deleting group id=" <<
gs->first);
- messageGroups.erase( gs );
+ QPID_LOG( trace, "group queue " << qName << ": deleting group id=" <<
state.group);
+ if (cachedGroup == &state) {
+ cachedGroup = 0;
+ }
+ std::string key(state.group);
+ messageGroups.erase( key );
} else if (state.acquired == 0 && state.owned()) {
QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << state.owner << " released group id="
<< gs->first);
+ ": consumer name=" << state.owner << " released group id="
<< state.group);
disown(state);
} else if (reFreeNeeded) {
disown(state);
}
- QPID_LOG( trace, "group queue " << qName <<
- ": dequeued message from group id=" << group << " total=" <<
total );
-}
-
-void MessageGroupManager::consumerAdded( const Consumer& /*c*/ )
-{
-#if 0
- // allow a re-subscribing consumer
- if (consumers.find(c.getName()) == consumers.end()) {
- consumers[c.getName()] = 0; // no groups owned yet
- QPID_LOG( trace, "group queue " << qName << ": added consumer, name="
<< c.getName() );
- } else {
- QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed,
name=" << c.getName() );
- }
-#endif
}
-void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ )
+MessageGroupManager::~MessageGroupManager()
{
-#if 0
- const std::string& name(c.getName());
- Consumers::iterator consumer = consumers.find(name);
- assert(consumer != consumers.end());
- size_t count = consumer->second;
-
- for (GroupMap::iterator gs = messageGroups.begin();
- count && gs != messageGroups.end(); ++gs) {
-
- GroupState& state( gs->second );
- if (state.owner == name) {
- if (state.acquired == 0) {
- --count;
- disown(state);
- QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << name << " released group id="
<< gs->first);
- }
- }
- }
- if (count == 0) {
- consumers.erase( consumer );
- QPID_LOG( trace, "group queue " << qName << ": removed consumer name="
<< name );
- } else {
- // don't release groups with outstanding acquired msgs - consumer may
re-subscribe!
- QPID_LOG( trace, "group queue " << qName << ": consumer name=" << name
<< " unsubscribed with outstanding messages.");
- }
-#endif
+ QPID_LOG( debug, "group queue " << qName << " cache results: hits=" <<
hits << " misses=" << misses );
}
-
-
bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c,
QueuedMessage& next )
{
if (messages.empty())
return false;
+ next.position = c->position;
if (!freeGroups.empty()) {
- framing::SequenceNumber nextFree = freeGroups.begin()->first;
- if (nextFree < c->position) { // next free group's msg is older than
current position
- bool ok = messages.find(nextFree, next);
- (void) ok; assert( ok );
- } else {
- if (!messages.next( c->position, next ))
- return false; // shouldn't happen - should find
nextFree
- }
- } else { // no free groups available
-#if 0
- if (consumers[c->getName()] == 0) { // and none currently owned
- return false; // so nothing available to consume
- }
-#endif
- if (!messages.next( c->position, next ))
- return false;
- }
-
- do {
- // @todo KAG avoid lookup: retrieve direct reference to group state
from QueuedMessage
- std::string group( getGroupId( next ) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
- if (!state.owned() || state.owner == c->getName()) {
+ const framing::SequenceNumber& nextFree = freeGroups.begin()->first;
+ if (nextFree < next.position) { // a free message is older than
current
+ next.position = nextFree;
+ --next.position;
+ }
+ }
+
+ while (messages.next( next.position, next )) {
+ GroupState& group = findGroup(next);
+ if (!group.owned()) {
+ if (group.members.front() == next.position) { // only take from
head!
+ return true;
+ }
+ QPID_LOG(debug, "Skipping " << next.position << " since group " <<
group.group
+ << "'s head message still pending. pos=" <<
group.members.front());
+ } else if (group.owner == c->getName()) {
return true;
}
- } while (messages.next( next.position, next ));
+ }
return false;
}
@@ -241,15 +235,12 @@ bool MessageGroupManager::nextConsumable
bool MessageGroupManager::allocate(const std::string& consumer, const
QueuedMessage& qm)
{
// @todo KAG avoid lookup: retrieve direct reference to group state from
QueuedMessage
- std::string group( getGroupId(qm) );
- GroupMap::iterator gs = messageGroups.find( group );
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
+ GroupState& state = findGroup(qm);
if (!state.owned()) {
own( state, consumer );
QPID_LOG( trace, "group queue " << qName <<
- ": consumer name=" << consumer << " has acquired group id="
<< gs->first);
+ ": consumer name=" << consumer << " has acquired group id="
<< state.group);
return true;
}
return state.owner == consumer;
@@ -389,8 +380,8 @@ void MessageGroupManager::setState(const
{
using namespace qpid::framing;
messageGroups.clear();
- //consumers.clear();
freeGroups.clear();
+ cachedGroup = 0;
framing::Array groupState(TYPE_CODE_MAP);
@@ -430,10 +421,7 @@ void MessageGroupManager::setState(const
for (Array::const_iterator p = positions.begin(); p !=
positions.end(); ++p)
state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
messageGroups[state.group] = state;
- if (state.owned())
- //consumers[state.owner]++;
- ;
- else {
+ if (!state.owned()) {
assert(state.members.size());
freeGroups[state.members.front()] = &messageGroups[state.group];
}
Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1197661&r1=1197660&r2=1197661&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
(original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h Fri
Nov 4 16:56:47 2011
@@ -26,7 +26,7 @@
#include "qpid/broker/StatefulQueueObserver.h"
#include "qpid/broker/MessageDistributor.h"
-
+#include "qpid/sys/unordered_map.h"
namespace qpid {
namespace broker {
@@ -44,22 +44,21 @@ class MessageGroupManager : public State
const std::string qName; // name of parent queue (for logs)
struct GroupState {
+ // note: update getState()/setState() when changing this object's
state implementation
typedef std::deque<framing::SequenceNumber> PositionFifo;
std::string group; // group identifier
std::string owner; // consumer with outstanding acquired messages
uint32_t acquired; // count of outstanding acquired messages
- //uint32_t total; // count of enqueued messages in this group
PositionFifo members; // msgs belonging to this group
GroupState() : acquired(0) {}
bool owned() const {return !owner.empty();}
};
- typedef std::map<std::string, struct GroupState> GroupMap;
- //typedef std::map<std::string, uint32_t> Consumers; // count of owned
groups
+
+ typedef sys::unordered_map<std::string, struct GroupState> GroupMap;
typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
- // note: update getState()/setState() when changing this object's state
implementation
GroupMap messageGroups; // index: group name
GroupFifo freeGroups; // ordered by oldest free msg
//Consumers consumers; // index: consumer name
@@ -68,28 +67,15 @@ class MessageGroupManager : public State
static const std::string qpidSharedGroup; // if specified, one group can
be consumed by multiple receivers
static const std::string qpidMessageGroupTimestamp;
- const std::string getGroupId( const QueuedMessage& qm ) const;
- void unFree( const GroupState& state )
- {
- GroupFifo::iterator pos = freeGroups.find( state.members.front() );
- assert( pos != freeGroups.end() && pos->second == &state );
- freeGroups.erase( pos );
- }
- void own( GroupState& state, const std::string& owner )
- {
- state.owner = owner;
- //consumers[state.owner]++;
- unFree( state );
- }
- void disown( GroupState& state )
- {
- //assert(consumers[state.owner]);
- //consumers[state.owner]--;
- state.owner.clear();
- assert(state.members.size());
- assert(freeGroups.find(state.members.front()) == freeGroups.end());
- freeGroups[state.members.front()] = &state;
- }
+ GroupState& findGroup( const QueuedMessage& qm );
+ unsigned long hits, misses; // for debug
+ uint32_t lastMsg;
+ std::string lastGroup;
+ GroupState *cachedGroup;
+
+ void unFree( const GroupState& state );
+ void own( GroupState& state, const std::string& owner );
+ void disown( GroupState& state );
public:
@@ -101,13 +87,18 @@ class MessageGroupManager : public State
MessageGroupManager(const std::string& header, const std::string& _qName,
Messages& container, unsigned int _timestamp=0 )
: StatefulQueueObserver(std::string("MessageGroupManager:") + header),
- groupIdHeader( header ), timestamp(_timestamp), messages(container),
qName(_qName) {}
+ groupIdHeader( header ), timestamp(_timestamp), messages(container),
qName(_qName),
+ hits(0), misses(0),
+ lastMsg(0), cachedGroup(0) {}
+ virtual ~MessageGroupManager();
+
+ // QueueObserver iface
void enqueued( const QueuedMessage& qm );
void acquired( const QueuedMessage& qm );
void requeued( const QueuedMessage& qm );
void dequeued( const QueuedMessage& qm );
- void consumerAdded( const Consumer& );
- void consumerRemoved( const Consumer& );
+ void consumerAdded( const Consumer& ) {};
+ void consumerRemoved( const Consumer& ) {};
void getState(qpid::framing::FieldTable& state ) const;
void setState(const qpid::framing::FieldTable&);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]