Author: robbie
Date: Tue Dec 8 04:05:04 2009
New Revision: 888250
URL: http://svn.apache.org/viewvc?rev=888250&view=rev
Log:
QPID-2177: unit and system testing for the new flow controlled related
attributes of the Queue MBean
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
qpid/trunk/qpid/java/test-profiles/08InVMExcludes
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=888250&r1=888249&r2=888250&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
Tue Dec 8 04:05:04 2009
@@ -318,6 +318,67 @@
}
}
+
+ public void testFlowControlProperties() throws Exception
+ {
+ assertTrue(_queueMBean.getCapacity() == 0);
+ assertTrue(_queueMBean.getFlowResumeCapacity() == 0);
+ assertFalse(_queueMBean.isFlowOverfull());
+
+ //capacity currently 0, try setting FlowResumeCapacity above this
+ try
+ {
+ _queueMBean.setFlowResumeCapacity(1L);
+ fail("Should have failed to allow setting FlowResumeCapacity above
Capacity");
+ }
+ catch (IllegalArgumentException ex)
+ {
+ //expected exception
+ assertTrue(_queueMBean.getFlowResumeCapacity() == 0);
+ }
+
+ //(FlowResume)Capacity currently 0, set both to 2 then try setting
Capacity below this
+ _queueMBean.setCapacity(2L);
+ assertTrue(_queueMBean.getCapacity() == 2L);
+ _queueMBean.setFlowResumeCapacity(2L);
+ assertTrue(_queueMBean.getFlowResumeCapacity() == 2L);
+
+ try
+ {
+ _queueMBean.setCapacity(1L);
+ fail("Should have failed to allow setting Capacity below
FlowResumeCapacity");
+ }
+ catch (IllegalArgumentException ex)
+ {
+ //expected exception
+ assertTrue(_queueMBean.getCapacity() == 2);
+ }
+
+ //set (FlowResume)Capacity to MESSAGE_SIZE +1 then add a message to
the queue
+ _queueMBean.setCapacity(MESSAGE_SIZE + 1);
+ _queueMBean.setFlowResumeCapacity(MESSAGE_SIZE + 1);
+
+ AMQChannel channel = new AMQChannel(_protocolSession, 1,
_messageStore);
+ sendMessages(1, true);
+ _queue.checkCapacity(channel);
+
+ assertFalse(_queueMBean.isFlowOverfull());
+ assertFalse(channel.getBlocking());
+
+ //add another message then check queue is now overfull and channel
blocked
+ sendMessages(1, true);
+ _queue.checkCapacity(channel);
+
+ assertTrue(_queueMBean.isFlowOverfull());
+ assertTrue(channel.getBlocking());
+
+ //set FlowResumeCapacity to 2x MESSAGE_SIZE and check queue is now
underfull and channel unblocked
+ _queueMBean.setCapacity(2 * MESSAGE_SIZE);//must increase capacity too
+ _queueMBean.setFlowResumeCapacity(2 * MESSAGE_SIZE);
+
+ assertFalse(_queueMBean.isFlowOverfull());
+ assertFalse(channel.getBlocking());
+ }
private IncomingMessage message(final boolean immediate, boolean
persistent) throws AMQException
{
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=888250&r1=888249&r2=888250&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
Tue Dec 8 04:05:04 2009
@@ -25,7 +25,9 @@
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.AMQException;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.logging.AbstractTestLogging;
+import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.framing.AMQShortString;
import javax.jms.*;
@@ -43,8 +45,6 @@
private static final Logger _logger =
Logger.getLogger(ProducerFlowControlTest.class);
- protected final String QUEUE = "ProducerFlowControl";
-
private static final int MSG_COUNT = 50;
private Connection producerConnection;
@@ -54,12 +54,18 @@
private Connection consumerConnection;
private Session consumerSession;
-
private MessageConsumer consumer;
private final AtomicInteger _sentMessages = new AtomicInteger();
+ private JMXTestUtils _jmxUtils;
+ private boolean _jmxUtilConnected;
+ private static final String USER = "admin";
+
public void setUp() throws Exception
{
+ _jmxUtils = new JMXTestUtils(this, USER , USER);
+ _jmxUtils.setUp();
+ _jmxUtilConnected=false;
super.setUp();
_monitor.reset();
@@ -76,6 +82,17 @@
public void tearDown() throws Exception
{
+ if(_jmxUtilConnected)
+ {
+ try
+ {
+ _jmxUtils.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
producerConnection.close();
consumerConnection.close();
super.tearDown();
@@ -84,11 +101,13 @@
public void testCapacityExceededCausesBlock()
throws JMSException, NamingException, AMQException,
InterruptedException
{
+ String queueName = getTestQueueName();
+
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE),
true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).createQueue(new
AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
@@ -124,11 +143,13 @@
public void testBrokerLogMessages()
throws JMSException, NamingException, AMQException,
InterruptedException, IOException
{
+ String queueName = getTestQueueName();
+
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE),
true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).createQueue(new
AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
@@ -161,6 +182,8 @@
public void testClientLogMessages()
throws JMSException, NamingException, AMQException,
InterruptedException, IOException
{
+ String queueName = getTestQueueName();
+
setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
@@ -170,8 +193,8 @@
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true,
false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) session).createQueue(new AMQShortString(queueName),
true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) session).declareAndBind((AMQDestination)queue);
producer = session.createProducer(queue);
@@ -195,11 +218,13 @@
public void testFlowControlOnCapacityResumeEqual()
throws JMSException, NamingException, AMQException,
InterruptedException
{
+ String queueName = getTestQueueName();
+
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",1000);
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE),
true, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) producerSession).createQueue(new
AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
@@ -229,6 +254,8 @@
public void testFlowControlSoak()
throws Exception, NamingException, AMQException,
InterruptedException
{
+ String queueName = getTestQueueName();
+
_sentMessages.set(0);
final int numProducers = 10;
final int numMessages = 100;
@@ -237,9 +264,9 @@
arguments.put("x-qpid-capacity",6000);
arguments.put("x-qpid-flow-resume-capacity",3000);
- ((AMQSession) consumerSession).createQueue(new AMQShortString(QUEUE),
false, false, false, arguments);
+ ((AMQSession) consumerSession).createQueue(new
AMQShortString(queueName), false, false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
consumerConnection.start();
@@ -285,6 +312,8 @@
public void testSendTimeout()
throws JMSException, NamingException, AMQException,
InterruptedException
{
+ String queueName = getTestQueueName();
+
setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
Session session = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -292,8 +321,8 @@
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity",1000);
arguments.put("x-qpid-flow-resume-capacity",800);
- ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true,
false, false, arguments);
- queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) session).createQueue(new AMQShortString(queueName),
true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
((AMQSession) session).declareAndBind((AMQDestination)queue);
producer = session.createProducer(queue);
@@ -310,6 +339,76 @@
assertNotNull("No timeout exception on sending", e);
}
+
+
+ public void testFlowControlAttributeModificationViaJMX()
+ throws JMSException, NamingException, AMQException, InterruptedException,
Exception
+ {
+ _jmxUtils.open();
+ _jmxUtilConnected = true;
+
+ String queueName = getTestQueueName();
+
+ //create queue
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",0);
+ arguments.put("x-qpid-flow-resume-capacity",0);
+ ((AMQSession) producerSession).createQueue(new
AMQShortString(queueName), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",queueName);
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ Thread.sleep(1000);
+
+ //Create a JMX MBean proxy for the queue
+ ManagedQueue queueMBean =
_jmxUtils.getManagedObject(ManagedQueue.class,
_jmxUtils.getQueueObjectName("test", queueName));
+ assertNotNull(queueMBean);
+
+ //check current attribute values are 0 as expected
+ assertTrue("Capacity was not the expected value",
queueMBean.getCapacity() == 0L);
+ assertTrue("FlowResumeCapacity was not the expected value",
queueMBean.getFlowResumeCapacity() == 0L);
+
+ //set new values that will cause flow control to be active, and the
queue to become overfull after 1 message is sent
+ queueMBean.setCapacity(250L);
+ queueMBean.setFlowResumeCapacity(250L);
+ assertTrue("Capacity was not the expected value",
queueMBean.getCapacity() == 250L);
+ assertTrue("FlowResumeCapacity was not the expected value",
queueMBean.getFlowResumeCapacity() == 250L);
+ assertFalse("Queue should not be overfull",
queueMBean.isFlowOverfull());
+
+ // try to send 2 messages (should block after 1)
+ _sentMessages.set(0);
+ sendMessagesAsync(producer, producerSession, 2, 50L);
+
+ Thread.sleep(2000);
+
+ //check only 1 message was sent, and queue is overfull
+ assertEquals("Incorrect number of message sent before blocking", 1,
_sentMessages.get());
+ assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+
+ //raise the attribute values, causing the queue to become underfull
and allow the second message to be sent.
+ queueMBean.setCapacity(300L);
+ queueMBean.setFlowResumeCapacity(300L);
+
+ Thread.sleep(2000);
+
+ //check second message was sent, and caused the queue to become
overfull again
+ assertEquals("Second message was not sent after lifting
FlowResumeCapacity", 2, _sentMessages.get());
+ assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+
+ //raise capacity above queue depth, check queue remains overfull as
FlowResumeCapacity still exceeded
+ queueMBean.setCapacity(700L);
+ assertTrue("Queue should be overfull", queueMBean.isFlowOverfull());
+
+ //receive a message, check queue becomes underfull
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ consumer.receive();
+ assertFalse("Queue should not be overfull",
queueMBean.isFlowOverfull());
+
+ consumer.receive();
+ }
private MessageSender sendMessagesAsync(final MessageProducer producer,
final Session producerSession,
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java?rev=888250&r1=888249&r2=888250&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
Tue Dec 8 04:05:04 2009
@@ -119,6 +119,7 @@
Set<ObjectName> objectNames = allObject.returnObjects();
+ _test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("Incorrect number test vhosts returned", 1,
objectNames.size());
// We have verified we have only one value in objectNames so return it
@@ -142,6 +143,7 @@
Set<ObjectName> objectNames = allObject.returnObjects();
+ _test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("Incorrect number of queues with name '" +
allObject.querystring +
"' returned", 1, objectNames.size());
@@ -167,6 +169,7 @@
Set<ObjectName> objectNames = allObject.returnObjects();
+ _test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("Incorrect number of exchange with name '" +
exchange +
"' returned", 1, objectNames.size());
@@ -181,6 +184,7 @@
Set<ObjectName> objectNames = allObject.returnObjects();
+ _test.assertNotNull("Null ObjectName Set returned", objectNames);
_test.assertEquals("More than one " + managedClass + " returned", 1,
objectNames.size());
ObjectName objectName = objectNames.iterator().next();
Modified: qpid/trunk/qpid/java/test-profiles/08InVMExcludes
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/08InVMExcludes?rev=888250&r1=888249&r2=888250&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/08InVMExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/08InVMExcludes Tue Dec 8 04:05:04 2009
@@ -1,3 +1,6 @@
//======================================================================
//Exclude the following tests when running the InVM default test profile
//======================================================================
+
+// QPID-2097 exclude until InVM JMX test runs are reliable
+org.apache.qpid.server.queue.ProducerFlowControlTest#testFlowControlAttributeModificationViaJMX
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]