Added ANYCAST routing to local queues
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3aa84a99 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3aa84a99 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3aa84a99 Branch: refs/heads/ARTEMIS-780 Commit: 3aa84a99adef1088c7a16057380c193f2b0f40ae Parents: bab49b8 Author: Martyn Taylor <[email protected]> Authored: Mon Oct 24 14:27:00 2016 +0100 Committer: Clebert Suconic <[email protected]> Committed: Mon Nov 7 11:28:07 2016 -0500 ---------------------------------------------------------------------- .../artemis/core/postoffice/AddressManager.java | 2 + .../artemis/core/postoffice/PostOffice.java | 2 + .../core/postoffice/impl/BindingsImpl.java | 1 + .../core/postoffice/impl/LocalQueueBinding.java | 9 +- .../core/postoffice/impl/PostOfficeImpl.java | 5 + .../postoffice/impl/SimpleAddressManager.java | 15 ++ .../artemis/core/server/ActiveMQServer.java | 2 +- .../core/server/impl/ActiveMQServerImpl.java | 12 +- .../artemis/core/server/impl/AddressInfo.java | 12 +- .../server/impl/PostOfficeJournalLoader.java | 3 +- .../core/server/impl/QueueFactoryImpl.java | 8 + .../core/config/impl/FileConfigurationTest.java | 4 +- .../integration/addressing/AddressingTest.java | 240 ++++++++++++++++++- .../integration/client/HangConsumerTest.java | 2 +- .../jms/client/TopicCleanupTest.java | 2 +- .../core/server/impl/fakes/FakePostOffice.java | 5 + 16 files changed, 300 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java index 5519822..1cf1a07 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java @@ -54,6 +54,8 @@ public interface AddressManager { AddressInfo addAddressInfo(AddressInfo addressInfo); + AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo); + AddressInfo removeAddressInfo(SimpleString address); AddressInfo getAddressInfo(SimpleString address); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index f719966..7902352 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -45,6 +45,8 @@ public interface PostOffice extends ActiveMQComponent { AddressInfo addAddressInfo(AddressInfo addressInfo); + AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo); + AddressInfo removeAddressInfo(SimpleString address); AddressInfo getAddressInfo(SimpleString address); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index e5df737..6be0311 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -262,6 +262,7 @@ public final class BindingsImpl implements Bindings { boolean routed = false; for (Binding binding : exclusiveBindings) { + if (binding.getFilter() == null || binding.getFilter().match(message)) { binding.getBindable().route(message, context); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java index 2a6d9c5..2921388 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java @@ -24,10 +24,11 @@ import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; public class LocalQueueBinding implements QueueBinding { - private final SimpleString address; + private final AddressInfo address; private final Queue queue; @@ -37,7 +38,7 @@ public class LocalQueueBinding implements QueueBinding { private final SimpleString clusterName; - public LocalQueueBinding(final SimpleString address, final Queue queue, final SimpleString nodeID) { + public LocalQueueBinding(final AddressInfo address, final Queue queue, final SimpleString nodeID) { this.address = address; this.queue = queue; @@ -61,7 +62,7 @@ public class LocalQueueBinding implements QueueBinding { @Override public SimpleString getAddress() { - return address; + return address.getName(); } @Override @@ -76,7 +77,7 @@ public class LocalQueueBinding implements QueueBinding { @Override public SimpleString getRoutingName() { - return name; + return (address.getRoutingType() == AddressInfo.RoutingType.MULTICAST) ? name : address.getName(); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 9b7ed0c..6c654bf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -425,6 +425,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override + public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) { + return addressManager.addOrUpdateAddressInfo(addressInfo); + } + + @Override public AddressInfo removeAddressInfo(SimpleString address) { return addressManager.removeAddressInfo(address); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index 2994f9e..969a1a9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -188,6 +188,21 @@ public class SimpleAddressManager implements AddressManager { } @Override + public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) { + AddressInfo from = addAddressInfo(addressInfo); + return (from == null) ? addressInfo : updateAddressInfo(from, addressInfo); + } + + private AddressInfo updateAddressInfo(AddressInfo from, AddressInfo to) { + synchronized (from) { + from.setRoutingType(to.getRoutingType()); + from.setDefaultMaxConsumers(to.getDefaultMaxConsumers()); + from.setDefaultDeleteOnNoConsumers(to.isDefaultDeleteOnNoConsumers()); + return from; + } + } + + @Override public AddressInfo removeAddressInfo(SimpleString address) { return addressInfoMap.remove(address); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 01fa89a..a6256d8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -417,7 +417,7 @@ public interface ActiveMQServer extends ActiveMQComponent { void removeClientConnection(String clientId); - AddressInfo addAddressInfo(AddressInfo addressInfo); + AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo); AddressInfo removeAddressInfo(SimpleString address); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 4c5a0d6..375e678 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2094,7 +2094,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { info.setDefaultDeleteOnNoConsumers(config.getDefaultDeleteOnNoConsumers()); info.setDefaultMaxConsumers(config.getDefaultMaxConsumers()); - addAddressInfo(info); + createOrUpdateAddressInfo(info); deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations()); } } @@ -2198,8 +2198,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override - public AddressInfo addAddressInfo(AddressInfo addressInfo) { - return postOffice.addAddressInfo(addressInfo); + public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) { + return postOffice.addOrUpdateAddressInfo(addressInfo); } @Override @@ -2209,7 +2209,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public AddressInfo getAddressInfo(SimpleString address) { - return postOffice.removeAddressInfo(address); + return postOffice.getAddressInfo(address); } private Queue createQueue(final SimpleString addressName, @@ -2245,15 +2245,13 @@ public class ActiveMQServerImpl implements ActiveMQServer { final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build(); final Queue queue = queueFactory.createQueueWith(queueConfig); - addAddressInfo(new AddressInfo(queue.getAddress())); - if (transientQueue) { queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName())); } else if (queue.isAutoCreated()) { queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queue.getName())); } - final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); + final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId()); if (queue.isDurable()) { storageManager.addQueueBinding(txID, localQueueBinding); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index 4c6ec1f..1449107 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -22,7 +22,7 @@ public class AddressInfo { private final SimpleString name; - private RoutingType routingType = RoutingType.Multicast; + private RoutingType routingType = RoutingType.MULTICAST; private boolean defaultDeleteOnNoConsumers; @@ -61,13 +61,13 @@ public class AddressInfo { } public enum RoutingType { - Multicast, Anycast; + MULTICAST, ANYCAST; public byte getType() { switch (this) { - case Multicast: + case MULTICAST: return 0; - case Anycast: + case ANYCAST: return 1; default: return -1; @@ -77,9 +77,9 @@ public class AddressInfo { public static RoutingType getType(byte type) { switch (type) { case 0: - return Multicast; + return MULTICAST; case 1: - return Anycast; + return ANYCAST; default: return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 9a8ae74..71c5b2b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -155,7 +155,8 @@ public class PostOfficeJournalLoader implements JournalLoader { } } - final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); + final Binding binding = new LocalQueueBinding(postOffice.getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId()); + queues.put(queue.getID(), queue); postOffice.addBinding(binding); managementService.registerAddress(queue.getAddress()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index 5686c7b..3678553 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -68,6 +68,10 @@ public class QueueFactoryImpl implements QueueFactory { @Override public Queue createQueueWith(final QueueConfig config) { + + // Add default address info if one doesn't exist + postOffice.addAddressInfo(new AddressInfo(config.address())); + final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString()); final Queue queue; if (addressSettings.isLastValueQueue()) { @@ -89,6 +93,10 @@ public class QueueFactoryImpl implements QueueFactory { final boolean durable, final boolean temporary, final boolean autoCreated) { + + // Add default address info if one doesn't exist + postOffice.addAddressInfo(new AddressInfo(address)); + AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); Queue queue; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 214070e..c1639c7 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -369,7 +369,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { // Addr 1 CoreAddressConfiguration addressConfiguration = conf.getAddressConfigurations().get(0); assertEquals("addr1", addressConfiguration.getName()); - assertEquals(AddressInfo.RoutingType.Anycast, addressConfiguration.getRoutingType()); + assertEquals(AddressInfo.RoutingType.ANYCAST, addressConfiguration.getRoutingType()); assertEquals(2, addressConfiguration.getQueueConfigurations().size()); // Addr 1 Queue 1 @@ -395,7 +395,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { // Addr 2 addressConfiguration = conf.getAddressConfigurations().get(1); assertEquals("addr2", addressConfiguration.getName()); - assertEquals(AddressInfo.RoutingType.Multicast, addressConfiguration.getRoutingType()); + assertEquals(AddressInfo.RoutingType.MULTICAST, addressConfiguration.getRoutingType()); assertEquals(2, addressConfiguration.getQueueConfigurations().size()); // Addr 2 Queue 1 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java index 43d6071..2e0fda4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java @@ -16,6 +16,244 @@ */ package org.apache.activemq.artemis.tests.integration.addressing; -public class AddressingTest { +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Before; +import org.junit.Test; + +public class AddressingTest extends ActiveMQTestBase { + + private ActiveMQServer server; + + private ClientSessionFactory sessionFactory; + + @Before + public void setup() throws Exception { + server = createServer(true); + server.start(); + + server.waitForActivation(10, TimeUnit.SECONDS); + + ServerLocator sl = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + sessionFactory = sl.createSessionFactory(); + + addSessionFactory(sessionFactory); + } + + @Test + public void testMulticastRouting() throws Exception { + + SimpleString sendAddress = new SimpleString("test.address"); + + List<String> testAddresses = Arrays.asList("test.address", "test.#", "test.*"); + + for (String consumeAddress : testAddresses) { + + // For each address, create 2 Queues with the same address, assert both queues receive message + + AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress)); + addressInfo.setRoutingType(AddressInfo.RoutingType.MULTICAST); + + server.createOrUpdateAddressInfo(addressInfo); + Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false); + Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + ClientConsumer consumer2 = session.createConsumer(q2.getName()); + + ClientProducer producer = session.createProducer(sendAddress); + ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true); + m.getBodyBuffer().writeString("TestMessage"); + + producer.send(m); + + assertNotNull(consumer1.receive(2000)); + assertNotNull(consumer2.receive(2000)); + + q1.deleteQueue(); + q2.deleteQueue(); + + System.out.println(consumeAddress); + } + } + + @Test + public void testAnycastRouting() throws Exception { + + SimpleString sendAddress = new SimpleString("test.address"); + + List<String> testAddresses = Arrays.asList("test.address", "test.#", "test.*"); + + for (String consumeAddress : testAddresses) { + + // For each address, create 2 Queues with the same address, assert one queue receive message + + AddressInfo addressInfo = new AddressInfo(new SimpleString(consumeAddress)); + addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST); + + server.createOrUpdateAddressInfo(addressInfo); + Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false); + Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + ClientConsumer consumer2 = session.createConsumer(q2.getName()); + + ClientProducer producer = session.createProducer(sendAddress); + ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true); + + m.getBodyBuffer().writeString("TestMessage"); + + producer.send(m); + + int count = 0; + count = (consumer1.receive(1000) == null) ? count : count + 1; + count = (consumer2.receive(1000) == null) ? count : count + 1; + assertEquals(1, count); + + q1.deleteQueue(); + q2.deleteQueue(); + + System.out.println(consumeAddress); + } + } + + @Test + public void testAnycastRoutingRoundRobin() throws Exception { + + SimpleString address = new SimpleString("test.address"); + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST); + + server.createOrUpdateAddressInfo(addressInfo); + Queue q1 = server.createQueue(address, address.concat(".1"), null, true, false); + Queue q2 = server.createQueue(address, address.concat(".2"), null, true, false); + Queue q3 = server.createQueue(address, address.concat(".3"), null, true, false); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + ClientProducer producer = session.createProducer(address); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + ClientConsumer consumer2 = session.createConsumer(q2.getName()); + ClientConsumer consumer3 = session.createConsumer(q3.getName()); + List<ClientConsumer> consumers = new ArrayList<>(Arrays.asList(new ClientConsumer[] {consumer1, consumer2, consumer3})); + + List<String> messages = new ArrayList<>(); + messages.add("Message1"); + messages.add("Message2"); + messages.add("Message3"); + + ClientMessage clientMessage; + for (String message : messages) { + clientMessage = session.createMessage(true); + clientMessage.getBodyBuffer().writeString(message); + producer.send(clientMessage); + } + + String m; + for (ClientConsumer consumer : consumers) { + clientMessage = consumer.receive(1000); + m = clientMessage.getBodyBuffer().readString(); + messages.remove(m); + } + + assertTrue(messages.isEmpty()); + + // Check we don't receive more messages + int count = 0; + for (ClientConsumer consumer : consumers) { + count = (consumer.receive(1000) == null) ? count : count + 1; + } + assertEquals(0, count); + } + + + + @Test + public void testMulticastRoutingBackwardsCompat() throws Exception { + + SimpleString sendAddress = new SimpleString("test.address"); + + List<String> testAddresses = Arrays.asList("test.address", "test.#", "test.*"); + + for (String consumeAddress : testAddresses) { + + // For each address, create 2 Queues with the same address, assert both queues receive message + Queue q1 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".1"), null, true, false); + Queue q2 = server.createQueue(new SimpleString(consumeAddress), new SimpleString(consumeAddress + ".2"), null, true, false); + + ClientSession session = sessionFactory.createSession(); + session.start(); + + ClientConsumer consumer1 = session.createConsumer(q1.getName()); + ClientConsumer consumer2 = session.createConsumer(q2.getName()); + + ClientProducer producer = session.createProducer(sendAddress); + ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, true); + m.getBodyBuffer().writeString("TestMessage"); + + producer.send(m); + + assertNotNull(consumer1.receive(2000)); + assertNotNull(consumer2.receive(2000)); + + q1.deleteQueue(); + q2.deleteQueue(); + + System.out.println(consumeAddress); + } + } + + @Test + public void testDeleteQueueOnNoConsumersTrue() { + fail("Not Implemented"); + } + + @Test + public void testDeleteQueueOnNoConsumersFalse() { + fail("Not Implemented"); + } + + @Test + public void testLimitOnMaxConsumers() { + fail("Not Implemented"); + } + + @Test + public void testUnlimitedMaxConsumers() { + fail("Not Implemented"); + } + + @Test + public void testDefaultMaxConsumersFromAddress() { + fail("Not Implemented"); + } + + @Test + public void testDefaultDeleteOnNoConsumersFromAddress() { + fail("Not Implemented"); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 83d28a1..2fd5915 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -353,7 +353,7 @@ public class HangConsumerTest extends ActiveMQTestBase { long txID = server.getStorageManager().generateID(); // Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally - LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE, new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID()); + LocalQueueBinding newBinding = new LocalQueueBinding(server.getAddressInfo(QUEUE), new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false, false, null, null, null, null, null), server.getNodeID()); server.getStorageManager().addQueueBinding(txID, newBinding); server.getStorageManager().commitBindings(txID); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java index 280596a..ec279ee 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/TopicCleanupTest.java @@ -83,7 +83,7 @@ public class TopicCleanupTest extends JMSTestBase { final Queue queue = new QueueImpl(storage.generateID(), SimpleString.toSimpleString("jms.topic.topic"), SimpleString.toSimpleString("jms.topic.topic"), FilterImpl.createFilter(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), null, true, false, false, server.getScheduledPool(), server.getPostOffice(), storage, server.getAddressSettingsRepository(), server.getExecutorFactory().getExecutor()); - LocalQueueBinding binding = new LocalQueueBinding(queue.getAddress(), queue, server.getNodeID()); + LocalQueueBinding binding = new LocalQueueBinding(server.getAddressInfo(queue.getAddress()), queue, server.getNodeID()); storage.addQueueBinding(txid, binding); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aa84a99/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 9424fc3..512f0f2 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -65,6 +65,11 @@ public class FakePostOffice implements PostOffice { return null; } + @Override + public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) { + return null; + } + @Override public AddressInfo removeAddressInfo(SimpleString address) {
