Author: kwall
Date: Tue Feb  7 22:25:37 2012
New Revision: 1241670

URL: http://svn.apache.org/viewvc?rev=1241670&view=rev
Log:
QPID-3820: ProducerFlowControlTest#testSendTimeout test fails sprodically on 
0-10 profiles

Modified:
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=1241670&r1=1241669&r2=1241670&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
 Tue Feb  7 22:25:37 2012
@@ -20,15 +20,13 @@
 */
 package org.apache.qpid.server.queue;
 
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.management.common.mbeans.ManagedQueue;
-import org.apache.qpid.server.logging.AbstractTestLogging;
-import org.apache.qpid.test.utils.JMXTestUtils;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -38,28 +36,28 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
-import javax.naming.NamingException;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.server.logging.AbstractTestLogging;
+import org.apache.qpid.test.utils.JMXTestUtils;
 
 public class ProducerFlowControlTest extends AbstractTestLogging
 {
     private static final int TIMEOUT = 10000;
 
-    private static final Logger _logger = 
Logger.getLogger(ProducerFlowControlTest.class);
-
     private Connection producerConnection;
-    private MessageProducer producer;
-    private Session producerSession;
-    private Queue queue;
     private Connection consumerConnection;
+    private Session producerSession;
     private Session consumerSession;
-
+    private MessageProducer producer;
     private MessageConsumer consumer;
-    private final AtomicInteger _sentMessages = new AtomicInteger();
+    private Queue queue;
+
+    private final AtomicInteger _sentMessages = new AtomicInteger(0);
 
     private JMXTestUtils _jmxUtils;
     private boolean _jmxUtilConnected;
@@ -85,37 +83,34 @@ public class ProducerFlowControlTest ext
 
     public void tearDown() throws Exception
     {
-        if(_jmxUtilConnected)
+        try
         {
-            try
+            if(_jmxUtilConnected)
             {
-                _jmxUtils.close();
-            }
-            catch (IOException e)
-            {
-                e.printStackTrace();
+                try
+                {
+                    _jmxUtils.close();
+                }
+                catch (IOException e)
+                {
+                    e.printStackTrace();
+                }
             }
+            producerConnection.close();
+            consumerConnection.close();
+        }
+        finally
+        {
+            super.tearDown();
         }
-        producerConnection.close();
-        consumerConnection.close();
-        super.tearDown();
     }
 
-    public void testCapacityExceededCausesBlock()
-            throws JMSException, NamingException, AMQException, 
InterruptedException
+    public void testCapacityExceededCausesBlock() throws Exception
     {
         String queueName = getTestQueueName();
-        
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",1000);
-        arguments.put("x-qpid-flow-resume-capacity",800);
-        ((AMQSession) producerSession).createQueue(new 
AMQShortString(queueName), true, false, false, arguments);
-        queue = 
producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
-        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
-        producer = producerSession.createProducer(queue);
-
-        _sentMessages.set(0);
 
+        createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 
1000, 800);
+        producer = producerSession.createProducer(queue);
 
         // try to send 5 messages (should block after 4)
         sendMessagesAsync(producer, producerSession, 5, 50L);
@@ -143,22 +138,14 @@ public class ProducerFlowControlTest ext
 
     }
 
-    public void testBrokerLogMessages()
-            throws JMSException, NamingException, AMQException, 
InterruptedException, IOException
+
+    public void testBrokerLogMessages() throws Exception
     {
         String queueName = getTestQueueName();
         
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",1000);
-        arguments.put("x-qpid-flow-resume-capacity",800);
-        ((AMQSession) producerSession).createQueue(new 
AMQShortString(queueName), true, false, false, arguments);
-        queue = 
producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
-        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+        createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 
1000, 800);
         producer = producerSession.createProducer(queue);
 
-        _sentMessages.set(0);
-
-
         // try to send 5 messages (should block after 4)
         sendMessagesAsync(producer, producerSession, 5, 50L);
 
@@ -175,34 +162,21 @@ public class ProducerFlowControlTest ext
         results = waitAndFindMatches("QUE-1004");
 
         assertEquals("Did not find correct number of UNDERFULL queue underfull 
messages", 1, results.size());
-
-
-        
     }
 
 
