http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/49fcde76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java
index 3a6c404..1f5ef15 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java
@@ -16,11 +16,16 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import com.hazelcast.util.UuidUtil;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -30,8 +35,11 @@ import 
org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@@ -143,4 +151,95 @@ public class CoreClientTest extends ActiveMQTestBase {
 
       sf.close();
    }
+
+   @Test
+   public void testCoreClientPrefixes() throws Exception {
+
+      Configuration configuration = createBasicConfig();
+      configuration.clearAcceptorConfigurations();
+      configuration.addAddressesSetting("#", new 
AddressSettings().setMaxSizeBytes(10 * 1024 * 1024).setPageSizeBytes(1024 * 
1024));
+
+      String baseAddress = "foo";
+
+      List<String> anycastPrefixes = new ArrayList<>();
+      anycastPrefixes.add("anycast://");
+      anycastPrefixes.add("queue://");
+      anycastPrefixes.add("jms.queue.");
+
+      List<String> multicastPrefixes = new ArrayList<>();
+      multicastPrefixes.add("multicast://");
+      multicastPrefixes.add("topic://");
+      multicastPrefixes.add("jms.topic.");
+
+      String locatorString = "tcp://localhost:5445";
+      StringBuilder acceptor = new StringBuilder(locatorString + 
"?PROTOCOLS=CORE;anycastPrefix=");
+      for (String prefix : anycastPrefixes) {
+         acceptor.append(prefix + ",");
+      }
+      acceptor.append(";multicastPrefix=");
+      for (String prefix : multicastPrefixes) {
+         acceptor.append(prefix + ",");
+      }
+
+      configuration.addAcceptorConfiguration("prefix", acceptor.toString());
+
+      ActiveMQServer server = createServer(configuration);
+      server.start();
+
+      ServerLocator locator = ServerLocatorImpl.newLocator(locatorString);
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      Map<String, ClientConsumer> consumerMap = new HashMap<>();
+
+      for (String prefix : anycastPrefixes) {
+         String queueName = UuidUtil.buildRandomUuidString();
+         String address = prefix + baseAddress;
+
+         session.createQueue(prefix + baseAddress, null, queueName, null, 
false);
+         consumerMap.put(address, session.createConsumer(queueName));
+      }
+
+      for (String prefix : multicastPrefixes) {
+         String queueName = UuidUtil.buildRandomUuidString();
+         String address = prefix + baseAddress;
+
+         session.createQueue(prefix + baseAddress, null, queueName, null, 
false);
+         consumerMap.put(address, session.createConsumer(queueName));
+      }
+
+      session.start();
+
+      final int numMessages = 3;
+
+      for (String prefix : anycastPrefixes) {
+         ClientProducer producer = session.createProducer(prefix + 
baseAddress);
+         for (int i = 0; i < numMessages; i++) {
+            ClientMessage message = 
session.createMessage(ActiveMQTextMessage.TYPE, false, 0, 
System.currentTimeMillis(), (byte) 1);
+            message.getBodyBuffer().writeString("testINVMCoreClient");
+            producer.send(message);
+         }
+
+         // Ensure that messages are load balanced across all queues
+
+         for (String queuePrefix : anycastPrefixes) {
+            ClientConsumer consumer = consumerMap.get(queuePrefix + 
baseAddress);
+            for (int i = 0; i < numMessages / anycastPrefixes.size(); i++) {
+               ClientMessage message = consumer.receive(1000);
+               assertNotNull(message);
+               message.acknowledge();
+            }
+            assertNull(consumer.receive(1000));
+         }
+
+         for (String multicastPrefix : multicastPrefixes) {
+            ClientConsumer consumer = consumerMap.get(multicastPrefix + 
baseAddress);
+            assertNull(consumer.receive(100));
+         }
+      }
+
+      sf.close();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/49fcde76/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 7932dc8..00f296e 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.client;
 import javax.management.MBeanServer;
 import java.lang.management.ManagementFactory;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -593,8 +594,9 @@ public class HangConsumerTest extends ActiveMQTestBase {
                                                         String defaultAddress,
                                                         SessionCallback 
callback,
                                                         OperationContext 
context,
-                                                        boolean 
autoCreateQueue) throws Exception {
-         return new ServerSessionImpl(name, username, password, validatedUser, 
minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, 
getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, 
getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), 
getManagementService(), this, getConfiguration().getManagementAddress(), 
defaultAddress == null ? null : new SimpleString(defaultAddress), new 
MyCallback(callback), context, getPagingManager());
+                                                        boolean 
autoCreateQueue,
+                                                        Map<SimpleString, 
RoutingType> prefixes) throws Exception {
+         return new ServerSessionImpl(name, username, password, validatedUser, 
minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, 
getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, 
getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), 
getManagementService(), this, getConfiguration().getManagementAddress(), 
defaultAddress == null ? null : new SimpleString(defaultAddress), new 
MyCallback(callback), context, getPagingManager(), prefixes);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/49fcde76/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index d272c02..918ff41 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -179,14 +179,6 @@ public class FakePostOffice implements PostOffice {
 
    @Override
    public RoutingStatus route(ServerMessage message,
-                              RoutingContext context,
-                              boolean direct) throws Exception {
-      return RoutingStatus.OK;
-
-   }
-
-   @Override
-   public RoutingStatus route(ServerMessage message,
                               Transaction tx,
                               boolean direct) throws Exception {
       return RoutingStatus.OK;
@@ -194,19 +186,23 @@ public class FakePostOffice implements PostOffice {
 
    @Override
    public RoutingStatus route(ServerMessage message,
-                              RoutingContext context,
+                              Transaction tx,
                               boolean direct,
                               boolean rejectDuplicates) throws Exception {
       return RoutingStatus.OK;
+   }
 
+   @Override
+   public RoutingStatus route(ServerMessage message, RoutingContext context, 
boolean direct) throws Exception {
+      return null;
    }
 
    @Override
    public RoutingStatus route(ServerMessage message,
-                              Transaction tx,
+                              RoutingContext context,
                               boolean direct,
                               boolean rejectDuplicates) throws Exception {
-      return RoutingStatus.OK;
+      return null;
    }
 
    @Override

Reply via email to