Adds several test cases which show some JMS client issues, most are resolved by moving on to v 0.26-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/71b55233 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/71b55233 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/71b55233 Branch: refs/heads/activemq-5.9 Commit: 71b55233248b006e51b645f01b25c19e5277fc31 Parents: 0bf2233 Author: Timothy Bish <[email protected]> Authored: Mon Dec 2 15:07:13 2013 -0500 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 13:12:37 2014 -0400 ---------------------------------------------------------------------- .../activemq/transport/amqp/JMSClientTest.java | 135 +++++++++++++++++++ 1 file changed, 135 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/71b55233/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 16fde4e..e35127d 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -18,9 +18,12 @@ package org.apache.activemq.transport.amqp; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.Enumeration; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.DeliveryMode; @@ -229,6 +232,138 @@ public class JMSClientTest extends AmqpTestSupport { connection.close(); } + //should through exception IllegalStateException:The session is closed + @Test(timeout=30000) + public void testBrokerRestartPersistentQueueException() throws Exception { + QueueImpl queue = new QueueImpl("queue://" + name); + + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + Message m = session.createTextMessage("Sample text"); + producer.send(m); + + restartBroker(); + + try { + session.createConsumer(queue); + fail("Should have thrown an IllegalStateException"); + } catch (Exception ex) { + LOG.info("Caught exception on receive: {}", ex); + } + } + + @Test(timeout=30000) + public void testProducerThrowsWhenBrokerRestarted() throws Exception { + QueueImpl queue = new QueueImpl("queue://" + name); + + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + Message m = session.createTextMessage("Sample text"); + + Thread restart = new Thread(new Runnable() { + + @Override + public void run() { + try { + TimeUnit.SECONDS.sleep(5); + restartBroker(); + } catch (Exception ex) {} + } + }); + restart.start(); + + try { + for (int i = 0; i < 10; ++i) { + producer.send(m); + TimeUnit.SECONDS.sleep(1); + } + fail("Should have thrown an IllegalStateException"); + } catch (Exception ex) { + LOG.info("Caught exception on send: {}", ex); + } + } + + @Test(timeout=30000) + public void testBrokerRestartWontHangConnectionClose() throws Exception { + QueueImpl queue = new QueueImpl("queue://" + name); + + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + Message m = session.createTextMessage("Sample text"); + producer.send(m); + + restartBroker(); + + try { + connection.close(); + } catch (Exception ex) { + LOG.error("Should not thrown on disconnected connection close(): {}", ex); + fail("Should not have thrown an exception."); + } + } + + @Test(timeout=120000) + public void testProduceAndConsumeLargeNumbersOfMessages() throws JMSException { + + int count = 2000; + + QueueImpl queue = new QueueImpl("queue://" + name); + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + MessageProducer producer= session.createProducer(queue); + for (int i = 0; i < count; i++) { + Message m=session.createTextMessage("Test-Message:"+i); + producer.send(m); + } + + MessageConsumer consumer=session.createConsumer(queue); + for(int i = 0; i < count; i++) { + Message message = consumer.receive(5000); + assertNotNull(message); + System.out.println(((TextMessage) message).getText()); + assertEquals("Test-Message:" + i,((TextMessage) message).getText()); + } + + Message message = consumer.receive(5000); + assertNull(message); + } + + @Test(timeout=30000) + public void testTTL() throws Exception { + QueueImpl queue = new QueueImpl("queue://" + name); + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + MessageProducer producer = session.createProducer(queue); + producer.setTimeToLive(1000); + Message toSend = session.createTextMessage("Sample text"); + producer.send(toSend); + MessageConsumer consumer = session.createConsumer(queue); + Message received = consumer.receive(5000); + assertNotNull(received); + producer.setTimeToLive(100); + producer.send(toSend); + TimeUnit.SECONDS.sleep(1); + assertNull(consumer.receive(5000)); + } + private Connection createConnection() throws JMSException { final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password"); final Connection connection = factory.createConnection();
