http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0260a304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java index 689c23c..748f10a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java @@ -36,101 +36,170 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport { public void testSendWithDeliveryTimeIsScheduled() throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + try { + AmqpSession session = connection.createSession(); - // Get the Queue View early to avoid racing the delivery. - final Queue queueView = getProxyToQueue(getTestName()); - assertNotNull(queueView); + AmqpSender sender = session.createSender(getQueueName()); - AmqpMessage message = new AmqpMessage(); - long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2); - message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); - message.setText("Test-Message"); - sender.send(message); - sender.close(); + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); - assertEquals(1, queueView.getScheduledCount()); + AmqpMessage message = new AmqpMessage(); + long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2); + message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); + message.setText("Test-Message"); + sender.send(message); + sender.close(); - // Now try and get the message - AmqpReceiver receiver = session.createReceiver(getTestName()); - receiver.flow(1); - AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); - assertNull(received); + assertEquals(1, queueView.getScheduledCount()); - connection.close(); + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getQueueName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNull(received); + } finally { + connection.close(); + } } @Test(timeout = 60000) public void testSendRecvWithDeliveryTime() throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + try { + AmqpSession session = connection.createSession(); - // Get the Queue View early to avoid racing the delivery. - final Queue queueView = getProxyToQueue(getTestName()); - assertNotNull(queueView); + AmqpSender sender = session.createSender(getQueueName()); - AmqpMessage message = new AmqpMessage(); - long deliveryTime = System.currentTimeMillis() + 6000; - message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); - message.setText("Test-Message"); - sender.send(message); - sender.close(); + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); - assertEquals(1, queueView.getScheduledCount()); + AmqpMessage message = new AmqpMessage(); + long deliveryTime = System.currentTimeMillis() + 6000; + message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); + message.setText("Test-Message"); + sender.send(message); + sender.close(); - AmqpReceiver receiver = session.createReceiver(getTestName()); - receiver.flow(1); + assertEquals(1, queueView.getScheduledCount()); - // Now try and get the message, should not due to being scheduled. - AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS); - assertNull(received); + AmqpReceiver receiver = session.createReceiver(getQueueName()); + receiver.flow(1); - // Now try and get the message, should get it now - received = receiver.receive(10, TimeUnit.SECONDS); - assertNotNull(received); - received.accept(); + // Now try and get the message, should not due to being scheduled. + AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS); + assertNull(received); - connection.close(); + // Now try and get the message, should get it now + received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + } finally { + connection.close(); + } } @Test public void testScheduleWithDelay() throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + try { + AmqpSession session = connection.createSession(); - // Get the Queue View early to avoid racing the delivery. - final Queue queueView = getProxyToQueue(getTestName()); - assertNotNull(queueView); + AmqpSender sender = session.createSender(getQueueName()); - AmqpMessage message = new AmqpMessage(); - long delay = 6000; - message.setMessageAnnotation("x-opt-delivery-delay", delay); - message.setText("Test-Message"); - sender.send(message); - sender.close(); + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); - assertEquals(1, queueView.getScheduledCount()); + AmqpMessage message = new AmqpMessage(); + long delay = 6000; + message.setMessageAnnotation("x-opt-delivery-delay", delay); + message.setText("Test-Message"); + sender.send(message); + sender.close(); - AmqpReceiver receiver = session.createReceiver(getTestName()); - receiver.flow(1); + assertEquals(1, queueView.getScheduledCount()); - // Now try and get the message, should not due to being scheduled. - AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS); - assertNull(received); + AmqpReceiver receiver = session.createReceiver(getQueueName()); + receiver.flow(1); - // Now try and get the message, should get it now - received = receiver.receive(10, TimeUnit.SECONDS); - assertNotNull(received); - received.accept(); + // Now try and get the message, should not due to being scheduled. + AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS); + assertNull(received); - connection.close(); + // Now try and get the message, should get it now + received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendWithDeliveryTimeHoldsMessage() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); + + AmqpMessage message = new AmqpMessage(); + long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5); + message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); + message.setText("Test-Message"); + sender.send(message); + + // Now try and get the message + receiver.flow(1); + + // Shouldn't get this since we delayed the message. + assertNull(receiver.receive(1, TimeUnit.SECONDS)); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + AmqpConnection connection = addConnection(client.connect()); + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); + + AmqpMessage message = new AmqpMessage(); + long deliveryTime = System.currentTimeMillis() + 2000; + message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); + message.setText("Test-Message"); + sender.send(message); + + // Now try and get the message + receiver.flow(1); + + AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time"); + assertNotNull(msgDeliveryTime); + assertEquals(deliveryTime, msgDeliveryTime.longValue()); + } finally { + connection.close(); + } } }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0260a304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java index e2b80f5..8e41d71 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -22,11 +22,11 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -57,7 +57,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport { HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository(); HashSet<Role> value = new HashSet<>(); value.add(new Role("none", false, true, true, true, true, true, true, true)); - securityRepository.addMatch(getTestName(), value); + securityRepository.addMatch(getQueueName(), value); serverManager = new JMSServerManagerImpl(server); Configuration serverConfig = server.getConfiguration(); @@ -135,7 +135,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport { connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setMessageId("msg" + 1); @@ -154,8 +154,8 @@ public class AmqpSecurityTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception { - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTestName()), RoutingType.ANYCAST)); - server.createQueue(new SimpleString(getTestName()), RoutingType.ANYCAST, new SimpleString(getTestName()), null, true, false); + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST)); + server.createQueue(new SimpleString(getQueueName()), RoutingType.ANYCAST, new SimpleString(getQueueName()), null, true, false); AmqpClient client = createAmqpClient(user1, password1); AmqpConnection connection = client.connect(); @@ -165,7 +165,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport { AmqpSender sender = session.createAnonymousSender(); AmqpMessage message = new AmqpMessage(); - message.setAddress(getTestName()); + message.setAddress(getQueueName()); message.setMessageId("msg" + 1); message.setText("Test-Message"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0260a304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index 54b361c..9cf256a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -69,9 +69,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queue = getProxyToQueue(getTestName()); + Queue queue = getProxyToQueue(getQueueName()); assertNotNull(queue); receiver.close(); @@ -84,9 +84,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - sendMessages(getTestName(), 10); + sendMessages(getQueueName(), 10); for (int i = 0; i < 10; i++) { receiver.flow(1); @@ -98,7 +98,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { receiver.close(); connection.close(); - Queue queue = getProxyToQueue(getTestName()); + Queue queue = getProxyToQueue(getQueueName()); assertNotNull(queue); assertEquals(0, queue.getMessageCount()); } @@ -130,7 +130,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - session.createReceiver(getTestName(), "JMSPriority > 8"); + session.createReceiver(getQueueName(), "JMSPriority > 8"); connection.getStateInspector().assertValid(); connection.close(); @@ -163,7 +163,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - session.createReceiver(getTestName(), null, true); + session.createReceiver(getQueueName(), null, true); connection.getStateInspector().assertValid(); connection.close(); @@ -177,7 +177,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpSession session = connection.createSession(); try { - session.createReceiver(getTestName(), "null = 'f''", true); + session.createReceiver(getQueueName(), "null = 'f''", true); fail("should throw exception"); } catch (Exception e) { assertTrue(e.getCause() instanceof JMSException); @@ -189,15 +189,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testQueueReceiverReadMessage() throws Exception { - sendMessages(getTestName(), 1); + sendMessages(getQueueName(), 1); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(1, queueView.getMessageCount()); receiver.flow(1); @@ -211,11 +211,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testQueueReceiverReadMessageWithDivert() throws Exception { - final String forwardingAddress = getTestName() + "Divert"; + final String forwardingAddress = getQueueName() + "Divert"; final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress); server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, simpleForwardingAddress, null, true, false); - server.getActiveMQServerControl().createDivert("name", "routingName", getTestName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString()); - sendMessages(getTestName(), 1); + server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString()); + sendMessages(getQueueName(), 1); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); @@ -313,15 +313,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testMessageDurableFalse() throws Exception { - sendMessages(getTestName(), 1, false); + sendMessages(getQueueName(), 1, false); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(1, queueView.getMessageCount()); receiver.flow(1); @@ -337,15 +337,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testMessageDurableTrue() throws Exception { - sendMessages(getTestName(), 1, true); + sendMessages(getQueueName(), 1, true); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(1, queueView.getMessageCount()); receiver.flow(1); @@ -362,22 +362,22 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception { int MSG_COUNT = 4; - sendMessages(getTestName(), MSG_COUNT); + sendMessages(getQueueName(), MSG_COUNT); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); - Queue queueView = getProxyToQueue(getTestName()); + Queue queueView = getProxyToQueue(getQueueName()); assertEquals(MSG_COUNT, queueView.getMessageCount()); receiver1.flow(2); assertNotNull(receiver1.receive(5, TimeUnit.SECONDS)); assertNotNull(receiver1.receive(5, TimeUnit.SECONDS)); - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); assertEquals(2, server.getTotalConsumerCount()); @@ -398,15 +398,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception { int MSG_COUNT = 4; - sendMessages(getTestName(), MSG_COUNT); + sendMessages(getQueueName(), MSG_COUNT); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); - final Queue queueView = getProxyToQueue(getTestName()); + final Queue queueView = getProxyToQueue(getQueueName()); assertEquals(MSG_COUNT, queueView.getMessageCount()); receiver1.flow(2); @@ -425,7 +425,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { } }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50))); - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); assertEquals(2, server.getTotalConsumerCount()); @@ -456,15 +456,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception { int MSG_COUNT = 20; - sendMessages(getTestName(), MSG_COUNT); + sendMessages(getQueueName(), MSG_COUNT); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); - final Queue queueView = getProxyToQueue(getTestName()); + final Queue queueView = getProxyToQueue(getQueueName()); assertEquals(MSG_COUNT, queueView.getMessageCount()); receiver1.flow(20); @@ -479,7 +479,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { receiver1.close(); - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); assertEquals(1, server.getTotalConsumerCount()); @@ -513,7 +513,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); @@ -525,7 +525,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { sender.close(); LOG.info("Attempting to read message with receiver"); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(2); AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); assertNotNull("Should have read message", received); @@ -544,7 +544,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < MSG_COUNT; i++) { AmqpMessage message = new AmqpMessage(); @@ -560,17 +560,17 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { sender.close(); - Queue queue = getProxyToQueue(getTestName()); + Queue queue = getProxyToQueue(getQueueName()); assertEquals(MSG_COUNT, queue.getMessageCount()); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); receiver1.flow(MSG_COUNT); AmqpMessage received = receiver1.receive(5, TimeUnit.SECONDS); assertNotNull("Should have got a message", received); assertEquals("msg0", received.getMessageId()); receiver1.close(); - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); receiver2.flow(200); for (int i = 0; i < MSG_COUNT; ++i) { received = receiver2.receive(5, TimeUnit.SECONDS); @@ -597,12 +597,12 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { message2.setGroupId("hijklm"); message2.setApplicationProperty("sn", 200); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); sender.send(message1); sender.send(message2); sender.close(); - AmqpReceiver receiver = session.createReceiver(getTestName(), "sn = 100"); + AmqpReceiver receiver = session.createReceiver(getQueueName(), "sn = 100"); receiver.flow(2); AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); assertNotNull("Should have read a message", received); @@ -624,7 +624,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < MSG_COUNT; i++) { AmqpMessage message = new AmqpMessage(); @@ -639,7 +639,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { sender.close(); LOG.info("Attempting to read first two messages with receiver #1"); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); receiver1.flow(2); AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS); AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS); @@ -651,7 +651,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { message2.accept(); LOG.info("Attempting to read next two messages with receiver #2"); - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); receiver2.flow(2); AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS); AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS); @@ -685,7 +685,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < MSG_COUNT; i++) { AmqpMessage message = new AmqpMessage(); @@ -699,10 +699,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { sender.close(); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); receiver1.flow(1); - AmqpReceiver receiver2 = session.createReceiver(getTestName()); + AmqpReceiver receiver2 = session.createReceiver(getQueueName()); receiver2.flow(1); AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS); @@ -759,7 +759,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - final String address = getTestName(); + final String address = getQueueName(); AmqpReceiver receiver = session.createReceiver(address); AmqpSender sender = session.createSender(address); @@ -793,7 +793,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { final CountDownLatch receiverReady = new CountDownLatch(1); ExecutorService executorService = Executors.newCachedThreadPool(); - final String address = getTestName(); + final String address = getQueueName(); executorService.submit(new Runnable() { @Override @@ -858,10 +858,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - AmqpReceiver receiver1 = session.createReceiver(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + AmqpReceiver receiver1 = session.createReceiver(getQueueName()); - Queue queue = getProxyToQueue(getTestName()); + Queue queue = getProxyToQueue(getQueueName()); // Create default message that should be sent as non-durable AmqpMessage message1 = new AmqpMessage(); @@ -904,7 +904,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - final String address = getTestName(); + final String address = getQueueName(); AmqpReceiver receiver = session.createReceiver(address); AmqpSender sender = session.createSender(address); @@ -957,7 +957,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - final String address = getTestName(); + final String address = getQueueName(); AmqpSender sender = session.createSender(address); AmqpReceiver receiver1 = session.createReceiver(address, null, false, true); @@ -1036,7 +1036,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender("queue://" + getTestName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY}); + AmqpSender sender = session.createSender("queue://" + getQueueName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY}); assertNotNull(sender); connection.getStateInspector().assertValid(); @@ -1047,7 +1047,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testMessageWithToFieldSetToSenderAddress() throws Exception { - doTestMessageWithToFieldSet(false, getTestName()); + doTestMessageWithToFieldSet(false, getQueueName()); } @Test(timeout = 60000) @@ -1067,7 +1067,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testMessageWithToFieldSetWithAnonymousSender() throws Exception { - doTestMessageWithToFieldSet(true, getTestName()); + doTestMessageWithToFieldSet(true, getQueueName()); } private void doTestMessageWithToFieldSet(boolean anonymous, String expected) throws Exception { @@ -1075,7 +1075,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - final String address = getTestName(); + final String address = getQueueName(); AmqpSender sender = session.createSender(anonymous ? null : address); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0260a304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java index 8f89452..a360eb8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.activemq.artemis.tests.integration.amqp; import java.net.URI; @@ -25,8 +24,12 @@ import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.junit.After; -/** This will only add methods to support AMQP Testing without creating servers or anything */ +/** + * Base test support class providing client support methods to aid in + * creating and configuration the AMQP test client. + */ public class AmqpTestSupport extends ActiveMQTestBase { + protected LinkedList<AmqpConnection> connections = new LinkedList<>(); protected boolean useSSL; @@ -121,7 +124,4 @@ public class AmqpTestSupport extends ActiveMQTestBase { public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception { return new AmqpClient(brokerURI, username, password); } - - - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0260a304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java index 3a9d498..3b231fa 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -94,7 +94,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpSession session = connection.createSession(); assertNotNull(session); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); sender.setStateInspector(new AmqpValidator() { @Override @@ -148,8 +148,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - final Queue queue = getProxyToQueue(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + final Queue queue = getProxyToQueue(getQueueName()); session.begin(); @@ -173,8 +173,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - final Queue queue = getProxyToQueue(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + final Queue queue = getProxyToQueue(getQueueName()); session.begin(); @@ -198,8 +198,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - final Queue queue = getProxyToQueue(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + final Queue queue = getProxyToQueue(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); @@ -207,7 +207,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { assertEquals(1, queue.getMessageCount()); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); session.begin(); @@ -230,8 +230,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - final Queue queue = getProxyToQueue(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + final Queue queue = getProxyToQueue(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); @@ -239,7 +239,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { assertEquals(1, queue.getMessageCount()); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); session.begin(); @@ -253,7 +253,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { connection = addConnection(client.connect()); session = connection.createSession(); - receiver = session.createReceiver(getTestName()); + receiver = session.createReceiver(getQueueName()); session.begin(); receiver.flow(1); @@ -274,8 +274,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - final Queue queue = getProxyToQueue(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); + final Queue queue = getProxyToQueue(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); @@ -283,7 +283,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { assertEquals(1, queue.getMessageCount()); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); session.begin(); @@ -308,7 +308,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Load up the Queue with some messages { AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); sender.send(message); @@ -326,11 +326,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpSession session3 = connection.createSession(); // Sender linked to each session - AmqpReceiver receiver1 = session1.createReceiver(getTestName()); - AmqpReceiver receiver2 = session2.createReceiver(getTestName()); - AmqpReceiver receiver3 = session3.createReceiver(getTestName()); + AmqpReceiver receiver1 = session1.createReceiver(getQueueName()); + AmqpReceiver receiver2 = session2.createReceiver(getQueueName()); + AmqpReceiver receiver3 = session3.createReceiver(getQueueName()); - final Queue queue = getProxyToQueue(getTestName()); + final Queue queue = getProxyToQueue(getQueueName()); assertEquals(3, queue.getMessageCount()); // Begin the transaction that all senders will operate in. @@ -365,7 +365,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Load up the Queue with some messages { AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); sender.send(message); @@ -383,11 +383,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpSession session3 = connection.createSession(); // Sender linked to each session - AmqpReceiver receiver1 = session1.createReceiver(getTestName()); - AmqpReceiver receiver2 = session2.createReceiver(getTestName()); - AmqpReceiver receiver3 = session3.createReceiver(getTestName()); + AmqpReceiver receiver1 = session1.createReceiver(getQueueName()); + AmqpReceiver receiver2 = session2.createReceiver(getQueueName()); + AmqpReceiver receiver3 = session3.createReceiver(getQueueName()); - final Queue queue = getProxyToQueue(getTestName()); + final Queue queue = getProxyToQueue(getQueueName()); assertEquals(3, queue.getMessageCount()); // Begin the transaction that all senders will operate in. @@ -428,11 +428,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpSession session3 = connection.createSession(); // Sender linked to each session - AmqpSender sender1 = session1.createSender(getTestName()); - AmqpSender sender2 = session2.createSender(getTestName()); - AmqpSender sender3 = session3.createSender(getTestName()); + AmqpSender sender1 = session1.createSender(getQueueName()); + AmqpSender sender2 = session2.createSender(getQueueName()); + AmqpSender sender3 = session3.createSender(getQueueName()); - final Queue queue = getProxyToQueue(getTestName()); + final Queue queue = getProxyToQueue(getQueueName()); assertEquals(0, queue.getMessageCount()); // Begin the transaction that all senders will operate in. @@ -468,11 +468,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpSession session3 = connection.createSession(); // Sender linked to each session - AmqpSender sender1 = session1.createSender(getTestName()); - AmqpSender sender2 = session2.createSender(getTestName()); - AmqpSender sender3 = session3.createSender(getTestName()); + AmqpSender sender1 = session1.createSender(getQueueName()); + AmqpSender sender2 = session2.createSender(getQueueName()); + AmqpSender sender3 = session3.createSender(getQueueName()); - final Queue queue = getProxyToQueue(getTestName()); + final Queue queue = getProxyToQueue(getQueueName()); assertEquals(0, queue.getMessageCount()); // Begin the transaction that all senders will operate in. @@ -509,7 +509,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Normal Session which won't create an TXN itself AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); // Commit TXN work from a sender. txnSession.begin(); @@ -538,7 +538,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { } txnSession.commit(); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(NUM_MESSAGES * 2); for (int i = 0; i < NUM_MESSAGES * 2; ++i) { AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); @@ -563,7 +563,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Normal Session which won't create an TXN itself AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < NUM_MESSAGES + 1; ++i) { AmqpMessage message = new AmqpMessage(); @@ -573,7 +573,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { } // Read all messages from the Queue, do not accept them yet. - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES); receiver.flow((NUM_MESSAGES + 2) * 2); for (int i = 0; i < NUM_MESSAGES; ++i) { @@ -629,7 +629,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Normal Session which won't create an TXN itself AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < NUM_MESSAGES; ++i) { AmqpMessage message = new AmqpMessage(); @@ -639,7 +639,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { } // Read all messages from the Queue, do not accept them yet. - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(2); AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS); @@ -700,7 +700,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Normal Session which won't create an TXN itself AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < NUM_MESSAGES + 1; ++i) { AmqpMessage message = new AmqpMessage(); @@ -710,7 +710,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { } // Read all messages from the Queue, do not accept them yet. - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES); receiver.flow((NUM_MESSAGES + 2) * 2); for (int i = 0; i < NUM_MESSAGES; ++i) { @@ -787,7 +787,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { // Normal Session which won't create an TXN itself AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); for (int i = 0; i < NUM_MESSAGES; ++i) { AmqpMessage message = new AmqpMessage(); @@ -797,7 +797,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { } // Read all messages from the Queue, do not accept them yet. - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(2); AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS); AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS); @@ -930,12 +930,12 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpSession session = connection.createSession(); assertNotNull(session); - AmqpSender sender = session.createSender(getTestName()); + AmqpSender sender = session.createSender(getQueueName()); AmqpMessage message = new AmqpMessage(); message.setText("Test-Message"); sender.send(message); - AmqpReceiver receiver = session.createReceiver(getTestName()); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.setStateInspector(new AmqpValidator() { @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0260a304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index 4ee94c2..7933fec 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -16,6 +16,22 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Enumeration; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -41,26 +57,10 @@ import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.management.MBeanServer; import javax.management.MBeanServerFactory; -import java.io.IOException; -import java.io.Serializable; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Enumeration; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.AddressControl; -import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.remoting.CloseListener; @@ -84,17 +84,14 @@ import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.TimeUtils; import org.apache.activemq.artemis.utils.UUIDGenerator; -import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; -import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.junit.After; @@ -104,18 +101,12 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains; - @RunWith(Parameterized.class) public class ProtonTest extends ProtonTestBase { private static final String amqpConnectionUri = "amqp://localhost:5672"; private static final String tcpAmqpConnectionUri = "tcp://localhost:5672"; - private static final String brokerName = "localhost"; private static final long maxSizeBytes = 1 * 1024 * 1024; @@ -371,132 +362,6 @@ public class ProtonTest extends ProtonTestBase { } @Test - public void testBrokerContainerId() throws Exception { - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection amqpConnection = client.connect(); - try { - assertTrue(brokerName.equals(amqpConnection.getEndpoint().getRemoteContainer())); - } finally { - amqpConnection.close(); - } - } - - @Test - public void testBrokerConnectionProperties() throws Exception { - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection amqpConnection = client.connect(); - try { - Map<Symbol, Object> properties = amqpConnection.getEndpoint().getRemoteProperties(); - assertTrue(properties != null); - if (properties != null) { - assertTrue("apache-activemq-artemis".equals(properties.get(Symbol.valueOf("product")))); - assertTrue(VersionLoader.getVersion().getFullVersion().equals(properties.get(Symbol.valueOf("version")))); - } - } finally { - amqpConnection.close(); - } - } - - @Test(timeout = 60000) - public void testConnectionCarriesExpectedCapabilities() throws Exception { - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - assertNotNull(client); - - client.setValidator(new AmqpValidator() { - - @Override - public void inspectOpenedResource(org.apache.qpid.proton.engine.Connection connection) { - - Symbol[] offered = connection.getRemoteOfferedCapabilities(); - - if (!contains(offered, DELAYED_DELIVERY)) { - markAsInvalid("Broker did not indicate it support delayed message delivery"); - return; - } - - Map<Symbol, Object> properties = connection.getRemoteProperties(); - if (!properties.containsKey(PRODUCT)) { - markAsInvalid("Broker did not send a queue product name value"); - return; - } - - if (!properties.containsKey(VERSION)) { - markAsInvalid("Broker did not send a queue version value"); - return; - } - } - }); - - AmqpConnection connection = client.connect(); - try { - assertNotNull(connection); - connection.getStateInspector().assertValid(); - } finally { - connection.close(); - } - } - - @Test(timeout = 60000) - public void testSendWithDeliveryTimeHoldsMessage() throws Exception { - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - assertNotNull(client); - - AmqpConnection connection = client.connect(); - try { - AmqpSession session = connection.createSession(); - - AmqpSender sender = session.createSender(address); - AmqpReceiver receiver = session.createReceiver(address); - - AmqpMessage message = new AmqpMessage(); - long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5); - message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); - message.setText("Test-Message"); - sender.send(message); - - // Now try and get the message - receiver.flow(1); - - // Shouldn't get this since we delayed the message. - assertNull(receiver.receive(1, TimeUnit.SECONDS)); - } finally { - connection.close(); - } - } - - @Test(timeout = 60000) - public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception { - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - assertNotNull(client); - - AmqpConnection connection = client.connect(); - try { - AmqpSession session = connection.createSession(); - - AmqpSender sender = session.createSender(address); - AmqpReceiver receiver = session.createReceiver(address); - - AmqpMessage message = new AmqpMessage(); - long deliveryTime = System.currentTimeMillis() + 2000; - message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); - message.setText("Test-Message"); - sender.send(message); - - // Now try and get the message - receiver.flow(1); - - AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); - assertNotNull(received); - received.accept(); - Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time"); - assertNotNull(msgDeliveryTime); - assertEquals(deliveryTime, msgDeliveryTime.longValue()); - } finally { - connection.close(); - } - } - - @Test public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception { String destinationAddress = address + 1; @@ -984,41 +849,6 @@ public class ProtonTest extends ProtonTestBase { } @Test - public void testManagementQueryOverAMQP() throws Throwable { - - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection amqpConnection = client.connect(); - try { - String destinationAddress = address + 1; - AmqpSession session = amqpConnection.createSession(); - AmqpSender sender = session.createSender("activemq.management"); - AmqpReceiver receiver = session.createReceiver(destinationAddress); - receiver.flow(10); - - //create request message for getQueueNames query - AmqpMessage request = new AmqpMessage(); - request.setApplicationProperty("_AMQ_ResourceName", ResourceNames.BROKER); - request.setApplicationProperty("_AMQ_OperationName", "getQueueNames"); - request.setReplyToAddress(destinationAddress); - request.setText("[]"); - - sender.send(request); - AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS); - Assert.assertNotNull(response); - assertNotNull(response); - Object section = response.getWrappedMessage().getBody(); - assertTrue(section instanceof AmqpValue); - Object value = ((AmqpValue) section).getValue(); - assertTrue(value instanceof String); - assertTrue(((String) value).length() > 0); - assertTrue(((String) value).contains(destinationAddress)); - response.accept(); - } finally { - amqpConnection.close(); - } - } - - @Test public void testReplyTo() throws Throwable { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryQueue queue = session.createTemporaryQueue(); @@ -1792,93 +1622,6 @@ public class ProtonTest extends ProtonTestBase { } } - @Test(timeout = 60000) - public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception { - - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection connection = client.connect(); - - try { - AmqpSession session = connection.createSession(); - - AmqpSender sender = session.createAnonymousSender(); - AmqpMessage message = new AmqpMessage(); - - message.setAddress(address); - message.setMessageId("msg" + 1); - message.setText("Test-Message"); - - sender.send(message); - sender.close(); - - AmqpReceiver receiver = session.createReceiver(address); - receiver.flow(1); - AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); - assertNotNull("Should have read message", received); - assertEquals("msg1", received.getMessageId()); - received.accept(); - - receiver.close(); - } finally { - connection.close(); - } - } - - @Test(timeout = 60000) - public void testSendMessageFailsOnAnonymousRelayLinkWhenNoToValueSet() throws Exception { - - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection connection = client.connect(); - try { - AmqpSession session = connection.createSession(); - - AmqpSender sender = session.createAnonymousSender(); - AmqpMessage message = new AmqpMessage(); - - message.setMessageId("msg" + 1); - message.setText("Test-Message"); - - try { - sender.send(message); - fail("Should not be able to send, message should be rejected"); - } catch (Exception ex) { - ex.printStackTrace(); - } finally { - sender.close(); - } - } finally { - connection.close(); - } - } - - @Test(timeout = 60000) - public void testSendMessageFailsOnAnonymousRelayWhenToFieldHasNonExistingAddress() throws Exception { - - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection connection = client.connect(); - try { - AmqpSession session = connection.createSession(); - - AmqpSender sender = session.createAnonymousSender(); - AmqpMessage message = new AmqpMessage(); - - message.setAddress(address + "-not-in-service"); - message.setMessageId("msg" + 1); - message.setText("Test-Message"); - - try { - sender.send(message); - fail("Should not be able to send, message should be rejected"); - } catch (Exception ex) { - ex.printStackTrace(); - } finally { - sender.close(); - } - } finally { - connection.close(); - } - } - private javax.jms.Queue createQueue(String address) throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); try {
