Author: kgiusti
Date: Mon Jan 31 22:16:53 2011
New Revision: 1065831

URL: http://svn.apache.org/viewvc?rev=1065831&view=rev
Log:
add mgmt configuration support

Modified:
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
    qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
    
qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
   (props changed)
    qpid/branches/qpid-2935/qpid/java/client/src/main/java/client.log4j   
(props changed)
    
qpid/branches/qpid-2935/qpid/java/integrationtests/src/resources/sustained-log4j.xml
   (props changed)
    qpid/branches/qpid-2935/qpid/java/management/client/etc/qman.log4j   (props 
changed)
    qpid/branches/qpid-2935/qpid/java/perftests/etc/perftests.log4j   (props 
changed)
    qpid/branches/qpid-2935/qpid/java/systests/src/main/java/systests.log4j   
(props changed)
    qpid/branches/qpid-2935/qpid/java/test-profiles/   (props changed)
    qpid/branches/qpid-2935/qpid/java/tools/etc/test.log4j   (props changed)
    qpid/branches/qpid-2935/qpid/specs/management-schema.xml
    qpid/branches/qpid-2935/qpid/tools/src/py/qpid-config

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1065831&r1=1065830&r2=1065831&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Queue.cpp Mon Jan 31 
22:16:53 2011
@@ -645,7 +645,11 @@ void Queue::push(boost::intrusive_ptr<Me
         if (policy.get()) {
             policy->enqueued(qm);
         }
-        if (flowLimit.get()) flowLimit->consume(qm);
+        if (flowLimit.get()) {
+            bool fc = flowLimit->consume(qm);
+            if (fc && mgmtObject)
+                mgmtObject->set_flowStopped(true);
+        }
     }
     copy.notify();
 }
