Author: kgiusti
Date: Mon Nov  7 03:29:04 2011
New Revision: 1198612

URL: http://svn.apache.org/viewvc?rev=1198612&view=rev
Log:
QPID-3346: checkpoint client tracking code

Modified:
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
    qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
    qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp

Modified: 
qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1198612&r1=1198611&r2=1198612&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp 
(original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp 
Mon Nov  7 03:29:04 2011
@@ -43,28 +43,174 @@ const std::string MessageGroupManager::q
 const std::string 
MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
 
 
-void MessageGroupManager::unFree( const GroupState& state )
+/** class ConsumerState **/
+
+/** the consumer owns the given group */
+void MessageGroupManager::ConsumerState::addGroup(const GroupState& group)
+{
+    ownedGroups += 1;
+    pendingMsgs += (group.totalMsgs() - group.acquiredMsgs());
+}
+
+/** the consumer releases the given group */
+void MessageGroupManager::ConsumerState::removeGroup(const GroupState& group)
+{
+    assert(ownedGroups != 0);
+    ownedGroups -= 1;
+    uint32_t del = (group.totalMsgs() - group.acquiredMsgs());
+    assert(del <= pendingMsgs);
+    pendingMsgs -= del;
+}
+
+/** notify the consumer that a new message has arrived at one if its owned 
groups */
+void MessageGroupManager::ConsumerState::msgAvailable(const GroupState&,
+                                                      const QueuedMessage& )
+{
+    assert(ownedGroups != 0);
+    pendingMsgs += 1;
+}
+
+/** notify the consumer that an available message has been acquired */
+void MessageGroupManager::ConsumerState::msgAcquired(const GroupState&,
+                                                     const QueuedMessage& )
+{
+    assert(pendingMsgs != 0);
+    pendingMsgs -= 1;
+}
+
+
+void MessageGroupManager::consumerAdded( const Consumer& c)
+{
+    const std::string& name = c.getName();
+    ConsumerState& state = consumers[name];
+    state.setName(name);
+    state.uncancel();  // just in case old consumer resubcribed
+    QPID_LOG( trace, "group queue " << qName << ": consumer " << name << " 
added.");
+}
+
+void MessageGroupManager::consumerRemoved( const Consumer& c)
+{
+    const std::string& name = c.getName();
+    ConsumerMap::iterator cs = consumers.find( name );
+    assert(cs != consumers.end());
+    ConsumerState& state = cs->second;
+    state.cancel();
+    if (state.groupCount() == 0) {
+        assert(state.remainingMsgs() == 0);
+        consumers.erase(cs);
+        QPID_LOG( trace, "group queue " << qName << ": consumer " << name << " 
removed.");
+    }
+}
+
+
+/** GroupFifo */
+void MessageGroupManager::GroupFifo::addGroup(const GroupState& group)
+{
+    assert(group.totalMsgs() != 0);
+    const framing::SequenceNumber& next = group.nextMsg();
+    assert(fifo.find(next) == fifo.end());
+    fifo[next] = &group;
+}
+
+void MessageGroupManager::GroupFifo::removeGroup(const GroupState& group)
+{
+    Fifo::iterator pos = fifo.find( group.nextMsg() );
+    assert( pos != fifo.end() && pos->second == &group );
+    fifo.erase( pos );
+}
+
+const MessageGroupManager::GroupState& 
MessageGroupManager::GroupFifo::nextGroup() const
+{
+    return *(fifo.begin()->second);
+}
+
+
+/** GroupState */
+void MessageGroupManager::GroupState::setOwner( ConsumerState& consumer )
+{
+    assert(owner == 0);
+    owner = &consumer;
+    owner->addGroup( *this );
+}
+
+void MessageGroupManager::GroupState::resetOwner()
 {
-    GroupFifo::iterator pos = freeGroups.find( state.members.front() );
-    assert( pos != freeGroups.end() && pos->second == &state );
-    freeGroups.erase( pos );
+    assert(owner);
+    owner->removeGroup( *this );
+    owner = 0;
+}
+
+const qpid::framing::SequenceNumber& 
MessageGroupManager::GroupState::nextMsg() const
+{
+    assert(members.size() != 0);
+    return members.front();
+}
+
+
+void MessageGroupManager::GroupState::enqueueMsg(const QueuedMessage& msg)
+{
+    members.push_back(msg.position);
+    if (owner) {
+        owner->msgAvailable(*this, msg);
+    }
+}
+
+
+void MessageGroupManager::GroupState::acquireMsg(const QueuedMessage& msg)
+{
+    assert(members.size());   // there are msgs present
+    acquired += 1;
+    if (owner) {
+        owner->msgAcquired(*this, msg);
+    }
+}
+
+void MessageGroupManager::GroupState::requeueMsg(const QueuedMessage& msg)
+{
+    assert(acquired != 0);
+    acquired -= 1;
+    if (owner) {
+        owner->msgAvailable(*this, msg);
+    }
+}
+
+
+void MessageGroupManager::GroupState::dequeueMsg(const QueuedMessage& msg)
+{
+    assert( members.size() != 0 );
+    assert( acquired != 0 );
+    acquired -= 1;
+
+    // likely to be at or near begin() if dequeued in order
+    if (members.front() == msg.position) {
+        members.pop_front();
+    } else {
+        unsigned long diff = msg.position.getValue() - 
members.front().getValue();
+        long maxEnd = diff < members.size() ? (diff + 1) : members.size();
+        GroupState::PositionFifo::iterator i =
+          std::lower_bound(members.begin(), members.begin()+maxEnd, 
msg.position);
+        assert(i != members.end() && *i == msg.position);
+        members.erase(i);
+    }
 }
 
-void MessageGroupManager::own( GroupState& state, const std::string& owner )
+void MessageGroupManager::GroupState::getPositions(framing::Array& positions) 
const
 {
-    state.owner = owner;
-    unFree( state );
+    for (PositionFifo::const_iterator p = members.begin();
+         p != members.end(); ++p)
+        positions.push_back(framing::Array::ValuePtr(new 
framing::IntegerValue( *p )));
 }
 
-void MessageGroupManager::disown( GroupState& state )
+void MessageGroupManager::GroupState::setPositions(const framing::Array& 
positions)
 {
-    state.owner.clear();
-    assert(state.members.size());
-    assert(freeGroups.find(state.members.front()) == freeGroups.end());
-    freeGroups[state.members.front()] = &state;
+    members.clear();
+    for (framing::Array::const_iterator p = positions.begin(); p != 
positions.end(); ++p)
+        members.push_back((*p)->getIntegerValue<uint32_t, 4>());
 }
 
-MessageGroupManager::GroupState& MessageGroupManager::findGroup( const 
QueuedMessage& qm )
+
+
+MessageGroupManager::GroupState& MessageGroupManager::findGroup(const 
QueuedMessage& qm)
 {
     uint32_t thisMsg = qm.position.getValue();
     if (cachedGroup && lastMsg == thisMsg) {
@@ -91,42 +237,45 @@ MessageGroupManager::GroupState& Message
 
     misses++;
 
-    GroupState& found = messageGroups[group];
-    if (found.group.empty())
-        found.group = group;    // new group, assign name
+    cachedGroup = &messageGroups[group];
+    if (cachedGroup->getName().empty())
+        cachedGroup->setName(group);    // new group, assign name
     lastMsg = thisMsg;
     lastGroup = group;
-    cachedGroup = &found;
-    return found;
+    return *cachedGroup;
 }
 
 
+void MessageGroupManager::deleteGroup(GroupState& group)
+{
+    if (cachedGroup == &group)
+        cachedGroup = 0;
+    std::string name = group.getName();
+    messageGroups.erase(name);
+}
+
 void MessageGroupManager::enqueued( const QueuedMessage& qm )
 {
     // @todo KAG optimization - store reference to group state in QueuedMessage
     // issue: const-ness??
     GroupState& state = findGroup(qm);
-    state.members.push_back(qm.position);
-    uint32_t total = state.members.size();
+    state.enqueueMsg(qm);
+    uint32_t total = state.totalMsgs();
     QPID_LOG( trace, "group queue " << qName <<
-              ": added message to group id=" << state.group << " total=" << 
total );
+              ": added message to group id=" << state.getName() << " total=" 
<< total );
     if (total == 1) {
         // newly created group, no owner
-        assert(freeGroups.find(qm.position) == freeGroups.end());
-        freeGroups[qm.position] = &state;
+        freeGroups.addGroup(state);
     }
 }
 
 
 void MessageGroupManager::acquired( const QueuedMessage& qm )
 {
-    // @todo KAG  avoid lookup: retrieve direct reference to group state from 
QueuedMessage
-    // issue: const-ness??
     GroupState& state = findGroup(qm);
-    assert(state.members.size());   // there are msgs present
-    state.acquired += 1;
+    state.acquireMsg(qm);
     QPID_LOG( trace, "group queue " << qName <<
-              ": acquired message in group id=" << state.group << " acquired=" 
<< state.acquired );
+              ": acquired message in group id=" << state.getName() << " 
acquired=" << state.acquiredMsgs());
 }
 
 
@@ -135,69 +284,77 @@ void MessageGroupManager::requeued( cons
     // @todo KAG  avoid lookup: retrieve direct reference to group state from 
QueuedMessage
     // issue: const-ness??
     GroupState& state = findGroup(qm);
-    assert( state.acquired != 0 );
-    state.acquired -= 1;
-    if (state.acquired == 0 && state.owned()) {
-        QPID_LOG( trace, "group queue " << qName <<
-                  ": consumer name=" << state.owner << " released group id=" 
<< state.group);
-        disown(state);
+    state.requeueMsg(qm);
+    if (state.acquiredMsgs() == 0 && state.getOwner()) {
+        disownGroup(state);
+        freeGroups.addGroup(state);
     }
     QPID_LOG( trace, "group queue " << qName <<
-              ": requeued message to group id=" << state.group << " acquired=" 
<< state.acquired );
+              ": requeued message to group id=" << state.getName() << " 
acquired=" << state.acquiredMsgs());
 }
 
 
 void MessageGroupManager::dequeued( const QueuedMessage& qm )
 {
-    // @todo KAG  avoid lookup: retrieve direct reference to group state from 
QueuedMessage
-    // issue: const-ness??
-    GroupState& state = findGroup(qm);
-    assert( state.members.size() != 0 );
-    assert( state.acquired != 0 );
-    state.acquired -= 1;
-
-    // likely to be at or near begin() if dequeued in order
-    bool reFreeNeeded = false;
-    if (state.members.front() == qm.position) {
-        if (!state.owned()) {
-            // will be on the freeGroups list if mgmt is dequeueing rather 
than a consumer!
-            // if on freelist, it is indexed by first member, which is about 
to be removed!
-            unFree(state);
-            reFreeNeeded = true;
-        }
-        state.members.pop_front();
-    } else {
-        GroupState::PositionFifo::iterator pos = state.members.begin() + 1;
-        GroupState::PositionFifo::iterator end = state.members.end();
-        while (pos != end) {
-            if (*pos == qm.position) {
-                state.members.erase(pos);
-                break;
-            }
-            ++pos;
+    GroupState& group = findGroup(qm);
+    bool freeNeeded = false;
+    if (group.isFree()) {       // dequeue is occuring via mgmt, not 
subscriber!
+        const framing::SequenceNumber next = group.nextMsg();
+        if (next == qm.position) {
+            /* we are about to remove the head message of this group.  This 
message is
+             * used to index the freeGroups fifo, so we must temporarily 
remove it from
+             * the fifo until we are done updating the head message.
+             */
+            freeGroups.removeGroup(group);
+            freeNeeded = true;
         }
     }
+    group.dequeueMsg(qm);
+
+    uint32_t total = group.totalMsgs();
+    QPID_LOG( trace, "group queue " << qName <<
+              ": dequeued message from group id=" << group.getName() << " 
total=" << total );
+
+    // if no more outstanding acquired messages, free the group from the 
consumer
+    if (group.acquiredMsgs() == 0 && group.getOwner()) {
+        // group is now available again
+        disownGroup(group);
+        freeNeeded = true;
+    }
 
-    uint32_t total = state.members.size();
     QPID_LOG( trace, "group queue " << qName <<
-              ": dequeued message from group id=" << state.group << " total=" 
<< total );
+              ": dequeued message from group id=" << group.getName() << " 
total=" << total );
 
     if (total == 0) {
-        QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << 
state.group);
-        if (cachedGroup == &state) {
-            cachedGroup = 0;
-        }
-        std::string key(state.group);
-        messageGroups.erase( key );
-    } else if (state.acquired == 0 && state.owned()) {
-        QPID_LOG( trace, "group queue " << qName <<
-                  ": consumer name=" << state.owner << " released group id=" 
<< state.group);
-        disown(state);
-    } else if (reFreeNeeded) {
-        disown(state);
+        QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << 
group.getName());
+        deleteGroup(group);
+    } else if (freeNeeded) {
+        freeGroups.addGroup(group);
     }
 }
 
+/** remove the owner of the group */
+void MessageGroupManager::disownGroup(GroupState& group)
+{
+    ConsumerState& owner = *group.getOwner();
+    QPID_LOG( trace, "group queue " << qName <<
+              ": consumer name=" << owner.getName() << " released group id=" 
<< group.getName());
+    group.resetOwner();
+    if (owner.cancelled() && owner.groupCount() == 0) {
+        // this owner has unsubscribed, we can release it now.
+        std::string name = owner.getName();
+        consumers.erase(name);
+        QPID_LOG( error, "group queue " << qName << ": consumer " << name << " 
removed.");
+    }
+}
+
+namespace {
+    unsigned long found = 0;
+    unsigned long failed = 0;
+    unsigned long missCount = 0;
+    unsigned long earlyRet = 0;
+}
+
 MessageGroupManager::~MessageGroupManager()
 {
     QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << 
hits << " misses=" << misses );
@@ -207,27 +364,38 @@ bool MessageGroupManager::nextConsumable
     if (messages.empty())
         return false;
 
+    ConsumerState& cState = consumers.find(c->getName())->second;
+
     next.position = c->position;
-    if (!freeGroups.empty()) {
-        const framing::SequenceNumber& nextFree = freeGroups.begin()->first;
-        if (nextFree < next.position) {     // a free message is older than 
current
+    if (freeGroups.groupCount() != 0) {
+        const framing::SequenceNumber& nextFree = 
freeGroups.nextGroup().nextMsg();
+        if (nextFree <= next.position) {     // a free message is older than 
current
             next.position = nextFree;
             --next.position;
         }
+    } else if (cState.remainingMsgs() == 0) {   // no more msgs from owned 
groups
+        earlyRet += 1;
+        return false;
     }
 
+    int count = 1;
     while (messages.next( next.position, next )) {
         GroupState& group = findGroup(next);
-        if (!group.owned()) {
-            if (group.members.front() == next.position) {    // only take from 
head!
+        if (group.getOwner() == &cState) {
+            found += 1;
+            return true;
+        } else if (group.isFree()) {
+            if (group.nextMsg() == next.position) {    // only take from head!
+                found += 1;
                 return true;
             }
-            QPID_LOG(debug, "Skipping " << next.position << " since group " << 
group.group
-                     << "'s head message still pending. pos=" << 
group.members.front());
-        } else if (group.owner == c->getName()) {
-            return true;
+            QPID_LOG(debug, "Skipping " << next.position << " since group " << 
group.getName()
+                     << "'s head message still pending. pos=" << 
group.nextMsg());
         }
+        count += 1;
     }
+    failed += 1;
+    missCount += 1;
     return false;
 }
 
@@ -237,13 +405,17 @@ bool MessageGroupManager::allocate(const
     // @todo KAG avoid lookup: retrieve direct reference to group state from 
QueuedMessage
     GroupState& state = findGroup(qm);
 
-    if (!state.owned()) {
-        own( state, consumer );
+    if (state.isFree()) {
+        freeGroups.removeGroup(state);
+        ConsumerMap::iterator cs = consumers.find( consumer );
+        assert(cs != consumers.end());
+        ConsumerState& owner = cs->second;
+        state.setOwner( owner );
         QPID_LOG( trace, "group queue " << qName <<
-                  ": consumer name=" << consumer << " has acquired group id=" 
<< state.group);
+                  ": consumer name=" << consumer << " has acquired group id=" 
<< state.getName());
         return true;
     }
-    return state.owner == consumer;
+    return state.getOwner()->getName() == consumer;
 }
 
 bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, 
QueuedMessage& next )
@@ -280,9 +452,13 @@ void MessageGroupManager::query(qpid::ty
          g != messageGroups.end(); ++g) {
         qpid::types::Variant::Map info;
         info[GROUP_ID_KEY] = g->first;
-        info[GROUP_MSG_COUNT] = g->second.members.size();
+        info[GROUP_MSG_COUNT] = g->second.totalMsgs();
         info[GROUP_TIMESTAMP] = 0;   /** @todo KAG - NEED HEAD MSG TIMESTAMP */
-        info[GROUP_CONSUMER] = g->second.owner;
+        if (g->second.getOwner()) {
+            info[GROUP_CONSUMER] = g->second.getOwner()->getName();
+        } else {
+            info[GROUP_CONSUMER] = std::string("");
+        }
         groups.push_back(info);
     }
     state[GROUP_STATE_KEY] = groups;
@@ -346,6 +522,11 @@ namespace {
     const std::string GROUP_ACQUIRED_CT("acquired-ct");
     const std::string GROUP_POSITIONS("positions");
     const std::string GROUP_STATE("group-state");
+    const std::string OWNER_STATE("owner-state");
+    const std::string CANCELLED("cancelled");
+    const std::string YES("yes");
+    const std::string NO("no");
+    const std::string OWNER_NAME("name");
 }
 
 
@@ -360,17 +541,29 @@ void MessageGroupManager::getState(qpid:
 
         framing::FieldTable group;
         group.setString(GROUP_NAME, g->first);
-        group.setString(GROUP_OWNER, g->second.owner);
-        group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
+        if (g->second.getOwner()) {
+            group.setString(GROUP_OWNER, g->second.getOwner()->getName());
+        } else {
+            group.setString(GROUP_OWNER, std::string(""));
+        }
+        group.setInt(GROUP_ACQUIRED_CT, g->second.acquiredMsgs());
         framing::Array positions(TYPE_CODE_UINT32);
-        for (GroupState::PositionFifo::const_iterator p = 
g->second.members.begin();
-             p != g->second.members.end(); ++p)
-            positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p 
)));
+        g->second.getPositions(positions);
         group.setArray(GROUP_POSITIONS, positions);
         groupState.push_back(framing::Array::ValuePtr(new 
FieldTableValue(group)));
     }
     state.setArray(GROUP_STATE, groupState);
 
+    framing::Array ownerState(TYPE_CODE_MAP);
+    for (ConsumerMap::const_iterator c = consumers.begin();
+         c != consumers.end(); ++c) {
+        framing::FieldTable owner;
+        owner.setString(OWNER_NAME, c->first);
+        owner.setString(CANCELLED, c->second.cancelled() ? YES : NO);
+        ownerState.push_back(framing::Array::ValuePtr(new 
FieldTableValue(owner)));
+    }
+    state.setArray(OWNER_STATE, ownerState);
+
     QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group 
state, key=" << groupIdHeader);
 }
 
@@ -379,51 +572,86 @@ void MessageGroupManager::getState(qpid:
 void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
 {
     using namespace qpid::framing;
+    consumers.clear();
     messageGroups.clear();
     freeGroups.clear();
     cachedGroup = 0;
 
-    framing::Array groupState(TYPE_CODE_MAP);
+    // set up the known owners
+    framing::Array ownerState(TYPE_CODE_MAP);
+    bool ok = state.getArray(OWNER_STATE, ownerState);
+    if (!ok) {
+        QPID_LOG(error, "Unable to find message group owner state information 
for queue \"" <<
+                 qName << "\": cluster inconsistency error!");
+        return;
+    }
 
-    bool ok = state.getArray(GROUP_STATE, groupState);
+    for (framing::Array::const_iterator c = ownerState.begin(); c != 
ownerState.end(); ++c) {
+        framing::FieldTable ownerMap;
+        ok = framing::getEncodedValue<FieldTable>(*c, ownerMap);
+        if (!ok) {
+            QPID_LOG(error, "Invalid message group owner information for queue 
\"" <<
+                     qName << "\": table encoding error!");
+            return;
+        }
+        if (!ownerMap.isSet(OWNER_NAME) || !ownerMap.isSet(CANCELLED)) {
+            QPID_LOG(error, "Invalid message group owner information for queue 
\"" <<
+                     qName << "\": fields missing error!");
+            return;
+        }
+
+        const std::string name = ownerMap.getAsString(OWNER_NAME);
+        ConsumerState& owner = consumers[name];
+        owner.setName(name);
+        if (ownerMap.getAsString(CANCELLED) == YES) {
+            owner.cancel();
+        }
+    }
+
+    // set up the known groups
+    framing::Array groupState(TYPE_CODE_MAP);
+    ok = state.getArray(GROUP_STATE, groupState);
     if (!ok) {
         QPID_LOG(error, "Unable to find message group state information for 
queue \"" <<
                  qName << "\": cluster inconsistency error!");
         return;
     }
 
-    for (framing::Array::const_iterator g = groupState.begin();
-         g != groupState.end(); ++g) {
-        framing::FieldTable group;
-        ok = framing::getEncodedValue<FieldTable>(*g, group);
+    for (framing::Array::const_iterator g = groupState.begin(); g != 
groupState.end(); ++g) {
+        framing::FieldTable groupMap;
+        ok = framing::getEncodedValue<FieldTable>(*g, groupMap);
         if (!ok) {
             QPID_LOG(error, "Invalid message group state information for queue 
\"" <<
                      qName << "\": table encoding error!");
             return;
         }
-        MessageGroupManager::GroupState state;
-        if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || 
!group.isSet(GROUP_ACQUIRED_CT)) {
+        if (!groupMap.isSet(GROUP_NAME) || !groupMap.isSet(GROUP_OWNER) || 
!groupMap.isSet(GROUP_ACQUIRED_CT)) {
             QPID_LOG(error, "Invalid message group state information for queue 
\"" <<
                      qName << "\": fields missing error!");
             return;
         }
-        state.group = group.getAsString(GROUP_NAME);
-        state.owner = group.getAsString(GROUP_OWNER);
-        state.acquired = group.getAsInt(GROUP_ACQUIRED_CT);
+
+        // replicate the group state
+        std::string name = groupMap.getAsString(GROUP_NAME);
+        MessageGroupManager::GroupState& group = messageGroups[name];
+        assert(group.getName().empty());
+        group.setName(name);
+        group.setAcquired(groupMap.getAsInt(GROUP_ACQUIRED_CT));
         framing::Array positions(TYPE_CODE_UINT32);
-        ok = group.getArray(GROUP_POSITIONS, positions);
+        ok = groupMap.getArray(GROUP_POSITIONS, positions);
         if (!ok) {
             QPID_LOG(error, "Invalid message group state information for queue 
\"" <<
                      qName << "\": position encoding error!");
             return;
         }
+        group.setPositions(positions);
 
-        for (Array::const_iterator p = positions.begin(); p != 
positions.end(); ++p)
-            state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
-        messageGroups[state.group] = state;
-        if (!state.owned()) {
-            assert(state.members.size());
-            freeGroups[state.members.front()] = &messageGroups[state.group];
+        const std::string ownerName = groupMap.getAsString(GROUP_OWNER);
+        if (!ownerName.empty()) {
+            ConsumerState& owner = consumers[ownerName];
+            group.setOwner(owner);
+        } else {
+            freeGroups.addGroup(group);
         }
     }
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1198612&r1=1198611&r2=1198612&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h 
(original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.h Mon 
Nov  7 03:29:04 2011
@@ -43,40 +43,99 @@ class MessageGroupManager : public State
     Messages& messages;                 // parent Queue's in memory message 
container
     const std::string qName;            // name of parent queue (for logs)
 
-    struct GroupState {
+    class GroupState;
+
+    /** track consumers subscribed to this queue */
+    class ConsumerState {
+        // note: update getState()/setState() when changing this object's 
state implementation
+        bool zombie;        // cancelled, but still holding messages.
+        std::string name;
+        uint32_t ownedGroups;
+        uint32_t pendingMsgs;
+
+    public:
+        ConsumerState() : zombie(false), ownedGroups(0), pendingMsgs(0) {}
+        const std::string& getName() const {return name;}
+        void setName(const std::string& n) {name = n;}
+        uint32_t groupCount() const {return ownedGroups;}
+        uint32_t remainingMsgs() const {return pendingMsgs;}
+
+        void addGroup( const GroupState& g);
+        void removeGroup( const GroupState& g );
+        bool cancelled() const {return zombie;}
+        void cancel() {zombie = true;}
+        void uncancel() {zombie = false;}   // resubscribed on existing session
+        void msgAvailable(const GroupState& g, const QueuedMessage& qm);
+        void msgAcquired(const GroupState& g, const QueuedMessage& qm);
+    };
+    typedef sys::unordered_map<std::string, ConsumerState> ConsumerMap;
+    ConsumerMap consumers;    // index: consumer name
+
+    /** track all known groups */
+    class GroupState {
         // note: update getState()/setState() when changing this object's 
state implementation
         typedef std::deque<framing::SequenceNumber> PositionFifo;
 
-        std::string group;  // group identifier
-        std::string owner;  // consumer with outstanding acquired messages
+        std::string name;  // group identifier
         uint32_t acquired;  // count of outstanding acquired messages
+        ConsumerState *owner;  // consumer with outstanding acquired messages
         PositionFifo members;   // msgs belonging to this group
 
-        GroupState() : acquired(0) {}
-        bool owned() const {return !owner.empty();}
+    public:
+        GroupState() : acquired(0), owner(0) {}
+        const std::string& getName() const {return name;}
+        void setName(const std::string& n) {name = n;}
+        uint32_t acquiredMsgs() const {return acquired;}
+        uint32_t totalMsgs() const {return members.size();}
+        bool isFree() const {return owner == 0;}
+
+        void setOwner( ConsumerState& consumer );
+        ConsumerState *getOwner() const { return owner; }
+        void resetOwner();
+        const framing::SequenceNumber& nextMsg() const;
+        void enqueueMsg(const QueuedMessage& msg);
+        void acquireMsg(const QueuedMessage& msg);
+        void requeueMsg(const QueuedMessage& msg);
+        void dequeueMsg(const QueuedMessage& msg);
+        // for clustering:
+        void getPositions(framing::Array& pos) const;
+        void setPositions(const framing::Array& pos);
+        void setAcquired(uint32_t c) {acquired = c;}
+
     };
+    typedef sys::unordered_map<std::string, GroupState> GroupMap;
+    GroupMap messageGroups;   // index: group name
+    // cache the last lookup
+    uint hits;
+    uint misses;
+    uint32_t lastMsg;
+    std::string lastGroup;
+    GroupState *cachedGroup;
 
-    typedef sys::unordered_map<std::string, struct GroupState> GroupMap;
-    typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
+    /** store free (un-owned) groups by the position of the oldest index */
+    class GroupFifo {
+        // orders groups by their next available message (oldest first)
+        typedef std::map<framing::SequenceNumber, const GroupState *> Fifo;
+        Fifo fifo;
+
+    public:
+        GroupFifo() {}
+        void addGroup(const GroupState& group);
+        void removeGroup(const GroupState& group);
+        size_t groupCount() const {return fifo.size();}
+        const GroupState& nextGroup() const;
+        void clear() {fifo.clear();}
+    };
+    GroupFifo freeGroups;
 
-    GroupMap messageGroups; // index: group name
-    GroupFifo freeGroups;   // ordered by oldest free msg
-    //Consumers consumers;    // index: consumer name
+    GroupState& findGroup( const QueuedMessage& qm );
+    void deleteGroup(GroupState& group);
+    void disownGroup(GroupState& group);        /** release a group from a 
subscriber */
 
     static const std::string qpidMessageGroupKey;
     static const std::string qpidSharedGroup;   // if specified, one group can 
be consumed by multiple receivers
     static const std::string qpidMessageGroupTimestamp;
 
-    GroupState& findGroup( const QueuedMessage& qm );
-    unsigned long hits, misses; // for debug
-    uint32_t lastMsg;
-    std::string lastGroup;
-    GroupState *cachedGroup;
-
-    void unFree( const GroupState& state );
-    void own( GroupState& state, const std::string& owner );
-    void disown( GroupState& state );
-
  public:
 
     static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId);
@@ -97,8 +156,8 @@ class MessageGroupManager : public State
     void acquired( const QueuedMessage& qm );
     void requeued( const QueuedMessage& qm );
     void dequeued( const QueuedMessage& qm );
-    void consumerAdded( const Consumer& ) {};
-    void consumerRemoved( const Consumer& ) {};
+    void consumerAdded( const Consumer& );
+    void consumerRemoved( const Consumer& );
     void getState(qpid::framing::FieldTable& state ) const;
     void setState(const qpid::framing::FieldTable&);
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp?rev=1198612&r1=1198611&r2=1198612&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/QueueTest.cpp Mon Nov  7 
03:29:04 2011
@@ -860,6 +860,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsu
     // Owners= ---, ---, ---
 
     TestConsumer::shared_ptr c3(new TestConsumer("C3"));
+    queue->consume(c3);
     std::deque<QueuedMessage> dequeMeC3;
 
     verifyAcquire(queue, c3, dequeMeC3, "a", 2 );



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to