http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java new file mode 100644 index 0000000..9c7488b --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageTypesTest.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.io.Serializable; +import java.util.ArrayList; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.AddressControl; +import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.Base64; +import org.apache.activemq.artemis.utils.ByteUtil; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test that various message types are handled as expected with an AMQP JMS client. + */ +public class JMSMessageTypesTest extends JMSClientTestSupport { + + final int NUM_MESSAGES = 10; + + @Test(timeout = 60000) + public void testAddressControlSendMessage() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + server.createQueue(address, RoutingType.ANYCAST, address, null, true, false); + + AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mBeanServer); + Assert.assertEquals(1, addressControl.getQueueNames().length); + addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, fullUser, fullPass); + + Wait.waitFor(() -> addressControl.getMessageCount() == 1); + + Assert.assertEquals(1, addressControl.getMessageCount()); + + Connection connection = createConnection("myClientId"); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(address.toString()); + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(500); + assertNotNull(message); + byte[] buffer = new byte[(int)((BytesMessage)message).getBodyLength()]; + ((BytesMessage)message).readBytes(buffer); + assertEquals("test", new String(buffer)); + session.close(); + connection.close(); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test(timeout = 60000) + public void testAddressControlSendMessageWithText() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + server.createQueue(address, RoutingType.ANYCAST, address, null, true, false); + + AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mBeanServer); + Assert.assertEquals(1, addressControl.getQueueNames().length); + addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.TEXT_TYPE, "test", false, fullUser, fullPass); + + Wait.waitFor(() -> addressControl.getMessageCount() == 1); + + Assert.assertEquals(1, addressControl.getMessageCount()); + + Connection connection = createConnection("myClientId"); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(address.toString()); + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(500); + assertNotNull(message); + String text = ((TextMessage) message).getText(); + assertEquals("test", text); + session.close(); + connection.close(); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test(timeout = 60000) + public void testBytesMessageSendReceive() throws Throwable { + long time = System.currentTimeMillis(); + + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + + byte[] bytes = new byte[0xf + 1]; + for (int i = 0; i <= 0xf; i++) { + bytes[i] = (byte) i; + } + + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < NUM_MESSAGES; i++) { + System.out.println("Sending " + i); + BytesMessage message = session.createBytesMessage(); + + message.writeBytes(bytes); + message.setIntProperty("count", i); + producer.send(message); + } + + Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + + for (int i = 0; i < NUM_MESSAGES; i++) { + BytesMessage m = (BytesMessage) consumer.receive(5000); + Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m); + + m.reset(); + + long size = m.getBodyLength(); + byte[] bytesReceived = new byte[(int) size]; + m.readBytes(bytesReceived); + + System.out.println("Received " + ByteUtil.bytesToHex(bytesReceived, 1) + " count - " + m.getIntProperty("count")); + + Assert.assertArrayEquals(bytes, bytesReceived); + } + + long taken = (System.currentTimeMillis() - time) / 1000; + System.out.println("taken = " + taken); + } + + @Test(timeout = 60000) + public void testMessageSendReceive() throws Throwable { + long time = System.currentTimeMillis(); + + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + + byte[] bytes = new byte[0xf + 1]; + for (int i = 0; i <= 0xf; i++) { + bytes[i] = (byte) i; + } + + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < NUM_MESSAGES; i++) { + System.out.println("Sending " + i); + Message message = session.createMessage(); + + message.setIntProperty("count", i); + producer.send(message); + } + + Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + + for (int i = 0; i < NUM_MESSAGES; i++) { + Message m = consumer.receive(5000); + Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m); + } + + long taken = (System.currentTimeMillis() - time) / 1000; + System.out.println("taken = " + taken); + } + + @Test(timeout = 60000) + public void testMapMessageSendReceive() throws Throwable { + long time = System.currentTimeMillis(); + + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < NUM_MESSAGES; i++) { + System.out.println("Sending " + i); + MapMessage message = session.createMapMessage(); + + message.setInt("i", i); + message.setIntProperty("count", i); + producer.send(message); + } + + Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + + for (int i = 0; i < NUM_MESSAGES; i++) { + MapMessage m = (MapMessage) consumer.receive(5000); + Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m); + + Assert.assertEquals(i, m.getInt("i")); + Assert.assertEquals(i, m.getIntProperty("count")); + } + + long taken = (System.currentTimeMillis() - time) / 1000; + System.out.println("taken = " + taken); + } + + @Test(timeout = 60000) + public void testTextMessageSendReceive() throws Throwable { + long time = System.currentTimeMillis(); + + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < NUM_MESSAGES; i++) { + System.out.println("Sending " + i); + TextMessage message = session.createTextMessage("text" + i); + message.setStringProperty("text", "text" + i); + producer.send(message); + } + + Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + + for (int i = 0; i < NUM_MESSAGES; i++) { + TextMessage m = (TextMessage) consumer.receive(5000); + Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m); + Assert.assertEquals("text" + i, m.getText()); + } + + long taken = (System.currentTimeMillis() - time) / 1000; + System.out.println("taken = " + taken); + } + + @Test(timeout = 60000) + public void testStreamMessageSendReceive() throws Throwable { + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < NUM_MESSAGES; i++) { + StreamMessage message = session.createStreamMessage(); + message.writeInt(i); + message.writeBoolean(true); + message.writeString("test"); + producer.send(message); + } + + Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + + for (int i = 0; i < NUM_MESSAGES; i++) { + StreamMessage m = (StreamMessage) consumer.receive(5000); + Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m); + + Assert.assertEquals(i, m.readInt()); + Assert.assertEquals(true, m.readBoolean()); + Assert.assertEquals("test", m.readString()); + } + } + + @Test(timeout = 60000) + public void testObjectMessageWithArrayListPayload() throws Throwable { + ArrayList<String> payload = new ArrayList<>(); + payload.add("aString"); + + Connection connection = createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + ObjectMessage objectMessage = session.createObjectMessage(payload); + producer.send(objectMessage); + session.close(); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer cons = session.createConsumer(queue); + connection.start(); + + objectMessage = (ObjectMessage) cons.receive(5000); + assertNotNull(objectMessage); + @SuppressWarnings("unchecked") + ArrayList<String> received = (ArrayList<String>) objectMessage.getObject(); + assertEquals(received.get(0), "aString"); + + connection.close(); + } + + @Test(timeout = 60000) + public void testObjectMessageUsingCustomType() throws Throwable { + long time = System.currentTimeMillis(); + + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < NUM_MESSAGES; i++) { + System.out.println("Sending " + i); + ObjectMessage message = session.createObjectMessage(new AnythingSerializable(i)); + producer.send(message); + } + + Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + + for (int i = 0; i < NUM_MESSAGES; i++) { + ObjectMessage msg = (ObjectMessage) consumer.receive(5000); + Assert.assertNotNull("Could not receive message count=" + i + " on consumer", msg); + + AnythingSerializable someSerialThing = (AnythingSerializable) msg.getObject(); + Assert.assertEquals(i, someSerialThing.getCount()); + } + + long taken = (System.currentTimeMillis() - time) / 1000; + System.out.println("taken = " + taken); + } + + public static class AnythingSerializable implements Serializable { + private static final long serialVersionUID = 5972085029690947807L; + + private int count; + + public AnythingSerializable(int count) { + this.count = count; + } + + public int getCount() { + return count; + } + } + + @Test(timeout = 60000) + public void testPropertiesArePreserved() throws Exception { + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + TextMessage message = session.createTextMessage(); + message.setText("msg:0"); + message.setBooleanProperty("true", true); + message.setBooleanProperty("false", false); + message.setStringProperty("foo", "bar"); + message.setDoubleProperty("double", 66.6); + message.setFloatProperty("float", 56.789f); + message.setIntProperty("int", 8); + message.setByteProperty("byte", (byte) 10); + + producer.send(message); + producer.send(message); + + connection.start(); + + MessageConsumer messageConsumer = session.createConsumer(queue); + TextMessage received = (TextMessage) messageConsumer.receive(5000); + Assert.assertNotNull(received); + Assert.assertEquals("msg:0", received.getText()); + Assert.assertEquals(received.getBooleanProperty("true"), true); + Assert.assertEquals(received.getBooleanProperty("false"), false); + Assert.assertEquals(received.getStringProperty("foo"), "bar"); + Assert.assertEquals(received.getDoubleProperty("double"), 66.6, 0.0001); + Assert.assertEquals(received.getFloatProperty("float"), 56.789f, 0.0001); + Assert.assertEquals(received.getIntProperty("int"), 8); + Assert.assertEquals(received.getByteProperty("byte"), (byte) 10); + + received = (TextMessage) messageConsumer.receive(5000); + Assert.assertNotNull(received); + + connection.close(); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSQueueBrowserTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSQueueBrowserTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSQueueBrowserTest.java new file mode 100644 index 0000000..45bec32 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSQueueBrowserTest.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.Enumeration; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for various QueueBrowser scenarios with an AMQP JMS client. + */ +public class JMSQueueBrowserTest extends JMSClientTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(JMSQueueBrowserTest.class); + + @Test(timeout = 60000) + public void testBrowseAllInQueueZeroPrefetch() throws Exception { + + final int MSG_COUNT = 5; + + JmsConnection connection = (JmsConnection) createConnection(); + ((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setAll(0); + + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + javax.jms.Queue queue = session.createQueue(getQueueName()); + sendMessages(name.getMethodName(), MSG_COUNT, false); + + Queue queueView = getProxyToQueue(getQueueName()); + assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT)); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + Enumeration<?> enumeration = browser.getEnumeration(); + int count = 0; + while (count < MSG_COUNT && enumeration.hasMoreElements()) { + Message msg = (Message) enumeration.nextElement(); + assertNotNull(msg); + LOG.debug("Recv: {}", msg); + count++; + } + + LOG.debug("Received all expected message, checking that hasMoreElements returns false"); + assertFalse(enumeration.hasMoreElements()); + assertEquals(5, count); + } + + @Test(timeout = 40000) + public void testCreateQueueBrowser() throws Exception { + Connection connection = createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + javax.jms.Queue queue = session.createQueue(getQueueName()); + session.createConsumer(queue).close(); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + + Queue queueView = getProxyToQueue(getQueueName()); + assertEquals(0, queueView.getMessageCount()); + } + + @Test(timeout = 40000) + public void testNoMessagesBrowserHasNoElements() throws Exception { + Connection connection = createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + javax.jms.Queue queue = session.createQueue(getQueueName()); + session.createConsumer(queue).close(); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + + Queue queueView = getProxyToQueue(getQueueName()); + assertEquals(0, queueView.getMessageCount()); + + Enumeration<?> enumeration = browser.getEnumeration(); + assertFalse(enumeration.hasMoreElements()); + } + + @Test(timeout = 30000) + public void testBroseOneInQueue() throws Exception { + Connection connection = createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("hello")); + producer.close(); + + QueueBrowser browser = session.createBrowser(queue); + Enumeration<?> enumeration = browser.getEnumeration(); + while (enumeration.hasMoreElements()) { + Message m = (Message) enumeration.nextElement(); + assertTrue(m instanceof TextMessage); + LOG.debug("Browsed message {} from Queue {}", m, queue); + } + + browser.close(); + + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(5000); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + } + + @Test(timeout = 60000) + public void testBrowseAllInQueue() throws Exception { + Connection connection = createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + javax.jms.Queue queue = session.createQueue(getQueueName()); + sendMessages(name.getMethodName(), 5, false); + + Queue queueView = getProxyToQueue(getQueueName()); + assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == 5)); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + Enumeration<?> enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message msg = (Message) enumeration.nextElement(); + assertNotNull(msg); + LOG.debug("Recv: {}", msg); + count++; + TimeUnit.MILLISECONDS.sleep(50); + } + assertFalse(enumeration.hasMoreElements()); + assertEquals(5, count); + } + + @Test(timeout = 60000) + public void testBrowseAllInQueuePrefetchOne() throws Exception { + Connection connection = createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + javax.jms.Queue queue = session.createQueue(getQueueName()); + sendMessages(name.getMethodName(), 5, false); + + Queue queueView = getProxyToQueue(getQueueName()); + assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == 5)); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + Enumeration<?> enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message msg = (Message) enumeration.nextElement(); + assertNotNull(msg); + LOG.debug("Recv: {}", msg); + count++; + } + assertFalse(enumeration.hasMoreElements()); + assertEquals(5, count); + } + + @Test(timeout = 40000) + public void testBrowseAllInQueueTxSession() throws Exception { + Connection connection = createConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + assertNotNull(session); + javax.jms.Queue queue = session.createQueue(getQueueName()); + sendMessages(name.getMethodName(), 5, false); + + Queue queueView = getProxyToQueue(getQueueName()); + assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == 5)); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + Enumeration<?> enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message msg = (Message) enumeration.nextElement(); + assertNotNull(msg); + LOG.debug("Recv: {}", msg); + count++; + } + assertFalse(enumeration.hasMoreElements()); + assertEquals(5, count); + } + + @Test(timeout = 40000) + public void testQueueBrowserInTxSessionLeavesOtherWorkUnaffected() throws Exception { + Connection connection = createConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + assertNotNull(session); + javax.jms.Queue queue = session.createQueue(getQueueName()); + sendMessages(name.getMethodName(), 5, false); + + Queue queueView = getProxyToQueue(getQueueName()); + assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == 5)); + + // Send some TX work but don't commit. + MessageProducer txProducer = session.createProducer(queue); + for (int i = 0; i < 5; ++i) { + txProducer.send(session.createMessage()); + } + + assertEquals(5, queueView.getMessageCount()); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + Enumeration<?> enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message msg = (Message) enumeration.nextElement(); + assertNotNull(msg); + LOG.debug("Recv: {}", msg); + count++; + } + + assertFalse(enumeration.hasMoreElements()); + assertEquals(5, count); + + browser.close(); + + // Now check that all browser work did not affect the session transaction. + assertEquals(5, queueView.getMessageCount()); + session.commit(); + assertEquals(10, queueView.getMessageCount()); + } + + @Test(timeout = 60000) + public void testBrowseAllInQueueSmallPrefetch() throws Exception { + Connection connection = createConnection(); + connection.start(); + + final int MSG_COUNT = 30; + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + javax.jms.Queue queue = session.createQueue(getQueueName()); + sendMessages(name.getMethodName(), MSG_COUNT, false); + + Queue queueView = getProxyToQueue(getQueueName()); + assertTrue("Messages did not arrive", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT)); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + Enumeration<?> enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message msg = (Message) enumeration.nextElement(); + assertNotNull(msg); + LOG.debug("Recv: {}", msg); + count++; + } + assertFalse(enumeration.hasMoreElements()); + assertEquals(MSG_COUNT, count); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java new file mode 100644 index 0000000..776d553 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTemporaryDestinationTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Test; + +public class JMSTemporaryDestinationTest extends JMSClientTestSupport { + + @Test(timeout = 60000) + public void testCreateTemporaryQueue() throws Throwable { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = session.createTemporaryQueue(); + System.out.println("queue:" + queue.getQueueName()); + MessageProducer producer = session.createProducer(queue); + + TextMessage message = session.createTextMessage(); + message.setText("Message temporary"); + producer.send(message); + + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + message = (TextMessage) consumer.receive(5000); + + assertNotNull(message); + } finally { + connection.close(); + } + } + + @Test(timeout = 30000) + public void testDeleteTemporaryQueue() throws Exception { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final javax.jms.Queue queue = session.createTemporaryQueue(); + assertNotNull(queue); + assertTrue(queue instanceof TemporaryQueue); + + Queue queueView = getProxyToQueue(queue.getQueueName()); + assertNotNull(queueView); + + TemporaryQueue tempQueue = (TemporaryQueue) queue; + tempQueue.delete(); + + assertTrue("Temp Queue should be deleted.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisfied() throws Exception { + return getProxyToQueue(queue.getQueueName()) == null; + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50))); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testCreateTemporaryTopic() throws Throwable { + Connection connection = createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryTopic topic = session.createTemporaryTopic(); + + System.out.println("topic:" + topic.getTopicName()); + MessageConsumer consumer = session.createConsumer(topic); + MessageProducer producer = session.createProducer(topic); + + TextMessage message = session.createTextMessage(); + message.setText("Message temporary"); + producer.send(message); + + connection.start(); + + message = (TextMessage) consumer.receive(5000); + + assertNotNull(message); + } + + @Test(timeout = 30000) + public void testDeleteTemporaryTopic() throws Exception { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final javax.jms.Topic topic = session.createTemporaryTopic(); + assertNotNull(topic); + assertTrue(topic instanceof TemporaryTopic); + + Queue queueView = getProxyToQueue(topic.getTopicName()); + assertNotNull(queueView); + + TemporaryTopic tempTopic = (TemporaryTopic) topic; + tempTopic.delete(); + + assertTrue("Temp Queue should be deleted.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisfied() throws Exception { + return getProxyToQueue(topic.getTopicName()) == null; + } + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50))); + } finally { + connection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTopicConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTopicConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTopicConsumerTest.java new file mode 100644 index 0000000..52bd247 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTopicConsumerTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Bindings; +import org.apache.activemq.artemis.core.remoting.CloseListener; +import org.junit.Assert; +import org.junit.Test; + +public class JMSTopicConsumerTest extends JMSClientTestSupport { + + @Test(timeout = 60000) + public void testSendAndReceiveOnTopic() throws Exception { + Connection connection = createConnection("myClientId"); + + try { + TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getTopicName()); + TopicSubscriber consumer = session.createSubscriber(topic); + TopicPublisher producer = session.createPublisher(topic); + + TextMessage message = session.createTextMessage("test-message"); + producer.send(message); + + producer.close(); + connection.start(); + + message = (TextMessage) consumer.receive(1000); + + assertNotNull(message); + assertNotNull(message.getText()); + assertEquals("test-message", message.getText()); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendWithMultipleReceiversOnTopic() throws Exception { + Connection connection = createConnection(); + + try { + TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getTopicName()); + TopicSubscriber consumer1 = session.createSubscriber(topic); + TopicSubscriber consumer2 = session.createSubscriber(topic); + TopicPublisher producer = session.createPublisher(topic); + + TextMessage message = session.createTextMessage("test-message"); + producer.send(message); + + producer.close(); + connection.start(); + + message = (TextMessage) consumer1.receive(1000); + + assertNotNull(message); + assertNotNull(message.getText()); + assertEquals("test-message", message.getText()); + + message = (TextMessage) consumer2.receive(1000); + + assertNotNull(message); + assertNotNull(message.getText()); + assertEquals("test-message", message.getText()); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testDurableSubscriptionUnsubscribe() throws Exception { + Connection connection = createConnection("myClientId"); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getTopicName()); + TopicSubscriber myDurSub = session.createDurableSubscriber(topic, "myDurSub"); + session.close(); + connection.close(); + + connection = createConnection("myClientId"); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + myDurSub = session.createDurableSubscriber(topic, "myDurSub"); + myDurSub.close(); + + Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub"))); + session.unsubscribe("myDurSub"); + Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub"))); + session.close(); + connection.close(); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testTemporarySubscriptionDeleted() throws Exception { + Connection connection = createConnection(); + + try { + TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getTopicName()); + TopicSubscriber myNonDurSub = session.createSubscriber(topic); + assertNotNull(myNonDurSub); + + Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString(getTopicName())); + Assert.assertEquals(2, bindingsForAddress.getBindings().size()); + session.close(); + + final CountDownLatch latch = new CountDownLatch(1); + server.getRemotingService().getConnections().iterator().next().addCloseListener(new CloseListener() { + @Override + public void connectionClosed() { + latch.countDown(); + } + }); + + connection.close(); + latch.await(5, TimeUnit.SECONDS); + bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString(getTopicName())); + Assert.assertEquals(1, bindingsForAddress.getBindings().size()); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testMultipleDurableConsumersSendAndReceive() throws Exception { + Connection connection = createConnection("myClientId"); + + try { + TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getTopicName()); + + int numMessages = 100; + TopicSubscriber sub1 = session.createDurableSubscriber(topic, "myPubId1"); + TopicSubscriber sub2 = session.createDurableSubscriber(topic, "myPubId2"); + TopicSubscriber sub3 = session.createDurableSubscriber(topic, "myPubId3"); + + Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(topic); + connection.start(); + for (int i = 0; i < numMessages; i++) { + producer.send(sendSession.createTextMessage("message:" + i)); + } + + for (int i = 0; i < numMessages; i++) { + TextMessage receive = (TextMessage) sub1.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + receive = (TextMessage) sub2.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + receive = (TextMessage) sub3.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + } + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testDurableSubscriptionReconnection() throws Exception { + Connection connection = createConnection("myClientId"); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getTopicName()); + + int numMessages = 100; + TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId"); + + Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(topic); + connection.start(); + + for (int i = 0; i < numMessages; i++) { + producer.send(sendSession.createTextMessage("message:" + i)); + } + + for (int i = 0; i < numMessages; i++) { + TextMessage receive = (TextMessage) sub.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + } + + connection.close(); + connection = createConnection("myClientId"); + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + sub = session.createDurableSubscriber(topic, "myPubId"); + + sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = sendSession.createProducer(topic); + connection.start(); + for (int i = 0; i < numMessages; i++) { + producer.send(sendSession.createTextMessage("message:" + i)); + } + for (int i = 0; i < numMessages; i++) { + TextMessage receive = (TextMessage) sub.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals(receive.getText(), "message:" + i); + } + } finally { + connection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTransactionTest.java new file mode 100644 index 0000000..c7f73c1 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSTransactionTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Test; + +public class JMSTransactionTest extends JMSClientTestSupport { + + @Test(timeout = 60000) + public void testProduceMessageAndCommit() throws Throwable { + Connection connection = createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + javax.jms.Queue queue = session.createQueue(getQueueName()); + + System.out.println("queue:" + queue.getQueueName()); + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message:" + i); + p.send(message); + } + + session.commit(); + session.close(); + + Queue queueView = getProxyToQueue(getQueueName()); + + assertTrue("Message didn't arrive on queue", Wait.waitFor(() -> queueView.getMessageCount() == 10)); + } + + @Test(timeout = 60000) + public void testProduceMessageAndRollback() throws Throwable { + Connection connection = createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + javax.jms.Queue queue = session.createQueue(getQueueName()); + + System.out.println("queue:" + queue.getQueueName()); + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message:" + i); + p.send(message); + } + + session.rollback(); + session.close(); + + Queue queueView = getProxyToQueue(getQueueName()); + assertTrue("Messages arrived on queue", Wait.waitFor(() -> queueView.getMessageCount() == 0)); + } + + @Test(timeout = 60000) + public void testProducedMessageAreRolledBackOnSessionClose() throws Exception { + int numMessages = 10; + + Connection connection = createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + javax.jms.Queue queue = session.createQueue(getQueueName()); + + MessageProducer p = session.createProducer(queue); + byte[] bytes = new byte[2048]; + new Random().nextBytes(bytes); + for (int i = 0; i < numMessages; i++) { + TextMessage message = session.createTextMessage(); + message.setText("msg:" + i); + p.send(message); + } + + session.close(); + + Queue queueView = getProxyToQueue(getQueueName()); + assertTrue("Messages arrived on queue", Wait.waitFor(() -> queueView.getMessageCount() == 0)); + } + + @Test(timeout = 60000) + public void testConsumeMessagesAndCommit() throws Throwable { + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + + System.out.println("queue:" + queue.getQueueName()); + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message:" + i); + p.send(message); + } + session.close(); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer cons = session.createConsumer(queue); + connection.start(); + + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) cons.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("Message:" + i, message.getText()); + } + session.commit(); + session.close(); + + Queue queueView = getProxyToQueue(getQueueName()); + assertTrue("Messages not consumed", Wait.waitFor(() -> queueView.getMessageCount() == 0)); + } + + @Test(timeout = 60000) + public void testConsumeMessagesAndRollback() throws Throwable { + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message:" + i); + p.send(message); + } + session.close(); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer cons = session.createConsumer(queue); + connection.start(); + + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) cons.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals("Message:" + i, message.getText()); + } + + session.rollback(); + + Queue queueView = getProxyToQueue(getQueueName()); + assertTrue("Messages consumed", Wait.waitFor(() -> queueView.getMessageCount() == 10)); + } + + @Test(timeout = 60000) + public void testRollbackSomeThenReceiveAndCommit() throws Exception { + final int MSG_COUNT = 5; + final int consumeBeforeRollback = 2; + + Connection connection = createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + javax.jms.Queue queue = session.createQueue(getQueueName()); + + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < MSG_COUNT; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message:" + i); + message.setIntProperty("MESSAGE_NUMBER", i + 1); + p.send(message); + } + + session.commit(); + + Queue queueView = getProxyToQueue(getQueueName()); + assertTrue("Messages not enqueued", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT)); + + MessageConsumer consumer = session.createConsumer(queue); + + for (int i = 1; i <= consumeBeforeRollback; i++) { + Message message = consumer.receive(1000); + assertNotNull(message); + assertEquals("Unexpected message number", i, message.getIntProperty("MESSAGE_NUMBER")); + } + + session.rollback(); + + assertEquals(MSG_COUNT, queueView.getMessageCount()); + + // Consume again..check we receive all the messages. + Set<Integer> messageNumbers = new HashSet<>(); + for (int i = 1; i <= MSG_COUNT; i++) { + messageNumbers.add(i); + } + + for (int i = 1; i <= MSG_COUNT; i++) { + Message message = consumer.receive(1000); + assertNotNull(message); + int msgNum = message.getIntProperty("MESSAGE_NUMBER"); + messageNumbers.remove(msgNum); + } + + session.commit(); + + assertTrue("Did not consume all expected messages, missing messages: " + messageNumbers, messageNumbers.isEmpty()); + assertEquals("Queue should have no messages left after commit", 0, queueView.getMessageCount()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonFullQualifiedNameTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonFullQualifiedNameTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonFullQualifiedNameTest.java deleted file mode 100644 index 22ba64d..0000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonFullQualifiedNameTest.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.amqp; - -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.api.core.client.ClientProducer; -import org.apache.activemq.artemis.api.core.client.ClientSession; -import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; -import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.postoffice.Binding; -import org.apache.activemq.artemis.core.postoffice.Bindings; -import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; -import org.apache.activemq.artemis.core.server.QueueQueryResult; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.utils.CompositeAddress; -import org.apache.qpid.jms.JmsConnectionFactory; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import javax.jms.Connection; -import javax.jms.InvalidDestinationException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.Topic; -import java.util.HashMap; -import java.util.Map; - -public class ProtonFullQualifiedNameTest extends ProtonTestBase { - - private static final String amqpConnectionUri = "amqp://localhost:5672"; - - private SimpleString anycastAddress = new SimpleString("address.anycast"); - private SimpleString multicastAddress = new SimpleString("address.multicast"); - - private SimpleString anycastQ1 = new SimpleString("q1"); - private SimpleString anycastQ2 = new SimpleString("q2"); - private SimpleString anycastQ3 = new SimpleString("q3"); - - JmsConnectionFactory factory = new JmsConnectionFactory(amqpConnectionUri); - private ServerLocator locator; - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - - Configuration serverConfig = server.getConfiguration(); - - Map<String, AddressSettings> settings = serverConfig.getAddressesSettings(); - assertNotNull(settings); - AddressSettings addressSetting = settings.get("#"); - if (addressSetting == null) { - addressSetting = new AddressSettings(); - settings.put("#", addressSetting); - } - addressSetting.setAutoCreateQueues(true); - locator = createNettyNonHALocator(); - } - - @Override - @After - public void tearDown() throws Exception { - super.tearDown(); - } - - @Override - protected void configureServer(Configuration serverConfig) { - serverConfig.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, new HashMap<String, Object>(), "netty", new HashMap<String, Object>())); - } - - @Test - //there isn't much use of FQQN for topics - //however we can test query functionality - public void testTopic() throws Exception { - - Connection connection = factory.createConnection(); - try { - connection.setClientID("FQQNconn"); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic(multicastAddress.toString()); - - MessageConsumer consumer1 = session.createConsumer(topic); - MessageConsumer consumer2 = session.createConsumer(topic); - MessageConsumer consumer3 = session.createConsumer(topic); - - MessageProducer producer = session.createProducer(topic); - - producer.send(session.createMessage()); - - //each consumer receives one - Message m = consumer1.receive(2000); - assertNotNull(m); - m = consumer2.receive(2000); - assertNotNull(m); - m = consumer3.receive(2000); - assertNotNull(m); - - Bindings bindings = server.getPostOffice().getBindingsForAddress(multicastAddress); - for (Binding b : bindings.getBindings()) { - System.out.println("checking binidng " + b.getUniqueName() + " " + ((LocalQueueBinding)b).getQueue().getDeliveringMessages()); - SimpleString qName = b.getUniqueName(); - //do FQQN query - QueueQueryResult result = server.queueQuery(CompositeAddress.toFullQN(multicastAddress, qName)); - assertTrue(result.isExists()); - assertEquals(result.getName(), CompositeAddress.toFullQN(multicastAddress, qName)); - //do qname query - result = server.queueQuery(qName); - assertTrue(result.isExists()); - assertEquals(result.getName(), qName); - } - } finally { - connection.close(); - } - } - - @Test - public void testQueue() throws Exception { - server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true); - server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ2, null, true, false, -1, false, true); - server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ3, null, true, false, -1, false, true); - - Connection connection = factory.createConnection(); - try { - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Queue q1 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ1).toString()); - Queue q2 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ2).toString()); - Queue q3 = session.createQueue(CompositeAddress.toFullQN(anycastAddress, anycastQ3).toString()); - - //send 3 messages to anycastAddress - ClientSessionFactory cf = createSessionFactory(locator); - ClientSession coreSession = cf.createSession(); - - //send 3 messages - ClientProducer coreProducer = coreSession.createProducer(anycastAddress); - sendMessages(coreSession, coreProducer, 3); - - MessageConsumer consumer1 = session.createConsumer(q1); - MessageConsumer consumer2 = session.createConsumer(q2); - MessageConsumer consumer3 = session.createConsumer(q3); - - //each consumer receives one - assertNotNull(consumer1.receive(2000)); - assertNotNull(consumer2.receive(2000)); - assertNotNull(consumer3.receive(2000)); - - connection.close(); - //queues are empty now - for (SimpleString q : new SimpleString[]{anycastQ1, anycastQ2, anycastQ3}) { - //FQQN query - QueueQueryResult query = server.queueQuery(CompositeAddress.toFullQN(anycastAddress, q)); - assertTrue(query.isExists()); - assertEquals(anycastAddress, query.getAddress()); - assertEquals(CompositeAddress.toFullQN(anycastAddress, q), query.getName()); - assertEquals(0, query.getMessageCount()); - //try query again using qName - query = server.queueQuery(q); - assertEquals(q, query.getName()); - } - } finally { - connection.close(); - } - } - - @Test - public void testQueueSpecial() throws Exception { - server.createQueue(anycastAddress, RoutingType.ANYCAST, anycastQ1, null, true, false, -1, false, true); - - Connection connection = factory.createConnection(); - try { - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - //::queue ok! - String specialName = CompositeAddress.toFullQN(new SimpleString(""), anycastQ1).toString(); - Queue q1 = session.createQueue(specialName); - - ClientSessionFactory cf = createSessionFactory(locator); - ClientSession coreSession = cf.createSession(); - - ClientProducer coreProducer = coreSession.createProducer(anycastAddress); - sendMessages(coreSession, coreProducer, 1); - - System.out.println("create consumer: " + q1); - MessageConsumer consumer1 = session.createConsumer(q1); - - assertNotNull(consumer1.receive(2000)); - - //queue:: - specialName = CompositeAddress.toFullQN(anycastQ1, new SimpleString("")).toString(); - q1 = session.createQueue(specialName); - try { - session.createConsumer(q1); - fail("should get exception"); - } catch (InvalidDestinationException e) { - //expected - } - - //:: - specialName = CompositeAddress.toFullQN(new SimpleString(""), new SimpleString("")).toString(); - q1 = session.createQueue(specialName); - try { - session.createConsumer(q1); - fail("should get exception"); - } catch (InvalidDestinationException e) { - //expected - } - - } finally { - connection.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java deleted file mode 100644 index 851ee2f..0000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonMaxFrameSizeTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.amqp; - -import java.net.URI; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.activemq.transport.amqp.client.AmqpClient; -import org.apache.activemq.transport.amqp.client.AmqpConnection; -import org.apache.activemq.transport.amqp.client.AmqpMessage; -import org.apache.activemq.transport.amqp.client.AmqpReceiver; -import org.apache.activemq.transport.amqp.client.AmqpSender; -import org.apache.activemq.transport.amqp.client.AmqpSession; -import org.apache.qpid.proton.amqp.messaging.Data; -import org.apache.qpid.proton.message.impl.MessageImpl; -import org.junit.Test; - -public class ProtonMaxFrameSizeTest extends ProtonTestBase { - - private static final int FRAME_SIZE = 512; - - @Override - protected void configureAmqp(Map<String, Object> params) { - params.put("maxFrameSize", FRAME_SIZE); - } - - @Test - public void testMultipleTransfers() throws Exception { - - String testQueueName = "ConnectionFrameSize"; - int nMsgs = 200; - - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - - AmqpConnection amqpConnection = client.createConnection(); - - try { - amqpConnection.connect(); - - AmqpSession session = amqpConnection.createSession(); - AmqpSender sender = session.createSender(testQueueName); - - final int payload = FRAME_SIZE * 16; - - for (int i = 0; i < nMsgs; ++i) { - AmqpMessage message = createAmqpMessage((byte) 'A', payload); - sender.send(message); - } - - int count = getMessageCount(server.getPostOffice(), testQueueName); - assertEquals(nMsgs, count); - - AmqpReceiver receiver = session.createReceiver(testQueueName); - receiver.flow(nMsgs); - - for (int i = 0; i < nMsgs; ++i) { - AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull("failed at " + i, message); - MessageImpl wrapped = (MessageImpl) message.getWrappedMessage(); - Data data = (Data) wrapped.getBody(); - System.out.println("received : message: " + data.getValue().getLength()); - assertEquals(payload, data.getValue().getLength()); - message.accept(); - } - - } finally { - amqpConnection.close(); - } - } - - private AmqpMessage createAmqpMessage(byte value, int payloadSize) { - AmqpMessage message = new AmqpMessage(); - byte[] payload = new byte[payloadSize]; - for (int i = 0; i < payload.length; i++) { - payload[i] = value; - } - message.setBytes(payload); - return message; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java deleted file mode 100644 index 42f30ac..0000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java +++ /dev/null @@ -1,271 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.amqp; - -import javax.jms.Connection; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import java.util.Map; - -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.postoffice.Bindings; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.qpid.jms.JmsConnectionFactory; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class ProtonPubSubTest extends ProtonTestBase { - - private final String prefix = "foo.bar."; - private final String pubAddress = "pubAddress"; - private final String prefixedPubAddress = prefix + "pubAddress"; - private final SimpleString ssPubAddress = new SimpleString(pubAddress); - private final SimpleString ssprefixedPubAddress = new SimpleString(prefixedPubAddress); - private Connection connection; - private JmsConnectionFactory factory; - - @Override - protected void configureAmqp(Map<String, Object> params) { - params.put("pubSubPrefix", prefix); - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - server.addAddressInfo(new AddressInfo(ssPubAddress, RoutingType.MULTICAST)); - server.addAddressInfo(new AddressInfo(ssprefixedPubAddress, RoutingType.MULTICAST)); - server.createQueue(ssPubAddress, RoutingType.MULTICAST, ssPubAddress, new SimpleString("foo=bar"), false, true); - server.createQueue(ssprefixedPubAddress, RoutingType.MULTICAST, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true); - factory = new JmsConnectionFactory("amqp://localhost:5672"); - factory.setClientID("myClientID"); - connection = factory.createConnection(); - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - - } - - @Override - @After - public void tearDown() throws Exception { - try { - Thread.sleep(250); - if (connection != null) { - connection.close(); - } - } finally { - super.tearDown(); - } - } - - @Test - public void testNonDurablePubSub() throws Exception { - int numMessages = 100; - Topic topic = createTopic(pubAddress); - TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer sub = session.createSubscriber(topic); - - Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = sendSession.createProducer(topic); - connection.start(); - for (int i = 0; i < numMessages; i++) { - producer.send(sendSession.createTextMessage("message:" + i)); - } - for (int i = 0; i < numMessages; i++) { - TextMessage receive = (TextMessage) sub.receive(5000); - Assert.assertNotNull(receive); - Assert.assertEquals(receive.getText(), "message:" + i); - } - } - - @Test - public void testNonDurablePubSubQueueDeleted() throws Exception { - int numMessages = 100; - Topic topic = createTopic(pubAddress); - TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer sub = session.createSubscriber(topic); - Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString(pubAddress)); - assertEquals(2, bindingsForAddress.getBindings().size()); - sub.close(); - Thread.sleep(1000); - assertEquals(1, bindingsForAddress.getBindings().size()); - } - - @Test - public void testNonDurableMultiplePubSub() throws Exception { - int numMessages = 100; - Topic topic = createTopic(pubAddress); - TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer sub = session.createSubscriber(topic); - MessageConsumer sub2 = session.createSubscriber(topic); - MessageConsumer sub3 = session.createSubscriber(topic); - - Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = sendSession.createProducer(topic); - connection.start(); - for (int i = 0; i < numMessages; i++) { - producer.send(sendSession.createTextMessage("message:" + i)); - } - for (int i = 0; i < numMessages; i++) { - TextMessage receive = (TextMessage) sub.receive(5000); - Assert.assertNotNull(receive); - Assert.assertEquals(receive.getText(), "message:" + i); - receive = (TextMessage) sub2.receive(5000); - Assert.assertNotNull(receive); - Assert.assertEquals(receive.getText(), "message:" + i); - receive = (TextMessage) sub3.receive(5000); - Assert.assertNotNull(receive); - Assert.assertEquals(receive.getText(), "message:" + i); - } - } - - @Test - public void testDurablePubSub() throws Exception { - int numMessages = 100; - Topic topic = createTopic(pubAddress); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId"); - - Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = sendSession.createProducer(topic); - connection.start(); - for (int i = 0; i < numMessages; i++) { - producer.send(sendSession.createTextMessage("message:" + i)); - } - for (int i = 0; i < numMessages; i++) { - TextMessage receive = (TextMessage) sub.receive(5000); - Assert.assertNotNull(receive); - Assert.assertEquals(receive.getText(), "message:" + i); - } - } - - @Test - public void testDurableMultiplePubSub() throws Exception { - int numMessages = 100; - Topic topic = createTopic(pubAddress); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId"); - TopicSubscriber sub2 = session.createDurableSubscriber(topic, "myPubId2"); - TopicSubscriber sub3 = session.createDurableSubscriber(topic, "myPubId3"); - - Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = sendSession.createProducer(topic); - connection.start(); - for (int i = 0; i < numMessages; i++) { - producer.send(sendSession.createTextMessage("message:" + i)); - } - for (int i = 0; i < numMessages; i++) { - TextMessage receive = (TextMessage) sub.receive(5000); - Assert.assertNotNull(receive); - Assert.assertEquals(receive.getText(), "message:" + i); - receive = (TextMessage) sub2.receive(5000); - Assert.assertNotNull(receive); - Assert.assertEquals(receive.getText(), "message:" + i); - receive = (TextMessage) sub3.receive(5000); - Assert.assertNotNull(receive); - Assert.assertEquals(receive.getText(), "message:" + i); - } - } - - @Test - public void testDurablePubSubReconnect() throws Exception { - int numMessages = 100; - Topic topic = createTopic(pubAddress); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId"); - - Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = sendSession.createProducer(topic); - connection.start(); - for (int i = 0; i < numMessages; i++) { - producer.send(sendSession.createTextMessage("message:" + i)); - } - for (int i = 0; i < numMessages; i++) { - TextMessage receive = (TextMessage) sub.receive(5000); - Assert.assertNotNull(receive); - Assert.assertEquals(receive.getText(), "message:" + i); - } - connection.close(); - connection = factory.createConnection(); - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - sub = session.createDurableSubscriber(topic, "myPubId"); - - sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = sendSession.createProducer(topic); - connection.start(); - for (int i = 0; i < numMessages; i++) { - producer.send(sendSession.createTextMessage("message:" + i)); - } - for (int i = 0; i < numMessages; i++) { - TextMessage receive = (TextMessage) sub.receive(5000); - Assert.assertNotNull(receive); - Assert.assertEquals(receive.getText(), "message:" + i); - } - } - - @Test - public void testDurablePubSubUnsubscribe() throws Exception { - int numMessages = 100; - Topic topic = createTopic(pubAddress); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - TopicSubscriber sub = session.createDurableSubscriber(topic, "myPubId"); - - Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = sendSession.createProducer(topic); - connection.start(); - for (int i = 0; i < numMessages; i++) { - producer.send(sendSession.createTextMessage("message:" + i)); - } - for (int i = 0; i < numMessages; i++) { - TextMessage receive = (TextMessage) sub.receive(5000); - Assert.assertNotNull(receive); - Assert.assertEquals(receive.getText(), "message:" + i); - } - sub.close(); - session.unsubscribe("myPubId"); - } - - private javax.jms.Topic createTopic(String address) throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - try { - return session.createTopic(address); - } finally { - session.close(); - } - } -}
