http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSMessageTest.java new file mode 100644 index 0000000..5cc279c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSMessageTest.java @@ -0,0 +1,547 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.net.URISyntaxException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Vector; + +import javax.jms.BytesMessage; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageEOFException; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import junit.framework.Test; + +import org.apache.activemq.command.ActiveMQDestination; + +/** + * Test cases used to test the JMS message consumer. + * + * + */ +public class JMSMessageTest extends JmsTestSupport { + + public ActiveMQDestination destination; + public int deliveryMode = DeliveryMode.NON_PERSISTENT; + public int prefetch; + public int ackMode; + public byte destinationType = ActiveMQDestination.QUEUE_TYPE; + public boolean durableConsumer; + public String connectURL = "vm://localhost?marshal=false"; + + /** + * Run all these tests in both marshaling and non-marshaling mode. + */ + public void initCombos() { + addCombinationValues("connectURL", new Object[] {"vm://localhost?marshal=false", + "vm://localhost?marshal=true"}); + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), + Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)}); + } + + public void testTextMessage() throws Exception { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // Send the message. + { + TextMessage message = session.createTextMessage(); + message.setText("Hi"); + producer.send(message); + } + + // Check the Message + { + TextMessage message = (TextMessage)consumer.receive(1000); + assertNotNull(message); + assertEquals("Hi", message.getText()); + } + + assertNull(consumer.receiveNoWait()); + } + + public static Test suite() { + return suite(JMSMessageTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + @Override + protected ConnectionFactory createConnectionFactory() throws URISyntaxException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectURL); + return factory; + } + + public void testBytesMessageLength() throws Exception { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // Send the message + { + BytesMessage message = session.createBytesMessage(); + message.writeInt(1); + message.writeInt(2); + message.writeInt(3); + message.writeInt(4); + producer.send(message); + } + + // Check the message. + { + BytesMessage message = (BytesMessage)consumer.receive(1000); + assertNotNull(message); + assertEquals(16, message.getBodyLength()); + } + + assertNull(consumer.receiveNoWait()); + } + + public void testObjectMessage() throws Exception { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // send the message. + { + ObjectMessage message = session.createObjectMessage(); + message.setObject("Hi"); + producer.send(message); + } + + // Check the message + { + ObjectMessage message = (ObjectMessage)consumer.receive(1000); + assertNotNull(message); + assertEquals("Hi", message.getObject()); + } + assertNull(consumer.receiveNoWait()); + } + + public void testBytesMessage() throws Exception { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // Send the message + { + BytesMessage message = session.createBytesMessage(); + message.writeBoolean(true); + producer.send(message); + } + + // Check the message + { + BytesMessage message = (BytesMessage)consumer.receive(1000); + assertNotNull(message); + assertTrue(message.readBoolean()); + + try { + message.readByte(); + fail("Expected exception not thrown."); + } catch (MessageEOFException e) { + } + + } + assertNull(consumer.receiveNoWait()); + } + + public void testStreamMessage() throws Exception { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // Send the message. + { + StreamMessage message = session.createStreamMessage(); + message.writeString("This is a test to see how it works."); + producer.send(message); + } + + // Check the message. + { + StreamMessage message = (StreamMessage)consumer.receive(1000); + assertNotNull(message); + + // Invalid conversion should throw exception and not move the stream + // position. + try { + message.readByte(); + fail("Should have received NumberFormatException"); + } catch (NumberFormatException e) { + } + + assertEquals("This is a test to see how it works.", message.readString()); + + // Invalid conversion should throw exception and not move the stream + // position. + try { + message.readByte(); + fail("Should have received MessageEOFException"); + } catch (MessageEOFException e) { + } + } + assertNull(consumer.receiveNoWait()); + } + + public void testMapMessage() throws Exception { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // send the message. + { + MapMessage message = session.createMapMessage(); + message.setBoolean("boolKey", true); + producer.send(message); + } + + // get the message. + { + MapMessage message = (MapMessage)consumer.receive(1000); + assertNotNull(message); + assertTrue(message.getBoolean("boolKey")); + } + assertNull(consumer.receiveNoWait()); + } + + static class ForeignMessage implements TextMessage { + + public int deliveryMode; + + private String messageId; + private long timestamp; + private String correlationId; + private Destination replyTo; + private Destination destination; + private boolean redelivered; + private String type; + private long expiration; + private int priority; + private String text; + private final HashMap<String, Object> props = new HashMap<String, Object>(); + + @Override + public String getJMSMessageID() throws JMSException { + return messageId; + } + + @Override + public void setJMSMessageID(String arg0) throws JMSException { + messageId = arg0; + } + + @Override + public long getJMSTimestamp() throws JMSException { + return timestamp; + } + + @Override + public void setJMSTimestamp(long arg0) throws JMSException { + timestamp = arg0; + } + + @Override + public byte[] getJMSCorrelationIDAsBytes() throws JMSException { + return null; + } + + @Override + public void setJMSCorrelationIDAsBytes(byte[] arg0) throws JMSException { + } + + @Override + public void setJMSCorrelationID(String arg0) throws JMSException { + correlationId = arg0; + } + + @Override + public String getJMSCorrelationID() throws JMSException { + return correlationId; + } + + @Override + public Destination getJMSReplyTo() throws JMSException { + return replyTo; + } + + @Override + public void setJMSReplyTo(Destination arg0) throws JMSException { + replyTo = arg0; + } + + @Override + public Destination getJMSDestination() throws JMSException { + return destination; + } + + @Override + public void setJMSDestination(Destination arg0) throws JMSException { + destination = arg0; + } + + @Override + public int getJMSDeliveryMode() throws JMSException { + return deliveryMode; + } + + @Override + public void setJMSDeliveryMode(int arg0) throws JMSException { + deliveryMode = arg0; + } + + @Override + public boolean getJMSRedelivered() throws JMSException { + return redelivered; + } + + @Override + public void setJMSRedelivered(boolean arg0) throws JMSException { + redelivered = arg0; + } + + @Override + public String getJMSType() throws JMSException { + return type; + } + + @Override + public void setJMSType(String arg0) throws JMSException { + type = arg0; + } + + @Override + public long getJMSExpiration() throws JMSException { + return expiration; + } + + @Override + public void setJMSExpiration(long arg0) throws JMSException { + expiration = arg0; + } + + @Override + public int getJMSPriority() throws JMSException { + return priority; + } + + @Override + public void setJMSPriority(int arg0) throws JMSException { + priority = arg0; + } + + @Override + public void clearProperties() throws JMSException { + } + + @Override + public boolean propertyExists(String arg0) throws JMSException { + return false; + } + + @Override + public boolean getBooleanProperty(String arg0) throws JMSException { + return false; + } + + @Override + public byte getByteProperty(String arg0) throws JMSException { + return 0; + } + + @Override + public short getShortProperty(String arg0) throws JMSException { + return 0; + } + + @Override + public int getIntProperty(String arg0) throws JMSException { + return 0; + } + + @Override + public long getLongProperty(String arg0) throws JMSException { + return 0; + } + + @Override + public float getFloatProperty(String arg0) throws JMSException { + return 0; + } + + @Override + public double getDoubleProperty(String arg0) throws JMSException { + return 0; + } + + @Override + public String getStringProperty(String arg0) throws JMSException { + return (String)props.get(arg0); + } + + @Override + public Object getObjectProperty(String arg0) throws JMSException { + return props.get(arg0); + } + + @Override + public Enumeration<?> getPropertyNames() throws JMSException { + return new Vector<String>(props.keySet()).elements(); + } + + @Override + public void setBooleanProperty(String arg0, boolean arg1) throws JMSException { + } + + @Override + public void setByteProperty(String arg0, byte arg1) throws JMSException { + } + + @Override + public void setShortProperty(String arg0, short arg1) throws JMSException { + } + + @Override + public void setIntProperty(String arg0, int arg1) throws JMSException { + } + + @Override + public void setLongProperty(String arg0, long arg1) throws JMSException { + } + + @Override + public void setFloatProperty(String arg0, float arg1) throws JMSException { + } + + @Override + public void setDoubleProperty(String arg0, double arg1) throws JMSException { + } + + @Override + public void setStringProperty(String arg0, String arg1) throws JMSException { + props.put(arg0, arg1); + } + + @Override + public void setObjectProperty(String arg0, Object arg1) throws JMSException { + props.put(arg0, arg1); + } + + @Override + public void acknowledge() throws JMSException { + } + + @Override + public void clearBody() throws JMSException { + } + + @Override + public void setText(String arg0) throws JMSException { + text = arg0; + } + + @Override + public String getText() throws JMSException { + return text; + } + } + + public void testForeignMessage() throws Exception { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // Send the message. + { + ForeignMessage message = new ForeignMessage(); + message.text = "Hello"; + message.setStringProperty("test", "value"); + long timeToLive = 10000L; + long start = System.currentTimeMillis(); + producer.send(message, Session.AUTO_ACKNOWLEDGE, 7, timeToLive); + long end = System.currentTimeMillis(); + + + //validate jms spec 1.1 section 3.4.11 table 3.1 + // JMSDestination, JMSDeliveryMode, JMSExpiration, JMSPriority, JMSMessageID, and JMSTimestamp + //must be set by sending a message. + + assertNotNull(message.getJMSDestination()); + assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode()); + assertTrue(start + timeToLive <= message.getJMSExpiration()); + assertTrue(end + timeToLive >= message.getJMSExpiration()); + assertEquals(7, message.getJMSPriority()); + assertNotNull(message.getJMSMessageID()); + assertTrue(start <= message.getJMSTimestamp()); + assertTrue(end >= message.getJMSTimestamp()); + } + + // Validate message is OK. + { + TextMessage message = (TextMessage)consumer.receive(1000); + assertNotNull(message); + assertEquals("Hello", message.getText()); + assertEquals("value", message.getStringProperty("test")); + } + + assertNull(consumer.receiveNoWait()); + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSQueueRedeliverTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSQueueRedeliverTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSQueueRedeliverTest.java new file mode 100644 index 0000000..be8a6f2 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSQueueRedeliverTest.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +/** + * + */ +public class JMSQueueRedeliverTest extends JmsTopicRedeliverTest { + protected void setUp() throws Exception { + topic = false; + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSUsecaseTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSUsecaseTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSUsecaseTest.java new file mode 100644 index 0000000..d22907a --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSUsecaseTest.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.Enumeration; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.Test; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; + +public class JMSUsecaseTest extends JmsTestSupport { + + public ActiveMQDestination destination; + public int deliveryMode; + public int prefetch; + public byte destinationType; + public boolean durableConsumer; + + public static Test suite() { + return suite(JMSUsecaseTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + public void initCombosForTestQueueBrowser() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)}); + } + + public void testQueueBrowser() throws Exception { + + // Send a message to the broker. + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(this.deliveryMode); + sendMessages(session, producer, 5); + producer.close(); + + QueueBrowser browser = session.createBrowser((Queue)destination); + Enumeration<?> enumeration = browser.getEnumeration(); + for (int i = 0; i < 5; i++) { + Thread.sleep(100); + assertTrue(enumeration.hasMoreElements()); + Message m = (Message)enumeration.nextElement(); + assertNotNull(m); + assertEquals("" + i, ((TextMessage)m).getText()); + } + assertFalse(enumeration.hasMoreElements()); + } + + public void initCombosForTestSendReceive() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), + Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); + } + + public void testSendReceive() throws Exception { + // Send a message to the broker. + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(this.deliveryMode); + MessageConsumer consumer = session.createConsumer(destination); + ActiveMQMessage message = new ActiveMQMessage(); + producer.send(message); + + // Make sure only 1 message was delivered. + assertNotNull(consumer.receive(1000)); + assertNull(consumer.receiveNoWait()); + } + + public void initCombosForTestSendReceiveTransacted() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), + Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); + } + + public void testSendReceiveTransacted() throws Exception { + // Send a message to the broker. + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + destination = createDestination(session, destinationType); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(this.deliveryMode); + MessageConsumer consumer = session.createConsumer(destination); + producer.send(session.createTextMessage("test")); + + // Message should not be delivered until commit. + assertNull(consumer.receiveNoWait()); + session.commit(); + + // Make sure only 1 message was delivered. + Message message = consumer.receive(1000); + assertNotNull(message); + assertFalse(message.getJMSRedelivered()); + assertNull(consumer.receiveNoWait()); + + // Message should be redelivered is rollback is used. + session.rollback(); + + // Make sure only 1 message was delivered. + message = consumer.receive(2000); + assertNotNull(message); + assertTrue(message.getJMSRedelivered()); + assertNull(consumer.receiveNoWait()); + + // If we commit now, the message should not be redelivered. + session.commit(); + assertNull(consumer.receiveNoWait()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java new file mode 100644 index 0000000..7deff27 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.ConnectionFactory; +import junit.framework.Test; + +/* + * allow an XA session to be used as an auto ack session when no XA transaction + * https://issues.apache.org/activemq/browse/AMQ-2659 + */ +public class JMSXAConsumerTest extends JMSConsumerTest { + + public static Test suite() { + return suite(JMSXAConsumerTest.class); + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQXAConnectionFactory("vm://localhost"); + } + + // some tests use transactions, these will not work unless an XA transaction is in place + // slip these + public void testPrefetch1MessageNotDispatched() throws Exception { + } + + public void testRedispatchOfUncommittedTx() throws Exception { + } + + public void testRedispatchOfRolledbackTx() throws Exception { + } + + public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception { + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java new file mode 100644 index 0000000..5f34106 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +/** + * + */ +public class JmsAutoAckListenerTest extends TestSupport implements MessageListener { + + private Connection connection; + + protected void setUp() throws Exception { + super.setUp(); + connection = createConnection(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + /** + * Tests if acknowleged messages are being consumed. + * + * @throws javax.jms.JMSException + */ + public void testAckedMessageAreConsumed() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test"); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(this); + + Thread.sleep(10000); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + // Attempt to Consume the message...check if message was acknowledge + consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNull(msg); + + session.close(); + } + + public void onMessage(Message message) { + assertNotNull(message); + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckTest.java new file mode 100644 index 0000000..90ee032 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsAutoAckTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +/** + * + */ +public class JmsAutoAckTest extends TestSupport { + + private Connection connection; + + protected void setUp() throws Exception { + super.setUp(); + connection = createConnection(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + /** + * Tests if acknowleged messages are being consumed. + * + * @throws javax.jms.JMSException + */ + public void testAckedMessageAreConsumed() throws JMSException { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test"); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + + // Reset the session. + session.close(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(1000); + assertNull(msg); + + session.close(); + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsBenchmark.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsBenchmark.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsBenchmark.java new file mode 100644 index 0000000..52a70a0 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsBenchmark.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.Test; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Benchmarks the broker by starting many consumer and producers against the + * same destination. Make sure you run with jvm option -server (makes a big + * difference). The tests simulate storing 1000 1k jms messages to see the rate + * of processing msg/sec. + * + * + */ +public class JmsBenchmark extends JmsTestSupport { + private static final transient Logger LOG = LoggerFactory.getLogger(JmsBenchmark.class); + + private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 5)); + private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "10")); + private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 60)); + private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "10")); + private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "10")); + + public ActiveMQDestination destination; + + public static Test suite() { + return suite(JmsBenchmark.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(JmsBenchmark.class); + } + + public void initCombos() { + addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")}); + } + + @Override + protected BrokerService createBroker() throws Exception { + return BrokerFactory.createBroker(new URI("broker://(tcp://localhost:0)?persistent=false")); + } + + @Override + protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException { + return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getServer().getConnectURI()); + } + + /** + * @throws Throwable + */ + public void testConcurrentSendReceive() throws Throwable { + + final Semaphore connectionsEstablished = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT)); + final Semaphore workerDone = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT)); + final CountDownLatch sampleTimeDone = new CountDownLatch(1); + + final AtomicInteger producedMessages = new AtomicInteger(0); + final AtomicInteger receivedMessages = new AtomicInteger(0); + + final Callable<Object> producer = new Callable<Object>() { + @Override + public Object call() throws JMSException, InterruptedException { + Connection connection = factory.createConnection(); + connections.add(connection); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[1024]); + connection.start(); + connectionsEstablished.release(); + + while (!sampleTimeDone.await(0, TimeUnit.MILLISECONDS)) { + producer.send(message); + producedMessages.incrementAndGet(); + } + + connection.close(); + workerDone.release(); + return null; + } + }; + + final Callable<Object> consumer = new Callable<Object>() { + @Override + public Object call() throws JMSException, InterruptedException { + Connection connection = factory.createConnection(); + connections.add(connection); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message msg) { + receivedMessages.incrementAndGet(); + } + }); + connection.start(); + + connectionsEstablished.release(); + sampleTimeDone.await(); + + connection.close(); + workerDone.release(); + return null; + } + }; + + final Throwable workerError[] = new Throwable[1]; + for (int i = 0; i < PRODUCER_COUNT; i++) { + new Thread("Producer:" + i) { + @Override + public void run() { + try { + producer.call(); + } catch (Throwable e) { + e.printStackTrace(); + workerError[0] = e; + } + } + }.start(); + } + + for (int i = 0; i < CONSUMER_COUNT; i++) { + new Thread("Consumer:" + i) { + @Override + public void run() { + try { + consumer.call(); + } catch (Throwable e) { + e.printStackTrace(); + workerError[0] = e; + } + } + }.start(); + } + + LOG.info(getName() + ": Waiting for Producers and Consumers to startup."); + connectionsEstablished.acquire(); + LOG.info("Producers and Consumers are now running. Waiting for system to reach steady state: " + (SAMPLE_DELAY / 1000.0f) + " seconds"); + Thread.sleep(1000 * 10); + + LOG.info("Starting sample: " + SAMPLES + " each lasting " + (SAMPLE_DURATION / 1000.0f) + " seconds"); + + for (int i = 0; i < SAMPLES; i++) { + + long start = System.currentTimeMillis(); + producedMessages.set(0); + receivedMessages.set(0); + + Thread.sleep(SAMPLE_DURATION); + + long end = System.currentTimeMillis(); + int r = receivedMessages.get(); + int p = producedMessages.get(); + + LOG.info("published: " + p + " msgs at " + (p * 1000f / (end - start)) + " msgs/sec, " + "consumed: " + r + " msgs at " + (r * 1000f / (end - start)) + " msgs/sec"); + } + + LOG.info("Sample done."); + sampleTimeDone.countDown(); + + workerDone.acquire(); + if (workerError[0] != null) { + throw workerError[0]; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java new file mode 100644 index 0000000..c122807 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +/** + * + */ +public class JmsClientAckListenerTest extends TestSupport implements MessageListener { + + private Connection connection; + private boolean dontAck; + + protected void setUp() throws Exception { + super.setUp(); + connection = createConnection(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + /** + * Tests if acknowleged messages are being consumed. + * + * @throws javax.jms.JMSException + */ + public void testAckedMessageAreConsumed() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue("test"); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(this); + + Thread.sleep(10000); + + // Reset the session. + session.close(); + + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNull(msg); + + session.close(); + } + + /** + * Tests if unacknowleged messages are being redelivered when the consumer + * connects again. + * + * @throws javax.jms.JMSException + */ + public void testUnAckedMessageAreNotConsumedOnSessionClose() throws Exception { + connection.start(); + // don't aknowledge message on onMessage() call + dontAck = true; + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue("test"); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(this); + // Don't ack the message. + + // Reset the session. This should cause the Unacked message to be + // redelivered. + session.close(); + + Thread.sleep(10000); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + Message msg = consumer.receive(2000); + assertNotNull(msg); + msg.acknowledge(); + + session.close(); + } + + public void onMessage(Message message) { + + assertNotNull(message); + if (!dontAck) { + try { + message.acknowledge(); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckTest.java new file mode 100644 index 0000000..ef33f9a --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsClientAckTest.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +/** + * + */ +public class JmsClientAckTest extends TestSupport { + + private Connection connection; + + protected void setUp() throws Exception { + super.setUp(); + connection = createConnection(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + /** + * Tests if acknowledged messages are being consumed. + * + * @throws JMSException + */ + public void testAckedMessageAreConsumed() throws JMSException { + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + + // Reset the session. + session.close(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(1000); + assertNull(msg); + + session.close(); + } + + /** + * Tests if acknowledged messages are being consumed. + * + * @throws JMSException + */ + public void testLastMessageAcked() throws JMSException { + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + producer.send(session.createTextMessage("Hello2")); + producer.send(session.createTextMessage("Hello3")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + msg = consumer.receive(1000); + assertNotNull(msg); + msg = consumer.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + + // Reset the session. + session.close(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(1000); + assertNull(msg); + + session.close(); + } + + /** + * Tests if unacknowledged messages are being re-delivered when the consumer connects again. + * + * @throws JMSException + */ + public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException { + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + // Don't ack the message. + + // Reset the session. This should cause the unacknowledged message to be re-delivered. + session.close(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(2000); + assertNotNull(msg); + msg.acknowledge(); + + session.close(); + } + + protected String getQueueName() { + return getClass().getName() + "." + getName(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java new file mode 100644 index 0000000..c972b1e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.Random; +import java.util.Vector; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +/** + * + */ +public class JmsConnectionStartStopTest extends TestSupport { + + private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory + .getLog(JmsConnectionStartStopTest.class); + + private Connection startedConnection; + private Connection stoppedConnection; + + /** + * @see junit.framework.TestCase#setUp() + */ + @Override + protected void setUp() throws Exception { + + LOG.info(getClass().getClassLoader().getResource("log4j.properties")); + + ActiveMQConnectionFactory factory = createConnectionFactory(); + startedConnection = factory.createConnection(); + startedConnection.start(); + stoppedConnection = factory.createConnection(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + @Override + protected void tearDown() throws Exception { + stoppedConnection.close(); + startedConnection.close(); + } + + /** + * Tests if the consumer receives the messages that were sent before the + * connection was started. + * + * @throws JMSException + */ + public void testStoppedConsumerHoldsMessagesTillStarted() throws JMSException { + Session startedSession = startedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session stoppedSession = stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Setup the consumers. + Topic topic = startedSession.createTopic("test"); + MessageConsumer startedConsumer = startedSession.createConsumer(topic); + MessageConsumer stoppedConsumer = stoppedSession.createConsumer(topic); + + // Send the message. + MessageProducer producer = startedSession.createProducer(topic); + TextMessage message = startedSession.createTextMessage("Hello"); + producer.send(message); + + // Test the assertions. + Message m = startedConsumer.receive(1000); + assertNotNull(m); + + m = stoppedConsumer.receive(1000); + assertNull(m); + + stoppedConnection.start(); + m = stoppedConsumer.receive(5000); + assertNotNull(m); + + startedSession.close(); + stoppedSession.close(); + } + + /** + * Tests if the consumer is able to receive messages eveb when the + * connecction restarts multiple times. + * + * @throws Exception + */ + public void testMultipleConnectionStops() throws Exception { + testStoppedConsumerHoldsMessagesTillStarted(); + stoppedConnection.stop(); + testStoppedConsumerHoldsMessagesTillStarted(); + stoppedConnection.stop(); + testStoppedConsumerHoldsMessagesTillStarted(); + } + + + public void testConcurrentSessionCreateWithStart() throws Exception { + ThreadPoolExecutor executor = new ThreadPoolExecutor(50, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>()); + final Vector<Throwable> exceptions = new Vector<Throwable>(); + final Random rand = new Random(); + Runnable createSessionTask = new Runnable() { + @Override + public void run() { + try { + TimeUnit.MILLISECONDS.sleep(rand.nextInt(10)); + stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } catch (Exception e) { + exceptions.add(e); + } + } + }; + + Runnable startStopTask = new Runnable() { + @Override + public void run() { + try { + TimeUnit.MILLISECONDS.sleep(rand.nextInt(10)); + stoppedConnection.start(); + stoppedConnection.stop(); + } catch (Exception e) { + exceptions.add(e); + } + } + }; + + for (int i=0; i<1000; i++) { + executor.execute(createSessionTask); + executor.execute(startStopTask); + } + + executor.shutdown(); + assertTrue("executor terminated", executor.awaitTermination(30, TimeUnit.SECONDS)); + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java new file mode 100644 index 0000000..1a1a958 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + + +public class JmsConsumerResetActiveListenerTest extends TestCase { + + private Connection connection; + private ActiveMQConnectionFactory factory; + + protected void setUp() throws Exception { + factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + connection = factory.createConnection(); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + } + + /** + * verify the (undefined by spec) behaviour of setting a listener while receiving a message. + * + * @throws Exception + */ + public void testSetListenerFromListener() throws Exception { + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination dest = session.createQueue("Queue-" + getName()); + final MessageConsumer consumer = session.createConsumer(dest); + + final CountDownLatch latch = new CountDownLatch(2); + final AtomicBoolean first = new AtomicBoolean(true); + final Vector<Object> results = new Vector<Object>(); + consumer.setMessageListener(new MessageListener() { + + public void onMessage(Message message) { + if (first.compareAndSet(true, false)) { + try { + consumer.setMessageListener(this); + results.add(message); + } catch (JMSException e) { + results.add(e); + } + } else { + results.add(message); + } + latch.countDown(); + } + }); + + connection.start(); + + MessageProducer producer = session.createProducer(dest); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(session.createTextMessage("First")); + producer.send(session.createTextMessage("Second")); + + assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS)); + + assertEquals("we have a result", 2, results.size()); + Object result = results.get(0); + assertTrue(result instanceof TextMessage); + assertEquals("result is first", "First", ((TextMessage)result).getText()); + result = results.get(1); + assertTrue(result instanceof TextMessage); + assertEquals("result is first", "Second", ((TextMessage)result).getText()); + } + + /** + * and a listener on a new consumer, just in case. + * + * @throws Exception + */ + public void testNewConsumerSetListenerFromListener() throws Exception { + final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final Destination dest = session.createQueue("Queue-" + getName()); + final MessageConsumer consumer = session.createConsumer(dest); + + final CountDownLatch latch = new CountDownLatch(2); + final AtomicBoolean first = new AtomicBoolean(true); + final Vector<Object> results = new Vector<Object>(); + consumer.setMessageListener(new MessageListener() { + + public void onMessage(Message message) { + if (first.compareAndSet(true, false)) { + try { + MessageConsumer anotherConsumer = session.createConsumer(dest); + anotherConsumer.setMessageListener(this); + results.add(message); + } catch (JMSException e) { + results.add(e); + } + } else { + results.add(message); + } + latch.countDown(); + } + }); + + connection.start(); + + MessageProducer producer = session.createProducer(dest); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(session.createTextMessage("First")); + producer.send(session.createTextMessage("Second")); + + assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS)); + + assertEquals("we have a result", 2, results.size()); + Object result = results.get(0); + assertTrue(result instanceof TextMessage); + assertEquals("result is first", "First", ((TextMessage)result).getText()); + result = results.get(1); + assertTrue(result instanceof TextMessage); + assertEquals("result is first", "Second", ((TextMessage)result).getText()); + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java new file mode 100644 index 0000000..7a219e2 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; + +/** + * + */ +public class JmsCreateConsumerInOnMessageTest extends TestSupport implements MessageListener { + + private Connection connection; + private Session publisherSession; + private Session consumerSession; + private MessageConsumer consumer; + private MessageConsumer testConsumer; + private MessageProducer producer; + private Topic topic; + private Object lock = new Object(); + + /* + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + super.setUp(); + super.topic = true; + connection = createConnection(); + connection.setClientID("connection:" + getSubject()); + publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = (Topic)super.createDestination("Test.Topic"); + consumer = consumerSession.createConsumer(topic); + consumer.setMessageListener(this); + producer = publisherSession.createProducer(topic); + connection.start(); + } + + /* + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + super.tearDown(); + connection.close(); + } + + /** + * Tests if a consumer can be created asynchronusly + * + * @throws Exception + */ + public void testCreateConsumer() throws Exception { + Message msg = super.createMessage(); + producer.send(msg); + if (testConsumer == null) { + synchronized (lock) { + lock.wait(3000); + } + } + assertTrue(testConsumer != null); + } + + /** + * Use the asynchronous subscription mechanism + * + * @param message + */ + public void onMessage(Message message) { + try { + testConsumer = consumerSession.createConsumer(topic); + consumerSession.createProducer(topic); + synchronized (lock) { + lock.notify(); + } + } catch (Exception ex) { + ex.printStackTrace(); + assertTrue(false); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java new file mode 100644 index 0000000..72dd8bc --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableQueueWildcardSendReceiveTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.DeliveryMode; + +import org.apache.activemq.test.JmsTopicSendReceiveTest; + +/** + * + */ +public class JmsDurableQueueWildcardSendReceiveTest extends JmsTopicSendReceiveTest { + + /** + * Set up the test with a queue and persistent delivery mode. + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + topic = false; + deliveryMode = DeliveryMode.PERSISTENT; + super.setUp(); + } + + /** + * Returns the consumer subject. + */ + protected String getConsumerSubject() { + return "FOO.>"; + } + + /** + * Returns the producer subject. + */ + protected String getProducerSubject() { + return "FOO.BAR.HUMBUG"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java new file mode 100644 index 0000000..cc212ab --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +/** + * + */ +public class JmsDurableTopicSelectorTest extends JmsTopicSelectorTest { + public void setUp() throws Exception { + durable = true; + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java new file mode 100644 index 0000000..fa47ea9 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.test.JmsTopicSendReceiveTest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JmsDurableTopicSendReceiveTest extends JmsTopicSendReceiveTest { + private static final Logger LOG = LoggerFactory.getLogger(JmsDurableTopicSendReceiveTest.class); + + protected Connection connection2; + protected Session session2; + protected Session consumeSession2; + protected MessageConsumer consumer2; + protected MessageProducer producer2; + protected Destination consumerDestination2; + protected Destination producerDestination2; + + /** + * Set up a durable suscriber test. + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + this.durable = true; + super.setUp(); + } + + /** + * Test if all the messages sent are being received. + * + * @throws Exception + */ + public void testSendWhileClosed() throws Exception { + connection2 = createConnection(); + connection2.setClientID("test"); + connection2.start(); + session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer2 = session2.createProducer(null); + producer2.setDeliveryMode(deliveryMode); + producerDestination2 = session2.createTopic(getProducerSubject() + "2"); + Thread.sleep(1000); + + consumeSession2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumerDestination2 = session2.createTopic(getConsumerSubject() + "2"); + consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName()); + Thread.sleep(1000); + consumer2.close(); + TextMessage message = session2.createTextMessage("test"); + message.setStringProperty("test", "test"); + message.setJMSType("test"); + producer2.send(producerDestination2, message); + LOG.info("Creating durable consumer"); + consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName()); + Message msg = consumer2.receive(1000); + assertNotNull(msg); + assertEquals(((TextMessage)msg).getText(), "test"); + assertEquals(msg.getJMSType(), "test"); + assertEquals(msg.getStringProperty("test"), "test"); + connection2.stop(); + connection2.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java new file mode 100644 index 0000000..c01fb7f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.DeliveryMode; + +import org.apache.activemq.test.JmsResourceProvider; + +/** + * + */ +public class JmsDurableTopicTransactionTest extends JmsTopicTransactionTest { + + /** + * @see JmsTransactionTestSupport#getJmsResourceProvider() + */ + protected JmsResourceProvider getJmsResourceProvider() { + JmsResourceProvider provider = new JmsResourceProvider(); + provider.setTopic(true); + provider.setDeliveryMode(DeliveryMode.PERSISTENT); + provider.setClientID(getClass().getName()); + provider.setDurableName(getName()); + return provider; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicWildcardSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicWildcardSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicWildcardSendReceiveTest.java new file mode 100644 index 0000000..3058a57 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsDurableTopicWildcardSendReceiveTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.DeliveryMode; + +import org.apache.activemq.test.JmsTopicSendReceiveTest; + +/** + * + */ +public class JmsDurableTopicWildcardSendReceiveTest extends JmsTopicSendReceiveTest { + + /** + * Sets up a test with a topic destination, durable suscriber and persistent + * delivery mode. + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + topic = true; + durable = true; + deliveryMode = DeliveryMode.PERSISTENT; + super.setUp(); + } + + /** + * Returns the consumer subject. + */ + protected String getConsumerSubject() { + return "FOO.>"; + } + + /** + * Returns the producer subject. + */ + protected String getProducerSubject() { + return "FOO.BAR.HUMBUG"; + } +}
