Author: orudyy
Date: Mon Jun  1 14:33:06 2015
New Revision: 1682915

URL: http://svn.apache.org/r1682915
Log:
NO-JIRA: Improve relaibility of test 
PriorityQueueTest#testReleaseMessageThatBecomesExpiredIsNotRedelivered by 
replacing hard-coded Thread#sleep intervals with conditional waiting logic

Modified:
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1682915&r1=1682914&r2=1682915&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
 Mon Jun  1 14:33:06 2015
@@ -37,7 +37,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.qpid.server.util.StateChangeListener;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -292,6 +295,22 @@ abstract class AbstractQueueTestBase ext
     public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws 
Exception
     {
         ServerMessage messageA = createMessage(new Long(24));
+        final CountDownLatch sendIndicator = new CountDownLatch(1);
+        _consumerTarget = new MockConsumer()
+        {
+            @Override
+            public long send(ConsumerImpl consumer, MessageInstance entry, 
boolean batch)
+            {
+                try
+                {
+                    return super.send(consumer, entry, batch);
+                }
+                finally
+                {
+                    sendIndicator.countDown();
+                }
+            }
+        };
 
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, 
null, messageA.getClass(), "test",
                                            
EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
@@ -308,26 +327,46 @@ abstract class AbstractQueueTestBase ext
 
         _queue.enqueue(messageA, postEnqueueAction, null);
 
-        int subFlushWaitTime = 150;
-        Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
+        assertTrue("Message was not sent during expected time interval", 
sendIndicator.await(5000, TimeUnit.MILLISECONDS));
 
-        assertEquals("Unexpected total number of messages sent to consumer",
-                     1,
-                     _consumerTarget.getMessages().size());
-        assertFalse("Redelivery flag should not be set", 
queueEntries.get(0).isRedelivered());
+        assertEquals("Unexpected total number of messages sent to consumer", 
1, _consumerTarget.getMessages().size());
+        QueueEntry queueEntry = queueEntries.get(0);
+
+        final CountDownLatch dequeueIndicator = new CountDownLatch(1);
+        queueEntry.addStateChangeListener(new 
StateChangeListener<MessageInstance, MessageInstance.State>()
+        {
+            @Override
+            public void stateChanged(MessageInstance object, 
MessageInstance.State oldState, MessageInstance.State newState)
+            {
+                if (newState == MessageInstance.State.DEQUEUED)
+                {
+                    dequeueIndicator.countDown();
+                }
+            }
+        });
+        assertFalse("Redelivery flag should not be set", 
queueEntry.isRedelivered());
 
         /* Wait a little more to be sure that message will have expired, then 
release the first message only, causing it to be requeued */
-        Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10);
-        queueEntries.get(0).release();
+        while(!queueEntry.expired() && System.currentTimeMillis() <= 
expiration )
+        {
+            Thread.sleep(10);
+        }
 
-        Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
+        assertTrue("Expecting the queue entry to be now expired", 
queueEntry.expired());
+        queueEntry.release();
 
-        assertTrue("Expecting the queue entry to be now expired", 
queueEntries.get(0).expired());
-        assertEquals("Total number of messages sent should not have changed",
-                     1,
-                     _consumerTarget.getMessages().size());
-        assertFalse("Redelivery flag should not be set", 
queueEntries.get(0).isRedelivered());
-        assertNull("releasedEntry should be cleared after requeue processed",
+        assertTrue("Message was not de-queued due to expiration", 
dequeueIndicator.await(5000, TimeUnit.MILLISECONDS));
+
+        assertEquals("Total number of messages sent should not have changed", 
1, _consumerTarget.getMessages().size());
+        assertFalse("Redelivery flag should not be set", 
queueEntry.isRedelivered());
+
+        // QueueContext#_releasedEntry is updated after notification, thus, we 
need to make sure that it is updated
+        long waitLoopLimit = 10;
+        while(_consumer.getQueueContext().getReleasedEntry() != null && 
waitLoopLimit-- > 0 )
+        {
+            Thread.sleep(10);
+        }
+        assertNull("releasedEntry should be cleared after requeue processed:" 
+  _consumer.getQueueContext().getReleasedEntry(),
                    _consumer.getQueueContext().getReleasedEntry());
 
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to