http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/49fcde76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java index 3a6c404..1f5ef15 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java @@ -16,11 +16,16 @@ */ package org.apache.activemq.artemis.tests.integration.client; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.hazelcast.util.UuidUtil; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -30,8 +35,11 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -143,4 +151,95 @@ public class CoreClientTest extends ActiveMQTestBase { sf.close(); } + + @Test + public void testCoreClientPrefixes() throws Exception { + + Configuration configuration = createBasicConfig(); + configuration.clearAcceptorConfigurations(); + configuration.addAddressesSetting("#", new AddressSettings().setMaxSizeBytes(10 * 1024 * 1024).setPageSizeBytes(1024 * 1024)); + + String baseAddress = "foo"; + + List<String> anycastPrefixes = new ArrayList<>(); + anycastPrefixes.add("anycast://"); + anycastPrefixes.add("queue://"); + anycastPrefixes.add("jms.queue."); + + List<String> multicastPrefixes = new ArrayList<>(); + multicastPrefixes.add("multicast://"); + multicastPrefixes.add("topic://"); + multicastPrefixes.add("jms.topic."); + + String locatorString = "tcp://localhost:5445"; + StringBuilder acceptor = new StringBuilder(locatorString + "?PROTOCOLS=CORE;anycastPrefix="); + for (String prefix : anycastPrefixes) { + acceptor.append(prefix + ","); + } + acceptor.append(";multicastPrefix="); + for (String prefix : multicastPrefixes) { + acceptor.append(prefix + ","); + } + + configuration.addAcceptorConfiguration("prefix", acceptor.toString()); + + ActiveMQServer server = createServer(configuration); + server.start(); + + ServerLocator locator = ServerLocatorImpl.newLocator(locatorString); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true); + + Map<String, ClientConsumer> consumerMap = new HashMap<>(); + + for (String prefix : anycastPrefixes) { + String queueName = UuidUtil.buildRandomUuidString(); + String address = prefix + baseAddress; + + session.createQueue(prefix + baseAddress, null, queueName, null, false); + consumerMap.put(address, session.createConsumer(queueName)); + } + + for (String prefix : multicastPrefixes) { + String queueName = UuidUtil.buildRandomUuidString(); + String address = prefix + baseAddress; + + session.createQueue(prefix + baseAddress, null, queueName, null, false); + consumerMap.put(address, session.createConsumer(queueName)); + } + + session.start(); + + final int numMessages = 3; + + for (String prefix : anycastPrefixes) { + ClientProducer producer = session.createProducer(prefix + baseAddress); + for (int i = 0; i < numMessages; i++) { + ClientMessage message = session.createMessage(ActiveMQTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte) 1); + message.getBodyBuffer().writeString("testINVMCoreClient"); + producer.send(message); + } + + // Ensure that messages are load balanced across all queues + + for (String queuePrefix : anycastPrefixes) { + ClientConsumer consumer = consumerMap.get(queuePrefix + baseAddress); + for (int i = 0; i < numMessages / anycastPrefixes.size(); i++) { + ClientMessage message = consumer.receive(1000); + assertNotNull(message); + message.acknowledge(); + } + assertNull(consumer.receive(1000)); + } + + for (String multicastPrefix : multicastPrefixes) { + ClientConsumer consumer = consumerMap.get(multicastPrefix + baseAddress); + assertNull(consumer.receive(100)); + } + } + + sf.close(); + } }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/49fcde76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 7932dc8..00f296e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.client; import javax.management.MBeanServer; import java.lang.management.ManagementFactory; import java.util.LinkedList; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -593,8 +594,9 @@ public class HangConsumerTest extends ActiveMQTestBase { String defaultAddress, SessionCallback callback, OperationContext context, - boolean autoCreateQueue) throws Exception { - return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, getPagingManager()); + boolean autoCreateQueue, + Map<SimpleString, RoutingType> prefixes) throws Exception { + return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, getPagingManager(), prefixes); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/49fcde76/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index d272c02..918ff41 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -179,14 +179,6 @@ public class FakePostOffice implements PostOffice { @Override public RoutingStatus route(ServerMessage message, - RoutingContext context, - boolean direct) throws Exception { - return RoutingStatus.OK; - - } - - @Override - public RoutingStatus route(ServerMessage message, Transaction tx, boolean direct) throws Exception { return RoutingStatus.OK; @@ -194,19 +186,23 @@ public class FakePostOffice implements PostOffice { @Override public RoutingStatus route(ServerMessage message, - RoutingContext context, + Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception { return RoutingStatus.OK; + } + @Override + public RoutingStatus route(ServerMessage message, RoutingContext context, boolean direct) throws Exception { + return null; } @Override public RoutingStatus route(ServerMessage message, - Transaction tx, + RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception { - return RoutingStatus.OK; + return null; } @Override
