Author: orudyy
Date: Mon Jun 1 14:33:06 2015
New Revision: 1682915
URL: http://svn.apache.org/r1682915
Log:
NO-JIRA: Improve relaibility of test
PriorityQueueTest#testReleaseMessageThatBecomesExpiredIsNotRedelivered by
replacing hard-coded Thread#sleep intervals with conditional waiting logic
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1682915&r1=1682914&r2=1682915&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
Mon Jun 1 14:33:06 2015
@@ -37,7 +37,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.qpid.server.util.StateChangeListener;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -292,6 +295,22 @@ abstract class AbstractQueueTestBase ext
public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws
Exception
{
ServerMessage messageA = createMessage(new Long(24));
+ final CountDownLatch sendIndicator = new CountDownLatch(1);
+ _consumerTarget = new MockConsumer()
+ {
+ @Override
+ public long send(ConsumerImpl consumer, MessageInstance entry,
boolean batch)
+ {
+ try
+ {
+ return super.send(consumer, entry, batch);
+ }
+ finally
+ {
+ sendIndicator.countDown();
+ }
+ }
+ };
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget,
null, messageA.getClass(), "test",
EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
@@ -308,26 +327,46 @@ abstract class AbstractQueueTestBase ext
_queue.enqueue(messageA, postEnqueueAction, null);
- int subFlushWaitTime = 150;
- Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
+ assertTrue("Message was not sent during expected time interval",
sendIndicator.await(5000, TimeUnit.MILLISECONDS));
- assertEquals("Unexpected total number of messages sent to consumer",
- 1,
- _consumerTarget.getMessages().size());
- assertFalse("Redelivery flag should not be set",
queueEntries.get(0).isRedelivered());
+ assertEquals("Unexpected total number of messages sent to consumer",
1, _consumerTarget.getMessages().size());
+ QueueEntry queueEntry = queueEntries.get(0);
+
+ final CountDownLatch dequeueIndicator = new CountDownLatch(1);
+ queueEntry.addStateChangeListener(new
StateChangeListener<MessageInstance, MessageInstance.State>()
+ {
+ @Override
+ public void stateChanged(MessageInstance object,
MessageInstance.State oldState, MessageInstance.State newState)
+ {
+ if (newState == MessageInstance.State.DEQUEUED)
+ {
+ dequeueIndicator.countDown();
+ }
+ }
+ });
+ assertFalse("Redelivery flag should not be set",
queueEntry.isRedelivered());
/* Wait a little more to be sure that message will have expired, then
release the first message only, causing it to be requeued */
- Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10);
- queueEntries.get(0).release();
+ while(!queueEntry.expired() && System.currentTimeMillis() <=
expiration )
+ {
+ Thread.sleep(10);
+ }
- Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
+ assertTrue("Expecting the queue entry to be now expired",
queueEntry.expired());
+ queueEntry.release();
- assertTrue("Expecting the queue entry to be now expired",
queueEntries.get(0).expired());
- assertEquals("Total number of messages sent should not have changed",
- 1,
- _consumerTarget.getMessages().size());
- assertFalse("Redelivery flag should not be set",
queueEntries.get(0).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed",
+ assertTrue("Message was not de-queued due to expiration",
dequeueIndicator.await(5000, TimeUnit.MILLISECONDS));
+
+ assertEquals("Total number of messages sent should not have changed",
1, _consumerTarget.getMessages().size());
+ assertFalse("Redelivery flag should not be set",
queueEntry.isRedelivered());
+
+ // QueueContext#_releasedEntry is updated after notification, thus, we
need to make sure that it is updated
+ long waitLoopLimit = 10;
+ while(_consumer.getQueueContext().getReleasedEntry() != null &&
waitLoopLimit-- > 0 )
+ {
+ Thread.sleep(10);
+ }
+ assertNull("releasedEntry should be cleared after requeue processed:"
+ _consumer.getQueueContext().getReleasedEntry(),
_consumer.getQueueContext().getReleasedEntry());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]