Author: kgiusti
Date: Mon Jan 31 22:16:53 2011
New Revision: 1065831
URL: http://svn.apache.org/viewvc?rev=1065831&view=rev
Log:
add mgmt configuration support
Modified:
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
(props changed)
qpid/branches/qpid-2935/qpid/java/client/src/main/java/client.log4j
(props changed)
qpid/branches/qpid-2935/qpid/java/integrationtests/src/resources/sustained-log4j.xml
(props changed)
qpid/branches/qpid-2935/qpid/java/management/client/etc/qman.log4j (props
changed)
qpid/branches/qpid-2935/qpid/java/perftests/etc/perftests.log4j (props
changed)
qpid/branches/qpid-2935/qpid/java/systests/src/main/java/systests.log4j
(props changed)
qpid/branches/qpid-2935/qpid/java/test-profiles/ (props changed)
qpid/branches/qpid-2935/qpid/java/tools/etc/test.log4j (props changed)
qpid/branches/qpid-2935/qpid/specs/management-schema.xml
qpid/branches/qpid-2935/qpid/tools/src/py/qpid-config
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1065831&r1=1065830&r2=1065831&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp Mon Jan 31
22:16:53 2011
@@ -645,7 +645,11 @@ void Queue::push(boost::intrusive_ptr<Me
if (policy.get()) {
policy->enqueued(qm);
}
- if (flowLimit.get()) flowLimit->consume(qm);
+ if (flowLimit.get()) {
+ bool fc = flowLimit->consume(qm);
+ if (fc && mgmtObject)
+ mgmtObject->set_flowStopped(true);
+ }
}
copy.notify();
}
@@ -844,7 +848,11 @@ void Queue::popAndDequeue()
void Queue::dequeued(const QueuedMessage& msg)
{
if (policy.get()) policy->dequeued(msg);
- if (flowLimit.get()) flowLimit->replenish(msg);
+ if (flowLimit.get()) {
+ bool fc = flowLimit->replenish(msg);
+ if (fc && mgmtObject)
+ mgmtObject->set_flowStopped(false);
+ }
mgntDeqStats(msg.payload);
if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
eventMgr->dequeued(msg);
@@ -908,8 +916,16 @@ void Queue::configure(const FieldTable&
flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings);
- if (mgmtObject != 0)
+ if (mgmtObject != 0) {
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
+ if (flowLimit.get()) {
+ mgmtObject->set_flowStopCount(flowLimit->getFlowStopCount());
+ mgmtObject->set_flowResumeCount(flowLimit->getFlowResumeCount());
+ mgmtObject->set_flowStopSize(flowLimit->getFlowStopSize());
+ mgmtObject->set_flowResumeSize(flowLimit->getFlowResumeSize());
+ mgmtObject->set_flowStopped(flowLimit->isFlowControlActive());
+ }
+ }
if ( isDurable() && ! getPersistenceId() && ! recovering )
store->create(*this, _settings);
@@ -1184,7 +1200,11 @@ void Queue::enqueued(const QueuedMessage
policy->recoverEnqueued(m.payload);
policy->enqueued(m);
}
- if (flowLimit.get()) flowLimit->consume(m);
+ if (flowLimit.get()) {
+ bool fc = flowLimit->consume(m);
+ if (fc && mgmtObject)
+ mgmtObject->set_flowStopped(true);
+ }
mgntEnqStats(m.payload);
boost::intrusive_ptr<Message> payload = m.payload;
enqueue ( 0, payload, true );
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1065831&r1=1065830&r2=1065831&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
(original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Mon Jan
31 22:16:53 2011
@@ -86,16 +86,17 @@ namespace {
QueueFlowLimit::QueueFlowLimit(Queue *_queue,
uint32_t _flowStopCount, uint32_t
_flowResumeCount,
uint64_t _flowStopSize, uint64_t
_flowResumeSize)
- : queueName("<unknown>"), flowStopCount(_flowStopCount),
flowResumeCount(_flowResumeCount),
+ : queue(_queue), queueName("<unknown>"),
+ flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
flowStopped(false), count(0), size(0)
{
uint32_t maxCount(0);
uint64_t maxSize(0);
- if (_queue) {
+ if (queue) {
queueName = _queue->getName();
- if (_queue->getPolicy()) {
+ if (queue->getPolicy()) {
maxSize = _queue->getPolicy()->getMaxSize();
maxCount = _queue->getPolicy()->getMaxCount();
}
@@ -109,9 +110,11 @@ QueueFlowLimit::QueueFlowLimit(Queue *_q
-void QueueFlowLimit::consume(const QueuedMessage& msg)
+bool QueueFlowLimit::consume(const QueuedMessage& msg)
{
- if (!msg.payload) return;
+ bool flowChanged(false);
+
+ if (!msg.payload) return false;
sys::Mutex::ScopedLock l(pendingFlowLock);
@@ -119,32 +122,36 @@ void QueueFlowLimit::consume(const Queue
size += msg.payload->contentSize();
if (flowStopCount && !flowStopped && count > flowStopCount) {
- flowStopped = true;
+ flowChanged = flowStopped = true;
QPID_LOG(info, "Queue \"" << queueName << "\": has reached " <<
flowStopCount << " enqueued messages. Producer flow control activated." );
}
if (flowStopSize && !flowStopped && size > flowStopSize) {
- flowStopped = true;
+ flowChanged = flowStopped = true;
QPID_LOG(info, "Queue \"" << queueName << "\": has reached " <<
flowStopSize << " enqueued bytes. Producer flow control activated." );
}
- // KAG: test
+ // KAG: test - REMOVE ONCE STABLE
if (index.find(msg.payload) != index.end()) {
QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg
twice: " << msg.position);
}
-
+
if (flowStopped || !pendingFlow.empty()) {
msg.payload->getReceiveCompletion().startCompleter(); // don't
complete until flow resumes
pendingFlow.push_back(msg.payload);
index.insert(msg.payload);
}
+
+ return flowChanged;
}
-void QueueFlowLimit::replenish(const QueuedMessage& msg)
+bool QueueFlowLimit::replenish(const QueuedMessage& msg)
{
- if (!msg.payload) return;
+ bool flowChanged(false);
+
+ if (!msg.payload) return false;
sys::Mutex::ScopedLock l(pendingFlowLock);
@@ -165,6 +172,7 @@ void QueueFlowLimit::replenish(const Que
(flowResumeSize == 0 || size < flowResumeSize) &&
(flowResumeCount == 0 || count < flowResumeCount)) {
flowStopped = false;
+ flowChanged = true;
QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the
flow control resume level. Producer flow control deactivated." );
}
@@ -197,6 +205,8 @@ void QueueFlowLimit::replenish(const Que
pendingFlow.pop_front();
}
}
+
+ return flowChanged;
}
Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1065831&r1=1065830&r2=1065831&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Mon Jan
31 22:16:53 2011
@@ -43,6 +43,7 @@ namespace broker {
*/
class QueueFlowLimit
{
+ Queue *queue;
std::string queueName;
uint32_t flowStopCount;
@@ -63,10 +64,10 @@ class QueueFlowLimit
virtual ~QueueFlowLimit() {}
- /** the queue has added QueuedMessage */
- void consume(const QueuedMessage&);
- /** the queue has removed QueuedMessage */
- void replenish(const QueuedMessage&);
+ /** the queue has added QueuedMessage. Returns true if flow state changes
*/
+ bool consume(const QueuedMessage&);
+ /** the queue has removed QueuedMessage. Returns true if flow state
changes */
+ bool replenish(const QueuedMessage&);
uint32_t getFlowStopCount() const { return flowStopCount; }
uint32_t getFlowResumeCount() const { return flowResumeCount; }
Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1065831&r1=1065830&r2=1065831&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp Mon Jan
31 22:16:53 2011
@@ -61,6 +61,7 @@ QPID_AUTO_TEST_CASE(testFlowCount)
BOOST_CHECK(!flow->isFlowControlActive());
BOOST_CHECK(flow->monitorFlowControl());
+ bool fc;
std::deque<QueuedMessage> msgs;
for (size_t i = 0; i < 6; i++) {
msgs.push_back(createMessage(10));
@@ -72,11 +73,11 @@ QPID_AUTO_TEST_CASE(testFlowCount)
flow->consume(msgs.back());
BOOST_CHECK(!flow->isFlowControlActive()); // 7 on queue
msgs.push_back(createMessage(10));
- flow->consume(msgs.back());
- BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue, ON
+ fc = flow->consume(msgs.back());
+ BOOST_CHECK(fc && flow->isFlowControlActive()); // 8 on queue, ON
msgs.push_back(createMessage(10));
- flow->consume(msgs.back());
- BOOST_CHECK(flow->isFlowControlActive()); // 9 on queue
+ fc = flow->consume(msgs.back());
+ BOOST_CHECK(!fc && flow->isFlowControlActive()); // 9 on queue, no
change to flow control
flow->replenish(msgs.front());
msgs.pop_front();
@@ -87,13 +88,13 @@ QPID_AUTO_TEST_CASE(testFlowCount)
flow->replenish(msgs.front());
msgs.pop_front();
BOOST_CHECK(flow->isFlowControlActive()); // 6 on queue
- flow->replenish(msgs.front());
+ fc = flow->replenish(msgs.front());
msgs.pop_front();
- BOOST_CHECK(flow->isFlowControlActive()); // 5 on queue
+ BOOST_CHECK(!fc && flow->isFlowControlActive()); // 5 on queue, no change
- flow->replenish(msgs.front());
+ fc = flow->replenish(msgs.front());
msgs.pop_front();
- BOOST_CHECK(!flow->isFlowControlActive()); // 4 on queue, OFF
+ BOOST_CHECK(fc && !flow->isFlowControlActive()); // 4 on queue, OFF
}
Propchange:
qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
(empty)
Propchange: qpid/branches/qpid-2935/qpid/java/client/src/main/java/client.log4j
('svn:mergeinfo' removed)
Propchange:
qpid/branches/qpid-2935/qpid/java/integrationtests/src/resources/sustained-log4j.xml
('svn:mergeinfo' removed)
Propchange: qpid/branches/qpid-2935/qpid/java/management/client/etc/qman.log4j
('svn:mergeinfo' removed)
Propchange: qpid/branches/qpid-2935/qpid/java/perftests/etc/perftests.log4j
('svn:mergeinfo' removed)
Propchange:
qpid/branches/qpid-2935/qpid/java/systests/src/main/java/systests.log4j
('svn:mergeinfo' removed)
Propchange: qpid/branches/qpid-2935/qpid/java/test-profiles/
------------------------------------------------------------------------------
(empty)
Propchange: qpid/branches/qpid-2935/qpid/java/tools/etc/test.log4j
('svn:mergeinfo' removed)
Modified: qpid/branches/qpid-2935/qpid/specs/management-schema.xml
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/specs/management-schema.xml?rev=1065831&r1=1065830&r2=1065831&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/specs/management-schema.xml (original)
+++ qpid/branches/qpid-2935/qpid/specs/management-schema.xml Mon Jan 31
22:16:53 2011
@@ -143,6 +143,11 @@
<property name="exclusive" type="bool" access="RC"/>
<property name="arguments" type="map" access="RO" desc="Arguments
supplied in queue.declare"/>
<property name="altExchange" type="objId" references="Exchange"
access="RO" optional="y"/>
+ <property name="flowStopCount" type="uint32" access="RO" optional="y"
desc="Flow control: # messages to force flow control."/>
+ <property name="flowResumeCount" type="uint32" access="RO" optional="y"
desc="Flow control: # messages to release flow control."/>
+ <property name="flowStopSize" type="uint64" access="RO" optional="y"
desc="Flow control: # enqueued bytes to force flow control."/>
+ <property name="flowResumeSize" type="uint64" access="RO" optional="y"
desc="Flow control: # enqueued bytes to release flow control."/>
+
<statistic name="msgTotalEnqueues" type="count64" unit="message"
desc="Total messages enqueued"/>
<statistic name="msgTotalDequeues" type="count64" unit="message"
desc="Total messages dequeued"/>
@@ -162,6 +167,7 @@
<statistic name="bindingCount" type="hilo32" unit="binding"
desc="Current bindings"/>
<statistic name="unackedMessages" type="hilo32" unit="message"
desc="Messages consumed but not yet acked"/>
<statistic name="messageLatency" type="mmaTime" unit="nanosecond"
desc="Broker latency through this queue"/>
+ <statistic name="flowStopped" type="bool" desc="Flow control
active."/>
<method name="purge" desc="Discard all or some messages on a queue">
<arg name="request" dir="I" type="uint32" desc="0 for all messages or
n>0 for n messages"/>
Modified: qpid/branches/qpid-2935/qpid/tools/src/py/qpid-config
URL:
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/tools/src/py/qpid-config?rev=1065831&r1=1065830&r2=1065831&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/tools/src/py/qpid-config (original)
+++ qpid/branches/qpid-2935/qpid/tools/src/py/qpid-config Mon Jan 31 22:16:53
2011
@@ -47,6 +47,10 @@ class Config:
self._eventGeneration = None
self._file = None
self._sasl_mechanism = None
+ self._flowStopCount = None
+ self._flowResumeCount = None
+ self._flowStopSize = None
+ self._flowResumeSize = None
config = Config()
@@ -61,6 +65,10 @@ LVQNB = "qpid.last_value_queue_no_browse
MSG_SEQUENCE = "qpid.msg_sequence"
IVE = "qpid.ive"
QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
+FLOW_STOP_COUNT = "qpid.flow_stop_count"
+FLOW_RESUME_COUNT = "qpid.flow_resume_count"
+FLOW_STOP_SIZE = "qpid.flow_stop_size"
+FLOW_RESUME_SIZE = "qpid.flow_resume_size"
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage
strings
@@ -160,6 +168,14 @@ def OptionsAndArguments(argv):
group3.add_option("--limit-policy", action="store", choices=["none",
"reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>",
help="Action to take when queue limit is reached")
group3.add_option("--order", action="store", choices=["fifo", "lvq",
"lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy")
group3.add_option("--generate-queue-events", action="store", type="int",
metavar="<n>", help="If set to 1, every enqueue will generate an event that can
be processed by registered listeners (e.g. for replication). If set to 2,
events will be generated for enqueues and dequeues.")
+ group3.add_option("--flow-stop-size", action="store", type="int",
metavar="<n>",
+ help="Turn on sender flow control when the number of
queued bytes exceeds this value.")
+ group3.add_option("--flow-resume-size", action="store", type="int",
metavar="<n>",
+ help="Turn off sender flow control when the number of
queued bytes drops below this value.")
+ group3.add_option("--flow-stop-count", action="store", type="int",
metavar="<n>",
+ help="Turn on sender flow control when the number of
queued messages exceeds this value.")
+ group3.add_option("--flow-resume-count", action="store", type="int",
metavar="<n>",
+ help="Turn off sender flow control when the number of
queued messages drops below this value.")
# no option for declaring an exclusive queue - which can only be used by
the session that creates it.
parser.add_option_group(group3)
@@ -231,6 +247,14 @@ def OptionsAndArguments(argv):
config._if_unused = False
if opts.sasl_mechanism:
config._sasl_mechanism = opts.sasl_mechanism
+ if opts.flow_stop_size:
+ config._flowStopSize = opts.flow_stop_size
+ if opts.flow_resume_size:
+ config._flowResumeSize = opts.flow_resume_size
+ if opts.flow_stop_count:
+ config._flowStopCount = opts.flow_stop_count
+ if opts.flow_resume_count:
+ config._flowResumeCount = opts.flow_resume_count
return args
@@ -389,6 +413,10 @@ class BrokerManager:
if QUEUE_EVENT_GENERATION in args: print
"--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION],
if q.altExchange:
print "--alternate-exchange=%s" % q._altExchange_.name,
+ if FLOW_STOP_SIZE in args: print "--flow-stop-size=%d" %
args[FLOW_STOP_SIZE],
+ if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%d" %
args[FLOW_RESUME_SIZE],
+ if FLOW_STOP_COUNT in args: print "--flow-stop-count=%d" %
args[FLOW_STOP_COUNT],
+ if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%d" %
args[FLOW_RESUME_COUNT],
print
def QueueListRecurse(self, filter):
@@ -466,11 +494,21 @@ class BrokerManager:
if config._eventGeneration:
declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration
+ if config._flowStopSize:
+ declArgs[FLOW_STOP_SIZE] = config._flowStopSize
+ if config._flowResumeSize:
+ declArgs[FLOW_RESUME_SIZE] = config._flowResumeSize
+ if config._flowStopCount:
+ declArgs[FLOW_STOP_COUNT] = config._flowStopCount
+ if config._flowResumeCount:
+ declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount
+
if config._altern_ex != None:
self.broker.getAmqpSession().queue_declare(queue=qname,
alternate_exchange=config._altern_ex, passive=config._passive,
durable=config._durable, arguments=declArgs)
else:
self.broker.getAmqpSession().queue_declare(queue=qname,
passive=config._passive, durable=config._durable, arguments=declArgs)
+
def DelQueue(self, args):
if len(args) < 1:
Usage()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]