Author: kgiusti
Date: Thu Oct 13 17:33:08 2011
New Revision: 1183001

URL: http://svn.apache.org/viewvc?rev=1183001&view=rev
Log:
QPID-3346: code cleanup - move private code out of headers, delete dead code.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1183001&r1=1183000&r2=1183001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Thu Oct 13 
17:33:08 2011
@@ -43,6 +43,27 @@ const std::string MessageGroupManager::q
 const std::string 
MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp");
 
 
+void MessageGroupManager::unFree( const GroupState& state )
+{
+    GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+    assert( pos != freeGroups.end() && pos->second == &state );
+    freeGroups.erase( pos );
+}
+
+void MessageGroupManager::own( GroupState& state, const std::string& owner )
+{
+    state.owner = owner;
+    unFree( state );
+}
+
+void MessageGroupManager::disown( GroupState& state )
+{
+    state.owner.clear();
+    assert(state.members.size());
+    assert(freeGroups.find(state.members.front()) == freeGroups.end());
+    freeGroups[state.members.front()] = &state;
+}
+
 const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) 
const
 {
     const qpid::framing::FieldTable* headers = 
qm.payload->getApplicationHeaders();
@@ -155,51 +176,6 @@ void MessageGroupManager::dequeued( cons
               ": dequeued message from group id=" << group << " total=" << 
total );
 }
 
-void MessageGroupManager::consumerAdded( const Consumer& /*c*/ )
-{
-#if 0
-    // allow a re-subscribing consumer
-    if (consumers.find(c.getName()) == consumers.end()) {
-        consumers[c.getName()] = 0;     // no groups owned yet
-        QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" 
<< c.getName() );
-    } else {
-        QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed, 
name=" << c.getName() );
-    }
-#endif
-}
-
-void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ )
-{
-#if 0
-    const std::string& name(c.getName());
-    Consumers::iterator consumer = consumers.find(name);
-    assert(consumer != consumers.end());
-    size_t count = consumer->second;
-
-    for (GroupMap::iterator gs = messageGroups.begin();
-         count && gs != messageGroups.end(); ++gs) {
-
-        GroupState& state( gs->second );
-        if (state.owner == name) {
-            if (state.acquired == 0) {
-                --count;
-                disown(state);
-                QPID_LOG( trace, "group queue " << qName <<
-                          ": consumer name=" << name << " released group id=" 
<< gs->first);
-            }
-        }
-    }
-    if (count == 0) {
-        consumers.erase( consumer );
-        QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" 
<< name );
-    } else {
-        // don't release groups with outstanding acquired msgs - consumer may 
re-subscribe!
-        QPID_LOG( trace, "group queue " << qName << ": consumer name=" << name 
<< " unsubscribed with outstanding messages.");
-    }
-#endif
-}
-
-
 bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, 
QueuedMessage& next )
 {
     if (messages.empty())
@@ -215,11 +191,6 @@ bool MessageGroupManager::nextConsumable
                 return false;           // shouldn't happen - should find 
nextFree
         }
     } else {  // no free groups available
-#if 0
-        if (consumers[c->getName()] == 0) {  // and none currently owned
-            return false;       // so nothing available to consume
-        }
-#endif
         if (!messages.next( c->position, next ))
             return false;
     }
@@ -430,10 +401,7 @@ void MessageGroupManager::setState(const
         for (Array::const_iterator p = positions.begin(); p != 
positions.end(); ++p)
             state.members.push_back((*p)->getIntegerValue<uint32_t, 4>());
         messageGroups[state.group] = state;
-        if (state.owned())
-            //consumers[state.owner]++;
-            ;
-        else {
+        if (!state.owned()) {
             assert(state.members.size());
             freeGroups[state.members.front()] = &messageGroups[state.group];
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1183001&r1=1183000&r2=1183001&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h Thu Oct 13 
17:33:08 2011
@@ -44,22 +44,20 @@ class MessageGroupManager : public State
     const std::string qName;            // name of parent queue (for logs)
 
     struct GroupState {
+        // note: update getState()/setState() when changing this object's 
state implementation
         typedef std::deque<framing::SequenceNumber> PositionFifo;
 
         std::string group;  // group identifier
         std::string owner;  // consumer with outstanding acquired messages
         uint32_t acquired;  // count of outstanding acquired messages
-        //uint32_t total;     // count of enqueued messages in this group
         PositionFifo members;   // msgs belonging to this group
 
         GroupState() : acquired(0) {}
         bool owned() const {return !owner.empty();}
     };
     typedef std::map<std::string, struct GroupState> GroupMap;
-    //typedef std::map<std::string, uint32_t> Consumers;  // count of owned 
groups
     typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
 
-    // note: update getState()/setState() when changing this object's state 
implementation
     GroupMap messageGroups; // index: group name
     GroupFifo freeGroups;   // ordered by oldest free msg
     //Consumers consumers;    // index: consumer name
@@ -69,27 +67,9 @@ class MessageGroupManager : public State
     static const std::string qpidMessageGroupTimestamp;
 
     const std::string getGroupId( const QueuedMessage& qm ) const;
-    void unFree( const GroupState& state )
-    {
-        GroupFifo::iterator pos = freeGroups.find( state.members.front() );
-        assert( pos != freeGroups.end() && pos->second == &state );
-        freeGroups.erase( pos );
-    }
-    void own( GroupState& state, const std::string& owner )
-    {
-        state.owner = owner;
-        //consumers[state.owner]++;
-        unFree( state );
-    }
-    void disown( GroupState& state )
-    {
-        //assert(consumers[state.owner]);
-        //consumers[state.owner]--;
-        state.owner.clear();
-        assert(state.members.size());
-        assert(freeGroups.find(state.members.front()) == freeGroups.end());
-        freeGroups[state.members.front()] = &state;
-    }
+    void unFree( const GroupState& state );
+    void own( GroupState& state, const std::string& owner );
+    void disown( GroupState& state );
 
  public:
 
@@ -102,12 +82,14 @@ class MessageGroupManager : public State
                         Messages& container, unsigned int _timestamp=0 )
       : StatefulQueueObserver(std::string("MessageGroupManager:") + header),
       groupIdHeader( header ), timestamp(_timestamp), messages(container), 
qName(_qName) {}
+
+    // QueueObserver iface
     void enqueued( const QueuedMessage& qm );
     void acquired( const QueuedMessage& qm );
     void requeued( const QueuedMessage& qm );
     void dequeued( const QueuedMessage& qm );
-    void consumerAdded( const Consumer& );
-    void consumerRemoved( const Consumer& );
+    void consumerAdded( const Consumer& ) {};
+    void consumerRemoved( const Consumer& ) {};
     void getState(qpid::framing::FieldTable& state ) const;
     void setState(const qpid::framing::FieldTable&);
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to