Author: aconway
Date: Wed Dec 19 21:23:41 2012
New Revision: 1424132

URL: http://svn.apache.org/viewvc?rev=1424132&view=rev
Log:
QPID-4514: Remove obsolete cluster code: get rid of StatefulQueueObserver.

Removed:
    qpid/trunk/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h
Modified:
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1424132&r1=1424131&r2=1424132&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Wed Dec 19 21:23:41 2012
@@ -727,7 +727,6 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/SessionState.h \
   qpid/broker/SignalHandler.cpp \
   qpid/broker/SignalHandler.h \
-  qpid/broker/StatefulQueueObserver.h \
   qpid/broker/System.cpp \
   qpid/broker/System.h \
   qpid/broker/ThresholdAlerts.cpp \

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1424132&r1=1424131&r2=1424132&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Wed Dec 19 
21:23:41 2012
@@ -311,100 +311,3 @@ namespace {
     const std::string GROUP_STATE("group-state");
 }
 
-
-/** Runs on UPDATER to snapshot current state */
-void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
-{
-    using namespace qpid::framing;
-    state.clear();
-    framing::Array groupState(TYPE_CODE_MAP);
-    for (GroupMap::const_iterator g = messageGroups.begin();
-         g != messageGroups.end(); ++g) {
-
-        framing::FieldTable group;
-        group.setString(GROUP_NAME, g->first);
-        group.setString(GROUP_OWNER, g->second.owner);
-        group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
-        framing::Array positions(TYPE_CODE_UINT32);
-        framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
-        for (GroupState::MessageFifo::const_iterator p = 
g->second.members.begin();
-             p != g->second.members.end(); ++p) {
-            positions.push_back(framing::Array::ValuePtr(new IntegerValue( 
p->position )));
-            acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( 
p->acquired )));
-        }
-        group.setArray(GROUP_POSITIONS, positions);
-        group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
-        groupState.push_back(framing::Array::ValuePtr(new 
FieldTableValue(group)));
-    }
-    state.setArray(GROUP_STATE, groupState);
-
-    QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group 
state, key=" << groupIdHeader);
-}
-
-
-/** called on UPDATEE to set state from snapshot */
-void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
-{
-    using namespace qpid::framing;
-    messageGroups.clear();
-    freeGroups.clear();
-    cachedGroup = 0;
-
-    framing::Array groupState(TYPE_CODE_MAP);
-
-    bool ok = state.getArray(GROUP_STATE, groupState);
-    if (!ok) {
-        QPID_LOG(error, "Unable to find message group state information for 
queue \"" <<
-                 qName << "\"");
-        return;
-    }
-
-    for (framing::Array::const_iterator g = groupState.begin();
-         g != groupState.end(); ++g) {
-        framing::FieldTable group;
-        ok = framing::getEncodedValue<FieldTable>(*g, group);
-        if (!ok) {
-            QPID_LOG(error, "Invalid message group state information for queue 
\"" <<
-                     qName << "\": table encoding error!");
-            return;
-        }
-        MessageGroupManager::GroupState state;
-        if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || 
!group.isSet(GROUP_ACQUIRED_CT)) {
-            QPID_LOG(error, "Invalid message group state information for queue 
\"" <<
-                     qName << "\": fields missing error!");
-            return;
-        }
-        state.group = group.getAsString(GROUP_NAME);
-        state.owner = group.getAsString(GROUP_OWNER);
-        state.acquired = group.getAsInt(GROUP_ACQUIRED_CT);
-        framing::Array positions(TYPE_CODE_UINT32);
-        ok = group.getArray(GROUP_POSITIONS, positions);
-        if (!ok) {
-            QPID_LOG(error, "Invalid message group state information for queue 
\"" <<
-                     qName << "\": position encoding error!");
-            return;
-        }
-        framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
-        ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
-        if (!ok || positions.count() != acquiredMsgs.count()) {
-            QPID_LOG(error, "Invalid message group state information for queue 
\"" <<
-                     qName << "\": acquired flag encoding error!");
-            return;
-        }
-
-        Array::const_iterator a = acquiredMsgs.begin();
-        for (Array::const_iterator p = positions.begin(); p != 
positions.end(); ++p) {
-            GroupState::MessageState mState((*p)->getIntegerValue<uint32_t, 
4>());
-            mState.acquired = (*a++)->getIntegerValue<bool>();
-            state.members.push_back(mState);
-        }
-
-        messageGroups[state.group] = state;
-        if (!state.owned()) {
-            assert(state.members.size());
-            freeGroups[state.members.front().position] = 
&messageGroups[state.group];
-        }
-    }
-
-    QPID_LOG(debug, "Queue \"" << qName << "\": message group state 
replicated, key =" << groupIdHeader)
-}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1424132&r1=1424131&r2=1424132&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h Wed Dec 19 
21:23:41 2012
@@ -25,11 +25,12 @@
 /* for managing message grouping on Queues */
 
 #include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/StatefulQueueObserver.h"
