Updated Branches: refs/heads/trunk 6044c4c05 -> 3d63ca75f
Use the AMQP JMS client's setTopicPrefix and setQueuePrefix on the ConnectionFactory to make the JMS client usage simpler. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3d63ca75 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3d63ca75 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3d63ca75 Branch: refs/heads/trunk Commit: 3d63ca75fbd5251328323d49987eba8da5d7f6e8 Parents: 6044c4c Author: Timothy Bish <[email protected]> Authored: Wed Jan 15 09:46:43 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Wed Jan 15 09:46:43 2014 -0500 ---------------------------------------------------------------------- .../activemq/transport/amqp/JMSClientTest.java | 41 +++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/3d63ca75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 4002ef2..878ebc3 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -36,17 +36,17 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin; import org.apache.activemq.util.Wait; import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; -import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; -import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -60,11 +60,11 @@ public class JMSClientTest extends AmqpTestSupport { @Test public void testProducerConsume() throws Exception { ActiveMQAdmin.enableJMSFrameTracing(); - QueueImpl queue = new QueueImpl("queue://" + name); Connection connection = createConnection(); { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.toString()); MessageProducer p = session.createProducer(queue); TextMessage message = session.createTextMessage(); @@ -89,17 +89,17 @@ public class JMSClientTest extends AmqpTestSupport { @Test public void testTransactedConsumer() throws Exception { ActiveMQAdmin.enableJMSFrameTracing(); - QueueImpl queue = new QueueImpl("queue://" + name); final int msgCount = 1; Connection connection = createConnection(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.toString()); sendMessages(connection, queue, msgCount); QueueViewMBean queueView = getProxyToQueue(name.toString()); LOG.info("Queue size after produce is: {}", queueView.getQueueSize()); assertEquals(msgCount, queueView.getQueueSize()); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(queue); Message msg = consumer.receive(TestConfig.TIMEOUT); @@ -121,17 +121,17 @@ public class JMSClientTest extends AmqpTestSupport { public void testRollbackRececeivedMessage() throws Exception { ActiveMQAdmin.enableJMSFrameTracing(); - QueueImpl queue = new QueueImpl("queue://" + name); final int msgCount = 1; Connection connection = createConnection(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.toString()); sendMessages(connection, queue, msgCount); QueueViewMBean queueView = getProxyToQueue(name.toString()); LOG.info("Queue size after produce is: {}", queueView.getQueueSize()); assertEquals(msgCount, queueView.getQueueSize()); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(queue); // Receive and roll back, first receive should not show redelivered. @@ -165,10 +165,11 @@ public class JMSClientTest extends AmqpTestSupport { public void testTXConsumerAndLargeNumberOfMessages() throws Exception { ActiveMQAdmin.enableJMSFrameTracing(); - QueueImpl queue = new QueueImpl("queue://" + name); final int msgCount = 500; Connection connection = createConnection(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.toString()); sendMessages(connection, queue, msgCount); QueueViewMBean queueView = getProxyToQueue(name.toString()); @@ -177,7 +178,6 @@ public class JMSClientTest extends AmqpTestSupport { // Consumer all in TX and commit. { - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(queue); for (int i = 0; i < msgCount; ++i) { @@ -204,11 +204,11 @@ public class JMSClientTest extends AmqpTestSupport { @Test public void testSelectors() throws Exception{ ActiveMQAdmin.enableJMSFrameTracing(); - QueueImpl queue = new QueueImpl("queue://" + name); Connection connection = createConnection(); { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.toString()); MessageProducer p = session.createProducer(queue); TextMessage message = session.createTextMessage(); @@ -242,10 +242,10 @@ public class JMSClientTest extends AmqpTestSupport { //should through exception IllegalStateException:The session is closed @Test(timeout=30000) public void testBrokerRestartPersistentQueueException() throws Exception { - QueueImpl queue = new QueueImpl("queue://" + name); Connection connection = createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.toString()); connection.start(); MessageProducer producer = session.createProducer(queue); @@ -266,10 +266,10 @@ public class JMSClientTest extends AmqpTestSupport { @Test(timeout=30000) public void testProducerThrowsWhenBrokerRestarted() throws Exception { - QueueImpl queue = new QueueImpl("queue://" + name); Connection connection = createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.toString()); connection.start(); MessageProducer producer = session.createProducer(queue); @@ -302,10 +302,10 @@ public class JMSClientTest extends AmqpTestSupport { @Test(timeout=30000) public void testBrokerRestartWontHangConnectionClose() throws Exception { - QueueImpl queue = new QueueImpl("queue://" + name); Connection connection = createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.toString()); connection.start(); MessageProducer producer = session.createProducer(queue); @@ -329,9 +329,9 @@ public class JMSClientTest extends AmqpTestSupport { int count = 2000; - QueueImpl queue = new QueueImpl("queue://" + name); Connection connection = createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.toString()); connection.start(); MessageProducer producer= session.createProducer(queue); @@ -357,9 +357,9 @@ public class JMSClientTest extends AmqpTestSupport { ActiveMQAdmin.enableJMSFrameTracing(); Connection connection = null; try { - QueueImpl queue = new QueueImpl("queue://" + name); connection = createConnection(true); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.toString()); connection.start(); MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); @@ -376,13 +376,13 @@ public class JMSClientTest extends AmqpTestSupport { @Test(timeout=30000) public void testDurableConsumerAsync() throws Exception { ActiveMQAdmin.enableJMSFrameTracing(); - TopicImpl topic = new TopicImpl("topic://"+name); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference<Message> received = new AtomicReference<Message>(); Connection connection = createConnection(); { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(name.toString()); MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic"); consumer.setMessageListener(new MessageListener() { @@ -411,11 +411,11 @@ public class JMSClientTest extends AmqpTestSupport { @Test(timeout=30000) public void testDurableConsumerSync() throws Exception { ActiveMQAdmin.enableJMSFrameTracing(); - TopicImpl topic = new TopicImpl("topic://"+name); Connection connection = createConnection(); { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(name.toString()); final MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); @@ -444,13 +444,13 @@ public class JMSClientTest extends AmqpTestSupport { @Test(timeout=30000) public void testTopicConsumerAsync() throws Exception { ActiveMQAdmin.enableJMSFrameTracing(); - TopicImpl topic = new TopicImpl("topic://"+name); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference<Message> received = new AtomicReference<Message>(); Connection connection = createConnection(); { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(name.toString()); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { @@ -479,11 +479,11 @@ public class JMSClientTest extends AmqpTestSupport { @Test(timeout=45000) public void testTopicConsumerSync() throws Exception { ActiveMQAdmin.enableJMSFrameTracing(); - TopicImpl topic = new TopicImpl("topic://"+name); Connection connection = createConnection(); { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(name.toString()); final MessageConsumer consumer = session.createConsumer(topic); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); @@ -553,7 +553,10 @@ public class JMSClientTest extends AmqpTestSupport { private Connection createConnection(String clientId, boolean syncPublish) throws JMSException { final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password"); + factory.setSyncPublish(syncPublish); + factory.setTopicPrefix("topic://"); + factory.setQueuePrefix("queue://"); final Connection connection = factory.createConnection(); if (clientId != null && !clientId.isEmpty()) {
