Fix OpenWire queue auto-creation failure
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fccea4aa Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fccea4aa Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fccea4aa Branch: refs/heads/ARTEMIS-780 Commit: fccea4aa45e5f32f9baca1126fa492ed73c4967d Parents: 493e999 Author: Howard Gao <[email protected]> Authored: Fri Nov 11 19:40:36 2016 +0800 Committer: jbertram <[email protected]> Committed: Wed Nov 23 09:04:34 2016 -0600 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 19 ++++++++-- .../core/protocol/openwire/amq/AMQConsumer.java | 2 +- .../core/protocol/openwire/amq/AMQSession.java | 40 ++++++++++++++++---- .../protocol/openwire/util/OpenWireUtil.java | 17 --------- 4 files changed, 48 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fccea4aa/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 8dc0b34..cdc62fd 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -73,6 +73,7 @@ import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -177,6 +178,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private boolean useKeepAlive; private long maxInactivityDuration; + private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>(); + // TODO-NOW: check on why there are two connections created for every createConnection on the client. public OpenWireConnection(Connection connection, ActiveMQServer server, @@ -707,7 +710,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public void addDestination(DestinationInfo info) throws Exception { ActiveMQDestination dest = info.getDestination(); if (dest.isQueue()) { - SimpleString qName = OpenWireUtil.toCoreAddress(dest); + SimpleString qName = new SimpleString(dest.getPhysicalName()); QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName); if (binding == null) { if (dest.isTemporary()) { @@ -789,6 +792,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se checkInactivity(); } + public void addKnownDestination(final SimpleString address) { + knownDestinations.add(address); + } + + public boolean containsKnownDestination(final SimpleString address) { + return knownDestinations.contains(address); + } + class SlowConsumerDetection implements SlowConsumerDetectionListener { @Override @@ -845,7 +856,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public void removeDestination(ActiveMQDestination dest) throws Exception { if (dest.isQueue()) { try { - server.destroyQueue(OpenWireUtil.toCoreAddress(dest)); + server.destroyQueue(new SimpleString(dest.getPhysicalName())); } catch (ActiveMQNonExistentQueueException neq) { //this is ok, ActiveMQ 5 allows this and will actually do it quite often ActiveMQServerLogger.LOGGER.debug("queue never existed"); @@ -853,7 +864,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } else { - Bindings bindings = server.getPostOffice().getBindingsForAddress(OpenWireUtil.toCoreAddress(dest)); + Bindings bindings = server.getPostOffice().getBindingsForAddress(new SimpleString(dest.getPhysicalName())); for (Binding binding : bindings.getBindings()) { Queue b = (Queue) binding.getBindable(); @@ -883,7 +894,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se */ private void validateDestination(ActiveMQDestination destination) throws Exception { if (destination.isQueue()) { - SimpleString physicalName = OpenWireUtil.toCoreAddress(destination); + SimpleString physicalName = new SimpleString(destination.getPhysicalName()); BindingQueryResult result = server.bindingQuery(physicalName); if (!result.isExists() && !result.isAutoCreateJmsQueues()) { throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fccea4aa/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 5603cb8..2f05f45 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -95,7 +95,7 @@ public class AMQConsumer { serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); } else { - SimpleString queueName = OpenWireUtil.toCoreAddress(openwireDestination); + SimpleString queueName = new SimpleString(openwireDestination.getPhysicalName()); try { session.getCoreServer().createQueue(queueName, queueName, null, true, false); } catch (ActiveMQQueueExistsException e) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fccea4aa/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 5cab686..35fd733 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -23,16 +23,16 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; -import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; @@ -145,11 +145,10 @@ public class AMQSession implements SessionCallback { for (ActiveMQDestination openWireDest : dests) { if (openWireDest.isQueue()) { - SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest); - try { - getCoreServer().createQueue(queueName, queueName, null, true, false); - } catch (ActiveMQQueueExistsException e) { - // ignore + SimpleString queueName = new SimpleString(openWireDest.getPhysicalName()); + + if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) { + throw new InvalidDestinationException("Destination doesn't exist: " + queueName); } } AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool); @@ -162,6 +161,27 @@ public class AMQSession implements SessionCallback { return consumersList; } + private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary) throws Exception { + boolean hasQueue = true; + if (!connection.containsKnownDestination(queueName)) { + + BindingQueryResult bindingQuery = server.bindingQuery(queueName); + QueueQueryResult queueBinding = server.queueQuery(queueName); + + boolean isAutoCreate = bindingQuery.isExists() ? bindingQuery.isAutoCreateJmsQueues() : true; + + if (!queueBinding.isExists()) { + if (isAutoCreate) { + server.createQueue(queueName, queueName, null, true, isTemporary); + connection.addKnownDestination(queueName); + } else { + hasQueue = false; + } + } + } + return hasQueue; + } + public void start() { coreSession.start(); @@ -338,7 +358,7 @@ public class AMQSession implements SessionCallback { // We fillup addresses, pagingStores and we will throw failure if that's the case for (int i = 0; i < actualDestinations.length; i++) { ActiveMQDestination dest = actualDestinations[i]; - addresses[i] = OpenWireUtil.toCoreAddress(dest); + addresses[i] = new SimpleString(dest.getPhysicalName()); pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]); if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) { throw new ResourceAllocationException("Queue is full"); @@ -357,6 +377,10 @@ public class AMQSession implements SessionCallback { connection.getTransportConnection().setAutoRead(false); } + if (actualDestinations[i].isQueue()) { + checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary()); + } + RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary()); if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fccea4aa/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java index a6e7292..04bd6a3 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.openwire.util; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.command.ActiveMQDestination; @@ -37,22 +36,6 @@ public class OpenWireUtil { return buffer; } - public static SimpleString toCoreAddress(ActiveMQDestination dest) { - if (dest.isQueue()) { - if (dest.isTemporary()) { - return new SimpleString(dest.getPhysicalName()); - } else { - return new SimpleString(dest.getPhysicalName()); - } - } else { - if (dest.isTemporary()) { - return new SimpleString(dest.getPhysicalName()); - } else { - return new SimpleString(dest.getPhysicalName()); - } - } - } - /** * We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the * destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was