@@ -844,7 +848,11 @@ void Queue::popAndDequeue()
 void Queue::dequeued(const QueuedMessage& msg)
 {
     if (policy.get()) policy->dequeued(msg);
-    if (flowLimit.get()) flowLimit->replenish(msg);
+    if (flowLimit.get()) {
+        bool fc = flowLimit->replenish(msg);
+        if (fc && mgmtObject)
+            mgmtObject->set_flowStopped(false);
+    }
     mgntDeqStats(msg.payload);
     if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
         eventMgr->dequeued(msg);
@@ -908,8 +916,16 @@ void Queue::configure(const FieldTable& 
 
     flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings);
 
-    if (mgmtObject != 0)
+    if (mgmtObject != 0) {
         mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
+        if (flowLimit.get()) {
+            mgmtObject->set_flowStopCount(flowLimit->getFlowStopCount());
+            mgmtObject->set_flowResumeCount(flowLimit->getFlowResumeCount());
+            mgmtObject->set_flowStopSize(flowLimit->getFlowStopSize());
+            mgmtObject->set_flowResumeSize(flowLimit->getFlowResumeSize());
+            mgmtObject->set_flowStopped(flowLimit->isFlowControlActive());
+        }
+    }
 
     if ( isDurable() && ! getPersistenceId() && ! recovering )
       store->create(*this, _settings);
@@ -1184,7 +1200,11 @@ void Queue::enqueued(const QueuedMessage
             policy->recoverEnqueued(m.payload);
             policy->enqueued(m);
         }
-        if (flowLimit.get()) flowLimit->consume(m);
+        if (flowLimit.get()) {
+            bool fc = flowLimit->consume(m);
+            if (fc && mgmtObject)
+                mgmtObject->set_flowStopped(true);
+        }
         mgntEnqStats(m.payload);
         boost::intrusive_ptr<Message> payload = m.payload;
         enqueue ( 0, payload, true );

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1065831&r1=1065830&r2=1065831&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp 
(original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Mon Jan 
31 22:16:53 2011
@@ -86,16 +86,17 @@ namespace {
 QueueFlowLimit::QueueFlowLimit(Queue *_queue,
                                uint32_t _flowStopCount, uint32_t 
_flowResumeCount,
                                uint64_t _flowStopSize,  uint64_t 
_flowResumeSize)
-    : queueName("<unknown>"), flowStopCount(_flowStopCount), 
flowResumeCount(_flowResumeCount),
+    : queue(_queue), queueName("<unknown>"),
+      flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
       flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
       flowStopped(false), count(0), size(0)
 {
     uint32_t maxCount(0);
     uint64_t maxSize(0);
 
-    if (_queue) {
+    if (queue) {
         queueName = _queue->getName();
-        if (_queue->getPolicy()) {
+        if (queue->getPolicy()) {
             maxSize = _queue->getPolicy()->getMaxSize();
             maxCount = _queue->getPolicy()->getMaxCount();
         }
@@ -109,9 +110,11 @@ QueueFlowLimit::QueueFlowLimit(Queue *_q
 
 
 
-void QueueFlowLimit::consume(const QueuedMessage& msg)
+bool QueueFlowLimit::consume(const QueuedMessage& msg)
 {
-    if (!msg.payload) return;
+    bool flowChanged(false);
+
+    if (!msg.payload) return false;
 
     sys::Mutex::ScopedLock l(pendingFlowLock);
 
@@ -119,32 +122,36 @@ void QueueFlowLimit::consume(const Queue
     size += msg.payload->contentSize();
 
     if (flowStopCount && !flowStopped && count > flowStopCount) {
-        flowStopped = true;
+        flowChanged = flowStopped = true;
         QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << 
flowStopCount << " enqueued messages. Producer flow control activated." );
     }
 
     if (flowStopSize && !flowStopped && size > flowStopSize) {
-        flowStopped = true;
+        flowChanged = flowStopped = true;
         QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << 
flowStopSize << " enqueued bytes. Producer flow control activated." );
     }
 
-    // KAG: test
+    // KAG: test - REMOVE ONCE STABLE
     if (index.find(msg.payload) != index.end()) {
         QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg 
twice: " << msg.position);
     }
-    
+
     if (flowStopped || !pendingFlow.empty()) {
         msg.payload->getReceiveCompletion().startCompleter();    // don't 
complete until flow resumes
         pendingFlow.push_back(msg.payload);
         index.insert(msg.payload);
     }
+
+    return flowChanged;
 }
 
 
 
-void QueueFlowLimit::replenish(const QueuedMessage& msg)
+bool QueueFlowLimit::replenish(const QueuedMessage& msg)
 {
-    if (!msg.payload) return;
+    bool flowChanged(false);
+
+    if (!msg.payload) return false;
 
     sys::Mutex::ScopedLock l(pendingFlowLock);
 
@@ -165,6 +172,7 @@ void QueueFlowLimit::replenish(const Que
         (flowResumeSize == 0 || size < flowResumeSize) &&
         (flowResumeCount == 0 || count < flowResumeCount)) {
         flowStopped = false;
+        flowChanged = true;
         QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the 
flow control resume level. Producer flow control deactivated." );
     }
 
@@ -197,6 +205,8 @@ void QueueFlowLimit::replenish(const Que
             pendingFlow.pop_front();
         }
     }
+
+    return flowChanged;
 }
 
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1065831&r1=1065830&r2=1065831&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Mon Jan 
31 22:16:53 2011
@@ -43,6 +43,7 @@ namespace broker {
  */
 class QueueFlowLimit
 {
+    Queue *queue;
     std::string queueName;
 
     uint32_t flowStopCount;
@@ -63,10 +64,10 @@ class QueueFlowLimit
 
     virtual ~QueueFlowLimit() {}
 
-    /** the queue has added QueuedMessage */
-    void consume(const QueuedMessage&);
-    /** the queue has removed QueuedMessage */
-    void replenish(const QueuedMessage&);
+    /** the queue has added QueuedMessage.  Returns true if flow state changes 
*/
+    bool consume(const QueuedMessage&);
+    /** the queue has removed QueuedMessage.  Returns true if flow state 
changes */
+    bool replenish(const QueuedMessage&);
 
     uint32_t getFlowStopCount() const { return flowStopCount; }
     uint32_t getFlowResumeCount() const { return flowResumeCount; }

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1065831&r1=1065830&r2=1065831&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp Mon Jan 
31 22:16:53 2011
@@ -61,6 +61,7 @@ QPID_AUTO_TEST_CASE(testFlowCount)
     BOOST_CHECK(!flow->isFlowControlActive());
     BOOST_CHECK(flow->monitorFlowControl());
 
+    bool fc;
     std::deque<QueuedMessage> msgs;
     for (size_t i = 0; i < 6; i++) {
         msgs.push_back(createMessage(10));
@@ -72,11 +73,11 @@ QPID_AUTO_TEST_CASE(testFlowCount)
     flow->consume(msgs.back());
     BOOST_CHECK(!flow->isFlowControlActive());  // 7 on queue
     msgs.push_back(createMessage(10));
-    flow->consume(msgs.back());
-    BOOST_CHECK(flow->isFlowControlActive());   // 8 on queue, ON
+    fc = flow->consume(msgs.back());
+    BOOST_CHECK(fc && flow->isFlowControlActive());   // 8 on queue, ON
     msgs.push_back(createMessage(10));
-    flow->consume(msgs.back());
-    BOOST_CHECK(flow->isFlowControlActive());   // 9 on queue
+    fc = flow->consume(msgs.back());
+    BOOST_CHECK(!fc && flow->isFlowControlActive());   // 9 on queue, no 
change to flow control
 
     flow->replenish(msgs.front());
     msgs.pop_front();
@@ -87,13 +88,13 @@ QPID_AUTO_TEST_CASE(testFlowCount)
     flow->replenish(msgs.front());
     msgs.pop_front();
     BOOST_CHECK(flow->isFlowControlActive());   // 6 on queue
-    flow->replenish(msgs.front());
+    fc = flow->replenish(msgs.front());
     msgs.pop_front();
-    BOOST_CHECK(flow->isFlowControlActive());   // 5 on queue
+    BOOST_CHECK(!fc && flow->isFlowControlActive());   // 5 on queue, no change
 
-    flow->replenish(msgs.front());
+    fc = flow->replenish(msgs.front());
     msgs.pop_front();
-    BOOST_CHECK(!flow->isFlowControlActive());  // 4 on queue, OFF
+    BOOST_CHECK(fc && !flow->isFlowControlActive());  // 4 on queue, OFF
 }
 
 

Propchange: 
qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
    (empty)

Propchange: qpid/branches/qpid-2935/qpid/java/client/src/main/java/client.log4j
            ('svn:mergeinfo' removed)

Propchange: 
qpid/branches/qpid-2935/qpid/java/integrationtests/src/resources/sustained-log4j.xml
            ('svn:mergeinfo' removed)

Propchange: qpid/branches/qpid-2935/qpid/java/management/client/etc/qman.log4j
            ('svn:mergeinfo' removed)

Propchange: qpid/branches/qpid-2935/qpid/java/perftests/etc/perftests.log4j
            ('svn:mergeinfo' removed)

Propchange: 
qpid/branches/qpid-2935/qpid/java/systests/src/main/java/systests.log4j
            ('svn:mergeinfo' removed)

Propchange: qpid/branches/qpid-2935/qpid/java/test-profiles/
------------------------------------------------------------------------------
    (empty)

Propchange: qpid/branches/qpid-2935/qpid/java/tools/etc/test.log4j
            ('svn:mergeinfo' removed)

Modified: qpid/branches/qpid-2935/qpid/specs/management-schema.xml
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/specs/management-schema.xml?rev=1065831&r1=1065830&r2=1065831&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/specs/management-schema.xml (original)
+++ qpid/branches/qpid-2935/qpid/specs/management-schema.xml Mon Jan 31 
22:16:53 2011
@@ -143,6 +143,11 @@
     <property name="exclusive"   type="bool"  access="RC"/>
     <property name="arguments"   type="map"   access="RO" desc="Arguments 
supplied in queue.declare"/>
     <property name="altExchange" type="objId" references="Exchange" 
access="RO" optional="y"/>
+    <property name="flowStopCount"   type="uint32" access="RO" optional="y" 
desc="Flow control: # messages to force flow control."/>
+    <property name="flowResumeCount" type="uint32" access="RO" optional="y" 
desc="Flow control: # messages to release flow control."/>
+    <property name="flowStopSize"    type="uint64" access="RO" optional="y" 
desc="Flow control: # enqueued bytes to force flow control."/>
+    <property name="flowResumeSize"  type="uint64" access="RO" optional="y" 
desc="Flow control: # enqueued bytes to release flow control."/>
+
 
     <statistic name="msgTotalEnqueues"    type="count64"  unit="message"     
desc="Total messages enqueued"/>
     <statistic name="msgTotalDequeues"    type="count64"  unit="message"     
desc="Total messages dequeued"/>
@@ -162,6 +167,7 @@
     <statistic name="bindingCount"        type="hilo32"   unit="binding"     
desc="Current bindings"/>
     <statistic name="unackedMessages"     type="hilo32"   unit="message"     
desc="Messages consumed but not yet acked"/>
     <statistic name="messageLatency"      type="mmaTime"  unit="nanosecond"  
desc="Broker latency through this queue"/>
+    <statistic name="flowStopped"         type="bool"     desc="Flow control 
active."/>
 
     <method name="purge" desc="Discard all or some messages on a queue">
       <arg name="request" dir="I" type="uint32" desc="0 for all messages or 
n>0 for n messages"/>

Modified: qpid/branches/qpid-2935/qpid/tools/src/py/qpid-config
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/tools/src/py/qpid-config?rev=1065831&r1=1065830&r2=1065831&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/tools/src/py/qpid-config (original)
+++ qpid/branches/qpid-2935/qpid/tools/src/py/qpid-config Mon Jan 31 22:16:53 
2011
@@ -47,6 +47,10 @@ class Config:
         self._eventGeneration   = None
         self._file              = None
         self._sasl_mechanism    = None
+        self._flowStopCount     = None
+        self._flowResumeCount   = None
+        self._flowStopSize      = None
+        self._flowResumeSize    = None
 
 config = Config()
 
@@ -61,6 +65,10 @@ LVQNB = "qpid.last_value_queue_no_browse
 MSG_SEQUENCE = "qpid.msg_sequence"
 IVE = "qpid.ive"
 QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
+FLOW_STOP_COUNT   = "qpid.flow_stop_count"
+FLOW_RESUME_COUNT = "qpid.flow_resume_count"
+FLOW_STOP_SIZE    = "qpid.flow_stop_size"
+FLOW_RESUME_SIZE  = "qpid.flow_resume_size"
 
 class JHelpFormatter(IndentedHelpFormatter):
     """Format usage and description without stripping newlines from usage 
strings
@@ -160,6 +168,14 @@ def OptionsAndArguments(argv):
     group3.add_option("--limit-policy", action="store", choices=["none", 
"reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>", 
help="Action to take when queue limit is reached")
     group3.add_option("--order", action="store", choices=["fifo", "lvq", 
"lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy")
     group3.add_option("--generate-queue-events", action="store", type="int", 
metavar="<n>", help="If set to 1, every enqueue will generate an event that can 
be processed by registered listeners (e.g. for replication). If set to 2, 
events will be generated for enqueues and dequeues.")
+    group3.add_option("--flow-stop-size", action="store", type="int", 
metavar="<n>",
+                      help="Turn on sender flow control when the number of 
queued bytes exceeds this value.")
+    group3.add_option("--flow-resume-size", action="store", type="int", 
metavar="<n>",
+                      help="Turn off sender flow control when the number of 
queued bytes drops below this value.")
+    group3.add_option("--flow-stop-count", action="store", type="int", 
metavar="<n>",
+                      help="Turn on sender flow control when the number of 
queued messages exceeds this value.")
+    group3.add_option("--flow-resume-count", action="store", type="int", 
metavar="<n>",
+                      help="Turn off sender flow control when the number of 
queued messages drops below this value.")
     # no option for declaring an exclusive queue - which can only be used by 
the session that creates it.
     parser.add_option_group(group3)
 
@@ -231,6 +247,14 @@ def OptionsAndArguments(argv):
         config._if_unused = False
     if opts.sasl_mechanism:
         config._sasl_mechanism = opts.sasl_mechanism
+    if opts.flow_stop_size:
+        config._flowStopSize = opts.flow_stop_size
+    if opts.flow_resume_size:
+        config._flowResumeSize = opts.flow_resume_size
+    if opts.flow_stop_count:
+        config._flowStopCount = opts.flow_stop_count
+    if opts.flow_resume_count:
+        config._flowResumeCount = opts.flow_resume_count
     return args
 
 
@@ -389,6 +413,10 @@ class BrokerManager:
                 if QUEUE_EVENT_GENERATION in args: print 
"--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION],
                 if q.altExchange:
                     print "--alternate-exchange=%s" % q._altExchange_.name,
+                if FLOW_STOP_SIZE in args: print "--flow-stop-size=%d" % 
args[FLOW_STOP_SIZE],
+                if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%d" % 
args[FLOW_RESUME_SIZE],
+                if FLOW_STOP_COUNT in args: print "--flow-stop-count=%d" % 
args[FLOW_STOP_COUNT],
+                if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%d" % 
args[FLOW_RESUME_COUNT],
                 print
 
     def QueueListRecurse(self, filter):
@@ -466,11 +494,21 @@ class BrokerManager:
         if config._eventGeneration:
             declArgs[QUEUE_EVENT_GENERATION]  = config._eventGeneration
 
+        if config._flowStopSize:
+            declArgs[FLOW_STOP_SIZE]  = config._flowStopSize
+        if config._flowResumeSize:
+            declArgs[FLOW_RESUME_SIZE]  = config._flowResumeSize
+        if config._flowStopCount:
+            declArgs[FLOW_STOP_COUNT]  = config._flowStopCount
+        if config._flowResumeCount:
+            declArgs[FLOW_RESUME_COUNT]  = config._flowResumeCount
+
         if config._altern_ex != None:
             self.broker.getAmqpSession().queue_declare(queue=qname, 
alternate_exchange=config._altern_ex, passive=config._passive, 
durable=config._durable, arguments=declArgs)
         else:
             self.broker.getAmqpSession().queue_declare(queue=qname, 
passive=config._passive, durable=config._durable, arguments=declArgs)
 
+
     def DelQueue(self, args):
         if len(args) < 1:
             Usage()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to