ARTEMIS-880 Add support for address prefixing
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8f532cc2 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8f532cc2 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8f532cc2 Branch: refs/heads/master Commit: 8f532cc25d4179b4014f2136e2861bd73f6b5d61 Parents: a5031b5 Author: Martyn Taylor <[email protected]> Authored: Fri Dec 9 18:04:54 2016 +0000 Committer: Martyn Taylor <[email protected]> Committed: Fri Dec 9 18:43:15 2016 +0000 ---------------------------------------------------------------------- .../activemq/artemis/api/core/SimpleString.java | 6 +- .../impl/wireformat/CreateQueueMessage_V2.java | 2 +- .../activemq/artemis/utils/PrefixUtil.java | 65 +++++++++ .../amqp/broker/AMQPSessionCallback.java | 2 +- .../amqp/broker/ProtonProtocolManager.java | 25 ++++ .../protocol/mqtt/MQTTConnectionManager.java | 2 +- .../core/protocol/mqtt/MQTTProtocolHandler.java | 10 +- .../mqtt/MQTTProtocolManagerFactory.java | 5 +- .../artemis/core/protocol/mqtt/MQTTSession.java | 12 +- .../protocol/openwire/OpenWireConnection.java | 4 +- .../openwire/OpenWireProtocolManager.java | 23 ++++ .../core/protocol/openwire/amq/AMQSession.java | 12 +- .../protocol/stomp/StompProtocolManager.java | 4 +- .../core/postoffice/impl/LocalQueueBinding.java | 12 +- .../core/postoffice/impl/PostOfficeImpl.java | 6 +- .../core/impl/ActiveMQPacketHandler.java | 2 +- .../protocol/core/impl/CoreProtocolManager.java | 24 ++++ .../artemis/core/server/ActiveMQServer.java | 4 +- .../artemis/core/server/RoutingContext.java | 8 ++ .../artemis/core/server/ServerSession.java | 11 +- .../core/server/impl/ActiveMQServerImpl.java | 18 +-- .../artemis/core/server/impl/DivertImpl.java | 2 +- .../core/server/impl/RoutingContextImpl.java | 25 ++++ .../core/server/impl/ServerSessionImpl.java | 134 ++++++++++++------- .../core/protocol/AbstractProtocolManager.java | 25 ++++ .../spi/core/protocol/ProtocolManager.java | 9 ++ .../integration/client/CoreClientTest.java | 99 ++++++++++++++ .../integration/client/HangConsumerTest.java | 6 +- .../core/server/impl/fakes/FakePostOffice.java | 18 +-- 29 files changed, 469 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java index b4a02ea..decd189 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java @@ -118,6 +118,11 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl @Override public CharSequence subSequence(final int start, final int end) { + return subSeq(start, end); + } + + + public SimpleString subSeq(final int start, final int end) { int len = data.length >> 1; if (end < start || start < 0 || end > len) { @@ -399,5 +404,4 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl dst[d++] = (char) (low | high); } } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java index e2867ab..d9def3c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java @@ -107,7 +107,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage { public void encodeRest(final ActiveMQBuffer buffer) { super.encodeRest(buffer); buffer.writeBoolean(autoCreated); - buffer.writeByte(routingType.getType()); + buffer.writeByte(routingType == null ? -1 : routingType.getType()); buffer.writeInt(maxConsumers); buffer.writeBoolean(deleteOnNoConsumers); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java new file mode 100644 index 0000000..cc3bf1c --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/PrefixUtil.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; + +public class PrefixUtil { + + public static Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address, + RoutingType defaultRoutingType, + Map<SimpleString, RoutingType> prefixes) { + for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) { + if (address.startsWith(entry.getKey())) { + return new Pair<>(removePrefix(address, entry.getKey()), entry.getValue()); + } + } + return new Pair<>(address, defaultRoutingType); + } + + public static Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address, + Set<RoutingType> defaultRoutingTypes, + Map<SimpleString, RoutingType> prefixes) { + for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) { + if (address.startsWith(entry.getKey())) { + Set routingTypes = new HashSet<>(); + routingTypes.add(entry.getValue()); + return new Pair<>(removePrefix(address, entry.getKey()), routingTypes); + } + } + return new Pair<>(address, defaultRoutingTypes); + } + + public static SimpleString getAddress(SimpleString address, Map<SimpleString, RoutingType> prefixes) { + for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) { + if (address.startsWith(entry.getKey())) { + return removePrefix(address, entry.getKey()); + } + } + return address; + } + + private static SimpleString removePrefix(SimpleString string, SimpleString prefix) { + return string.subSeq(prefix.length(), string.length()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 9d69b00..c0dc34e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -159,7 +159,7 @@ public class AMQPSessionCallback implements SessionCallback { false, // boolean autoCommitAcks, false, // boolean preAcknowledge, true, //boolean xa, - (String) null, this, true, operationContext); + (String) null, this, true, operationContext, manager.getPrefixes()); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 9b84dc1..0e8ab37 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -17,16 +17,20 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Executor; import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; @@ -54,6 +58,8 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti private final ProtonProtocolManagerFactory factory; + private final Map<SimpleString, RoutingType> prefixes = new HashMap<>(); + /* * used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for * the address. This can be changed on the acceptor. @@ -168,4 +174,23 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti public void setMaxFrameSize(int maxFrameSize) { this.maxFrameSize = maxFrameSize; } + + @Override + public void setAnycastPrefix(String anycastPrefix) { + for (String prefix : anycastPrefix.split(",")) { + prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.ANYCAST); + } + } + + @Override + public void setMulticastPrefix(String multicastPrefix) { + for (String prefix : multicastPrefix.split(",")) { + prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.MULTICAST); + } + } + + @Override + public Map<SimpleString, RoutingType> getPrefixes() { + return prefixes; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index ce65648..a4690e7 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -97,7 +97,7 @@ public class MQTTConnectionManager { ActiveMQServer server = session.getServer(); ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null, - session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE, server.newOperationContext()); + session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE, server.newOperationContext(), session.getProtocolManager().getPrefixes()); return (ServerSessionImpl) serverSession; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 80923e9..0149f46 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.core.protocol.mqtt; +import java.util.Map; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -37,7 +39,9 @@ import io.netty.handler.codec.mqtt.MqttSubAckPayload; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; /** @@ -53,6 +57,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { private MQTTSession session; private ActiveMQServer server; + private MQTTProtocolManager protocolManager; // This Channel Handler is not sharable, therefore it can only ever be associated with a single ctx. @@ -62,15 +67,18 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { private boolean stopped = false; + private Map<SimpleString, RoutingType> prefixes; + public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) { this.server = server; this.protocolManager = protocolManager; + this.prefixes = protocolManager.getPrefixes(); } void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception { this.connectionEntry = entry; this.connection = connection; - this.session = new MQTTSession(this, connection); + this.session = new MQTTSession(this, connection, protocolManager); } void stop(boolean error) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java index 553f9ad..453b267 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.osgi.service.component.annotations.Component; @Component(service = ProtocolManagerFactory.class) @@ -40,8 +41,8 @@ public class MQTTProtocolManagerFactory extends AbstractProtocolManagerFactory<M public ProtocolManager createProtocolManager(ActiveMQServer server, final Map<String, Object> parameters, List<BaseInterceptor> incomingInterceptors, - List<BaseInterceptor> outgoingInterceptors) { - return new MQTTProtocolManager(server, incomingInterceptors, outgoingInterceptors); + List<BaseInterceptor> outgoingInterceptors) throws Exception { + return BeanSupport.setData(new MQTTProtocolManager(server, incomingInterceptors, outgoingInterceptors), parameters); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index 059948f..d4fd7bf 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -53,8 +53,14 @@ public class MQTTSession { private MQTTLogger log = MQTTLogger.LOGGER; - public MQTTSession(MQTTProtocolHandler protocolHandler, MQTTConnection connection) throws Exception { + private MQTTProtocolManager protocolManager; + + public MQTTSession(MQTTProtocolHandler protocolHandler, + MQTTConnection connection, + MQTTProtocolManager protocolManager) throws Exception { this.protocolHandler = protocolHandler; + this.protocolManager = protocolManager; + this.connection = connection; mqttConnectionManager = new MQTTConnectionManager(this); @@ -149,4 +155,8 @@ public class MQTTSession { MQTTConnection getConnection() { return connection; } + + MQTTProtocolManager getProtocolManager() { + return protocolManager; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index d6add20..9e65f17 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -660,7 +660,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } private void createInternalSession(ConnectionInfo info) throws Exception { - internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext); + internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes()); } //raise the refCount of context @@ -847,7 +847,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } public AMQSession addSession(SessionInfo ss, boolean internal) { - AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server, this, protocolManager.getScheduledPool()); + AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server, this, protocolManager); amqSession.initialize(); if (internal) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 0ee1711..cf673f8 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -34,6 +34,7 @@ import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; @@ -42,6 +43,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; @@ -118,6 +120,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl private final OpenWireMessageConverter messageConverter; + private final Map<SimpleString, RoutingType> prefixes = new HashMap<>(); + public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; this.server = server; @@ -558,4 +562,23 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; } + + @Override + public void setAnycastPrefix(String anycastPrefix) { + for (String prefix : anycastPrefix.split(",")) { + prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.ANYCAST); + } + } + + @Override + public void setMulticastPrefix(String multicastPrefix) { + for (String prefix : multicastPrefix.split(",")) { + prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.MULTICAST); + } + } + + @Override + public Map<SimpleString, RoutingType> getPrefixes() { + return prefixes; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index c64374a..a0826a7 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; @@ -75,17 +76,20 @@ public class AMQSession implements SessionCallback { // so we make a new one per AMQSession private final OpenWireMessageConverter converter; + private final OpenWireProtocolManager protocolManager; + public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo, ActiveMQServer server, OpenWireConnection connection, - ScheduledExecutorService scheduledPool) { + OpenWireProtocolManager protocolManager) { this.connInfo = connInfo; this.sessInfo = sessInfo; this.server = server; this.connection = connection; - this.scheduledPool = scheduledPool; + this.protocolManager = protocolManager; + this.scheduledPool = protocolManager.getScheduledPool(); OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller(); this.converter = new OpenWireMessageConverter(marshaller.copy()); @@ -109,7 +113,7 @@ public class AMQSession implements SessionCallback { // now try { - coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext()); + coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext(), protocolManager.getPrefixes()); long sessionId = sessInfo.getSessionId().getValue(); if (sessionId == -1) { @@ -169,7 +173,7 @@ public class AMQSession implements SessionCallback { BindingQueryResult bindingQuery = server.bindingQuery(queueName); QueueQueryResult queueBinding = server.queueQuery(queueName); - boolean isAutoCreate = bindingQuery.isExists() ? true : bindingQuery.isAutoCreateJmsQueues(); + boolean isAutoCreate = bindingQuery.isExists() ? true : bindingQuery.isAutoCreateJmsQueues(); if (!queueBinding.isExists()) { if (isAutoCreate) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 0c1f7dd..aba3634 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -230,7 +230,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St if (stompSession == null) { stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor())); String name = UUIDGenerator.getInstance().generateStringUUID(); - ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true, server.newOperationContext()); + ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes()); stompSession.setServerSession(session); sessions.put(connection.getID(), stompSession); } @@ -243,7 +243,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St if (stompSession == null) { stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor)); String name = UUIDGenerator.getInstance().generateStringUUID(); - ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true, server.newOperationContext()); + ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes()); stompSession.setServerSession(session); transactedSessions.put(txID, stompSession); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java index e09d108..18c87cf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.core.postoffice.impl; -import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.postoffice.BindingType; @@ -118,23 +117,20 @@ public class LocalQueueBinding implements QueueBinding { @Override public void route(final ServerMessage message, final RoutingContext context) throws Exception { - if (isMatchRoutingType(message)) { + if (isMatchRoutingType(context)) { queue.route(message, context); } } @Override public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception { - if (isMatchRoutingType(message)) { + if (isMatchRoutingType(context)) { queue.routeWithAck(message, context); } } - private boolean isMatchRoutingType(ServerMessage message) { - if (message.containsProperty(Message.HDR_ROUTING_TYPE)) { - return message.getByteProperty(Message.HDR_ROUTING_TYPE).equals(queue.getRoutingType().getType()); - } - return true; + private boolean isMatchRoutingType(RoutingContext context) { + return (context.getRoutingType() == null || context.getRoutingType() == queue.getRoutingType()); } public boolean isQueueBinding() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 34e966c..16c3021 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -664,12 +664,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding throw new IllegalStateException("Message cannot be routed more than once"); } - SimpleString address = message.getAddress(); - setPagingStore(message); AtomicBoolean startedTX = new AtomicBoolean(false); + final SimpleString address = message.getAddress(); + applyExpiryDelay(message, address); if (!checkDuplicateID(message, context, rejectDuplicates, startedTX)) { @@ -682,7 +682,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding cleanupInternalPropertiesBeforeRouting(message); } - Bindings bindings = addressManager.getBindingsForRoutingAddress(address); + Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress() == null ? message.getAddress() : context.getAddress()); // TODO auto-create queues here? // first check for the auto-queue creation thing http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index 39a6ac7..1034919 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -153,7 +153,7 @@ public class ActiveMQPacketHandler implements ChannelHandler { OperationContext sessionOperationContext = server.newOperationContext(); - ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext); + ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, protocolManager.getPrefixes()); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel); channel.setHandler(handler); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java index 3fb642a..62b10c4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -29,6 +30,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; @@ -51,6 +53,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQFrameDecoder2; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; @@ -74,6 +77,8 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> { private final CoreProtocolManagerFactory protocolManagerFactory; + private final Map<SimpleString, RoutingType> prefixes = new HashMap<>(); + public CoreProtocolManager(final CoreProtocolManagerFactory factory, final ActiveMQServer server, final List<Interceptor> incomingInterceptors, @@ -189,6 +194,25 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> { return websocketRegistryNames; } + @Override + public void setAnycastPrefix(String anycastPrefix) { + for (String prefix : anycastPrefix.split(",")) { + prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.ANYCAST); + } + } + + @Override + public void setMulticastPrefix(String multicastPrefix) { + for (String prefix : multicastPrefix.split(",")) { + prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.MULTICAST); + } + } + + @Override + public Map<SimpleString, RoutingType> getPrefixes() { + return prefixes; + } + private boolean isArtemis(ActiveMQBuffer buffer) { return buffer.getByte(0) == 'A' && buffer.getByte(1) == 'R' && http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 5fe71bb..cc5e51e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server; import javax.management.MBeanServer; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -197,7 +198,8 @@ public interface ActiveMQServer extends ActiveMQComponent { String defaultAddress, SessionCallback callback, boolean autoCreateQueues, - OperationContext context) throws Exception; + OperationContext context, + Map<SimpleString, RoutingType> prefixes) throws Exception; SecurityStore getSecurityStore(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java index 32baee1..9f2cf4f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java @@ -45,4 +45,12 @@ public interface RoutingContext { void addQueueWithAck(SimpleString address, Queue queue); boolean isAlreadyAcked(SimpleString address, Queue queue); + + void setAddress(SimpleString address); + + void setRoutingType(RoutingType routingType); + + SimpleString getAddress(); + + RoutingType getRoutingType(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index e6b5ad4..9559d74 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -109,7 +109,8 @@ public interface ServerSession extends SecurityAuth { boolean temporary, boolean durable) throws Exception; - /** Create queue with default delivery mode + /** + * Create queue with default delivery mode * * @param address * @param name @@ -143,9 +144,13 @@ public interface ServerSession extends SecurityAuth { boolean durable, boolean autoCreated) throws Exception; - AddressInfo createAddress(final SimpleString address, Set<RoutingType> routingTypes, final boolean autoCreated) throws Exception; + AddressInfo createAddress(final SimpleString address, + Set<RoutingType> routingTypes, + final boolean autoCreated) throws Exception; - AddressInfo createAddress(final SimpleString address, RoutingType routingType, final boolean autoCreated) throws Exception; + AddressInfo createAddress(final SimpleString address, + RoutingType routingType, + final boolean autoCreated) throws Exception; void deleteQueue(SimpleString name) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 742de33..dcb4d45 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1256,7 +1256,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { final String defaultAddress, final SessionCallback callback, final boolean autoCreateQueues, - final OperationContext context) throws Exception { + final OperationContext context, + final Map<SimpleString, RoutingType> prefixes) throws Exception { String validatedUser = ""; if (securityStore != null) { @@ -1269,7 +1270,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { checkSessionLimit(validatedUser); - final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues); + final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes); sessions.put(name, session); @@ -1341,8 +1342,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { String defaultAddress, SessionCallback callback, OperationContext context, - boolean autoCreateJMSQueues) throws Exception { - return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, pagingManager); + boolean autoCreateJMSQueues, + Map<SimpleString, RoutingType> prefixes) throws Exception { + return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, pagingManager, prefixes); } @Override @@ -2495,12 +2497,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { boolean addressAlreadyExists = true; - if (info == null) { - if (autoCreateAddress) { - createAddressInfo(defaultAddressInfo.setAutoCreated(true)); + if (autoCreateAddress) { + if (info == null || !info.getRoutingTypes().contains(routingType)) { + createOrUpdateAddressInfo(defaultAddressInfo.setAutoCreated(true)); addressAlreadyExists = false; - } else { - throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java index fd55521..6fe885b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java @@ -93,7 +93,7 @@ public class DivertImpl implements Divert { ServerMessage copy = null; // Shouldn't copy if it's not routed anywhere else - if (!forwardAddress.equals(message.getAddress())) { + if (!forwardAddress.equals(context.getAddress())) { long id = storageManager.generateID(); copy = message.copy(id); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java index 6e0aa95..0cbb0e2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.transaction.Transaction; public final class RoutingContextImpl implements RoutingContext { @@ -36,6 +37,10 @@ public final class RoutingContextImpl implements RoutingContext { private int queueCount; + private SimpleString address; + + private RoutingType routingType; + public RoutingContextImpl(final Transaction transaction) { this.transaction = transaction; } @@ -77,6 +82,26 @@ public final class RoutingContextImpl implements RoutingContext { } @Override + public void setAddress(SimpleString address) { + this.address = address; + } + + @Override + public void setRoutingType(RoutingType routingType) { + this.routingType = routingType; + } + + @Override + public SimpleString getAddress() { + return address; + } + + @Override + public RoutingType getRoutingType() { + return routingType; + } + + @Override public RouteContextList getContextListing(SimpleString address) { RouteContextList listing = map.get(address); if (listing == null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 347874b..dec6f65 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -67,12 +67,12 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult; -import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; @@ -88,6 +88,7 @@ import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.utils.JsonLoader; +import org.apache.activemq.artemis.utils.PrefixUtil; import org.apache.activemq.artemis.utils.TypedProperties; import org.apache.activemq.artemis.utils.UUID; import org.jboss.logging.Logger; @@ -182,6 +183,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener { // concurrently. private volatile boolean closed = false; + private boolean prefixEnabled = false; + + private Map<SimpleString, RoutingType> prefixes; + public ServerSessionImpl(final String name, final String username, final String password, @@ -203,7 +208,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final SimpleString defaultAddress, final SessionCallback callback, final OperationContext context, - final PagingManager pagingManager) throws Exception { + final PagingManager pagingManager, + final Map<SimpleString, RoutingType> prefixes) throws Exception { this.username = username; this.password = password; @@ -241,6 +247,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener { this.server = server; + this.prefixes = prefixes; + if (this.prefixes != null && !this.prefixes.isEmpty()) { + prefixEnabled = true; + } + this.managementAddress = managementAddress; this.callback = callback; @@ -255,8 +266,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } } - // ServerSession implementation ---------------------------------------------------------------------------- - + // ServerSession implementation --------------------------------------------------------------------------- @Override public void enableSecurity() { this.securityEnabled = true; @@ -387,7 +397,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } } - protected void securityCheck(SimpleString address, CheckType checkType, SecurityAuth auth) throws Exception { + private void securityCheck(SimpleString address, CheckType checkType, SecurityAuth auth) throws Exception { if (securityEnabled) { securityStore.check(address, checkType, auth); } @@ -414,17 +424,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener { throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName); } + SimpleString address = removePrefix(binding.getAddress()); if (browseOnly) { try { - securityCheck(binding.getAddress(), CheckType.BROWSE, this); + securityCheck(address, CheckType.BROWSE, this); } catch (Exception e) { - securityCheck(binding.getAddress().concat(".").concat(queueName), CheckType.BROWSE, this); + securityCheck(address.concat(".").concat(queueName), CheckType.BROWSE, this); } } else { try { - securityCheck(binding.getAddress(), CheckType.CONSUME, this); + securityCheck(address, CheckType.CONSUME, this); } catch (Exception e) { - securityCheck(binding.getAddress().concat(".").concat(queueName), CheckType.CONSUME, this); + securityCheck(address.concat(".").concat(queueName), CheckType.CONSUME, this); } } @@ -436,7 +447,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { if (!browseOnly) { TypedProperties props = new TypedProperties(); - props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress()); + props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address); props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName()); @@ -492,15 +503,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final SimpleString filterString, final boolean temporary, final boolean durable) throws Exception { - return createQueue(address, - name, - ActiveMQDefaultConfiguration.getDefaultRoutingType(), - filterString, - temporary, - durable, - ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), - ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), - false); + return createQueue(address, name, ActiveMQDefaultConfiguration.getDefaultRoutingType(), filterString, temporary, durable, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), false); } @Override @@ -510,14 +513,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final SimpleString filterString, final boolean temporary, final boolean durable) throws Exception { - return createQueue(address, - name, routingType, - filterString, - temporary, - durable, - ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), - ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), - false); + return createQueue(address, name, routingType, filterString, temporary, durable, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), false); } @Override @@ -530,6 +526,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final int maxConsumers, final boolean deleteOnNoConsumers, final boolean autoCreated) throws Exception { + + Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType); + if (durable) { // make sure the user has privileges to create this queue securityCheck(address, CheckType.CREATE_DURABLE_QUEUE, this); @@ -539,7 +538,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { server.checkQueueCreationLimit(getUsername()); - Queue queue = server.createQueue(address, routingType, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, true); + Queue queue = server.createQueue(art.getA(), art.getB(), name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, true); if (temporary) { // Temporary queue in core simply means the queue will be deleted if @@ -577,34 +576,36 @@ public class ServerSessionImpl implements ServerSession, FailureListener { boolean temporary, boolean durable, boolean autoCreated) throws Exception { - return createQueue(address, - name, routingType, - filterString, - temporary, - durable, - ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), - ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), - autoCreated); + return createQueue(address, name, routingType, filterString, temporary, durable, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), autoCreated); } @Override - public AddressInfo createAddress(final SimpleString address, Set<RoutingType> routingTypes, final boolean autoCreated) throws Exception { - securityCheck(address, CheckType.CREATE_ADDRESS, this); - return server.createOrUpdateAddressInfo(new AddressInfo(address, routingTypes).setAutoCreated(autoCreated)); + public AddressInfo createAddress(final SimpleString address, + Set<RoutingType> routingTypes, + final boolean autoCreated) throws Exception { + Pair<SimpleString, Set<RoutingType>> art = getAddressAndRoutingTypes(address, routingTypes); + securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this); + return server.createOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated)); } @Override - public AddressInfo createAddress(final SimpleString address, RoutingType routingType, final boolean autoCreated) throws Exception { - securityCheck(address, CheckType.CREATE_ADDRESS, this); - return server.createOrUpdateAddressInfo(new AddressInfo(address, routingType).setAutoCreated(autoCreated)); + public AddressInfo createAddress(final SimpleString address, + RoutingType routingType, + final boolean autoCreated) throws Exception { + Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType); + securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this); + return server.createOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated)); } @Override - public void createSharedQueue(final SimpleString address, + public void createSharedQueue(SimpleString address, final SimpleString name, final RoutingType routingType, boolean durable, final SimpleString filterString) throws Exception { + + address = removePrefix(address); + securityCheck(address, CheckType.CREATE_NON_DURABLE_QUEUE, this); server.checkQueueCreationLimit(getUsername()); @@ -715,7 +716,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception { - return server.bindingQuery(address); + return server.bindingQuery(removePrefix(address)); } @Override @@ -1317,7 +1318,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser)); } - SimpleString address = message.getAddress(); + SimpleString address = removePrefix(message.getAddress()); if (defaultAddress == null && address != null) { defaultAddress = address; @@ -1338,12 +1339,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener { logger.trace("send(message=" + message + ", direct=" + direct + ") being called"); } - if (message.getAddress() == null) { + if (address == null) { // This could happen with some tests that are ignoring messages throw ActiveMQMessageBundle.BUNDLE.noAddress(); } - if (message.getAddress().equals(managementAddress)) { + if (address.equals(managementAddress)) { // It's a management message handleManagementMessage(tx, message, direct); @@ -1381,8 +1382,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override - public void requestProducerCredits(final SimpleString address, final int credits) throws Exception { - PagingStore store = server.getPagingManager().getPageStore(address); + public void requestProducerCredits(SimpleString address, final int credits) throws Exception { + final SimpleString addr = removePrefix(address); + PagingStore store = server.getPagingManager().getPageStore(addr); if (!store.checkMemory(new Runnable() { @Override @@ -1572,7 +1574,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final ServerMessage message, final boolean direct) throws Exception { try { - securityCheck(message.getAddress(), CheckType.MANAGE, this); + securityCheck(removePrefix(message.getAddress()), CheckType.MANAGE, this); } catch (ActiveMQException e) { if (!autoCommitSends) { tx.markAsRollbackOnly(e); @@ -1655,9 +1657,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final boolean direct, final boolean noAutoCreateQueue) throws Exception { RoutingStatus result = RoutingStatus.OK; + + Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddress(), null); + + // Consumer // check the user has write access to this address. try { - securityCheck(msg.getAddress(), CheckType.SEND, this); + securityCheck(art.getA(), CheckType.SEND, this); } catch (ActiveMQException e) { if (!autoCommitSends && tx != null) { tx.markAsRollbackOnly(e); @@ -1671,6 +1677,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } try { + routingContext.setAddress(art.getA()); + routingContext.setRoutingType(art.getB()); + result = postOffice.route(msg, routingContext, direct); Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress()); @@ -1701,4 +1710,27 @@ public class ServerSessionImpl implements ServerSession, FailureListener { return Collections.emptyList(); } } + + private SimpleString removePrefix(SimpleString address) { + if (prefixEnabled) { + return PrefixUtil.getAddress(address, prefixes); + } + return address; + } + + private Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address, + RoutingType defaultRoutingType) { + if (prefixEnabled) { + return PrefixUtil.getAddressAndRoutingType(address, defaultRoutingType, prefixes); + } + return new Pair<>(address, defaultRoutingType); + } + + private Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address, + Set<RoutingType> defaultRoutingTypes) { + if (prefixEnabled) { + return PrefixUtil.getAddressAndRoutingTypes(address, defaultRoutingTypes, prefixes); + } + return new Pair<>(address, defaultRoutingTypes); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java index 6ce2518..baf021e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java @@ -19,13 +19,19 @@ package org.apache.activemq.artemis.spi.core.protocol; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.activemq.artemis.api.core.BaseInterceptor; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.RoutingType; public abstract class AbstractProtocolManager<P, I extends BaseInterceptor<P>, C extends RemotingConnection> implements ProtocolManager<I> { + private final Map<SimpleString, RoutingType> prefixes = new HashMap<>(); + protected void invokeInterceptors(final List<I> interceptors, final P message, final C connection) { if (interceptors != null && !interceptors.isEmpty()) { for (I interceptor : interceptors) { @@ -39,4 +45,23 @@ public abstract class AbstractProtocolManager<P, I extends BaseInterceptor<P>, C } } } + + @Override + public void setAnycastPrefix(String anycastPrefix) { + for (String prefix : anycastPrefix.split(",")) { + prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.ANYCAST); + } + } + + @Override + public void setMulticastPrefix(String multicastPrefix) { + for (String prefix : multicastPrefix.split(",")) { + prefixes.put(SimpleString.toSimpleString(prefix), RoutingType.MULTICAST); + } + } + + @Override + public Map<SimpleString, RoutingType> getPrefixes() { + return prefixes; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java index a7e50d5..b2a0265 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java @@ -17,11 +17,14 @@ package org.apache.activemq.artemis.spi.core.protocol; import java.util.List; +import java.util.Map; import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.BaseInterceptor; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; @@ -71,4 +74,10 @@ public interface ProtocolManager<P extends BaseInterceptor> { * @return A list of subprotocol ids */ List<String> websocketSubprotocolIdentifiers(); + + void setAnycastPrefix(String anycastPrefix); + + void setMulticastPrefix(String multicastPrefix); + + Map<SimpleString, RoutingType> getPrefixes(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java index 3a6c404..1f5ef15 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/CoreClientTest.java @@ -16,11 +16,16 @@ */ package org.apache.activemq.artemis.tests.integration.client; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.hazelcast.util.UuidUtil; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -30,8 +35,11 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -143,4 +151,95 @@ public class CoreClientTest extends ActiveMQTestBase { sf.close(); } + + @Test + public void testCoreClientPrefixes() throws Exception { + + Configuration configuration = createBasicConfig(); + configuration.clearAcceptorConfigurations(); + configuration.addAddressesSetting("#", new AddressSettings().setMaxSizeBytes(10 * 1024 * 1024).setPageSizeBytes(1024 * 1024)); + + String baseAddress = "foo"; + + List<String> anycastPrefixes = new ArrayList<>(); + anycastPrefixes.add("anycast://"); + anycastPrefixes.add("queue://"); + anycastPrefixes.add("jms.queue."); + + List<String> multicastPrefixes = new ArrayList<>(); + multicastPrefixes.add("multicast://"); + multicastPrefixes.add("topic://"); + multicastPrefixes.add("jms.topic."); + + String locatorString = "tcp://localhost:5445"; + StringBuilder acceptor = new StringBuilder(locatorString + "?PROTOCOLS=CORE;anycastPrefix="); + for (String prefix : anycastPrefixes) { + acceptor.append(prefix + ","); + } + acceptor.append(";multicastPrefix="); + for (String prefix : multicastPrefixes) { + acceptor.append(prefix + ","); + } + + configuration.addAcceptorConfiguration("prefix", acceptor.toString()); + + ActiveMQServer server = createServer(configuration); + server.start(); + + ServerLocator locator = ServerLocatorImpl.newLocator(locatorString); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true); + + Map<String, ClientConsumer> consumerMap = new HashMap<>(); + + for (String prefix : anycastPrefixes) { + String queueName = UuidUtil.buildRandomUuidString(); + String address = prefix + baseAddress; + + session.createQueue(prefix + baseAddress, null, queueName, null, false); + consumerMap.put(address, session.createConsumer(queueName)); + } + + for (String prefix : multicastPrefixes) { + String queueName = UuidUtil.buildRandomUuidString(); + String address = prefix + baseAddress; + + session.createQueue(prefix + baseAddress, null, queueName, null, false); + consumerMap.put(address, session.createConsumer(queueName)); + } + + session.start(); + + final int numMessages = 3; + + for (String prefix : anycastPrefixes) { + ClientProducer producer = session.createProducer(prefix + baseAddress); + for (int i = 0; i < numMessages; i++) { + ClientMessage message = session.createMessage(ActiveMQTextMessage.TYPE, false, 0, System.currentTimeMillis(), (byte) 1); + message.getBodyBuffer().writeString("testINVMCoreClient"); + producer.send(message); + } + + // Ensure that messages are load balanced across all queues + + for (String queuePrefix : anycastPrefixes) { + ClientConsumer consumer = consumerMap.get(queuePrefix + baseAddress); + for (int i = 0; i < numMessages / anycastPrefixes.size(); i++) { + ClientMessage message = consumer.receive(1000); + assertNotNull(message); + message.acknowledge(); + } + assertNull(consumer.receive(1000)); + } + + for (String multicastPrefix : multicastPrefixes) { + ClientConsumer consumer = consumerMap.get(multicastPrefix + baseAddress); + assertNull(consumer.receive(100)); + } + } + + sf.close(); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 7932dc8..00f296e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.client; import javax.management.MBeanServer; import java.lang.management.ManagementFactory; import java.util.LinkedList; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -593,8 +594,9 @@ public class HangConsumerTest extends ActiveMQTestBase { String defaultAddress, SessionCallback callback, OperationContext context, - boolean autoCreateQueue) throws Exception { - return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, getPagingManager()); + boolean autoCreateQueue, + Map<SimpleString, RoutingType> prefixes) throws Exception { + return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, getPagingManager(), prefixes); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8f532cc2/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index d272c02..918ff41 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -179,14 +179,6 @@ public class FakePostOffice implements PostOffice { @Override public RoutingStatus route(ServerMessage message, - RoutingContext context, - boolean direct) throws Exception { - return RoutingStatus.OK; - - } - - @Override - public RoutingStatus route(ServerMessage message, Transaction tx, boolean direct) throws Exception { return RoutingStatus.OK; @@ -194,19 +186,23 @@ public class FakePostOffice implements PostOffice { @Override public RoutingStatus route(ServerMessage message, - RoutingContext context, + Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception { return RoutingStatus.OK; + } + @Override + public RoutingStatus route(ServerMessage message, RoutingContext context, boolean direct) throws Exception { + return null; } @Override public RoutingStatus route(ServerMessage message, - Transaction tx, + RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception { - return RoutingStatus.OK; + return null; } @Override