+#include "qpid/broker/QueueObserver.h"
 #include "qpid/broker/MessageDistributor.h"
 #include "qpid/framing/SequenceNumber.h"
 #include "qpid/sys/unordered_map.h"
 
+#include "boost/shared_ptr.hpp"
 #include <deque>
 
 namespace qpid {
@@ -39,8 +40,9 @@ class QueueObserver;
 struct QueueSettings;
 class MessageDistributor;
 class Messages;
+class Consumer;
 
-class MessageGroupManager : public StatefulQueueObserver, public 
MessageDistributor
+class MessageGroupManager : public QueueObserver, public MessageDistributor
 {
     static std::string defaultGroupId;  // assigned if no group id header 
present
 
@@ -101,10 +103,10 @@ class MessageGroupManager : public State
 
     MessageGroupManager(const std::string& header, const std::string& _qName,
                         Messages& container, unsigned int _timestamp=0 )
-      : StatefulQueueObserver(std::string("MessageGroupManager:") + header),
-      groupIdHeader( header ), timestamp(_timestamp), messages(container), 
qName(_qName),
-      hits(0), misses(0),
-      lastMsg(0), cachedGroup(0) {}
+      : groupIdHeader( header ), timestamp(_timestamp), messages(container),
+        qName(_qName),
+        hits(0), misses(0),
+        lastMsg(0), cachedGroup(0) {}
     virtual ~MessageGroupManager();
 
     // QueueObserver iface
@@ -114,8 +116,6 @@ class MessageGroupManager : public State
     void dequeued( const Message& qm );
     void consumerAdded( const Consumer& ) {};
     void consumerRemoved( const Consumer& ) {};
-    void getState(qpid::framing::FieldTable& state ) const;
-    void setState(const qpid::framing::FieldTable&);
 
     // MessageDistributor iface
     bool acquire(const std::string& c, Message& );

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1424132&r1=1424131&r2=1424132&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Wed Dec 19 21:23:41 
2012
@@ -65,7 +65,7 @@ namespace {
 QueueFlowLimit::QueueFlowLimit(Queue *_queue,
                                uint32_t _flowStopCount, uint32_t 
_flowResumeCount,
                                uint64_t _flowStopSize,  uint64_t 
_flowResumeSize)
-    : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), 
queueName("<unknown>"),
+    : queue(_queue), queueName("<unknown>"),
       flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
       flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
       flowStopped(false), count(0), size(0), broker(0)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1424132&r1=1424131&r2=1424132&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Wed Dec 19 21:23:41 
2012
@@ -26,7 +26,7 @@
 #include <iostream>
 #include <memory>
 #include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/StatefulQueueObserver.h"
+#include "qpid/broker/QueueObserver.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/SequenceNumber.h"
 #include "qpid/sys/AtomicValue.h"
@@ -40,6 +40,7 @@ namespace broker {
 
 class Broker;
 class Queue;
+class Message;
 struct QueueSettings;
 
 /**
@@ -49,7 +50,7 @@ struct QueueSettings;
  * passing _either_ level may turn flow control ON, but _both_ must be
  * below level before flow control will be turned OFF.
  */
- class QueueFlowLimit : public StatefulQueueObserver
+ class QueueFlowLimit : public QueueObserver
 {
     static uint64_t defaultMaxSize;
     static uint defaultFlowStopRatio;



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to