http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java index a360eb8..216b0ec 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java @@ -30,6 +30,8 @@ import org.junit.After; */ public class AmqpTestSupport extends ActiveMQTestBase { + protected static final int AMQP_PORT = 5672; + protected LinkedList<AmqpConnection> connections = new LinkedList<>(); protected boolean useSSL; @@ -65,7 +67,7 @@ public class AmqpTestSupport extends ActiveMQTestBase { boolean webSocket = false; try { - int port = 61616; + int port = AMQP_PORT; String uri = null;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java index 3b231fa..493079a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.activemq.artemis.tests.integration.amqp; import java.io.IOException; @@ -33,6 +32,7 @@ import javax.jms.Session; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -45,6 +45,7 @@ import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Sender; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -98,7 +99,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { sender.setStateInspector(new AmqpValidator() { @Override - public void inspectDeliveryUpdate(Delivery delivery) { + public void inspectDeliveryUpdate(Sender sender, Delivery delivery) { if (delivery.remotelySettled()) { DeliveryState state = delivery.getRemoteState(); if (state instanceof TransactionalState) { @@ -161,7 +162,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { session.commit(); - assertEquals(1, queue.getMessageCount()); + assertTrue("Message was not queued", Wait.waitFor(() -> queue.getMessageCount() == 1)); sender.close(); connection.close(); @@ -205,7 +206,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { message.setText("Test-Message"); sender.send(message); - assertEquals(1, queue.getMessageCount()); + assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1)); AmqpReceiver receiver = session.createReceiver(getQueueName()); @@ -237,7 +238,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { message.setText("Test-Message"); sender.send(message); - assertEquals(1, queue.getMessageCount()); + assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1)); AmqpReceiver receiver = session.createReceiver(getQueueName()); @@ -281,7 +282,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { message.setText("Test-Message"); sender.send(message); - assertEquals(1, queue.getMessageCount()); + assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1)); AmqpReceiver receiver = session.createReceiver(getQueueName()); @@ -853,10 +854,10 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { @Test(timeout = 120000) public void testSendPersistentTX() throws Exception { - int MESSAGE_COUNT = 100000; + int MESSAGE_COUNT = 2000; AtomicInteger errors = new AtomicInteger(0); server.createQueue(SimpleString.toSimpleString("q1"), RoutingType.ANYCAST, SimpleString.toSimpleString("q1"), null, true, false, 1, false, true); - ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616"); + ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + AMQP_PORT); Connection sendConnection = factory.createConnection(); Connection consumerConnection = factory.createConnection(); try { @@ -939,7 +940,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { receiver.setStateInspector(new AmqpValidator() { @Override - public void inspectDeliveryUpdate(Delivery delivery) { + public void inspectDeliveryUpdate(Sender sender, Delivery delivery) { if (delivery.remotelySettled()) { LOG.info("Receiver got delivery update for: {}", delivery); if (!(delivery.getRemoteState() instanceof TransactionalState)) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java index a0f0393..e10c73d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,8 +16,13 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; -import org.apache.activemq.artemis.api.core.SimpleString; +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY; +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY; + +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -29,12 +34,6 @@ import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.qpid.proton.amqp.messaging.Source; import org.junit.Test; -import java.util.concurrent.TimeUnit; - -import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY; -import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY; - - public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { SimpleString address = new SimpleString("testAddress"); @@ -46,7 +45,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); server.createQueue(address, RoutingType.ANYCAST, address, null, true, false); - sendMessages(1, address.toString()); + sendMessages(address.toString(), 1); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); @@ -68,7 +67,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); server.createQueue(address, RoutingType.ANYCAST, address, null, true, false); - sendMessages(2, address.toString()); + sendMessages(address.toString(), 2); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); @@ -89,7 +88,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); - sendMessages(1, address.toString()); + sendMessages(address.toString(), 1); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); @@ -111,7 +110,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); server.createQueue(address, RoutingType.ANYCAST, queue2, null, true, false); - sendMessages(1, address.toString()); + sendMessages(address.toString(), 1); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); @@ -132,7 +131,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); - sendMessages(1, address.toString()); + sendMessages(address.toString(), 1); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); @@ -152,7 +151,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { public void testConsumeWhenOnlyMulticast() throws Exception { server.addAddressInfo(new AddressInfo(address, RoutingType.MULTICAST)); - sendMessages(1, address.toString()); + sendMessages(address.toString(), 1); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); @@ -195,7 +194,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); AmqpReceiver receiver = session.createReceiver(address.toString()); - sendMessages(1, address.toString()); + sendMessages(address.toString(), 1); receiver.flow(1); AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); @@ -223,7 +222,6 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { connection.close(); } - protected Source createJmsSource(boolean topic) { Source source = new Source(); @@ -236,5 +234,4 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { return source; } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java index b413ad8..bdf6258 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,8 +16,13 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; -import org.apache.activemq.artemis.api.core.SimpleString; +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY; +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY; + +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -28,12 +33,6 @@ import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.qpid.proton.amqp.messaging.Source; import org.junit.Test; -import java.util.concurrent.TimeUnit; - -import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY; -import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY; - - public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport { SimpleString address = new SimpleString("testAddress"); @@ -45,7 +44,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport { server.addAddressInfo(new AddressInfo(address, RoutingType.MULTICAST)); server.createQueue(address, RoutingType.MULTICAST, address, null, true, false); - sendMessages(1, address.toString()); + sendMessages(address.toString(), 1); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); @@ -65,7 +64,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport { public void testConsumeWhenOnlyAnycast() throws Exception { server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); - sendMessages(1, address.toString()); + sendMessages(address.toString(), 1); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); @@ -102,7 +101,6 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport { connection.close(); } - protected Source createJmsSource(boolean topic) { Source source = new Source(); @@ -115,5 +113,4 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport { return source; } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java index 377cf86..3e504d7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -25,8 +27,6 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.junit.Test; -import java.util.concurrent.TimeUnit; - public class ClientDefinedAnycastConsumerTest extends AmqpClientTestSupport { SimpleString address = new SimpleString("testAddress"); @@ -39,7 +39,7 @@ public class ClientDefinedAnycastConsumerTest extends AmqpClientTestSupport { AmqpSession session = connection.createSession(); AmqpReceiver receiver = session.createReceiver(address.toString()); - sendMessages(1, address.toString()); + sendMessages(address.toString(), 1); receiver.flow(1); AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); @@ -48,5 +48,4 @@ public class ClientDefinedAnycastConsumerTest extends AmqpClientTestSupport { receiver.close(); connection.close(); } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java index 84bdb86..51c70ee 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,8 +16,12 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; -import org.apache.activemq.artemis.api.core.SimpleString; +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY; + +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.tests.util.Wait; @@ -30,10 +34,6 @@ import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.junit.Test; -import java.util.concurrent.TimeUnit; - -import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY; - public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { SimpleString address = new SimpleString("testAddress"); @@ -52,7 +52,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); receiver.flow(1); receiver2.flow(1); - sendMessages(2, address.toString()); + sendMessages(address.toString(), 2); AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); @@ -86,7 +86,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|1"); receiver.flow(1); receiver2.flow(1); - sendMessages(2, address.toString()); + sendMessages(address.toString(), 2); AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); @@ -114,7 +114,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); receiver.flow(1); receiver2.flow(1); - sendMessages(2, address.toString()); + sendMessages(address.toString(), 2); AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); @@ -145,7 +145,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); receiver.flow(1); receiver2.flow(1); - sendMessages(2, address.toString()); + sendMessages(address.toString(), 2); AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); @@ -178,7 +178,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); receiver.flow(1); receiver2.flow(1); - sendMessages(2, address.toString()); + sendMessages(address.toString(), 2); AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); @@ -206,7 +206,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); receiver.flow(1); receiver2.flow(1); - sendMessages(2, address.toString()); + sendMessages(address.toString(), 2); AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); @@ -244,7 +244,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); receiver.flow(1); receiver2.flow(1); - sendMessages(2, address.toString()); + sendMessages(address.toString(), 2); AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); @@ -282,7 +282,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); receiver.flow(1); receiver2.flow(1); - sendMessages(2, address.toString()); + sendMessages(address.toString(), 2); AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); @@ -313,7 +313,10 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { fail("Exception expected"); } catch (Exception e) { //expected + } finally { + receiver.close(); } + connection.close(); } @@ -331,7 +334,10 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { fail("Exception expected"); } catch (Exception e) { //expected + } finally { + receiver.close(); } + connection.close(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java new file mode 100644 index 0000000..7de05aa --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.net.URI; +import java.util.LinkedList; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; + +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.After; +import org.junit.Before; + +public abstract class JMSClientTestSupport extends AmqpClientTestSupport { + + protected LinkedList<Connection> jmsConnections = new LinkedList<>(); + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + // Bug in Qpid JMS not shutting down a connection thread on certain errors + // TODO - Reevaluate after Qpid JMS 0.23.0 is released. + disableCheckThread(); + } + + @After + @Override + public void tearDown() throws Exception { + for (Connection connection : jmsConnections) { + try { + connection.close(); + } catch (Throwable ignored) { + ignored.printStackTrace(); + } + } + jmsConnections.clear(); + + super.tearDown(); + } + + protected Connection trackJMSConnection(Connection connection) { + jmsConnections.add(connection); + + return connection; + } + + protected String getJmsConnectionURIOptions() { + return ""; + } + + protected URI getBrokerQpidJMSConnectionURI() { + boolean webSocket = false; + + try { + int port = AMQP_PORT; + + String uri = null; + + if (isUseSSL()) { + if (webSocket) { + uri = "amqpwss://127.0.0.1:" + port; + } else { + uri = "amqps://127.0.0.1:" + port; + } + } else { + if (webSocket) { + uri = "amqpws://127.0.0.1:" + port; + } else { + uri = "amqp://127.0.0.1:" + port; + } + } + + if (!getJmsConnectionURIOptions().isEmpty()) { + uri = uri + "?" + getJmsConnectionURIOptions(); + } + + return new URI(uri); + } catch (Exception e) { + throw new RuntimeException(); + } + } + + protected Connection createConnection() throws JMSException { + return createConnection(getBrokerQpidJMSConnectionURI(), null, null, null, true); + } + + protected Connection createConnection(boolean start) throws JMSException { + return createConnection(getBrokerQpidJMSConnectionURI(), null, null, null, start); + } + + protected Connection createConnection(String clientId) throws JMSException { + return createConnection(getBrokerQpidJMSConnectionURI(), null, null, clientId, true); + } + + protected Connection createConnection(String clientId, boolean start) throws JMSException { + return createConnection(getBrokerQpidJMSConnectionURI(), null, null, clientId, start); + } + + protected Connection createConnection(String username, String password) throws JMSException { + return createConnection(getBrokerQpidJMSConnectionURI(), username, password, null, true); + } + + protected Connection createConnection(String username, String password, String clientId) throws JMSException { + return createConnection(getBrokerQpidJMSConnectionURI(), username, password, clientId, true); + } + + protected Connection createConnection(String username, String password, String clientId, boolean start) throws JMSException { + return createConnection(getBrokerQpidJMSConnectionURI(), username, password, clientId, start); + } + + private Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean start) throws JMSException { + JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI); + + Connection connection = trackJMSConnection(factory.createConnection(username, password)); + + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + + if (clientId != null && !clientId.isEmpty()) { + connection.setClientID(clientId); + } + + if (start) { + connection.start(); + } + + return connection; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionTest.java new file mode 100644 index 0000000..e261468 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.InvalidClientIDException; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Test; + +public class JMSConnectionTest extends JMSClientTestSupport { + + @Test(timeout = 60000) + public void testConnection() throws Exception { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + session.createConsumer(queue); + + Queue queueView = getProxyToQueue(getQueueName()); + + assertTrue("Connection not counted", Wait.waitFor(() -> server.getConnectionCount() == 1)); + assertTrue("Consumer not counted", Wait.waitFor(() -> queueView.getConsumerCount() == 1)); + + assertEquals(1, queueView.getConsumerCount()); + + connection.close(); + + assertTrue("Consumer not closed", Wait.waitFor(() -> queueView.getConsumerCount() == 0)); + assertTrue("Connection not released", Wait.waitFor(() -> server.getConnectionCount() == 0)); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testClientIDsAreExclusive() throws Exception { + Connection testConn1 = createConnection(false); + Connection testConn2 = createConnection(false); + + try { + testConn1.setClientID("client-id1"); + try { + testConn1.setClientID("client-id2"); + fail("didn't get expected exception"); + } catch (javax.jms.IllegalStateException e) { + // expected + } + + try { + testConn2.setClientID("client-id1"); + fail("didn't get expected exception"); + } catch (InvalidClientIDException e) { + // expected + } + } finally { + testConn1.close(); + testConn2.close(); + } + + try { + testConn1 = createConnection(false); + testConn2 = createConnection(false); + testConn1.setClientID("client-id1"); + testConn2.setClientID("client-id2"); + } finally { + testConn1.close(); + testConn2.close(); + } + } + + @Test(timeout = 60000) + public void testParallelConnections() throws Exception { + final int numThreads = 40; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + + try { + Connection connection = createConnection(fullUser, fullPass); + connection.start(); + connection.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + } + + executorService.shutdown(); + assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionWithSecurityTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionWithSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionWithSecurityTest.java new file mode 100644 index 0000000..bfd31ac --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSConnectionWithSecurityTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.JMSSecurityException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.junit.Test; + +public class JMSConnectionWithSecurityTest extends JMSClientTestSupport { + + @Override + protected boolean isSecurityEnabled() { + return true; + } + + @Test(timeout = 10000) + public void testNoUserOrPassword() throws Exception { + try { + Connection connection = createConnection("", "", null, false); + connection.start(); + fail("Expected JMSException"); + } catch (JMSSecurityException ex) { + IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with no user / password."); + } + } + + @Test(timeout = 10000) + public void testUnknownUser() throws Exception { + try { + Connection connection = createConnection("nosuchuser", "blah", null, false); + connection.start(); + fail("Expected JMSException"); + } catch (JMSSecurityException ex) { + IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with unknown user ID"); + } + } + + @Test(timeout = 10000) + public void testKnownUserWrongPassword() throws Exception { + try { + Connection connection = createConnection(fullUser, "wrongPassword", null, false); + connection.start(); + fail("Expected JMSException"); + } catch (JMSSecurityException ex) { + IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with incorrect password."); + } + } + + @Test(timeout = 30000) + public void testRepeatedWrongPasswordAttempts() throws Exception { + for (int i = 0; i < 25; ++i) { + Connection connection = null; + try { + connection = createConnection(fullUser, "wrongPassword", null, false); + connection.start(); + fail("Expected JMSException"); + } catch (JMSSecurityException ex) { + IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with incorrect password."); + } finally { + if (connection != null) { + connection.close(); + } + } + } + } + + @Test(timeout = 30000) + public void testSendReceive() throws Exception { + Connection connection = createConnection(fullUser, fullPass); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + MessageProducer p = session.createProducer(queue); + TextMessage message = null; + message = session.createTextMessage(); + String messageText = "hello sent at " + new java.util.Date().toString(); + message.setText(messageText); + p.send(message); + + // Get the message we just sent + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + Message msg = consumer.receive(5000); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + TextMessage textMessage = (TextMessage) msg; + assertEquals(messageText, textMessage.getText()); + } finally { + connection.close(); + } + } + + @Test(timeout = 30000) + public void testCreateTemporaryQueueNotAuthorized() throws JMSException { + Connection connection = createConnection(guestUser, guestPass); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try { + session.createTemporaryQueue(); + } catch (JMSSecurityException jmsse) { + } catch (JMSException jmse) { + IntegrationTestLogger.LOGGER.info("Client should have thrown a JMSSecurityException but only threw JMSException"); + } + + // Should not be fatal + assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)); + } finally { + connection.close(); + } + } + + @Test(timeout = 30000) + public void testCreateTemporaryTopicNotAuthorized() throws JMSException { + Connection connection = createConnection(guestUser, guestPass); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try { + session.createTemporaryTopic(); + } catch (JMSSecurityException jmsse) { + } catch (JMSException jmse) { + IntegrationTestLogger.LOGGER.info("Client should have thrown a JMSSecurityException but only threw JMSException"); + } + + // Should not be fatal + assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)); + } finally { + connection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java new file mode 100644 index 0000000..26097f6 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Test; + +public class JMSDurableConsumerTest extends JMSClientTestSupport { + + @Test(timeout = 30000) + public void testDurableConsumerAsync() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference<Message> received = new AtomicReference<>(); + String durableClientId = getTopicName() + "-ClientId"; + + Connection connection = createConnection(durableClientId); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getTopicName()); + MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic"); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + received.set(message); + latch.countDown(); + } + }); + + MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection.start(); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + producer.send(message); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertNotNull("Should have received a message by now.", received.get()); + assertTrue("Should be an instance of TextMessage", received.get() instanceof TextMessage); + } finally { + connection.close(); + } + } + + @Test(timeout = 30000) + public void testDurableConsumerSync() throws Exception { + String durableClientId = getTopicName() + "-ClientId"; + + Connection connection = createConnection(durableClientId); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getTopicName()); + final MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic"); + MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection.start(); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + producer.send(message); + + final AtomicReference<Message> msg = new AtomicReference<>(); + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisfied() throws Exception { + msg.set(consumer.receiveNoWait()); + return msg.get() != null; + } + }, TimeUnit.SECONDS.toMillis(25), TimeUnit.MILLISECONDS.toMillis(200))); + + assertNotNull("Should have received a message by now.", msg.get()); + assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage); + } finally { + connection.close(); + } + } + + @Test(timeout = 30000) + public void testDurableConsumerUnsubscribe() throws Exception { + String durableClientId = getTopicName() + "-ClientId"; + + Connection connection = createConnection(durableClientId); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getTopicName()); + MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic"); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisfied() throws Exception { + return server.getTotalConsumerCount() == 1; + } + }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250))); + + consumer.close(); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisfied() throws Exception { + return server.getTotalConsumerCount() == 0; + } + }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250))); + + session.unsubscribe("DurbaleTopic"); + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisfied() throws Exception { + return server.getTotalConsumerCount() == 0; + } + }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250))); + } finally { + connection.close(); + } + } + + @Test(timeout = 30000) + public void testDurableConsumerUnsubscribeWhileNoSubscription() throws Exception { + Connection connection = createConnection(); + + try { + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisfied() throws Exception { + return server.getTotalConsumerCount() == 0; + } + }, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250))); + + try { + session.unsubscribe("DurbaleTopic"); + fail("Should have thrown as subscription is in use."); + } catch (JMSException ex) { + } + } finally { + connection.close(); + } + } + + @Test(timeout = 30000) + public void testDurableConsumerUnsubscribeWhileActive() throws Exception { + String durableClientId = getTopicName() + "-ClientId"; + + Connection connection = createConnection(durableClientId); + try { + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getTopicName()); + MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic"); + + assertNotNull(consumer); + assertNull(consumer.receive(10)); + + try { + session.unsubscribe("DurbaleTopic"); + fail("Should have thrown as subscription is in use."); + } catch (JMSException ex) { + } + } finally { + connection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java new file mode 100644 index 0000000..c5372ac --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JMSMessageConsumerTest extends JMSClientTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(JMSMessageConsumerTest.class); + + @Test(timeout = 60000) + public void testSelector() throws Exception { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + + TextMessage message = session.createTextMessage(); + message.setText("msg:0"); + producer.send(message); + message = session.createTextMessage(); + message.setText("msg:1"); + message.setStringProperty("color", "RED"); + producer.send(message); + + connection.start(); + + MessageConsumer messageConsumer = session.createConsumer(queue, "color = 'RED'"); + TextMessage m = (TextMessage) messageConsumer.receive(5000); + assertNotNull(m); + assertEquals("msg:1", m.getText()); + assertEquals(m.getStringProperty("color"), "RED"); + } finally { + connection.close(); + } + } + + @SuppressWarnings("rawtypes") + @Test(timeout = 30000) + public void testSelectorsWithJMSType() throws Exception { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + MessageProducer p = session.createProducer(queue); + + TextMessage message = session.createTextMessage(); + message.setText("text"); + p.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + TextMessage message2 = session.createTextMessage(); + String type = "myJMSType"; + message2.setJMSType(type); + message2.setText("text + type"); + p.send(message2, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + QueueBrowser browser = session.createBrowser(queue); + Enumeration enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message m = (Message) enumeration.nextElement(); + assertTrue(m instanceof TextMessage); + count++; + } + + assertEquals(2, count); + + MessageConsumer consumer = session.createConsumer(queue, "JMSType = '" + type + "'"); + Message msg = consumer.receive(2000); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("Unexpected JMSType value", type, msg.getJMSType()); + assertEquals("Unexpected message content", "text + type", ((TextMessage) msg).getText()); + } finally { + connection.close(); + } + } + + @SuppressWarnings("rawtypes") + @Test(timeout = 30000) + public void testSelectorsWithJMSPriority() throws Exception { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + MessageProducer p = session.createProducer(queue); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + p.send(message, DeliveryMode.PERSISTENT, 5, 0); + + message = session.createTextMessage(); + message.setText("hello + 9"); + p.send(message, DeliveryMode.PERSISTENT, 9, 0); + + QueueBrowser browser = session.createBrowser(queue); + Enumeration enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message m = (Message) enumeration.nextElement(); + assertTrue(m instanceof TextMessage); + count++; + } + + assertEquals(2, count); + + MessageConsumer consumer = session.createConsumer(queue, "JMSPriority > 8"); + Message msg = consumer.receive(2000); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("hello + 9", ((TextMessage) msg).getText()); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testJMSSelectorFiltersJMSMessageID() throws Exception { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + + // Send one to receive + TextMessage message = session.createTextMessage(); + producer.send(message); + + // Send another to filter + producer.send(session.createTextMessage()); + + connection.start(); + + // First one should make it through + MessageConsumer messageConsumer = session.createConsumer(queue, "JMSMessageID = '" + message.getJMSMessageID() + "'"); + TextMessage m = (TextMessage) messageConsumer.receive(5000); + assertNotNull(m); + assertEquals(message.getJMSMessageID(), m.getJMSMessageID()); + + // The second one should not be received. + assertNull(messageConsumer.receive(1000)); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testZeroPrefetchWithTwoConsumers() throws Exception { + JmsConnection connection = (JmsConnection) createConnection(); + ((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setAll(0); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Msg1")); + producer.send(session.createTextMessage("Msg2")); + + // now lets receive it + MessageConsumer consumer1 = session.createConsumer(queue); + MessageConsumer consumer2 = session.createConsumer(queue); + TextMessage answer = (TextMessage) consumer1.receive(5000); + assertNotNull(answer); + assertEquals("Should have received a message!", answer.getText(), "Msg1"); + answer = (TextMessage) consumer2.receive(5000); + assertNotNull(answer); + assertEquals("Should have received a message!", answer.getText(), "Msg2"); + + answer = (TextMessage) consumer2.receiveNoWait(); + assertNull("Should have not received a message!", answer); + } + + @Test(timeout = 30000) + public void testProduceAndConsumeLargeNumbersOfTopicMessagesClientAck() throws Exception { + doTestProduceAndConsumeLargeNumbersOfMessages(true, Session.CLIENT_ACKNOWLEDGE); + } + + @Test(timeout = 30000) + public void testProduceAndConsumeLargeNumbersOfQueueMessagesClientAck() throws Exception { + doTestProduceAndConsumeLargeNumbersOfMessages(false, Session.CLIENT_ACKNOWLEDGE); + } + + @Test(timeout = 30000) + public void testProduceAndConsumeLargeNumbersOfTopicMessagesAutoAck() throws Exception { + doTestProduceAndConsumeLargeNumbersOfMessages(true, Session.AUTO_ACKNOWLEDGE); + } + + @Test(timeout = 30000) + public void testProduceAndConsumeLargeNumbersOfQueueMessagesAutoAck() throws Exception { + doTestProduceAndConsumeLargeNumbersOfMessages(false, Session.AUTO_ACKNOWLEDGE); + } + + public void doTestProduceAndConsumeLargeNumbersOfMessages(boolean topic, int ackMode) throws Exception { + + final int MSG_COUNT = 1000; + final CountDownLatch done = new CountDownLatch(MSG_COUNT); + + JmsConnection connection = (JmsConnection) createConnection(); + connection.setForceAsyncSend(true); + connection.start(); + + Session session = connection.createSession(false, ackMode); + final Destination destination; + if (topic) { + destination = session.createTopic(getTopicName()); + } else { + destination = session.createQueue(getQueueName()); + } + + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + try { + message.acknowledge(); + done.countDown(); + } catch (JMSException ex) { + LOG.info("Caught exception.", ex); + } + } + }); + + MessageProducer producer = session.createProducer(destination); + + TextMessage textMessage = session.createTextMessage(); + textMessage.setText("messageText"); + + for (int i = 0; i < MSG_COUNT; i++) { + producer.send(textMessage); + } + + assertTrue("Did not receive all messages: " + MSG_COUNT, done.await(15, TimeUnit.SECONDS)); + } + + @Test(timeout = 60000) + public void testPrefetchedMessagesAreNotConsumedOnConsumerClose() throws Exception { + final int NUM_MESSAGES = 10; + + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + + byte[] bytes = new byte[2048]; + new Random().nextBytes(bytes); + for (int i = 0; i < NUM_MESSAGES; i++) { + TextMessage message = session.createTextMessage(); + message.setText("msg:" + i); + producer.send(message); + } + + connection.close(); + + Queue queueView = getProxyToQueue(getQueueName()); + assertTrue("Not all messages were enqueud", Wait.waitFor(() -> queueView.getMessageCount() == NUM_MESSAGES)); + + // Create a consumer and prefetch the messages + connection = createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + + Thread.sleep(100); + + consumer.close(); + connection.close(); + + assertTrue("Not all messages were enqueud", Wait.waitFor(() -> queueView.getMessageCount() == NUM_MESSAGES)); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testMessagesReceivedInParallel() throws Throwable { + final int numMessages = 50000; + long time = System.currentTimeMillis(); + + final ArrayList<Throwable> exceptions = new ArrayList<>(); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + Connection connectionConsumer = null; + try { + connectionConsumer = createConnection(); + connectionConsumer.start(); + Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE); + final javax.jms.Queue queue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + + long n = 0; + int count = numMessages; + while (count > 0) { + try { + if (++n % 1000 == 0) { + System.out.println("received " + n + " messages"); + } + + Message m = consumer.receive(5000); + Assert.assertNotNull("Could not receive message count=" + count + " on consumer", m); + count--; + } catch (JMSException e) { + e.printStackTrace(); + break; + } + } + } catch (Throwable e) { + exceptions.add(e); + e.printStackTrace(); + } finally { + try { + connectionConsumer.close(); + } catch (Throwable ignored) { + // NO OP + } + } + } + }); + + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + + t.start(); + + MessageProducer p = session.createProducer(queue); + p.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + for (int i = 0; i < numMessages; i++) { + BytesMessage message = session.createBytesMessage(); + message.writeUTF("Hello world!!!!" + i); + message.setIntProperty("count", i); + p.send(message); + } + + // Wait for the consumer thread to completely read the Queue + t.join(); + + if (!exceptions.isEmpty()) { + throw exceptions.get(0); + } + + Queue queueView = getProxyToQueue(getQueueName()); + + connection.close(); + assertTrue("Not all messages consumed", Wait.waitFor(() -> queueView.getMessageCount() == 0)); + + long taken = (System.currentTimeMillis() - time); + System.out.println("Microbenchamrk ran in " + taken + " milliseconds, sending/receiving " + numMessages); + + double messagesPerSecond = ((double) numMessages / (double) taken) * 1000; + + System.out.println(((int) messagesPerSecond) + " messages per second"); + } + + @Test(timeout = 60000) + public void testClientAckMessages() throws Exception { + final int numMessages = 10; + + Connection connection = createConnection(); + + try { + long time = System.currentTimeMillis(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + + byte[] bytes = new byte[2048]; + new Random().nextBytes(bytes); + for (int i = 0; i < numMessages; i++) { + TextMessage message = session.createTextMessage(); + message.setText("msg:" + i); + producer.send(message); + } + connection.close(); + Queue queueView = getProxyToQueue(getQueueName()); + + assertTrue("Not all messages enqueued", Wait.waitFor(() -> queueView.getMessageCount() == numMessages)); + + // Now create a new connection and receive and acknowledge + connection = createConnection(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + + for (int i = 0; i < numMessages; i++) { + Message msg = consumer.receive(5000); + if (msg == null) { + System.out.println("ProtonTest.testManyMessages"); + } + Assert.assertNotNull("" + i, msg); + Assert.assertTrue("" + msg, msg instanceof TextMessage); + String text = ((TextMessage) msg).getText(); + // System.out.println("text = " + text); + Assert.assertEquals(text, "msg:" + i); + msg.acknowledge(); + } + + consumer.close(); + connection.close(); + + // Wait for Acks to be processed and message removed from queue. + Thread.sleep(500); + + assertTrue("Not all messages consumed", Wait.waitFor(() -> queueView.getMessageCount() == 0)); + long taken = (System.currentTimeMillis() - time) / 1000; + System.out.println("taken = " + taken); + } finally { + connection.close(); + } + } + + @Test(timeout = 240000) + public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable { + String name = "exampleQueue1"; + + final int numMessages = 40; + + Connection connection = createConnection(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(name); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < numMessages; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message temporary"); + producer.send(message); + } + producer.close(); + session.close(); + + for (int i = 0; i < numMessages; i++) { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = session.createQueue(name); + MessageConsumer c = session.createConsumer(queue); + c.receive(1000); + producer.close(); + session.close(); + } + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = session.createQueue(name); + MessageConsumer c = session.createConsumer(queue); + for (int i = 0; i < numMessages; i++) { + c.receive(1000); + } + producer.close(); + session.close(); + } finally { + connection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java new file mode 100644 index 0000000..628c814 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JMSMessageGroupsTest extends JMSClientTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(JMSMessageGroupsTest.class); + + private static final int ITERATIONS = 10; + private static final int MESSAGE_COUNT = 10; + private static final int MESSAGE_SIZE = 10 * 1024; + private static final int RECEIVE_TIMEOUT = 3000; + private static final String JMSX_GROUP_ID = "JmsGroupsTest"; + + @Test(timeout = 60000) + public void testGroupSeqIsNeverLost() throws Exception { + AtomicInteger sequenceCounter = new AtomicInteger(); + + for (int i = 0; i < ITERATIONS; ++i) { + Connection connection = createConnection(); + try { + sendMessagesToBroker(connection, MESSAGE_COUNT, sequenceCounter); + readMessagesOnBroker(connection, MESSAGE_COUNT); + } finally { + connection.close(); + } + } + } + + protected void readMessagesOnBroker(Connection connection, int count) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageConsumer consumer = session.createConsumer(queue); + + for (int i = 0; i < MESSAGE_COUNT; ++i) { + Message message = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull(message); + LOG.debug("Read message #{}: type = {}", i, message.getClass().getSimpleName()); + String gid = message.getStringProperty("JMSXGroupID"); + String seq = message.getStringProperty("JMSXGroupSeq"); + LOG.debug("Message assigned JMSXGroupID := {}", gid); + LOG.debug("Message assigned JMSXGroupSeq := {}", seq); + } + + session.close(); + } + + protected void sendMessagesToBroker(Connection connection, int count, AtomicInteger sequence) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + + byte[] buffer = new byte[MESSAGE_SIZE]; + for (count = 0; count < MESSAGE_SIZE; count++) { + String s = String.valueOf(count % 10); + Character c = s.charAt(0); + int value = c.charValue(); + buffer[count] = (byte) value; + } + + LOG.debug("Sending {} messages to destination: {}", MESSAGE_COUNT, queue); + for (int i = 1; i <= MESSAGE_COUNT; i++) { + BytesMessage message = session.createBytesMessage(); + message.setJMSDeliveryMode(DeliveryMode.PERSISTENT); + message.setStringProperty("JMSXGroupID", JMSX_GROUP_ID); + message.setIntProperty("JMSXGroupSeq", sequence.incrementAndGet()); + message.writeBytes(buffer); + producer.send(message); + } + + session.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java new file mode 100644 index 0000000..2287238 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.Random; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TextMessage; + +import org.junit.Assert; +import org.junit.Test; + +public class JMSMessageProducerTest extends JMSClientTestSupport { + + @Test(timeout = 30000) + public void testAnonymousProducer() throws Exception { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue1 = session.createQueue(getQueueName(1)); + Queue queue2 = session.createQueue(getQueueName(2)); + MessageProducer p = session.createProducer(null); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + p.send(queue1, message); + p.send(queue2, message); + + { + MessageConsumer consumer = session.createConsumer(queue1); + Message msg = consumer.receive(2000); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + consumer.close(); + } + { + MessageConsumer consumer = session.createConsumer(queue2); + Message msg = consumer.receive(2000); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + consumer.close(); + } + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testAnonymousProducerAcrossManyDestinations() throws Exception { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer p = session.createProducer(null); + + for (int i = 1; i <= getPrecreatedQueueSize(); i++) { + javax.jms.Queue target = session.createQueue(getQueueName(i)); + TextMessage message = session.createTextMessage("message for " + target.getQueueName()); + p.send(target, message); + } + + connection.start(); + + MessageConsumer messageConsumer = session.createConsumer(session.createQueue(getQueueName())); + Message m = messageConsumer.receive(200); + Assert.assertNull(m); + + for (int i = 1; i <= getPrecreatedQueueSize(); i++) { + javax.jms.Queue target = session.createQueue(getQueueName(i)); + MessageConsumer consumer = session.createConsumer(target); + TextMessage tm = (TextMessage) consumer.receive(2000); + assertNotNull(tm); + assertEquals("message for " + target.getQueueName(), tm.getText()); + consumer.close(); + } + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendingBigMessage() throws Exception { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer sender = session.createProducer(queue); + + String body = createMessage(10240); + sender.send(session.createTextMessage(body)); + connection.start(); + + MessageConsumer consumer = session.createConsumer(queue); + TextMessage m = (TextMessage) consumer.receive(5000); + + assertEquals(body, m.getText()); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test(timeout = 60000) + public void testSendWithTimeToLiveExpiresToDLQ() throws Exception { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + + MessageProducer sender = session.createProducer(queue); + sender.setTimeToLive(1); + + Message message = session.createMessage(); + sender.send(message); + connection.start(); + + MessageConsumer consumer = session.createConsumer(session.createQueue(getDeadLetterAddress())); + Message m = consumer.receive(10000); + assertNotNull(m); + consumer.close(); + + consumer = session.createConsumer(queue); + m = consumer.receiveNoWait(); + assertNull(m); + consumer.close(); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test(timeout = 60000) + public void testReplyToUsingQueue() throws Throwable { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = session.createTemporaryQueue(); + MessageProducer p = session.createProducer(queue); + + TextMessage message = session.createTextMessage(); + message.setText("Message temporary"); + message.setJMSReplyTo(session.createQueue(getQueueName())); + p.send(message); + + MessageConsumer cons = session.createConsumer(queue); + connection.start(); + + message = (TextMessage) cons.receive(5000); + assertNotNull(message); + Destination jmsReplyTo = message.getJMSReplyTo(); + assertNotNull(jmsReplyTo); + assertNotNull(message); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testReplyToUsingTempQueue() throws Throwable { + Connection connection = createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = session.createTemporaryQueue(); + MessageProducer p = session.createProducer(queue); + + TextMessage message = session.createTextMessage(); + message.setText("Message temporary"); + message.setJMSReplyTo(session.createTemporaryQueue()); + p.send(message); + + MessageConsumer cons = session.createConsumer(queue); + connection.start(); + + message = (TextMessage) cons.receive(5000); + Destination jmsReplyTo = message.getJMSReplyTo(); + assertNotNull(jmsReplyTo); + assertNotNull(message); + } finally { + connection.close(); + } + } + + private static String createMessage(int messageSize) { + final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + Random rnd = new Random(); + StringBuilder sb = new StringBuilder(messageSize); + for (int j = 0; j < messageSize; j++) { + sb.append(AB.charAt(rnd.nextInt(AB.length()))); + } + String body = sb.toString(); + return body; + } +}
