Author: kwall
Date: Fri Feb 10 09:40:48 2012
New Revision: 1242716

URL: http://svn.apache.org/viewvc?rev=1242716&view=rev
Log:
QPID-3803: System tests SortedQueueTest fail ocasionally on slow machines

Modified:
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java?rev=1242716&r1=1242715&r2=1242716&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SortedQueueTest.java
 Fri Feb 10 09:40:48 2012
@@ -43,6 +43,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class SortedQueueTest extends QpidBrokerTestCase
 {
@@ -139,28 +140,21 @@ public class SortedQueueTest extends Qpi
             _producerSession.commit();
         }
 
-        synchronized(consumerThread)
+        try
         {
-            try
-            {
-                consumerThread.join(getConsumerThreadJoinInterval());
-            }
-            catch(InterruptedException e)
-            {
-                fail("Test failed waiting for consumer to complete");
-            }
+            consumerThread.join(getConsumerThreadJoinInterval());
+        }
+        catch(InterruptedException e)
+        {
+            fail("Test failed waiting for consumer to complete");
         }
+
         assertTrue("Consumer timed out", consumerThread.isStopped());
         assertEquals("Incorrect number of messages received", VALUES.length, 
consumerThread.getConsumed());
 
         producer.close();
     }
 
-    private long getConsumerThreadJoinInterval()
-    {
-        return isBrokerStorePersistent() ? 50000L: 5000L;
-    }
-
     public void testSortedQueueWithAscendingSortedKeys() throws JMSException, 
NamingException, AMQException
     {
         final Queue queue = createQueue();
@@ -178,23 +172,26 @@ public class SortedQueueTest extends Qpi
             _producerSession.commit();
         }
 
-        synchronized(consumerThread)
+        try
         {
-            try
-            {
-                consumerThread.join(getConsumerThreadJoinInterval());
-            }
-            catch(InterruptedException e)
-            {
-                fail("Test failed waiting for consumer to complete");
-            }
+            consumerThread.join(getConsumerThreadJoinInterval());
+        }
+        catch(InterruptedException e)
+        {
+            fail("Test failed waiting for consumer to complete");
         }
+
         assertTrue("Consumer timed out", consumerThread.isStopped());
         assertEquals("Incorrect number of messages received", 200, 
consumerThread.getConsumed());
 
         producer.close();
     }
 
+    private long getConsumerThreadJoinInterval()
+    {
+        return isBrokerStorePersistent() ? 50000L: 5000L;
+    }
+
     public void testSortOrderWithNonUniqueKeys() throws JMSException, 
NamingException, AMQException
     {
         final Queue queue = createQueue();
@@ -376,9 +373,9 @@ public class SortedQueueTest extends Qpi
 
     private class TestConsumerThread extends Thread
     {
-        private boolean _stopped = false;
+        private final AtomicInteger _consumed = new AtomicInteger(0);
+        private volatile boolean _stopped = false;
         private int _count = 0;
-        private int _consumed = 0;
         private int _sessionType = Session.AUTO_ACKNOWLEDGE;
         private Queue _queue;
 
@@ -422,7 +419,7 @@ public class SortedQueueTest extends Qpi
                          {
                              LOGGER.debug("transacted session commit");
                             session.commit();
-                            _consumed++;
+                            _consumed.incrementAndGet();
                          }
                     }
                     else if(_sessionType == Session.CLIENT_ACKNOWLEDGE)
@@ -436,13 +433,13 @@ public class SortedQueueTest extends Qpi
                          {
                              LOGGER.debug("client ack session acknowledge");
                              msg.acknowledge();
-                             _consumed++;
+                             _consumed.incrementAndGet();
                          }
                     }
                     else
                     {
                         LOGGER.debug("auto ack session");
-                        _consumed++;
+                        _consumed.incrementAndGet();
                     }
 
                     _count++;
@@ -460,14 +457,14 @@ public class SortedQueueTest extends Qpi
            }
         }
 
-        public synchronized boolean isStopped()
+        public boolean isStopped()
         {
             return _stopped;
         }
 
-        public synchronized int getConsumed()
+        public int getConsumed()
         {
-            return _consumed;
+            return _consumed.get();
         }
     }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to