Author: aconway
Date: Wed Dec 19 21:23:41 2012
New Revision: 1424132
URL: http://svn.apache.org/viewvc?rev=1424132&view=rev
Log:
QPID-4514: Remove obsolete cluster code: get rid of StatefulQueueObserver.
Removed:
qpid/trunk/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h
Modified:
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h
qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1424132&r1=1424131&r2=1424132&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Wed Dec 19 21:23:41 2012
@@ -727,7 +727,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/SessionState.h \
qpid/broker/SignalHandler.cpp \
qpid/broker/SignalHandler.h \
- qpid/broker/StatefulQueueObserver.h \
qpid/broker/System.cpp \
qpid/broker/System.h \
qpid/broker/ThresholdAlerts.cpp \
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1424132&r1=1424131&r2=1424132&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Wed Dec 19
21:23:41 2012
@@ -311,100 +311,3 @@ namespace {
const std::string GROUP_STATE("group-state");
}
-
-/** Runs on UPDATER to snapshot current state */
-void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
-{
- using namespace qpid::framing;
- state.clear();
- framing::Array groupState(TYPE_CODE_MAP);
- for (GroupMap::const_iterator g = messageGroups.begin();
- g != messageGroups.end(); ++g) {
-
- framing::FieldTable group;
- group.setString(GROUP_NAME, g->first);
- group.setString(GROUP_OWNER, g->second.owner);
- group.setInt(GROUP_ACQUIRED_CT, g->second.acquired);
- framing::Array positions(TYPE_CODE_UINT32);
- framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
- for (GroupState::MessageFifo::const_iterator p =
g->second.members.begin();
- p != g->second.members.end(); ++p) {
- positions.push_back(framing::Array::ValuePtr(new IntegerValue(
p->position )));
- acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue(
p->acquired )));
- }
- group.setArray(GROUP_POSITIONS, positions);
- group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
- groupState.push_back(framing::Array::ValuePtr(new
FieldTableValue(group)));
- }
- state.setArray(GROUP_STATE, groupState);
-
- QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group
state, key=" << groupIdHeader);
-}
-
-
-/** called on UPDATEE to set state from snapshot */
-void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
-{
- using namespace qpid::framing;
- messageGroups.clear();
- freeGroups.clear();
- cachedGroup = 0;
-
- framing::Array groupState(TYPE_CODE_MAP);
-
- bool ok = state.getArray(GROUP_STATE, groupState);
- if (!ok) {
- QPID_LOG(error, "Unable to find message group state information for
queue \"" <<
- qName << "\"");
- return;
- }
-
- for (framing::Array::const_iterator g = groupState.begin();
- g != groupState.end(); ++g) {
- framing::FieldTable group;
- ok = framing::getEncodedValue<FieldTable>(*g, group);
- if (!ok) {
- QPID_LOG(error, "Invalid message group state information for queue
\"" <<
- qName << "\": table encoding error!");
- return;
- }
- MessageGroupManager::GroupState state;
- if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) ||
!group.isSet(GROUP_ACQUIRED_CT)) {
- QPID_LOG(error, "Invalid message group state information for queue
\"" <<
- qName << "\": fields missing error!");
- return;
- }
- state.group = group.getAsString(GROUP_NAME);
- state.owner = group.getAsString(GROUP_OWNER);
- state.acquired = group.getAsInt(GROUP_ACQUIRED_CT);
- framing::Array positions(TYPE_CODE_UINT32);
- ok = group.getArray(GROUP_POSITIONS, positions);
- if (!ok) {
- QPID_LOG(error, "Invalid message group state information for queue
\"" <<
- qName << "\": position encoding error!");
- return;
- }
- framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN);
- ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs);
- if (!ok || positions.count() != acquiredMsgs.count()) {
- QPID_LOG(error, "Invalid message group state information for queue
\"" <<
- qName << "\": acquired flag encoding error!");
- return;
- }
-
- Array::const_iterator a = acquiredMsgs.begin();
- for (Array::const_iterator p = positions.begin(); p !=
positions.end(); ++p) {
- GroupState::MessageState mState((*p)->getIntegerValue<uint32_t,
4>());
- mState.acquired = (*a++)->getIntegerValue<bool>();
- state.members.push_back(mState);
- }
-
- messageGroups[state.group] = state;
- if (!state.owned()) {
- assert(state.members.size());
- freeGroups[state.members.front().position] =
&messageGroups[state.group];
- }
- }
-
- QPID_LOG(debug, "Queue \"" << qName << "\": message group state
replicated, key =" << groupIdHeader)
-}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1424132&r1=1424131&r2=1424132&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h Wed Dec 19
21:23:41 2012
@@ -25,11 +25,12 @@
/* for managing message grouping on Queues */
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/StatefulQueueObserver.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/MessageDistributor.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/sys/unordered_map.h"
+#include "boost/shared_ptr.hpp"
#include <deque>
namespace qpid {
@@ -39,8 +40,9 @@ class QueueObserver;
struct QueueSettings;
class MessageDistributor;
class Messages;
+class Consumer;
-class MessageGroupManager : public StatefulQueueObserver, public
MessageDistributor
+class MessageGroupManager : public QueueObserver, public MessageDistributor
{
static std::string defaultGroupId; // assigned if no group id header
present
@@ -101,10 +103,10 @@ class MessageGroupManager : public State
MessageGroupManager(const std::string& header, const std::string& _qName,
Messages& container, unsigned int _timestamp=0 )
- : StatefulQueueObserver(std::string("MessageGroupManager:") + header),
- groupIdHeader( header ), timestamp(_timestamp), messages(container),
qName(_qName),
- hits(0), misses(0),
- lastMsg(0), cachedGroup(0) {}
+ : groupIdHeader( header ), timestamp(_timestamp), messages(container),
+ qName(_qName),
+ hits(0), misses(0),
+ lastMsg(0), cachedGroup(0) {}
virtual ~MessageGroupManager();
// QueueObserver iface
@@ -114,8 +116,6 @@ class MessageGroupManager : public State
void dequeued( const Message& qm );
void consumerAdded( const Consumer& ) {};
void consumerRemoved( const Consumer& ) {};
- void getState(qpid::framing::FieldTable& state ) const;
- void setState(const qpid::framing::FieldTable&);
// MessageDistributor iface
bool acquire(const std::string& c, Message& );
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1424132&r1=1424131&r2=1424132&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Wed Dec 19 21:23:41
2012
@@ -65,7 +65,7 @@ namespace {
QueueFlowLimit::QueueFlowLimit(Queue *_queue,
uint32_t _flowStopCount, uint32_t
_flowResumeCount,
uint64_t _flowStopSize, uint64_t
_flowResumeSize)
- : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue),
queueName("<unknown>"),
+ : queue(_queue), queueName("<unknown>"),
flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
flowStopped(false), count(0), size(0), broker(0)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1424132&r1=1424131&r2=1424132&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Wed Dec 19 21:23:41
2012
@@ -26,7 +26,7 @@
#include <iostream>
#include <memory>
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/StatefulQueueObserver.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/sys/AtomicValue.h"
@@ -40,6 +40,7 @@ namespace broker {
class Broker;
class Queue;
+class Message;
struct QueueSettings;
/**
@@ -49,7 +50,7 @@ struct QueueSettings;
* passing _either_ level may turn flow control ON, but _both_ must be
* below level before flow control will be turned OFF.
*/
- class QueueFlowLimit : public StatefulQueueObserver
+ class QueueFlowLimit : public QueueObserver
{
static uint64_t defaultMaxSize;
static uint defaultFlowStopRatio;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]