ARTEMIS-789 AMQP tests for routing semantics
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3af1e5c7 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3af1e5c7 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3af1e5c7 Branch: refs/heads/master Commit: 3af1e5c734df3ac4d19b5e173f55a2a99fa33e8d Parents: c18ee83 Author: jbertram <[email protected]> Authored: Wed Dec 14 09:30:43 2016 -0600 Committer: jbertram <[email protected]> Committed: Wed Dec 14 15:12:57 2016 -0600 ---------------------------------------------------------------------- .../artemis/tests/util/ActiveMQTestBase.java | 4 +- .../integration/amqp/AmqpClientTestSupport.java | 9 +++ .../integration/amqp/AmqpSendReceiveTest.java | 73 ++++++++++++++++++-- 3 files changed, 78 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3af1e5c7/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 7f01767..e6d68b1 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -418,10 +418,10 @@ public abstract class ActiveMQTestBase extends Assert { } protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception { - ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID))); + ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID), "invm")); if (netty) { - configuration.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY)); + configuration.addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, new HashMap<String, Object>(), "netty", new HashMap<String, Object>())); } return configuration; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3af1e5c7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index 0d5c874..fde38fe 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -19,8 +19,10 @@ package org.apache.activemq.artemis.tests.integration.amqp; import java.net.URI; import java.util.LinkedList; +import java.util.Set; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; @@ -105,6 +107,13 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { serverConfig.addAddressConfiguration(address); serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ"))); serverConfig.setSecurityEnabled(false); + Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations(); + for (TransportConfiguration tc : acceptors) { + if (tc.getName().equals("netty")) { + tc.getExtraParams().put("anycastPrefix", "anycast://"); + tc.getExtraParams().put("multicastPrefix", "multicast://"); + } + } serverManager.start(); server.start(); return server; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3af1e5c7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index b817834..e102c77 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -16,11 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; -import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; -import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; -import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; -import static org.apache.activemq.transport.amqp.AmqpSupport.contains; - +import javax.jms.JMSException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -30,7 +26,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -48,7 +47,10 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.JMSException; +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; +import static org.apache.activemq.transport.amqp.AmqpSupport.contains; +import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; /** * Test basic send and receive scenarios using only AMQP sender and receiver links. @@ -178,6 +180,65 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { } @Test(timeout = 60000) + public void testAnycastMessageRoutingExclusivity() throws Exception { + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + + ActiveMQServerControl serverControl = server.getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + + sendMessages("anycast://" + addressA, 1); + + assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount()); + } + + @Test + public void testMulticastMessageRoutingExclusivity() throws Exception { + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + + ActiveMQServerControl serverControl = server.getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + + sendMessages("multicast://" + addressA, 1); + + assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount()); + assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + } + + @Test + public void testAmbiguousMessageRouting() throws Exception { + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + final String queueD = "queueD"; + + ActiveMQServerControl serverControl = server.getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueD, RoutingType.MULTICAST.toString()); + + sendMessages(addressA, 1); + + assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount()); + } + + @Test(timeout = 60000) public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception { int MSG_COUNT = 4; sendMessages(getTestName(), MSG_COUNT);
