Repository: activemq-artemis Updated Branches: refs/heads/master 5c2144b78 -> c48d96893
ARTEMIS-1246 Fixing compatibility issue It fixes compatibility issues with JMS Core clients using the old address model, allowing the client to query JMS temporary queues too. you would eventually see this issue when using older clients: AMQ119019: Queue already exists Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a8356fb0 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a8356fb0 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a8356fb0 Branch: refs/heads/master Commit: a8356fb057eb45f3a560e3841ce3fec220155153 Parents: 5c2144b Author: Francesco Nigro <[email protected]> Authored: Wed Jun 21 14:42:34 2017 +0200 Committer: Clebert Suconic <[email protected]> Committed: Tue Jun 27 14:23:36 2017 -0400 ---------------------------------------------------------------------- .../impl/wireformat/QueueAbstractPacket.java | 51 ++++++++++++++++++++ .../core/ServerSessionPacketHandler.java | 16 +++--- 2 files changed, 60 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a8356fb0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java index 57b72cd..767cd0c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java @@ -17,6 +17,10 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; @@ -54,6 +58,53 @@ public abstract class QueueAbstractPacket extends PacketImpl { } } + /** + * It converts the given {@code queueNames} using the JMS prefix found on {@link #address} when {@code clientVersion < }{@link #ADDRESSING_CHANGE_VERSION}. + * If no conversion has occurred, it returns {@code queueNames}. + * + * @param clientVersion version of the client + * @param queueNames names of the queues to be converted + * @return the converted queues names or {@code queueNames} when no conversion has occurred + */ + public final List<SimpleString> convertQueueNames(int clientVersion, List<SimpleString> queueNames) { + if (clientVersion < ADDRESSING_CHANGE_VERSION) { + return applyAddressPrefixTo(queueNames); + } else { + return queueNames; + } + } + + private List<SimpleString> applyAddressPrefixTo(List<SimpleString> queueNames) { + final int names = queueNames.size(); + if (names == 0) { + return Collections.emptyList(); + } else { + final SimpleString address = this.address; + final SimpleString prefix = jmsPrefixOf(address); + if (prefix != null) { + final List<SimpleString> prefixedQueueNames = new ArrayList<>(names); + for (int i = 0; i < names; i++) { + final SimpleString oldQueueNames = queueNames.get(i); + final SimpleString prefixedQueueName = prefix.concat(oldQueueNames); + prefixedQueueNames.add(prefixedQueueName); + } + return prefixedQueueNames; + } else { + return queueNames; + } + } + } + + private static SimpleString jmsPrefixOf(SimpleString address) { + if (address.startsWith(OLD_QUEUE_PREFIX)) { + return OLD_QUEUE_PREFIX; + } else if (address.startsWith(OLD_TOPIC_PREFIX)) { + return OLD_TOPIC_PREFIX; + } else { + return null; + } + } + public QueueAbstractPacket(byte type) { super(type); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a8356fb0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 628312a..8e3c3ed 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.core; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; -import java.util.ArrayList; import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -325,18 +324,21 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_BINDINGQUERY: { requiresResponse = true; SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet; - BindingQueryResult result = session.executeBindingQuery(request.getAddress(remotingConnection.getClientVersion())); + final int clientVersion = remotingConnection.getClientVersion(); + BindingQueryResult result = session.executeBindingQuery(request.getAddress(clientVersion)); /* if the session is JMS and it's from an older client then we need to add the old prefix to the queue * names otherwise the older client won't realize the queue exists and will try to create it and receive * an error */ - if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { - List<SimpleString> queueNames = new ArrayList<>(); - for (SimpleString queueName : result.getQueueNames()) { - queueNames.add(PacketImpl.OLD_QUEUE_PREFIX.concat(queueName)); + if (clientVersion < PacketImpl.ADDRESSING_CHANGE_VERSION && session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null) { + final List<SimpleString> queueNames = result.getQueueNames(); + if (!queueNames.isEmpty()) { + final List<SimpleString> convertedQueueNames = request.convertQueueNames(clientVersion, queueNames); + if (convertedQueueNames != queueNames) { + result = new BindingQueryResult(result.isExists(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers()); + } } - result = new BindingQueryResult(result.isExists(), queueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers()); } if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) {
