Repository: activemq-artemis Updated Branches: refs/heads/ARTEMIS-780 96b939b3b -> b59ddaa61
add consuming support to AMQP for new addressing schema Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b59ddaa6 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b59ddaa6 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b59ddaa6 Branch: refs/heads/ARTEMIS-780 Commit: b59ddaa61ef8cff891bf4032e2cf89f4a3cdadc8 Parents: 96b939b Author: Andy Taylor <[email protected]> Authored: Sat Dec 3 09:03:43 2016 +0000 Committer: Andy Taylor <[email protected]> Committed: Sat Dec 3 09:06:46 2016 +0000 ---------------------------------------------------------------------- .../amqp/broker/AMQPSessionCallback.java | 23 ++- .../proton/ProtonServerReceiverContext.java | 3 +- .../amqp/proton/ProtonServerSenderContext.java | 115 ++++++++--- .../artemis/core/postoffice/AddressManager.java | 6 + .../artemis/core/postoffice/PostOffice.java | 7 +- .../core/postoffice/impl/CompositeAddress.java | 53 +++++ .../core/postoffice/impl/PostOfficeImpl.java | 11 + .../postoffice/impl/SimpleAddressManager.java | 27 +++ .../artemis/core/server/ServerSession.java | 6 + .../core/server/impl/ServerSessionImpl.java | 15 ++ .../transport/amqp/client/AmqpSession.java | 2 +- .../integration/amqp/AmqpClientTestSupport.java | 9 + .../amqp/AmqpDurableReceiverTest.java | 2 +- .../amqp/AmqpTempDestinationTest.java | 2 +- .../integration/amqp/AmqpTransactionTest.java | 5 - .../amqp/BrokerDefinedAnycastConsumerTest.java | 201 +++++++++++++++++++ .../BrokerDefinedMulticastConsumerTest.java | 112 +++++++++++ .../tests/integration/amqp/ProtonTest.java | 58 ++++-- .../amqp/SendingAndReceivingTest.java | 4 + .../core/server/impl/fakes/FakePostOffice.java | 12 ++ 20 files changed, 607 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 6382cb2..5f5569f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -31,9 +31,11 @@ import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; @@ -192,12 +194,12 @@ public class AMQPSessionCallback implements SessionCallback { serverConsumer.receiveCredits(-1); } - public void createTemporaryQueue(String queueName) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false); + public void createTemporaryQueue(String queueName, RoutingType routingType) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false); } - public void createTemporaryQueue(String address, String queueName, String filter) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), true, false); + public void createTemporaryQueue(String address, String queueName, RoutingType routingType, String filter) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false); } public void createDurableQueue(String address, String queueName, String filter) throws Exception { @@ -521,4 +523,17 @@ public class AMQPSessionCallback implements SessionCallback { protonSPI.removeTransaction(txid); } + + public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception { + return serverSession.getMatchingQueue(address, routingType); + } + + + public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception { + return serverSession.getMatchingQueue(address, queueName, routingType); + } + + public AddressInfo getAddress(SimpleString address) { + return serverSession.getAddress(address); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 41caea9..515acc3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; @@ -83,7 +84,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements address = sessionSPI.tempQueueName(); try { - sessionSPI.createTemporaryQueue(address); + sessionSPI.createTemporaryQueue(address, RoutingType.ANYCAST); } catch (Exception e) { throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index ef075fc..b998b25 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -19,9 +19,13 @@ package org.apache.activemq.artemis.protocol.amqp.proton; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Set; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; @@ -65,6 +69,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private static final Symbol COPY = Symbol.valueOf("copy"); private static final Symbol TOPIC = Symbol.valueOf("topic"); + private static final Symbol QUEUE = Symbol.valueOf("queue"); private Object brokerConsumer; @@ -73,6 +78,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr protected final AMQPConnectionContext connection; protected boolean closed = false; protected final AMQPSessionCallback sessionSPI; + private boolean multicast; protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0); public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) { @@ -126,7 +132,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr super.initialise(); Source source = (Source) sender.getRemoteSource(); - String queue; + String queue = null; String selector = null; final Map<Symbol, Object> supportedFilters = new HashMap<>(); @@ -148,25 +154,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } - // if we have a capability for a Topic (AMQP -> JMS Mapping) or we are configured on this - // address to act like a topic then act like a subscription. - boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source); - - if (isPubSub) { - Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS); - if (filter != null) { - String remoteContainerId = sender.getSession().getConnection().getRemoteContainer(); - String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'"; - if (selector != null) { - selector += " AND " + noLocalFilter; - } else { - selector = noLocalFilter; - } - - supportedFilters.put(filter.getKey(), filter.getValue()); - } - } - if (source == null) { // Attempt to recover a previous subscription happens when a link reattach happens on a // subscription queue @@ -222,19 +209,77 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // node is temporary and will be deleted on closing of the session queue = java.util.UUID.randomUUID().toString(); try { - sessionSPI.createTemporaryQueue(queue); + sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST); // protonSession.getServerSession().createQueue(queue, queue, null, true, false); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); } source.setAddress(queue); } else { + SimpleString addressToUse; + SimpleString queueNameToUse = null; + //find out if we have an address made up of the address and queue name, if yes then set queue name + if (CompositeAddress.isFullyQualified(source.getAddress())) { + CompositeAddress compositeAddress = CompositeAddress.getQueueName(source.getAddress()); + addressToUse = new SimpleString(compositeAddress.getAddress()); + queueNameToUse = new SimpleString(compositeAddress.getQueueName()); + } + else { + addressToUse = new SimpleString(source.getAddress()); + } + //check to see if the client has defined how we act + boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source); + if (clientDefined) { + multicast = hasCapabilities(TOPIC, source); + AddressInfo addressInfo = sessionSPI.getAddress(addressToUse); + Set<RoutingType> routingTypes = addressInfo.getRoutingTypes(); + //if the client defines 1 routing type and the broker another then throw an exception + if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) { + throw new ActiveMQAMQPIllegalStateException("Address is not configured for topic support"); + } + else if (!multicast && !routingTypes.contains(RoutingType.ANYCAST)) { + throw new ActiveMQAMQPIllegalStateException("Address is not configured for queue support"); + } + } + else { + //if not we look up the address + AddressInfo addressInfo = sessionSPI.getAddress(addressToUse); + Set<RoutingType> routingTypes = addressInfo.getRoutingTypes(); + if (routingTypes.contains(RoutingType.MULTICAST) && routingTypes.size() == 1) { + multicast = true; + } + else { + multicast = false; + } + } // if not dynamic then we use the target's address as the address to forward the // messages to, however there has to be a queue bound to it so we need to check this. - if (isPubSub) { + if (multicast) { + Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS); + if (filter != null) { + String remoteContainerId = sender.getSession().getConnection().getRemoteContainer(); + String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'"; + if (selector != null) { + selector += " AND " + noLocalFilter; + } else { + selector = noLocalFilter; + } + + supportedFilters.put(filter.getKey(), filter.getValue()); + } + + + if (queueNameToUse != null) { + SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST ); + queue = matchingAnycastQueue.toString(); + } + //if the address specifies a broker configured queue then we always use this, treat it as a queue + if (queue != null) { + multicast = false; + } // if we are a subscription and durable create a durable queue using the container // id and link name - if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) { + else if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) { String clientId = getClientId(); String pubId = sender.getName(); queue = createQueueName(clientId, pubId); @@ -260,13 +305,21 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // otherwise we are a volatile subscription queue = java.util.UUID.randomUUID().toString(); try { - sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector); + sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); } } } else { - queue = source.getAddress(); + if (queueNameToUse != null) { + SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.ANYCAST); + queue = matchingAnycastQueue.toString(); + } + else { + SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST); + queue = matchingAnycastQueue.toString(); + } + } if (queue == null) { @@ -274,7 +327,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } try { - if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) { + if (!sessionSPI.queueQuery(queue, !multicast).isExists()) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); } } catch (ActiveMQAMQPNotFoundException e) { @@ -290,9 +343,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // have not honored what it asked for. source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters); - boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); + boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); try { - brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly); + brokerConsumer = sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage()); } @@ -302,10 +355,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return connection.getRemoteContainer(); } - private boolean isPubSub(Source source) { - String pubSubPrefix = sessionSPI.getPubSubPrefix(); - return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix); - } /* * close the session @@ -338,7 +387,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // any durable resources for say pub subs if (remoteLinkClose) { Source source = (Source) sender.getSource(); - if (source != null && source.getAddress() != null && (hasCapabilities(TOPIC, source) || isPubSub(source))) { + if (source != null && source.getAddress() != null && (hasCapabilities(TOPIC, source) || multicast)) { String queueName = source.getAddress(); QueueQueryResult result = sessionSPI.queueQuery(queueName, false); if (result.isExists() && source.getDynamic()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java index 1cf1a07..ada1d77 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Set; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -44,6 +45,10 @@ public interface AddressManager { Bindings getMatchingBindings(SimpleString address) throws Exception; + SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception; + + SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception; + void clear(); Binding getBinding(SimpleString queueName); @@ -59,4 +64,5 @@ public interface AddressManager { AddressInfo removeAddressInfo(SimpleString address); AddressInfo getAddressInfo(SimpleString address); + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index 48ec7db..dc5f4b4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -79,6 +80,10 @@ public interface PostOffice extends ActiveMQComponent { Map<SimpleString, Binding> getAllBindings(); + SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception; + + SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception; + RoutingStatus route(ServerMessage message, boolean direct) throws Exception; RoutingStatus route(ServerMessage message, @@ -119,6 +124,4 @@ public interface PostOffice extends ActiveMQComponent { boolean isAddressBound(final SimpleString address) throws Exception; Set<SimpleString> getAddresses(); - - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java new file mode 100644 index 0000000..bc12fb7 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + + +import org.apache.activemq.artemis.api.core.SimpleString; + +public class CompositeAddress { + + public static String SEPARATOR ="::"; + private final String address; + private final String queueName; + + public String getAddress() { + return address; + } + + public String getQueueName() { + return queueName; + } + + public CompositeAddress(String address, String queueName) { + + this.address = address; + this.queueName = queueName; + } + + public static boolean isFullyQualified(String address) { + return address.toString().contains(SEPARATOR); + } + + public static CompositeAddress getQueueName(String address) { + String[] split = address.split(SEPARATOR); + if(split.length <= 0) { + throw new IllegalStateException("Nott A Fully Qualified Name"); + } + return new CompositeAddress(split[0], split[1]); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 9bd69d1..34e966c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -67,6 +67,7 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -866,6 +867,16 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override + public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception { + return addressManager.getMatchingQueue(address, routingType); + } + + @Override + public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception { + return addressManager.getMatchingQueue(address, queueName, routingType); + } + + @Override public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception { // We send direct to the queue so we can send it to the same queue that is bound to the notifications address - // this is crucial for ensuring http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index 8db4f6f..e39626f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -118,6 +118,33 @@ public class SimpleAddressManager implements AddressManager { return bindings; } + public SimpleString getMatchingQueue(final SimpleString address, RoutingType routingType) throws Exception { + + Binding binding = nameMap.get(address); + + if (binding == null || !(binding instanceof LocalQueueBinding) + || !binding.getAddress().equals(address)) { + Bindings bindings = mappings.get(address); + for (Binding theBinding : bindings.getBindings()) { + if (theBinding instanceof LocalQueueBinding) { + binding = theBinding; + break; + } + } + } + + return binding != null ? binding.getUniqueName() : null; + } + + public SimpleString getMatchingQueue(final SimpleString address, final SimpleString queueName, RoutingType routingType) throws Exception { + Binding binding = nameMap.get(queueName); + + if (binding != null && !binding.getAddress().equals(address)) { + throw new IllegalStateException("queue belongs to address" + binding.getAddress()); + } + return binding != null ? binding.getUniqueName() : null; + } + @Override public void clear() { nameMap.clear(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index badadf4..a92786a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -237,4 +237,10 @@ public interface ServerSession extends SecurityAuth { List<MessageReference> getInTXMessagesForConsumer(long consumerId); String getValidatedUser(); + + SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception; + + SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception; + + AddressInfo getAddress(SimpleString address); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 89d110e..bee7d73 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1471,6 +1471,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override + public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception { + return server.getPostOffice().getMatchingQueue(address, routingType); + } + + @Override + public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception { + return server.getPostOffice().getMatchingQueue(address, queueName, routingType); + } + + @Override + public AddressInfo getAddress(SimpleString address) { + return server.getPostOffice().getAddressInfo(address); + } + + @Override public String toString() { StringBuffer buffer = new StringBuffer(); if (this.metaData != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index fc3fdf7..6ed7ed8 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -271,7 +271,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> { checkClosed(); final ClientFuture request = new ClientFuture(); - final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId()); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId); connection.getScheduler().execute(new Runnable() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index 1e12d4c..6f373e5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -22,8 +22,11 @@ import java.util.LinkedList; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.server.JMSServerManager; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; @@ -86,6 +89,12 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { ActiveMQServer server = createServer(true, true); serverManager = new JMSServerManagerImpl(server); Configuration serverConfig = server.getConfiguration(); + CoreAddressConfiguration address = new CoreAddressConfiguration(); + address.setName(getTestName()).getRoutingTypes().add(RoutingType.ANYCAST); + CoreQueueConfiguration queueConfig = new CoreQueueConfiguration(); + queueConfig.setName(getTestName()).setAddress(getTestName()).setRoutingType(RoutingType.ANYCAST); + address.getQueueConfigurations().add(queueConfig); + serverConfig.addAddressConfiguration(address); serverConfig.getAddressesSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ"))); serverConfig.setSecurityEnabled(false); serverManager.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java index abc422b..1ff74ed 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java @@ -371,6 +371,6 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { } public String getTopicName() { - return "topic://myTopic"; + return "myTopic"; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java index c599f38..d7874e3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java @@ -111,7 +111,7 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport { sender.close(); - Thread.sleep(200); + Thread.sleep(10000); queueView = getProxyToQueue(remoteTarget.getAddress()); assertNull(queueView); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java index e42a718..627a3e4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -36,11 +36,6 @@ import org.junit.Test; */ public class AmqpTransactionTest extends AmqpClientTestSupport { - @Before - public void createQueue() throws Exception { - server.createQueue(SimpleString.toSimpleString(getTestName()), SimpleString.toSimpleString(getTestName()), null, true, false); - } - @Test(timeout = 30000) public void testBeginAndCommitTransaction() throws Exception { AmqpClient client = createAmqpClient(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java new file mode 100644 index 0000000..38bb97a --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY; +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY; + + +public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { + + SimpleString address = new SimpleString("testAddress"); + SimpleString queue1 = new SimpleString("queue1"); + SimpleString queue2 = new SimpleString("queue2"); + + @Test(timeout = 60000) + public void testConsumeFromSingleQueueOnAddressSameName() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server.createQueue(address, RoutingType.ANYCAST, address, null, true, false); + + sendMessages(1, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(address.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount()); + + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeFromSingleQueueOnAddressSameNameMultipleQueues() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); + server.createQueue(address, RoutingType.ANYCAST, address, null, true, false); + + sendMessages(2, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(address.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount()); + assertEquals(0, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount()); + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeFromSingleQueueOnAddressDifferentName() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); + + sendMessages(1, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(address.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount()); + + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeFromSingleQueueOnAddressDifferentNameMultipleQueues() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); + server.createQueue(address, RoutingType.ANYCAST, queue2, null, true, false); + + sendMessages(1, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(address.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount()); + assertEquals(0, ((QueueImpl)server.getPostOffice().getBinding(queue2).getBindable()).getConsumerCount()); + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeFromSingleQualifiedQueueOnAddressSameName() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server.createQueue(address, RoutingType.ANYCAST, queue1, null, true, false); + + sendMessages(1, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(address.toString() + "::" + queue1.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(queue1).getBindable()).getConsumerCount()); + + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeWhenOnlyMulticast() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST)); + + sendMessages(1, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + AmqpSession session = connection.createSession(); + Source jmsSource = createJmsSource(false); + jmsSource.setAddress(address.toString()); + try { + session.createReceiver(jmsSource); + fail("should throw exception"); + } catch (Exception e) {//ignore + } + connection.close(); + } + + private void sendMessages(int numMessages, String address) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(address); + for (int i = 0; i < numMessages; i++) { + AmqpMessage message = new AmqpMessage(); + message.setText("message-" + i); + sender.send(message); + } + sender.close(); + connection.connect(); + } + + protected Source createJmsSource(boolean topic) { + + Source source = new Source(); + // Set the capability to indicate the node type being created + if (!topic) { + source.setCapabilities(QUEUE_CAPABILITY); + } else { + source.setCapabilities(TOPIC_CAPABILITY); + } + + return source; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java new file mode 100644 index 0000000..d5be5f2 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY; +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY; + + +public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport { + + SimpleString address = new SimpleString("testAddress"); + SimpleString queue1 = new SimpleString("queue1"); + SimpleString queue2 = new SimpleString("queue2"); + + @Test(timeout = 60000) + public void testConsumeFromSingleQueueOnAddressSameName() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.MULTICAST)); + server.createQueue(address, RoutingType.MULTICAST, address, null, true, false); + + sendMessages(1, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(address.toString() + "::" + address.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount()); + + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeWhenOnlyAnyicast() throws Exception { + server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + + sendMessages(1, address.toString()); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + AmqpSession session = connection.createSession(); + Source jmsSource = createJmsSource(true); + jmsSource.setAddress(address.toString()); + try { + session.createReceiver(jmsSource); + fail("should throw exception"); + } catch (Exception e) {//ignore + } + connection.close(); + } + + private void sendMessages(int numMessages, String address) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(address); + for (int i = 0; i < numMessages; i++) { + AmqpMessage message = new AmqpMessage(); + message.setText("message-" + i); + sender.send(message); + } + sender.close(); + connection.connect(); + } + + protected Source createJmsSource(boolean topic) { + + Source source = new Source(); + // Set the capability to indicate the node type being created + if (!topic) { + source.setCapabilities(QUEUE_CAPABILITY); + } else { + source.setCapabilities(TOPIC_CAPABILITY); + } + + return source; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index b017c31..3103af0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -70,6 +70,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory; @@ -151,20 +153,31 @@ public class ProtonTest extends ProtonTestBase { @Before public void setUp() throws Exception { super.setUp(); - - server.createQueue(new SimpleString(coreAddress), new SimpleString(coreAddress), null, true, false); - server.createQueue(new SimpleString(coreAddress + "1"), new SimpleString(coreAddress + "1"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "2"), new SimpleString(coreAddress + "2"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "3"), new SimpleString(coreAddress + "3"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "4"), new SimpleString(coreAddress + "4"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "5"), new SimpleString(coreAddress + "5"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "6"), new SimpleString(coreAddress + "6"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "7"), new SimpleString(coreAddress + "7"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "8"), new SimpleString(coreAddress + "8"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "9"), new SimpleString(coreAddress + "9"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "10"), new SimpleString(coreAddress + "10"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic"), new SimpleString("amqp_testtopic"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "1"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "2"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "3"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "4"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "5"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "6"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "7"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "8"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "9"), RoutingType.ANYCAST)); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "10"), RoutingType.ANYCAST)); + server.createQueue(new SimpleString(coreAddress), RoutingType.ANYCAST, new SimpleString(coreAddress), null, true, false); + server.createQueue(new SimpleString(coreAddress + "1"), RoutingType.ANYCAST, new SimpleString(coreAddress + "1"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "2"), RoutingType.ANYCAST, new SimpleString(coreAddress + "2"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "3"), RoutingType.ANYCAST, new SimpleString(coreAddress + "3"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "4"), RoutingType.ANYCAST, new SimpleString(coreAddress + "4"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "5"), RoutingType.ANYCAST, new SimpleString(coreAddress + "5"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "6"), RoutingType.ANYCAST, new SimpleString(coreAddress + "6"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "7"), RoutingType.ANYCAST, new SimpleString(coreAddress + "7"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "8"), RoutingType.ANYCAST, new SimpleString(coreAddress + "8"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "9"), RoutingType.ANYCAST, new SimpleString(coreAddress + "9"), null, true, false); + server.createQueue(new SimpleString(coreAddress + "10"), RoutingType.ANYCAST, new SimpleString(coreAddress + "10"), null, true, false); + server.createAddressInfo(new AddressInfo(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST)); + server.createQueue(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST, new SimpleString("amqp_testtopic"), null, true, false); + /* server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "2"), new SimpleString("amqp_testtopic" + "2"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "3"), new SimpleString("amqp_testtopic" + "3"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "4"), new SimpleString("amqp_testtopic" + "4"), null, true, false); @@ -173,7 +186,7 @@ public class ProtonTest extends ProtonTestBase { server.createQueue(new SimpleString("amqp_testtopic" + "7"), new SimpleString("amqp_testtopic" + "7"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false); server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false); + server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);*/ connection = createConnection(); @@ -769,6 +782,12 @@ public class ProtonTest extends ProtonTestBase { @Test public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception { + AddressSettings value = new AddressSettings(); + value.setAutoCreateJmsQueues(false); + value.setAutoCreateQueues(false); + value.setAutoCreateAddresses(false); + value.setAutoCreateJmsTopics(false); + server.getAddressSettingsRepository().addMatch("AnAddressThatDoesNotExist", value); AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpConnection amqpConnection = client.connect(); AmqpSession session = amqpConnection.createSession(); @@ -784,6 +803,7 @@ public class ProtonTest extends ProtonTestBase { assertNotNull(expectedException); assertTrue(expectedException.getMessage().contains("amqp:not-found")); assertTrue(expectedException.getMessage().contains("target address does not exist")); + amqpConnection.close(); } @Test @@ -838,6 +858,7 @@ public class ProtonTest extends ProtonTestBase { @Test public void testClientIdIsSetInSubscriptionList() throws Exception { AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST)); AmqpConnection amqpConnection = client.createConnection(); amqpConnection.setContainerId("testClient"); amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic"))); @@ -866,14 +887,14 @@ public class ProtonTest extends ProtonTestBase { String queueName = "TestQueueName"; String address = "TestAddress"; - - server.createQueue(new SimpleString(address), new SimpleString(queueName), null, true, false); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.ANYCAST)); + server.createQueue(new SimpleString(address), RoutingType.ANYCAST, new SimpleString(queueName), null, true, false); AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpConnection amqpConnection = client.connect(); AmqpSession session = amqpConnection.createSession(); AmqpSender sender = session.createSender(address); - AmqpReceiver receiver = session.createReceiver(queueName); + AmqpReceiver receiver = session.createReceiver(address); receiver.flow(1); AmqpMessage message = new AmqpMessage(); @@ -882,6 +903,7 @@ public class ProtonTest extends ProtonTestBase { AmqpMessage receivedMessage = receiver.receive(5000, TimeUnit.MILLISECONDS); assertNotNull(receivedMessage); + amqpConnection.close(); } @Test http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java index f19b0a4..f424ea2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java @@ -25,7 +25,10 @@ import javax.jms.Session; import javax.jms.TextMessage; import java.util.Random; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.After; @@ -42,6 +45,7 @@ public class SendingAndReceivingTest extends ActiveMQTestBase { public void setUp() throws Exception { super.setUp(); server = createServer(true, true); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString("exampleQueue"), RoutingType.ANYCAST)); server.start(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b59ddaa6/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index f2c844e..d272c02 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.DuplicateIDCacheImpl; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; @@ -51,6 +52,17 @@ public class FakePostOffice implements PostOffice { } @Override + public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) { + + return null; + } + + @Override + public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) { + return null; + } + + @Override public void start() throws Exception { }