-    public void testClientLogMessages()
-            throws JMSException, NamingException, AMQException, 
InterruptedException, IOException
+    public void testClientLogMessages() throws Exception
     {
         String queueName = getTestQueueName();
-        
+
         setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
         
setTestClientSystemProperty("qpid.flow_control_wait_notify_period","1000");
 
         Session session = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
-
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",1000);
-        arguments.put("x-qpid-flow-resume-capacity",800);
-        ((AMQSession) session).createQueue(new AMQShortString(queueName), 
true, false, false, arguments);
-        queue = 
producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
-        ((AMQSession) session).declareAndBind((AMQDestination)queue);
+        createAndBindQueueWithFlowControlEnabled(session, queueName, 1000, 
800);
         producer = session.createProducer(queue);
 
-        _sentMessages.set(0);
-
-
         // try to send 5 messages (should block after 4)
         MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 
50L);
 
@@ -213,26 +187,16 @@ public class ProducerFlowControlTest ext
                                                   + " flow control", TIMEOUT);
         assertEquals("Incorrect number of send failure messages logged by 
client (got " + results.size() + " delay "
                      + "messages)",1,failedMessages.size());
-
-
-
     }
 
 
-    public void testFlowControlOnCapacityResumeEqual()
-            throws JMSException, NamingException, AMQException, 
InterruptedException
+    public void testFlowControlOnCapacityResumeEqual() throws Exception
     {
         String queueName = getTestQueueName();
         
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",1000);
-        arguments.put("x-qpid-flow-resume-capacity",1000);
-        ((AMQSession) producerSession).createQueue(new 
AMQShortString(queueName), true, false, false, arguments);
-        queue = 
producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
-        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+        createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 
1000, 1000);
         producer = producerSession.createProducer(queue);
 
-        _sentMessages.set(0);
 
         // try to send 5 messages (should block after 4)
         sendMessagesAsync(producer, producerSession, 5, 50L);
@@ -244,7 +208,6 @@ public class ProducerFlowControlTest ext
         consumer = consumerSession.createConsumer(queue);
         consumerConnection.start();
 
-
         consumer.receive();
 
         Thread.sleep(1000);
@@ -255,23 +218,16 @@ public class ProducerFlowControlTest ext
     }
 
 
-    public void testFlowControlSoak()
-            throws Exception, NamingException, AMQException, 
InterruptedException
+    public void testFlowControlSoak() throws Exception
     {
         String queueName = getTestQueueName();
         
-        _sentMessages.set(0);
+
         final int numProducers = 10;
         final int numMessages = 100;
 
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",6000);
-        arguments.put("x-qpid-flow-resume-capacity",3000);
-
-        ((AMQSession) consumerSession).createQueue(new 
AMQShortString(queueName), false, false, false, arguments);
+        createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 
6000, 3000);
 
-        queue = 
producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='false'");
-        ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
         consumerConnection.start();
 
         Connection[] producers = new Connection[numProducers];
@@ -311,58 +267,38 @@ public class ProducerFlowControlTest ext
 
     }
 
-
-
-    public void testSendTimeout()
-            throws JMSException, NamingException, AMQException, 
InterruptedException
+    public void testSendTimeout() throws Exception
     {
         String queueName = getTestQueueName();
-        
+        final String expectedMsg = isBroker010() ? "Exception when sending 
message:timed out waiting for message credit"
+                : "Unable to send message for 3 seconds due to broker enforced 
flow control";
+
         setTestClientSystemProperty("qpid.flow_control_wait_failure","3000");
         Session session = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
-
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",1000);
-        arguments.put("x-qpid-flow-resume-capacity",800);
-        ((AMQSession) session).createQueue(new AMQShortString(queueName), 
true, false, false, arguments);
-        queue = 
producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
-        ((AMQSession) session).declareAndBind((AMQDestination)queue);
+        createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 
1000, 800);
         producer = session.createProducer(queue);
 
-        _sentMessages.set(0);
-
-
         // try to send 5 messages (should block after 4)
         MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 
100L);
 
