Author: kgiusti
Date: Tue May 3 22:04:51 2011
New Revision: 1099278
URL: http://svn.apache.org/viewvc?rev=1099278&view=rev
Log:
QPID-3243: correctly use --max-queue-count value to compute flow limit.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1099278&r1=1099277&r2=1099278&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Tue May 3 22:04:51
2011
@@ -282,7 +282,9 @@ QueueFlowLimit *QueueFlowLimit::createLi
return 0;
}
- if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey)) {
+ if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey) ||
+ settings.get(flowResumeCountKey) || settings.get(flowResumeSizeKey)) {
+ // user provided (some) flow settings manually...
uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey,
0);
uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
@@ -293,11 +295,15 @@ QueueFlowLimit *QueueFlowLimit::createLi
return new QueueFlowLimit(queue, flowStopCount, flowResumeCount,
flowStopSize, flowResumeSize);
}
- if (defaultFlowStopRatio) {
+ if (defaultFlowStopRatio) { // broker has a default ratio setup...
uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey,
defaultMaxSize);
uint64_t flowStopSize = (uint64_t)(maxByteCount *
(defaultFlowStopRatio/100.0) + 0.5);
uint64_t flowResumeSize = (uint64_t)(maxByteCount *
(defaultFlowResumeRatio/100.0));
- return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize);
+ uint32_t maxMsgCount = getCapacity(settings, QueuePolicy::maxCountKey,
0); // no size by default
+ uint32_t flowStopCount = (uint32_t)(maxMsgCount *
(defaultFlowStopRatio/100.0) + 0.5);
+ uint32_t flowResumeCount = (uint32_t)(maxMsgCount *
(defaultFlowResumeRatio/100.0));
+
+ return new QueueFlowLimit(queue, flowStopCount, flowResumeCount,
flowStopSize, flowResumeSize);
}
return 0;
}
Modified: qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=1099278&r1=1099277&r2=1099278&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp Tue May 3 22:04:51 2011
@@ -386,6 +386,7 @@ QPID_AUTO_TEST_CASE(testCapacityConversi
{
FieldTable args;
args.setString("qpid.max_count", "5");
+ args.setString("qpid.flow_stop_count", "0");
ProxySessionFixture f;
std::string q("q");
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1099278&r1=1099277&r2=1099278&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue May 3 22:04:51 2011
@@ -567,7 +567,7 @@ acl allow all all
s0 = c0.session()
# Declare multiple queues bound to same key on amq.topic
def declare(q,max=0):
- if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d}}'%max
+ if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d,
"qpid.flow_stop_count":0}}'%max
else: declare = 'x-declare:{}'
bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q)
s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind))
Modified: qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py?rev=1099278&r1=1099277&r2=1099278&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py Tue May 3 22:04:51
2011
@@ -129,6 +129,27 @@ class QueueFlowLimitTests(TestBase010):
self.assertEqual(i.name, "test01")
self._delete_queue("test01")
+ # now verify that the default ratios are applied if max sizing is
specified:
+ command = tool + \
+ " --broker-addr=%s:%s " % (self.broker.host, self.broker.port)
\
+ + "add queue test02 --max-queue-count=10000
--max-queue-size=1000000"
+ cmd = popen(command)
+ rc = cmd.close()
+ self.assertEqual(rc, None)
+
+ # now verify the settings
+ qs = self.qmf.getObjects(_class="queue")
+ for i in qs:
+ if i.name == "test02":
+ ## @todo KAG: can't get the flow size from qmf! Arrgh!
+ # no way to verify...
+
#self.assertEqual(i.arguments.get("qpid.flow_resume_count"), 55)
+
#self.assertEqual(i.arguments.get("qpid.flow_resume_count"), 55)
+ self.failIf(i.flowStopped)
+ break;
+ self.assertEqual(i.name, "test02")
+ self._delete_queue("test02")
+
def test_flow_count(self):
""" Create a queue with count-based flow limit. Spawn several
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]