http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java new file mode 100644 index 0000000..bd87b3d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTempDestinationTest.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.vm.VMTransport; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @version + */ +public class JmsTempDestinationTest extends TestCase { + + private static final Logger LOG = LoggerFactory.getLogger(JmsTempDestinationTest.class); + private Connection connection; + private ActiveMQConnectionFactory factory; + protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>()); + + @Override + protected void setUp() throws Exception { + factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + factory.setAlwaysSyncSend(true); + connection = factory.createConnection(); + connections.add(connection); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + @Override + protected void tearDown() throws Exception { + for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) { + Connection conn = iter.next(); + try { + conn.close(); + } catch (Throwable e) { + } + iter.remove(); + } + } + + /** + * Make sure Temp destination can only be consumed by local connection + * + * @throws JMSException + */ + public void testTempDestOnlyConsumedByLocalConn() throws JMSException { + connection.start(); + + Session tempSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = tempSession.createTemporaryQueue(); + MessageProducer producer = tempSession.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + TextMessage message = tempSession.createTextMessage("First"); + producer.send(message); + + // temp destination should not be consume when using another connection + Connection otherConnection = factory.createConnection(); + connections.add(otherConnection); + Session otherSession = otherConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue otherQueue = otherSession.createTemporaryQueue(); + MessageConsumer consumer = otherSession.createConsumer(otherQueue); + Message msg = consumer.receive(3000); + assertNull(msg); + + // should throw InvalidDestinationException when consuming a temp + // destination from another connection + try { + consumer = otherSession.createConsumer(queue); + fail("Send should fail since temp destination should be used from another connection"); + } catch (InvalidDestinationException e) { + assertTrue("failed to throw an exception", true); + } + + // should be able to consume temp destination from the same connection + consumer = tempSession.createConsumer(queue); + msg = consumer.receive(3000); + assertNotNull(msg); + + } + + /** + * Make sure that a temp queue does not drop message if there is an active + * consumers. + * + * @throws JMSException + */ + public void testTempQueueHoldsMessagesWithConsumers() throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + TextMessage message = session.createTextMessage("Hello"); + producer.send(message); + + Message message2 = consumer.receive(1000); + assertNotNull(message2); + assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage); + assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage)message2).getText().equals(message.getText())); + } + + /** + * Make sure that a temp queue does not drop message if there are no active + * consumers. + * + * @throws JMSException + */ + public void testTempQueueHoldsMessagesWithoutConsumers() throws JMSException { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createTemporaryQueue(); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + TextMessage message = session.createTextMessage("Hello"); + producer.send(message); + + connection.start(); + MessageConsumer consumer = session.createConsumer(queue); + Message message2 = consumer.receive(3000); + assertNotNull(message2); + assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage); + assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage)message2).getText().equals(message.getText())); + + } + + /** + * Test temp queue works under load + * + * @throws JMSException + */ + public void testTmpQueueWorksUnderLoad() throws JMSException { + int count = 500; + int dataSize = 1024; + + ArrayList<BytesMessage> list = new ArrayList<BytesMessage>(count); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createTemporaryQueue(); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + byte[] data = new byte[dataSize]; + for (int i = 0; i < count; i++) { + BytesMessage message = session.createBytesMessage(); + message.writeBytes(data); + message.setIntProperty("c", i); + producer.send(message); + list.add(message); + } + + connection.start(); + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < count; i++) { + Message message2 = consumer.receive(2000); + assertTrue(message2 != null); + assertEquals(i, message2.getIntProperty("c")); + assertTrue(message2.equals(list.get(i))); + } + } + + /** + * Make sure you cannot publish to a temp destination that does not exist + * anymore. + * + * @throws JMSException + * @throws InterruptedException + * @throws URISyntaxException + */ + public void testPublishFailsForClosedConnection() throws Exception { + + Connection tempConnection = factory.createConnection(); + connections.add(tempConnection); + Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = tempSession.createTemporaryQueue(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection; + assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return activeMQConnection.activeTempDestinations.containsKey(queue); + } + })); + + // This message delivery should work since the temp connection is still + // open. + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + TextMessage message = session.createTextMessage("First"); + producer.send(message); + + // Closing the connection should destroy the temp queue that was + // created. + tempConnection.close(); + Thread.sleep(5000); // Wait a little bit to let the delete take effect. + + // This message delivery NOT should work since the temp connection is + // now closed. + try { + message = session.createTextMessage("Hello"); + producer.send(message); + fail("Send should fail since temp destination should not exist anymore."); + } catch (JMSException e) { + } + } + + /** + * Make sure you cannot publish to a temp destination that does not exist + * anymore. + * + * @throws JMSException + * @throws InterruptedException + */ + public void testPublishFailsForDestroyedTempDestination() throws Exception { + + Connection tempConnection = factory.createConnection(); + connections.add(tempConnection); + Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TemporaryQueue queue = tempSession.createTemporaryQueue(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection; + assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return activeMQConnection.activeTempDestinations.containsKey(queue); + } + })); + + // This message delivery should work since the temp connection is still + // open. + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + TextMessage message = session.createTextMessage("First"); + producer.send(message); + + // deleting the Queue will cause sends to fail + queue.delete(); + Thread.sleep(5000); // Wait a little bit to let the delete take effect. + + // This message delivery NOT should work since the temp connection is + // now closed. + try { + message = session.createTextMessage("Hello"); + producer.send(message); + fail("Send should fail since temp destination should not exist anymore."); + } catch (JMSException e) { + assertTrue("failed to throw an exception", true); + } + } + + /** + * Test you can't delete a Destination with Active Subscribers + * + * @throws JMSException + */ + public void testDeleteDestinationWithSubscribersFails() throws JMSException { + Connection connection = factory.createConnection(); + connections.add(connection); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = session.createTemporaryQueue(); + + connection.start(); + + session.createConsumer(queue); + + // This message delivery should NOT work since the temp connection is + // now closed. + try { + queue.delete(); + fail("Should fail as Subscribers are active"); + } catch (JMSException e) { + assertTrue("failed to throw an exception", true); + } + } + + public void testSlowConsumerDoesNotBlockFastTempUsers() throws Exception { + ActiveMQConnectionFactory advisoryConnFactory = new ActiveMQConnectionFactory("vm://localhost?asyncQueueDepth=20"); + Connection connection = advisoryConnFactory.createConnection(); + connections.add(connection); + connection.start(); + + final CountDownLatch done = new CountDownLatch(1); + final AtomicBoolean ok = new AtomicBoolean(true); + final AtomicBoolean first = new AtomicBoolean(true); + VMTransport t = ((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class); + t.setTransportListener(new TransportListener() { + @Override + public void onCommand(Object command) { + // block first dispatch for a while so broker backs up, but other connection should be able to proceed + if (first.compareAndSet(true, false)) { + try { + ok.set(done.await(35, TimeUnit.SECONDS)); + LOG.info("Done waiting: " + ok.get()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + @Override + public void onException(IOException error) { + } + + @Override + public void transportInterupted() { + } + + @Override + public void transportResumed() { + } + }); + + connection = factory.createConnection(); + connections.add(connection); + ((ActiveMQConnection)connection).setWatchTopicAdvisories(false); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + for (int i=0; i<2500; i++) { + TemporaryQueue queue = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(queue); + consumer.close(); + queue.delete(); + } + LOG.info("Done with work: " + ok.get()); + done.countDown(); + assertTrue("ok", ok.get()); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTestSupport.java new file mode 100644 index 0000000..5531410 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTestSupport.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; + +/** + * Test cases used to test the JMS message consumer. + * + * + */ +public class JmsTestSupport extends CombinationTestSupport { + + static final private AtomicLong TEST_COUNTER = new AtomicLong(); + public String userName; + public String password; + public String messageTextPrefix = ""; + + protected ConnectionFactory factory; + protected ActiveMQConnection connection; + protected BrokerService broker; + + protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>()); + + // ///////////////////////////////////////////////////////////////// + // + // Test support methods. + // + // ///////////////////////////////////////////////////////////////// + protected ActiveMQDestination createDestination(Session session, byte type) throws JMSException { + String testMethod = getName(); + if( testMethod.indexOf(" ")>0 ) { + testMethod = testMethod.substring(0, testMethod.indexOf(" ")); + } + String name = "TEST." + getClass().getName() + "." +testMethod+"."+TEST_COUNTER.getAndIncrement(); + switch (type) { + case ActiveMQDestination.QUEUE_TYPE: + return (ActiveMQDestination)session.createQueue(name); + case ActiveMQDestination.TOPIC_TYPE: + return (ActiveMQDestination)session.createTopic(name); + case ActiveMQDestination.TEMP_QUEUE_TYPE: + return (ActiveMQDestination)session.createTemporaryQueue(); + case ActiveMQDestination.TEMP_TOPIC_TYPE: + return (ActiveMQDestination)session.createTemporaryTopic(); + default: + throw new IllegalArgumentException("type: " + type); + } + } + + protected void sendMessages(Destination destination, int count) throws Exception { + ConnectionFactory factory = createConnectionFactory(); + Connection connection = factory.createConnection(); + connection.start(); + sendMessages(connection, destination, count); + connection.close(); + } + + protected void sendMessages(Connection connection, Destination destination, int count) throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + sendMessages(session, destination, count); + session.close(); + } + + protected void sendMessages(Session session, Destination destination, int count) throws JMSException { + MessageProducer producer = session.createProducer(destination); + sendMessages(session, producer, count); + producer.close(); + } + + protected void sendMessages(Session session, MessageProducer producer, int count) throws JMSException { + for (int i = 0; i < count; i++) { + producer.send(session.createTextMessage(messageTextPrefix + i)); + } + } + + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("vm://localhost"); + } + + protected BrokerService createBroker() throws Exception { + return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false")); + } + + protected void setUp() throws Exception { + super.setUp(); + + if (System.getProperty("basedir") == null) { + File file = new File("."); + System.setProperty("basedir", file.getAbsolutePath()); + } + + broker = createBroker(); + broker.start(); + factory = createConnectionFactory(); + connection = (ActiveMQConnection)factory.createConnection(userName, password); + connections.add(connection); + } + + protected void tearDown() throws Exception { + for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) { + Connection conn = iter.next(); + try { + conn.close(); + } catch (Throwable e) { + } + iter.remove(); + } + broker.stop(); + super.tearDown(); + } + + protected void safeClose(Connection c) { + try { + c.close(); + } catch (Throwable e) { + } + } + + protected void safeClose(Session s) { + try { + s.close(); + } catch (Throwable e) { + } + } + + protected void safeClose(MessageConsumer c) { + try { + c.close(); + } catch (Throwable e) { + } + } + + protected void safeClose(MessageProducer p) { + try { + p.close(); + } catch (Throwable e) { + } + } + + protected void profilerPause(String prompt) throws IOException { + if (System.getProperty("profiler") != null) { + pause(prompt); + } + } + + protected void pause(String prompt) throws IOException { + System.out.println(); + System.out.println(prompt + "> Press enter to continue: "); + while (System.in.read() != '\n') { + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicCompositeSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicCompositeSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicCompositeSendReceiveTest.java new file mode 100644 index 0000000..a948ffa --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicCompositeSendReceiveTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.Topic; + +import org.apache.activemq.test.JmsTopicSendReceiveTest; + + +/** + * + */ +public class JmsTopicCompositeSendReceiveTest extends JmsTopicSendReceiveTest { + private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory + .getLog(JmsTopicCompositeSendReceiveTest.class); + + Destination consumerDestination2; + MessageConsumer consumer2; + + /** + * Sets a test to have a queue destination and non-persistent delivery mode. + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + deliveryMode = DeliveryMode.NON_PERSISTENT; + super.setUp(); + consumerDestination2 = consumeSession.createTopic("FOO.BAR.HUMBUG2"); + LOG.info("Created consumer destination: " + consumerDestination2 + " of type: " + consumerDestination2.getClass()); + if (durable) { + LOG.info("Creating durable consumer"); + consumer2 = consumeSession.createDurableSubscriber((Topic) consumerDestination2, getName()); + } else { + consumer2 = consumeSession.createConsumer(consumerDestination2); + } + + } + + /** + * Returns the consumer subject. + * + * @return String - consumer subject + * @see org.apache.activemq.test.TestSupport#getConsumerSubject() + */ + protected String getConsumerSubject() { + return "FOO.BAR.HUMBUG"; + } + + /** + * Returns the producer subject. + * + * @return String - producer subject + * @see org.apache.activemq.test.TestSupport#getProducerSubject() + */ + protected String getProducerSubject() { + return "FOO.BAR.HUMBUG,FOO.BAR.HUMBUG2"; + } + + /** + * Test if all the messages sent are being received. + * + * @throws Exception + */ + public void testSendReceive() throws Exception { + super.testSendReceive(); + messages.clear(); + consumer2.setMessageListener(this); + assertMessagesAreReceived(); + LOG.info("" + data.length + " messages(s) received, closing down connections"); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRedeliverTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRedeliverTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRedeliverTest.java new file mode 100644 index 0000000..188e4cd --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRedeliverTest.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JmsTopicRedeliverTest extends TestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(JmsTopicRedeliverTest.class); + + protected Connection connection; + protected Session session; + protected Session consumeSession; + protected MessageConsumer consumer; + protected MessageProducer producer; + protected Destination consumerDestination; + protected Destination producerDestination; + protected boolean topic = true; + protected boolean durable; + protected boolean verbose; + protected long initRedeliveryDelay; + + protected void setUp() throws Exception { + super.setUp(); + + connectionFactory = createConnectionFactory(); + connection = createConnection(); + initRedeliveryDelay = ((ActiveMQConnection)connection).getRedeliveryPolicy().getInitialRedeliveryDelay(); + + if (durable) { + connection.setClientID(getClass().getName()); + } + + LOG.info("Created connection: " + connection); + + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + LOG.info("Created session: " + session); + LOG.info("Created consumeSession: " + consumeSession); + producer = session.createProducer(null); + // producer.setDeliveryMode(deliveryMode); + + LOG.info("Created producer: " + producer); + + if (topic) { + consumerDestination = session.createTopic(getConsumerSubject()); + producerDestination = session.createTopic(getProducerSubject()); + } else { + consumerDestination = session.createQueue(getConsumerSubject()); + producerDestination = session.createQueue(getProducerSubject()); + } + + LOG.info("Created consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass()); + LOG.info("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass()); + consumer = createConsumer(); + connection.start(); + + LOG.info("Created connection: " + connection); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + super.tearDown(); + } + + /** + * Returns the consumer subject. + * + * @return String - consumer subject + * @see org.apache.activemq.test.TestSupport#getConsumerSubject() + */ + protected String getConsumerSubject() { + return "TEST"; + } + + /** + * Returns the producer subject. + * + * @return String - producer subject + * @see org.apache.activemq.test.TestSupport#getProducerSubject() + */ + protected String getProducerSubject() { + return "TEST"; + } + + /** + * Sends and consumes the messages. + * + * @throws Exception + */ + public void testRecover() throws Exception { + String text = "TEST"; + Message sendMessage = session.createTextMessage(text); + + if (verbose) { + LOG.info("About to send a message: " + sendMessage + " with text: " + text); + } + producer.send(producerDestination, sendMessage); + + // receive but don't acknowledge + Message unackMessage = consumer.receive(initRedeliveryDelay + 1000); + assertNotNull(unackMessage); + String unackId = unackMessage.getJMSMessageID(); + assertEquals(((TextMessage)unackMessage).getText(), text); + assertFalse(unackMessage.getJMSRedelivered()); + // assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"),1); + + // receive then acknowledge + consumeSession.recover(); + Message ackMessage = consumer.receive(initRedeliveryDelay + 1000); + assertNotNull(ackMessage); + ackMessage.acknowledge(); + String ackId = ackMessage.getJMSMessageID(); + assertEquals(((TextMessage)ackMessage).getText(), text); + assertTrue(ackMessage.getJMSRedelivered()); + // assertEquals(ackMessage.getIntProperty("JMSXDeliveryCount"),2); + assertEquals(unackId, ackId); + consumeSession.recover(); + assertNull(consumer.receiveNoWait()); + } + + protected MessageConsumer createConsumer() throws JMSException { + if (durable) { + LOG.info("Creating durable consumer"); + return consumeSession.createDurableSubscriber((Topic)consumerDestination, getName()); + } + return consumeSession.createConsumer(consumerDestination); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRequestReplyTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRequestReplyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRequestReplyTest.java new file mode 100644 index 0000000..f5d1d2c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicRequestReplyTest.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.List; +import java.util.Vector; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; + +import org.apache.activemq.test.TestSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JmsTopicRequestReplyTest extends TestSupport implements MessageListener { + private static final Logger LOG = LoggerFactory.getLogger(JmsTopicRequestReplyTest.class); + + protected boolean useAsyncConsume; + private Connection serverConnection; + private Connection clientConnection; + private MessageProducer replyProducer; + private Session serverSession; + private Destination requestDestination; + private List<JMSException> failures = new Vector<JMSException>(); + private boolean dynamicallyCreateProducer; + private String clientSideClientID; + + public void testSendAndReceive() throws Exception { + clientConnection = createConnection(); + clientConnection.setClientID("ClientConnection:" + getSubject()); + + Session session = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + clientConnection.start(); + + Destination replyDestination = createTemporaryDestination(session); + + // lets test the destination + clientSideClientID = clientConnection.getClientID(); + + // TODO + // String value = ActiveMQDestination.getClientId((ActiveMQDestination) + // replyDestination); + // assertEquals("clientID from the temporary destination must be the + // same", clientSideClientID, value); + LOG.info("Both the clientID and destination clientID match properly: " + clientSideClientID); + + /* build queues */ + MessageProducer requestProducer = session.createProducer(requestDestination); + MessageConsumer replyConsumer = session.createConsumer(replyDestination); + + /* build requestmessage */ + TextMessage requestMessage = session.createTextMessage("Olivier"); + requestMessage.setJMSReplyTo(replyDestination); + requestProducer.send(requestMessage); + + LOG.info("Sent request."); + LOG.info(requestMessage.toString()); + + Message msg = replyConsumer.receive(5000); + + if (msg instanceof TextMessage) { + TextMessage replyMessage = (TextMessage)msg; + LOG.info("Received reply."); + LOG.info(replyMessage.toString()); + assertEquals("Wrong message content", "Hello: Olivier", replyMessage.getText()); + } else { + fail("Should have received a reply by now"); + } + replyConsumer.close(); + deleteTemporaryDestination(replyDestination); + + assertEquals("Should not have had any failures: " + failures, 0, failures.size()); + } + + public void testSendAndReceiveWithDynamicallyCreatedProducer() throws Exception { + dynamicallyCreateProducer = true; + testSendAndReceive(); + } + + /** + * Use the asynchronous subscription mechanism + */ + public void onMessage(Message message) { + try { + TextMessage requestMessage = (TextMessage)message; + + LOG.info("Received request."); + LOG.info(requestMessage.toString()); + + Destination replyDestination = requestMessage.getJMSReplyTo(); + + // TODO + // String value = + // ActiveMQDestination.getClientId((ActiveMQDestination) + // replyDestination); + // assertEquals("clientID from the temporary destination must be the + // same", clientSideClientID, value); + + TextMessage replyMessage = serverSession.createTextMessage("Hello: " + requestMessage.getText()); + + replyMessage.setJMSCorrelationID(requestMessage.getJMSMessageID()); + + if (dynamicallyCreateProducer) { + replyProducer = serverSession.createProducer(replyDestination); + replyProducer.send(replyMessage); + } else { + replyProducer.send(replyDestination, replyMessage); + } + + LOG.info("Sent reply."); + LOG.info(replyMessage.toString()); + } catch (JMSException e) { + onException(e); + } + } + + /** + * Use the synchronous subscription mechanism + */ + protected void syncConsumeLoop(MessageConsumer requestConsumer) { + try { + Message message = requestConsumer.receive(5000); + if (message != null) { + onMessage(message); + } else { + LOG.error("No message received"); + } + } catch (JMSException e) { + onException(e); + } + } + + protected void setUp() throws Exception { + super.setUp(); + + serverConnection = createConnection(); + serverConnection.setClientID("serverConnection:" + getSubject()); + serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + replyProducer = serverSession.createProducer(null); + + requestDestination = createDestination(serverSession); + + /* build queues */ + final MessageConsumer requestConsumer = serverSession.createConsumer(requestDestination); + if (useAsyncConsume) { + requestConsumer.setMessageListener(this); + } else { + Thread thread = new Thread(new Runnable() { + public void run() { + syncConsumeLoop(requestConsumer); + } + }); + thread.start(); + } + serverConnection.start(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + + serverConnection.close(); + clientConnection.stop(); + clientConnection.close(); + } + + protected void onException(JMSException e) { + LOG.info("Caught: " + e); + e.printStackTrace(); + failures.add(e); + } + + protected Destination createDestination(Session session) throws JMSException { + if (topic) { + return session.createTopic(getSubject()); + } + return session.createQueue(getSubject()); + } + + protected Destination createTemporaryDestination(Session session) throws JMSException { + if (topic) { + return session.createTemporaryTopic(); + } + return session.createTemporaryQueue(); + } + + protected void deleteTemporaryDestination(Destination dest) throws JMSException { + if (topic) { + ((TemporaryTopic)dest).delete(); + } else { + ((TemporaryQueue)dest).delete(); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java new file mode 100644 index 0000000..e6a2503 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JmsTopicSelectorTest extends TestSupport { + private static final Logger LOG = LoggerFactory.getLogger(JmsTopicSelectorTest.class); + + protected Connection connection; + protected Session session; + protected MessageConsumer consumer; + protected MessageProducer producer; + protected Destination consumerDestination; + protected Destination producerDestination; + protected boolean topic = true; + protected boolean durable; + protected int deliveryMode = DeliveryMode.PERSISTENT; + + public void setUp() throws Exception { + super.setUp(); + + connectionFactory = createConnectionFactory(); + connection = createConnection(); + if (durable) { + connection.setClientID(getClass().getName()); + } + + LOG.info("Created connection: " + connection); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + LOG.info("Created session: " + session); + + if (topic) { + consumerDestination = session.createTopic(getConsumerSubject()); + producerDestination = session.createTopic(getProducerSubject()); + } else { + consumerDestination = session.createQueue(getConsumerSubject()); + producerDestination = session.createQueue(getProducerSubject()); + } + + LOG.info("Created consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass()); + LOG.info("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass()); + producer = session.createProducer(producerDestination); + producer.setDeliveryMode(deliveryMode); + + LOG.info("Created producer: " + producer + " delivery mode = " + (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT")); + connection.start(); + } + + public void tearDown() throws Exception { + session.close(); + connection.close(); + } + + protected MessageConsumer createConsumer(String selector) throws JMSException { + if (durable) { + LOG.info("Creating durable consumer"); + return session.createDurableSubscriber((Topic)consumerDestination, getName(), selector, false); + } + return session.createConsumer(consumerDestination, selector); + } + + public void sendMessages() throws Exception { + TextMessage message = session.createTextMessage("1"); + message.setIntProperty("id", 1); + message.setJMSType("a"); + message.setStringProperty("stringProperty", "a"); + message.setLongProperty("longProperty", 1); + message.setBooleanProperty("booleanProperty", true); + producer.send(message); + + message = session.createTextMessage("2"); + message.setIntProperty("id", 2); + message.setJMSType("a"); + message.setStringProperty("stringProperty", "a"); + message.setLongProperty("longProperty", 1); + message.setBooleanProperty("booleanProperty", false); + producer.send(message); + + message = session.createTextMessage("3"); + message.setIntProperty("id", 3); + message.setJMSType("a"); + message.setStringProperty("stringProperty", "a"); + message.setLongProperty("longProperty", 1); + message.setBooleanProperty("booleanProperty", true); + producer.send(message); + + message = session.createTextMessage("4"); + message.setIntProperty("id", 4); + message.setJMSType("b"); + message.setStringProperty("stringProperty", "b"); + message.setLongProperty("longProperty", 2); + message.setBooleanProperty("booleanProperty", false); + producer.send(message); + + message = session.createTextMessage("5"); + message.setIntProperty("id", 5); + message.setJMSType("c"); + message.setStringProperty("stringProperty", "c"); + message.setLongProperty("longProperty", 3); + message.setBooleanProperty("booleanProperty", true); + producer.send(message); + } + + public void consumeMessages(int remaining) throws Exception { + consumer = createConsumer(null); + for (int i = 0; i < remaining; i++) { + consumer.receive(1000); + } + consumer.close(); + + } + + public void testEmptyPropertySelector() throws Exception { + int remaining = 5; + Message message = null; + consumer = createConsumer(""); + sendMessages(); + while (true) { + message = consumer.receive(1000); + if (message == null) { + break; + } + + remaining--; + } + assertEquals(remaining, 0); + consumer.close(); + consumeMessages(remaining); + } + + public void testPropertySelector() throws Exception { + int remaining = 5; + Message message = null; + consumer = createConsumer("stringProperty = 'a' and longProperty = 1 and booleanProperty = true"); + sendMessages(); + while (true) { + message = consumer.receive(1000); + if (message == null) { + break; + } + String text = ((TextMessage)message).getText(); + if (!text.equals("1") && !text.equals("3")) { + fail("unexpected message: " + text); + } + remaining--; + } + assertEquals(remaining, 3); + consumer.close(); + consumeMessages(remaining); + + } + + public void testJMSPropertySelector() throws Exception { + int remaining = 5; + Message message = null; + consumer = createConsumer("JMSType = 'a' and stringProperty = 'a'"); + sendMessages(); + while (true) { + message = consumer.receive(1000); + if (message == null) { + break; + } + String text = ((TextMessage)message).getText(); + if (!text.equals("1") && !text.equals("2") && !text.equals("3")) { + fail("unexpected message: " + text); + } + remaining--; + } + assertEquals(remaining, 2); + consumer.close(); + consumeMessages(remaining); + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveSubscriberTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveSubscriberTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveSubscriberTest.java new file mode 100644 index 0000000..7cb09ea --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveSubscriberTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Topic; +import javax.jms.TopicSession; + +/** + * + */ +public class JmsTopicSendReceiveSubscriberTest extends JmsTopicSendReceiveTest { + protected MessageConsumer createConsumer() throws JMSException { + if (durable) { + return super.createConsumer(); + } else { + TopicSession topicSession = (TopicSession)session; + return topicSession.createSubscriber((Topic)consumerDestination, null, false); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveTest.java new file mode 100644 index 0000000..fd91ec4 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JmsTopicSendReceiveTest extends JmsSendReceiveTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(JmsTopicSendReceiveTest.class); + + protected Connection connection; + + protected void setUp() throws Exception { + super.setUp(); + + connectionFactory = createConnectionFactory(); + connection = createConnection(); + if (durable) { + connection.setClientID(getClass().getName()); + } + + LOG.info("Created connection: " + connection); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + LOG.info("Created session: " + session); + producer = session.createProducer(null); + producer.setDeliveryMode(deliveryMode); + + LOG.info("Created producer: " + producer + " delivery mode = " + (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT")); + + if (topic) { + consumerDestination = session.createTopic(getConsumerSubject()); + producerDestination = session.createTopic(getProducerSubject()); + } else { + consumerDestination = session.createQueue(getConsumerSubject()); + producerDestination = session.createQueue(getProducerSubject()); + } + + LOG.info("Created consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass()); + LOG.info("Created producer destination: " + producerDestination + " of type: " + producerDestination.getClass()); + consumer = createConsumer(); + consumer.setMessageListener(this); + connection.start(); + + // log.info("Created connection: " + connection); + } + + protected MessageConsumer createConsumer() throws JMSException { + if (durable) { + LOG.info("Creating durable consumer"); + return session.createDurableSubscriber((Topic)consumerDestination, getName()); + } + return session.createConsumer(consumerDestination); + } + + protected void tearDown() throws Exception { + LOG.info("Dumping stats..."); + // connectionFactory.getStats().reset(); + + LOG.info("Closing down connection"); + + /** TODO we should be able to shut down properly */ + session.close(); + connection.close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java new file mode 100644 index 0000000..b56c15d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +/** + * @version + */ +public class JmsTopicSendReceiveWithTwoConnectionsTest extends JmsSendReceiveTestSupport { + + private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory + .getLog(JmsTopicSendReceiveWithTwoConnectionsTest.class); + + protected Connection sendConnection; + protected Connection receiveConnection; + protected Session receiveSession; + + protected void setUp() throws Exception { + super.setUp(); + + connectionFactory = createConnectionFactory(); + + sendConnection = createSendConnection(); + sendConnection.start(); + + receiveConnection = createReceiveConnection(); + receiveConnection.start(); + + LOG.info("Created sendConnection: " + sendConnection); + LOG.info("Created receiveConnection: " + receiveConnection); + + session = createSendSession(sendConnection); + receiveSession = createReceiveSession(receiveConnection); + + LOG.info("Created sendSession: " + session); + LOG.info("Created receiveSession: " + receiveSession); + + producer = session.createProducer(null); + producer.setDeliveryMode(deliveryMode); + + LOG.info("Created producer: " + producer + " delivery mode = " + + (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT")); + + if (topic) { + consumerDestination = session.createTopic(getConsumerSubject()); + producerDestination = session.createTopic(getProducerSubject()); + } else { + consumerDestination = session.createQueue(getConsumerSubject()); + producerDestination = session.createQueue(getProducerSubject()); + } + + LOG.info("Created consumer destination: " + consumerDestination + " of type: " + + consumerDestination.getClass()); + LOG.info("Created producer destination: " + producerDestination + " of type: " + + producerDestination.getClass()); + + consumer = createConsumer(receiveSession, consumerDestination); + consumer.setMessageListener(this); + + LOG.info("Started connections"); + } + + protected Session createReceiveSession(Connection receiveConnection) throws Exception { + return receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + protected Session createSendSession(Connection sendConnection) throws Exception { + return sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + protected Connection createReceiveConnection() throws Exception { + return createConnection(); + } + + protected Connection createSendConnection() throws Exception { + return createConnection(); + } + + protected MessageConsumer createConsumer(Session session, Destination dest) throws JMSException { + return session.createConsumer(dest); + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + } + + protected void tearDown() throws Exception { + session.close(); + receiveSession.close(); + sendConnection.close(); + receiveConnection.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsWithJMXTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsWithJMXTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsWithJMXTest.java new file mode 100644 index 0000000..da2c80c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsWithJMXTest.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +/** + * + * + */ +public class JmsTopicSendReceiveWithTwoConnectionsWithJMXTest extends + JmsTopicSendReceiveWithTwoConnectionsTest { + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&broker.useJmx=true"); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendSameMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendSameMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendSameMessageTest.java new file mode 100644 index 0000000..9a92e49 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicSendSameMessageTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.TextMessage; + +/** + * + */ +public class JmsTopicSendSameMessageTest extends JmsTopicSendReceiveWithTwoConnectionsTest { + + private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory + .getLog(JmsTopicSendSameMessageTest.class); + + public void testSendReceive() throws Exception { + messages.clear(); + + TextMessage message = session.createTextMessage(); + + for (int i = 0; i < data.length; i++) { + message.setText(data[i]); + message.setStringProperty("stringProperty", data[i]); + message.setIntProperty("intProperty", i); + + if (verbose) { + LOG.info("About to send a message: " + message + " with text: " + data[i]); + } + + producer.send(producerDestination, message); + } + + assertMessagesAreReceived(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicTransactionTest.java new file mode 100644 index 0000000..cf5cdc0 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicTransactionTest.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import org.apache.activemq.test.JmsResourceProvider; + + +/** + * + */ +public class JmsTopicTransactionTest extends JmsTransactionTestSupport { + + /** + * @see org.apache.activemq.JmsTransactionTestSupport#getJmsResourceProvider() + */ + protected JmsResourceProvider getJmsResourceProvider() { + JmsResourceProvider p = new JmsResourceProvider(); + p.setTopic(true); + p.setDurableName("testsub"); + p.setClientID("testclient"); + return p; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicWildcardSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicWildcardSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicWildcardSendReceiveTest.java new file mode 100644 index 0000000..eeb5999 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTopicWildcardSendReceiveTest.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.test.JmsTopicSendReceiveTest; + +/** + * + */ +public class JmsTopicWildcardSendReceiveTest extends JmsTopicSendReceiveTest { + + private String destination1String = "TEST.ONE.ONE"; + private String destination2String = "TEST.ONE.ONE.ONE"; + private String destination3String = "TEST.ONE.TWO"; + private String destination4String = "TEST.TWO.ONE"; + + protected void setUp() throws Exception { + topic = true; + durable = false; + deliveryMode = DeliveryMode.NON_PERSISTENT; + super.setUp(); + } + + protected String getConsumerSubject() { + return "FOO.>"; + } + + protected String getProducerSubject() { + return "FOO.BAR.HUMBUG"; + } + + public void testReceiveWildcardTopicEndAsterisk() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQDestination destination1 = (ActiveMQDestination)session.createTopic(destination1String); + ActiveMQDestination destination3 = (ActiveMQDestination)session.createTopic(destination3String); + + Message m = null; + MessageConsumer consumer = null; + String text = null; + + ActiveMQDestination destination6 = (ActiveMQDestination)session.createTopic("TEST.ONE.*"); + consumer = session.createConsumer(destination6); + sendMessage(session, destination1, destination1String); + sendMessage(session, destination3, destination3String); + m = consumer.receive(1000); + assertNotNull(m); + text = ((TextMessage)m).getText(); + if (!(text.equals(destination1String) || text.equals(destination3String))) { + fail("unexpected message:" + text); + } + m = consumer.receive(1000); + assertNotNull(m); + text = ((TextMessage)m).getText(); + if (!(text.equals(destination1String) || text.equals(destination3String))) { + fail("unexpected message:" + text); + } + assertNull(consumer.receiveNoWait()); + } + + public void testReceiveWildcardTopicEndGreaterThan() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQDestination destination1 = (ActiveMQDestination)session.createTopic(destination1String); + ActiveMQDestination destination2 = (ActiveMQDestination)session.createTopic(destination2String); + ActiveMQDestination destination3 = (ActiveMQDestination)session.createTopic(destination3String); + + Message m = null; + MessageConsumer consumer = null; + String text = null; + + ActiveMQDestination destination7 = (ActiveMQDestination)session.createTopic("TEST.ONE.>"); + consumer = session.createConsumer(destination7); + sendMessage(session, destination1, destination1String); + sendMessage(session, destination2, destination2String); + sendMessage(session, destination3, destination3String); + m = consumer.receive(1000); + assertNotNull(m); + text = ((TextMessage)m).getText(); + if (!(text.equals(destination1String) || text.equals(destination2String) || text.equals(destination3String))) { + fail("unexpected message:" + text); + } + m = consumer.receive(1000); + assertNotNull(m); + if (!(text.equals(destination1String) || text.equals(destination2String) || text.equals(destination3String))) { + fail("unexpected message:" + text); + } + m = consumer.receive(1000); + assertNotNull(m); + if (!(text.equals(destination1String) || text.equals(destination2String) || text.equals(destination3String))) { + fail("unexpected message:" + text); + } + assertNull(consumer.receiveNoWait()); + } + + public void testReceiveWildcardTopicMidAsterisk() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQDestination destination1 = (ActiveMQDestination)session.createTopic(destination1String); + ActiveMQDestination destination4 = (ActiveMQDestination)session.createTopic(destination4String); + + Message m = null; + MessageConsumer consumer = null; + String text = null; + + ActiveMQDestination destination8 = (ActiveMQDestination)session.createTopic("TEST.*.ONE"); + consumer = session.createConsumer(destination8); + sendMessage(session, destination1, destination1String); + sendMessage(session, destination4, destination4String); + m = consumer.receive(1000); + assertNotNull(m); + text = ((TextMessage)m).getText(); + if (!(text.equals(destination1String) || text.equals(destination4String))) { + fail("unexpected message:" + text); + } + m = consumer.receive(1000); + assertNotNull(m); + text = ((TextMessage)m).getText(); + if (!(text.equals(destination1String) || text.equals(destination4String))) { + fail("unexpected message:" + text); + } + assertNull(consumer.receiveNoWait()); + + } + + public void testReceiveWildcardTopicMatchDoubleWildcard() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQDestination destination1 = (ActiveMQDestination)session.createTopic("a.*.>.>"); + ActiveMQDestination destination2 = (ActiveMQDestination)session.createTopic("a.b"); + + Message m = null; + MessageConsumer consumer = null; + String text = null; + + + consumer = session.createConsumer(destination1); + sendMessage(session, destination2, destination3String); + + m = consumer.receive(1000); + assertNotNull(m); + text = ((TextMessage)m).getText(); + if (!(text.equals(destination1String) || text.equals(destination3String))) { + fail("unexpected message:" + text); + } + + assertNull(consumer.receiveNoWait()); + } + + public void testReceiveWildcardTopicMatchSinglePastTheEndWildcard() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQDestination destination1 = (ActiveMQDestination)session.createTopic("a.>"); + ActiveMQDestination destination2 = (ActiveMQDestination)session.createTopic("a"); + + Message m = null; + MessageConsumer consumer = null; + String text = null; + + + consumer = session.createConsumer(destination1); + sendMessage(session, destination2, destination3String); + + m = consumer.receive(1000); + assertNotNull(m); + text = ((TextMessage)m).getText(); + if (!(text.equals(destination1String) || text.equals(destination3String))) { + fail("unexpected message:" + text); + } + + assertNull(consumer.receiveNoWait()); + } + + + + private void sendMessage(Session session, Destination destination, String text) throws JMSException { + MessageProducer producer = session.createProducer(destination); + producer.send(session.createTextMessage(text)); + producer.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java new file mode 100644 index 0000000..fc77218 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq; + +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.IdGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class LargeMessageTestSupport extends ClientTestSupport implements MessageListener { + + protected static final int LARGE_MESSAGE_SIZE = 128 * 1024; + protected static final int MESSAGE_COUNT = 100; + + private static final Logger LOG = LoggerFactory.getLogger(LargeMessageTestSupport.class); + + protected Connection producerConnection; + protected Connection consumerConnection; + protected MessageConsumer consumer; + protected MessageProducer producer; + protected Session producerSession; + protected Session consumerSession; + protected byte[] largeMessageData; + protected Destination destination; + protected boolean isTopic = true; + protected boolean isDurable = true; + protected int deliveryMode = DeliveryMode.PERSISTENT; + protected IdGenerator idGen = new IdGenerator(); + protected boolean validMessageConsumption = true; + protected AtomicInteger messageCount = new AtomicInteger(0); + + protected int prefetchValue = 10000000; + + protected Destination createDestination() { + String subject = getClass().getName(); + if (isTopic) { + return new ActiveMQTopic(subject); + } else { + return new ActiveMQQueue(subject); + } + } + + protected MessageConsumer createConsumer() throws JMSException { + if (isTopic && isDurable) { + return consumerSession.createDurableSubscriber((Topic)destination, idGen.generateId()); + } else { + return consumerSession.createConsumer(destination); + } + } + + public void setUp() throws Exception { + super.setUp(); + ClientTestSupport.removeMessageStore(); + LOG.info("Setting up . . . . . "); + messageCount.set(0); + + destination = createDestination(); + largeMessageData = new byte[LARGE_MESSAGE_SIZE]; + for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) { + if (i % 2 == 0) { + largeMessageData[i] = 'a'; + } else { + largeMessageData[i] = 'z'; + } + } + + try { + // allow the broker to start + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new JMSException(e.getMessage()); + } + + ActiveMQConnectionFactory fac = getConnectionFactory(); + producerConnection = fac.createConnection(); + setPrefetchPolicy((ActiveMQConnection)producerConnection); + producerConnection.start(); + + consumerConnection = fac.createConnection(); + setPrefetchPolicy((ActiveMQConnection)consumerConnection); + consumerConnection.setClientID(idGen.generateId()); + consumerConnection.start(); + producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = producerSession.createProducer(createDestination()); + producer.setDeliveryMode(deliveryMode); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = createConsumer(); + consumer.setMessageListener(this); + LOG.info("Setup complete"); + } + + protected void setPrefetchPolicy(ActiveMQConnection activeMQConnection) { + activeMQConnection.getPrefetchPolicy().setTopicPrefetch(prefetchValue); + activeMQConnection.getPrefetchPolicy().setQueuePrefetch(prefetchValue); + activeMQConnection.getPrefetchPolicy().setDurableTopicPrefetch(prefetchValue); + activeMQConnection.getPrefetchPolicy().setQueueBrowserPrefetch(prefetchValue); + activeMQConnection.getPrefetchPolicy().setOptimizeDurableTopicPrefetch(prefetchValue); + } + + public void tearDown() throws Exception { + Thread.sleep(1000); + producerConnection.close(); + consumerConnection.close(); + + super.tearDown(); + + largeMessageData = null; + } + + protected boolean isSame(BytesMessage msg1) throws Exception { + boolean result = false; + ((ActiveMQMessage)msg1).setReadOnlyBody(true); + + for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) { + result = msg1.readByte() == largeMessageData[i]; + if (!result) { + break; + } + } + + return result; + } + + public void onMessage(Message msg) { + try { + BytesMessage ba = (BytesMessage)msg; + validMessageConsumption &= isSame(ba); + assertTrue(ba.getBodyLength() == LARGE_MESSAGE_SIZE); + if (messageCount.incrementAndGet() >= MESSAGE_COUNT) { + synchronized (messageCount) { + messageCount.notify(); + } + } + LOG.info("got message = " + messageCount); + if (messageCount.get() % 50 == 0) { + LOG.info("count = " + messageCount); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void testLargeMessages() throws Exception { + for (int i = 0; i < MESSAGE_COUNT; i++) { + LOG.info("Sending message: " + i); + BytesMessage msg = producerSession.createBytesMessage(); + msg.writeBytes(largeMessageData); + producer.send(msg); + } + long now = System.currentTimeMillis(); + while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) { + LOG.info("message count = " + messageCount); + synchronized (messageCount) { + messageCount.wait(1000); + } + } + LOG.info("Finished count = " + messageCount); + assertTrue("Not enough messages - expected " + MESSAGE_COUNT + " but got " + messageCount, messageCount.get() == MESSAGE_COUNT); + assertTrue("received messages are not valid", validMessageConsumption); + Thread.sleep(1000); + LOG.info("FINAL count = " + messageCount); + } +}
