Author: tabish Date: Wed Jul 27 13:15:50 2011 New Revision: 1151454 URL: http://svn.apache.org/viewvc?rev=1151454&view=rev Log: Update the assertion messages to reflect the actual failure condition.
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=1151454&r1=1151453&r2=1151454&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Wed Jul 27 13:15:50 2011 @@ -46,15 +46,15 @@ public class ExpiredMessagesWithNoConsum private static final Logger LOG = LoggerFactory.getLogger(ExpiredMessagesWithNoConsumerTest.class); - - BrokerService broker; - Connection connection; - Session session; - MessageProducer producer; - public ActiveMQDestination destination = new ActiveMQQueue("test"); + + BrokerService broker; + Connection connection; + Session session; + MessageProducer producer; + public ActiveMQDestination destination = new ActiveMQQueue("test"); public boolean optimizedDispatch = true; public PendingQueueMessageStoragePolicy pendingQueuePolicy; - + public static Test suite() { return suite(ExpiredMessagesWithNoConsumerTest.class); } @@ -62,15 +62,15 @@ public class ExpiredMessagesWithNoConsum public static void main(String[] args) { junit.textui.TestRunner.run(suite()); } - + protected void createBrokerWithMemoryLimit() throws Exception { doCreateBroker(true); } - + protected void createBroker() throws Exception { doCreateBroker(false); } - + private void doCreateBroker(boolean memoryLimit) throws Exception { broker = new BrokerService(); broker.setBrokerName("localhost"); @@ -83,7 +83,7 @@ public class ExpiredMessagesWithNoConsum defaultEntry.setOptimizedDispatch(optimizedDispatch ); defaultEntry.setExpireMessagesPeriod(800); defaultEntry.setMaxExpirePageSize(800); - + defaultEntry.setPendingQueuePolicy(pendingQueuePolicy); if (memoryLimit) { @@ -99,51 +99,51 @@ public class ExpiredMessagesWithNoConsum broker.waitUntilStarted(); } - + public void initCombosForTestExpiredMessagesWithNoConsumer() { addCombinationValues("optimizedDispatch", new Object[] {Boolean.TRUE, Boolean.FALSE}); addCombinationValues("pendingQueuePolicy", new Object[] {null, new VMPendingQueueMessageStoragePolicy(), new FilePendingQueueMessageStoragePolicy()}); } - - public void testExpiredMessagesWithNoConsumer() throws Exception { - - createBrokerWithMemoryLimit(); - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); - connection = factory.createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(destination); - producer.setTimeToLive(1000); - connection.start(); - final long sendCount = 2000; - - final Thread producingThread = new Thread("Producing Thread") { + + public void testExpiredMessagesWithNoConsumer() throws Exception { + + createBrokerWithMemoryLimit(); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + connection = factory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(destination); + producer.setTimeToLive(1000); + connection.start(); + final long sendCount = 2000; + + final Thread producingThread = new Thread("Producing Thread") { public void run() { try { - int i = 0; - long tStamp = System.currentTimeMillis(); - while (i++ < sendCount) { - producer.send(session.createTextMessage("test")); - if (i%100 == 0) { - LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms"); - tStamp = System.currentTimeMillis() ; - } - } + int i = 0; + long tStamp = System.currentTimeMillis(); + while (i++ < sendCount) { + producer.send(session.createTextMessage("test")); + if (i%100 == 0) { + LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms"); + tStamp = System.currentTimeMillis() ; + } + } } catch (Throwable ex) { ex.printStackTrace(); } } - }; - - producingThread.start(); - - assertTrue("producer completed within time", Wait.waitFor(new Wait.Condition() { + }; + + producingThread.start(); + + assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { producingThread.join(1000); return !producingThread.isAlive(); } - })); - + })); + final DestinationViewMBean view = createView(destination); Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { @@ -156,14 +156,14 @@ public class ExpiredMessagesWithNoConsum LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize()); - + assertEquals("All sent have expired", sendCount, view.getExpiredCount()); assertEquals("memory usage goes to duck egg", 0, view.getMemoryPercentUsage()); - } - - // first ack delivered after expiry + } + + // first ack delivered after expiry public void testExpiredMessagesWithVerySlowConsumer() throws Exception { - createBroker(); + createBroker(); final long queuePrefetch = 600; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch); connection = factory.createConnection(); @@ -171,11 +171,11 @@ public class ExpiredMessagesWithNoConsum producer = session.createProducer(destination); final int ttl = 4000; producer.setTimeToLive(ttl); - - final long sendCount = 1500; + + final long sendCount = 1500; final CountDownLatch receivedOneCondition = new CountDownLatch(1); final CountDownLatch waitCondition = new CountDownLatch(1); - + MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @@ -189,13 +189,13 @@ public class ExpiredMessagesWithNoConsum } catch (Exception e) { e.printStackTrace(); fail(e.toString()); - } - } + } + } }); - + connection.start(); - - + + final Thread producingThread = new Thread("Producing Thread") { public void run() { try { @@ -213,19 +213,19 @@ public class ExpiredMessagesWithNoConsum } } }; - + producingThread.start(); assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS)); - - assertTrue("producer completed within time ", Wait.waitFor(new Wait.Condition() { + + assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { producingThread.join(1000); return !producingThread.isAlive(); - } + } }, Wait.MAX_WAIT_MILLIS * 2)); - + final DestinationViewMBean view = createView(destination); - + assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { return queuePrefetch == view.getDispatchCount(); @@ -235,15 +235,15 @@ public class ExpiredMessagesWithNoConsum public boolean isSatisified() throws Exception { return sendCount == view.getExpiredCount(); } - })); - + })); + LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize()); - + // let the ack happen waitCondition.countDown(); - + Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { // consumer ackLater(delivery ack for expired messages) is based on half the prefetch value @@ -254,21 +254,21 @@ public class ExpiredMessagesWithNoConsum LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize()); - - + + assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount()); assertEquals("size gets back to 0 ", 0, view.getQueueSize()); assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount()); - + consumer.close(); - + Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { return 0 == view.getInFlightCount(); } }); assertEquals("inflight goes to zeor on close", 0, view.getInFlightCount()); - + LOG.info("done: " + getName()); } @@ -328,7 +328,7 @@ public class ExpiredMessagesWithNoConsum producingThread.start(); assertTrue("got one message", receivedOneCondition.await(20, TimeUnit.SECONDS)); - assertTrue("producer completed within time ", Wait.waitFor(new Wait.Condition() { + assertTrue("producer failed to complete within allocated time", Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { producingThread.join(1000); return !producingThread.isAlive(); @@ -463,7 +463,7 @@ public class ExpiredMessagesWithNoConsum - protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { + protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { String domain = "org.apache.activemq"; ObjectName name; if (destination.isQueue()) { @@ -475,13 +475,13 @@ public class ExpiredMessagesWithNoConsum true); } - protected void tearDown() throws Exception { - connection.stop(); - broker.stop(); - broker.waitUntilStopped(); - } + protected void tearDown() throws Exception { + connection.stop(); + broker.stop(); + broker.waitUntilStopped(); + } + + - - }