-        
-        Thread.sleep(10000);
-
-        Exception e = sender.getException();
+        Exception e = sender.awaitSenderException(10000);
 
         assertNotNull("No timeout exception on sending", e);
 
+
+        assertEquals("Unexpected exception reason", expectedMsg, 
e.getMessage());
+
     }
-    
-    
-    public void testFlowControlAttributeModificationViaJMX()
-    throws JMSException, NamingException, AMQException, InterruptedException, 
Exception
+
+    public void testFlowControlAttributeModificationViaJMX() throws Exception
     {
         _jmxUtils.open();
         _jmxUtilConnected = true;
         
         String queueName = getTestQueueName();
-        
-        //create queue
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",0);
-        arguments.put("x-qpid-flow-resume-capacity",0);
-        ((AMQSession) producerSession).createQueue(new 
AMQShortString(queueName), true, false, false, arguments);
 
-        queue = 
producerSession.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
-
-        ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+        createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 
0, 0);
         producer = producerSession.createProducer(queue);
         
         Thread.sleep(1000);
@@ -383,7 +319,7 @@ public class ProducerFlowControlTest ext
         assertFalse("Queue should not be overfull", 
queueMBean.isFlowOverfull());
         
         // try to send 2 messages (should block after 1)
-        _sentMessages.set(0);
+
         sendMessagesAsync(producer, producerSession, 2, 50L);
 
         Thread.sleep(2000);
@@ -414,13 +350,23 @@ public class ProducerFlowControlTest ext
         consumer.receive();
         
         //perform a synchronous op on the connection
-        ((AMQSession) consumerSession).sync();
+        ((AMQSession<?,?>) consumerSession).sync();
         
         assertFalse("Queue should not be overfull", 
queueMBean.isFlowOverfull());
         
         consumer.receive();
     }
 
+    private void createAndBindQueueWithFlowControlEnabled(Session session, 
String queueName, int capacity, int resumeCapacity) throws Exception
+    {
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put("x-qpid-capacity",capacity);
+        arguments.put("x-qpid-flow-resume-capacity",resumeCapacity);
+        ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), 
true, false, false, arguments);
+        queue = 
session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'");
+        ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue);
+    }
+
     private MessageSender sendMessagesAsync(final MessageProducer producer,
                                             final Session producerSession,
                                             final int numMessages,
@@ -443,7 +389,7 @@ public class ProducerFlowControlTest ext
 
             try
             {
-                ((AMQSession)producerSession).sync();
+                ((AMQSession<?,?>)producerSession).sync();
             }
             catch (AMQException e)
             {
@@ -464,7 +410,6 @@ public class ProducerFlowControlTest ext
 
     private static final byte[] BYTE_300 = new byte[300];
 
-
     private Message nextMessage(int msg, Session producerSession) throws 
JMSException
     {
         BytesMessage send = producerSession.createBytesMessage();
@@ -474,22 +419,19 @@ public class ProducerFlowControlTest ext
         return send;
     }
 
-
     private class MessageSender implements Runnable
     {
-        private final MessageProducer _producer;
-        private final Session _producerSession;
+        private final MessageProducer _senderProducer;
+        private final Session _senderSession;
         private final int _numMessages;
-
-
-
-        private JMSException _exception;
+        private volatile JMSException _exception;
+        private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1);
         private long _sleepPeriod;
 
         public MessageSender(MessageProducer producer, Session 
producerSession, int numMessages, long sleepPeriod)
         {
-            _producer = producer;
-            _producerSession = producerSession;
+            _senderProducer = producer;
+            _senderSession = producerSession;
             _numMessages = numMessages;
             _sleepPeriod = sleepPeriod;
         }
@@ -498,16 +440,18 @@ public class ProducerFlowControlTest ext
         {
             try
             {
-                sendMessages(_producer, _producerSession, _numMessages, 
_sleepPeriod);
+                sendMessages(_senderProducer, _senderSession, _numMessages, 
_sleepPeriod);
             }
             catch (JMSException e)
             {
                 _exception = e;
+                _exceptionThrownLatch.countDown();
             }
         }
 
-        public JMSException getException()
+        public Exception awaitSenderException(long timeout) throws 
InterruptedException
         {
+            _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS);
             return _exception;
         }
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to