Repository: activemq-artemis Updated Branches: refs/heads/ARTEMIS-780 7d84b1e13 -> 83f8e6ecb
added address query and more amqp consumer functionality Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/83f8e6ec Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/83f8e6ec Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/83f8e6ec Branch: refs/heads/ARTEMIS-780 Commit: 83f8e6ecb4dad0939b84f64db9962c1ca4320a74 Parents: 7d84b1e Author: Andy Taylor <[email protected]> Authored: Mon Dec 5 13:16:01 2016 +0000 Committer: Andy Taylor <[email protected]> Committed: Mon Dec 5 13:18:03 2016 +0000 ---------------------------------------------------------------------- .../artemis/core/server/AddressQueryResult.java | 68 ++++ .../amqp/broker/AMQPSessionCallback.java | 37 ++- .../amqp/proton/ProtonServerSenderContext.java | 111 +++++-- .../postoffice/impl/SimpleAddressManager.java | 10 +- .../artemis/core/server/ActiveMQServer.java | 2 + .../artemis/core/server/ServerSession.java | 2 + .../core/server/impl/ActiveMQServerImpl.java | 21 +- .../core/server/impl/ServerSessionImpl.java | 6 + .../transport/amqp/client/AmqpClient.java | 38 +++ .../transport/amqp/client/AmqpConnection.java | 9 +- .../transport/amqp/client/AmqpSession.java | 61 ++++ .../integration/amqp/AmqpClientTestSupport.java | 23 ++ .../amqp/AmqpDurableReceiverTest.java | 5 +- .../amqp/BrokerDefinedAnycastConsumerTest.java | 59 +++- .../BrokerDefinedMulticastConsumerTest.java | 25 +- .../amqp/ClientDefinedAnycastConsumerTest.java | 59 ++++ .../amqp/ClientDefinedMultiConsumerTest.java | 330 +++++++++++++++++++ .../integration/amqp/ProtonPubSubTest.java | 4 + 18 files changed, 814 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java new file mode 100644 index 0000000..ce74d3b --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server; + +import org.apache.activemq.artemis.api.core.SimpleString; + +import java.util.Set; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + */ +public class AddressQueryResult { + private final SimpleString name; + private final Set<RoutingType> routingTypes; + private final long id; + private final boolean autoCreated; + private final boolean exists; + private final boolean autoCreateAddresses; + + public AddressQueryResult(SimpleString name, Set<RoutingType> routingTypes, long id, boolean autoCreated, boolean exists, boolean autoCreateAddresses) { + + this.name = name; + this.routingTypes = routingTypes; + this.id = id; + + this.autoCreated = autoCreated; + this.exists = exists; + this.autoCreateAddresses = autoCreateAddresses; + } + + public SimpleString getName() { + return name; + } + + public Set<RoutingType> getRoutingTypes() { + return routingTypes; + } + + public long getId() { + return id; + } + + public boolean isAutoCreated() { + return autoCreated; + } + + public boolean isExists() { + return exists; + } + + public boolean isAutoCreateAddresses() { + return autoCreateAddresses; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 5f5569f..a1928be 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; @@ -202,20 +203,32 @@ public class AMQPSessionCallback implements SessionCallback { serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false); } - public void createDurableQueue(String address, String queueName, String filter) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true); + public void createUnsharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false); } - public QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception { + public void createSharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false); + } + + public void createSharedVolatileQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true); + } + + public QueueQueryResult queueQuery(String queueName, RoutingType routingType, boolean autoCreate) throws Exception { QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) { try { - serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true); + serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true); } catch (ActiveMQQueueExistsException e) { // The queue may have been created by another thread in the mean time. Catch and do nothing. } - queueQueryResult = new QueueQueryResult(queueQueryResult.getName(), queueQueryResult.getAddress(), queueQueryResult.isDurable(), queueQueryResult.isTemporary(), queueQueryResult.getFilterString(), queueQueryResult.getConsumerCount(), queueQueryResult.getMessageCount(), queueQueryResult.isAutoCreateQueues(), true, queueQueryResult.isAutoCreated(), queueQueryResult.isDeleteOnNoConsumers(), queueQueryResult.getRoutingType(), queueQueryResult.getMaxConsumers()); + queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); + } + + if (queueQueryResult.getRoutingType() != routingType) { + throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType); } return queueQueryResult; } @@ -233,6 +246,20 @@ public class AMQPSessionCallback implements SessionCallback { return bindingQueryResult.isExists(); } + public AddressQueryResult addressQuery(String addressName, RoutingType routingType, boolean autoCreate) throws Exception { + AddressQueryResult addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName)); + + if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) { + try { + serverSession.createAddress(SimpleString.toSimpleString(addressName), routingType, true); + } catch (ActiveMQQueueExistsException e) { + // The queue may have been created by another thread in the mean time. Catch and do nothing. + } + addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName)); + } + return addressQueryResult; + } + public void closeSender(final Object brokerConsumer) throws Exception { final ServerConsumer consumer = ((ServerConsumer) brokerConsumer); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 06a6f9b..7f5f066 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -21,8 +21,10 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress; +import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -33,6 +35,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; @@ -70,6 +73,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private static final Symbol COPY = Symbol.valueOf("copy"); private static final Symbol TOPIC = Symbol.valueOf("topic"); private static final Symbol QUEUE = Symbol.valueOf("queue"); + private static final Symbol SHARED = Symbol.valueOf("shared"); + private static final Symbol GLOBAL = Symbol.valueOf("global"); private Object brokerConsumer; @@ -79,7 +84,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr protected boolean closed = false; protected final AMQPSessionCallback sessionSPI; private boolean multicast; + //todo get this from somewhere + private RoutingType defaultRoutingType = RoutingType.ANYCAST; protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0); + private RoutingType routingTypeToUse = defaultRoutingType; + private boolean shared = false; + private boolean global = false; + private boolean isVolatile = false; public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) { super(); @@ -159,8 +170,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // subscription queue String clientId = getClientId(); String pubId = sender.getName(); - queue = createQueueName(clientId, pubId); - QueueQueryResult result = sessionSPI.queueQuery(queue, false); + queue = createQueueName(clientId, pubId, true, global, false); + QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false); + multicast = true; + routingTypeToUse = RoutingType.MULTICAST; // Once confirmed that the address exists we need to return a Source that reflects // the lifetime policy and capabilities of the new subscription. @@ -218,6 +231,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } else { SimpleString addressToUse; SimpleString queueNameToUse = null; + shared = hasCapabilities(SHARED, source); + global = hasCapabilities(GLOBAL, source); + ; //find out if we have an address made up of the address and queue name, if yes then set queue name if (CompositeAddress.isFullyQualified(source.getAddress())) { CompositeAddress compositeAddress = CompositeAddress.getQueueName(source.getAddress()); @@ -240,14 +256,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } else { //if not we look up the address - AddressInfo addressInfo = sessionSPI.getAddress(addressToUse); - Set<RoutingType> routingTypes = addressInfo.getRoutingTypes(); + AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true); + if (!addressQueryResult.isExists()) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); + } + + Set<RoutingType> routingTypes = addressQueryResult.getRoutingTypes(); if (routingTypes.contains(RoutingType.MULTICAST) && routingTypes.size() == 1) { multicast = true; } else { + //todo add some checks if both routing types are supported multicast = false; } } + routingTypeToUse = multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST; // if not dynamic then we use the target's address as the address to forward the // messages to, however there has to be a queue bound to it so we need to check this. if (multicast) { @@ -278,8 +300,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // id and link name String clientId = getClientId(); String pubId = sender.getName(); - queue = createQueueName(clientId, pubId); - QueueQueryResult result = sessionSPI.queueQuery(queue, false); + queue = createQueueName(clientId, pubId, shared, global, false); + QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false); if (result.isExists()) { // If a client reattaches to a durable subscription with a different no-local @@ -289,30 +311,52 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (result.getConsumerCount() == 0) { sessionSPI.deleteQueue(queue); - sessionSPI.createDurableQueue(source.getAddress(), queue, selector); + sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); } else { throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist"); } } } else { - sessionSPI.createDurableQueue(source.getAddress(), queue, selector); + if (shared) { + sessionSPI.createSharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); + } else { + sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); + } } } else { // otherwise we are a volatile subscription - queue = java.util.UUID.randomUUID().toString(); - try { - sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector); - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); + isVolatile = true; + if (shared && sender.getName() != null) { + queue = createQueueName(getClientId(), sender.getName(), shared, global, isVolatile); + try { + sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); + } catch (ActiveMQQueueExistsException e) { + //this is ok, just means its shared + } + } else { + queue = java.util.UUID.randomUUID().toString(); + try { + sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector); + } catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); + } } } } else { if (queueNameToUse != null) { SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.ANYCAST); - queue = matchingAnycastQueue.toString(); + if (matchingAnycastQueue != null) { + queue = matchingAnycastQueue.toString(); + } else { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); + } } else { SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST); - queue = matchingAnycastQueue.toString(); + if (matchingAnycastQueue != null) { + queue = matchingAnycastQueue.toString(); + } else { + queue = addressToUse.toString(); + } } } @@ -322,7 +366,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } try { - if (!sessionSPI.queueQuery(queue, !multicast).isExists()) { + if (!sessionSPI.queueQuery(queue, routingTypeToUse, !multicast).isExists()) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); } } catch (ActiveMQAMQPNotFoundException e) { @@ -341,6 +385,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); try { brokerConsumer = sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly); + } catch (ActiveMQAMQPResourceLimitExceededException e1) { + throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage()); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage()); } @@ -382,20 +428,21 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // any durable resources for say pub subs if (remoteLinkClose) { Source source = (Source) sender.getSource(); - if (source != null && source.getAddress() != null && (hasCapabilities(TOPIC, source) || multicast)) { + if (source != null && source.getAddress() != null && multicast) { String queueName = source.getAddress(); - QueueQueryResult result = sessionSPI.queueQuery(queueName, false); + QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false); if (result.isExists() && source.getDynamic()) { sessionSPI.deleteQueue(queueName); } else { String clientId = getClientId(); String pubId = sender.getName(); - String queue = createQueueName(clientId, pubId); - result = sessionSPI.queueQuery(queue, false); - if (result.isExists()) { - if (result.getConsumerCount() > 0) { - System.out.println("error"); - } + if (pubId.contains("|")) { + pubId = pubId.split("\\|")[0]; + } + String queue = createQueueName(clientId, pubId, shared, global, isVolatile); + result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false); + //only delete if it isn't volatile and has no consumers + if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) { sessionSPI.deleteQueue(queue); } } @@ -562,7 +609,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return false; } - private static String createQueueName(String clientId, String pubId) { - return clientId + "." + pubId; + private static String createQueueName(String clientId, String pubId, boolean shared, boolean global, boolean isVolatile) { + String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId; + if (shared) { + if (queue.contains("|")) { + queue = queue.split("\\|")[0]; + } + if (isVolatile) { + queue += ":shared-volatile"; + } + if (global) { + queue += ":global"; + } + } + return queue; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index c5f7e21..c0e5b2d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -126,10 +126,12 @@ public class SimpleAddressManager implements AddressManager { if (binding == null || !(binding instanceof LocalQueueBinding) || !binding.getAddress().equals(address)) { Bindings bindings = mappings.get(address); - for (Binding theBinding : bindings.getBindings()) { - if (theBinding instanceof LocalQueueBinding) { - binding = theBinding; - break; + if (bindings != null) { + for (Binding theBinding : bindings.getBindings()) { + if (theBinding instanceof LocalQueueBinding) { + binding = theBinding; + break; + } } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 443ced7..44e81a8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -336,6 +336,8 @@ public interface ActiveMQServer extends ActiveMQComponent { QueueQueryResult queueQuery(SimpleString name) throws Exception; + AddressQueryResult addressQuery(SimpleString name) throws Exception; + Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString, boolean durable, boolean temporary, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index a92786a..e6b5ad4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -163,6 +163,8 @@ public interface ServerSession extends SecurityAuth { QueueQueryResult executeQueueQuery(SimpleString name) throws Exception; + AddressQueryResult executeAddressQuery(SimpleString name) throws Exception; + BindingQueryResult executeBindingQuery(SimpleString address) throws Exception; void closeConsumer(long consumerID) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index aebcb9a..663539d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -107,6 +107,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.Divert; @@ -680,6 +681,24 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override + public AddressQueryResult addressQuery(SimpleString name) throws Exception { + if (name == null) { + throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull(); + } + + boolean autoCreateAddresses = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateAddresses(); + + AddressInfo addressInfo = postOffice.getAddressInfo(name); + AddressQueryResult response; + if (addressInfo != null) { + response = new AddressQueryResult(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.getId(), addressInfo.isAutoCreated(), true, autoCreateAddresses); + } else { + response = new AddressQueryResult(name, null, -1, false, false, autoCreateAddresses); + } + return response; + } + + @Override public void threadDump() { StringWriter str = new StringWriter(); PrintWriter out = new PrintWriter(str); @@ -1398,7 +1417,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { final SimpleString filterString, final boolean durable, final boolean temporary) throws Exception { - return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true); + return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), false); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index bee7d73..1250b23 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.LargeServerMessage; @@ -695,6 +696,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override + public AddressQueryResult executeAddressQuery(SimpleString name) throws Exception { + return server.addressQuery(name); + } + + @Override public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception { return server.bindingQuery(address); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java index 56353e4..b537639 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java @@ -76,6 +76,44 @@ public class AmqpClient { } /** + * Creates a connection with the broker at the given location, this method initiates a + * connect attempt immediately and will fail if the remote peer cannot be reached. + * + * @throws Exception if an error occurs attempting to connect to the Broker. + * @return a new connection object used to interact with the connected peer. + */ + public AmqpConnection connect(boolean noContainerId) throws Exception { + + AmqpConnection connection = createConnection(); + connection.setNoContainerID(); + + LOG.debug("Attempting to create new connection to peer: {}", remoteURI); + connection.connect(); + + return connection; + } + + + /** + * Creates a connection with the broker at the given location, this method initiates a + * connect attempt immediately and will fail if the remote peer cannot be reached. + * + * @throws Exception if an error occurs attempting to connect to the Broker. + * @return a new connection object used to interact with the connected peer. + */ + public AmqpConnection connect(String containerId) throws Exception { + + AmqpConnection connection = createConnection(); + connection.setContainerId(containerId); + + LOG.debug("Attempting to create new connection to peer: {}", remoteURI); + connection.connect(); + + return connection; + } + + + /** * Creates a connection object using the configured values for user, password, remote URI * etc. This method does not immediately initiate a connection to the remote leaving that * to the caller which provides a connection object that can have additional configuration http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 01c60bc..723daef 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -104,6 +104,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements private long closeTimeout = DEFAULT_CLOSE_TIMEOUT; private long drainTimeout = DEFAULT_DRAIN_TIMEOUT; private boolean trace; + private boolean noContainerID = false; public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, @@ -139,7 +140,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements serializer.execute(new Runnable() { @Override public void run() { - getEndpoint().setContainer(safeGetContainerId()); + if (!noContainerID) { + getEndpoint().setContainer(safeGetContainerId()); + } getEndpoint().setHostname(remoteURI.getHost()); if (!getOfferedCapabilities().isEmpty()) { getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new Symbol[0])); @@ -735,4 +738,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements public String toString() { return "AmqpConnection { " + connectionId + " }"; } + + public void setNoContainerID() { + noContainerID = true; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 6ed7ed8..d4b16c1 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -289,6 +289,67 @@ public class AmqpSession extends AmqpAbstractResource<Session> { return receiver; } + + /** + * Create a receiver instance using the given Source + * + * @param source the caller created and configured Source used to create the receiver link. + * @return a newly created receiver that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createMulticastReceiver(Source source, String receiverId, String receiveName) throws Exception { + checkClosed(); + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId); + receiver.setSubscriptionName(receiveName); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return receiver; + } + + /** + * Create a receiver instance using the given Source + * + * @param source the caller created and configured Source used to create the receiver link. + * @return a newly created receiver that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createMulticastReceiver(String receiverId, String address, String receiveName) throws Exception { + checkClosed(); + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, receiverId); + receiver.setSubscriptionName(receiveName); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return receiver; + } + /** * Create a receiver instance using the given address that creates a durable subscription. * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index 6f373e5..0d5c874 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -33,6 +33,10 @@ import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; import org.junit.After; import org.junit.Before; @@ -42,6 +46,10 @@ import org.junit.Before; */ public class AmqpClientTestSupport extends ActiveMQTestBase { + protected static Symbol SHARED = Symbol.getSymbol("shared"); + protected static Symbol GLOBAL = Symbol.getSymbol("global"); + + private boolean useSSL; protected JMSServerManager serverManager; @@ -188,4 +196,19 @@ public class AmqpClientTestSupport extends ActiveMQTestBase { public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception { return new AmqpClient(brokerURI, username, password); } + + + protected void sendMessages(int numMessages, String address) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(address); + for (int i = 0; i < numMessages; i++) { + AmqpMessage message = new AmqpMessage(); + message.setText("message-" + i); + sender.send(message); + } + sender.close(); + connection.connect(); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java index 1ff74ed..e760d77 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java @@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpFrameValidator; @@ -54,7 +56,8 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { @Override public void setUp() throws Exception { super.setUp(); - server.createQueue(new SimpleString(getTopicName()), new SimpleString(getTopicName()), null, true, false); + server.createAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST)); + server.createQueue(new SimpleString(getTopicName()), RoutingType.MULTICAST, new SimpleString(getTopicName()), null, true, false); } @Test(timeout = 60000) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java index 7a4299e..db2f1b4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedAnycastConsumerTest.java @@ -20,11 +20,11 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; -import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.qpid.proton.amqp.messaging.Source; import org.junit.Test; @@ -169,20 +169,61 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport { connection.close(); } - private void sendMessages(int numMessages, String address) throws Exception { + @Test(timeout = 60000) + public void testConsumeWhenNoAddressCreatedNoAutoCreate() throws Exception { + AddressSettings settings = new AddressSettings(); + settings.setAutoCreateAddresses(false); + server.getAddressSettingsRepository().addMatch(address.toString(), settings); + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + try { + session.createReceiver(address.toString()); + fail("should throw exception"); + } catch (Exception e) { + //ignore + } + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeWhenNoAddressCreatedAutoCreate() throws Exception { + AddressSettings settings = new AddressSettings(); + settings.setAutoCreateAddresses(true); + server.getAddressSettingsRepository().addMatch(address.toString(), settings); + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(address.toString()); + sendMessages(1, address.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount()); + connection.close(); + } + + @Test(timeout = 60000) + public void testConsumeWhenNoAddressHasBothRoutingTypesButDefaultQueueIsMultiCast() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.ANYCAST); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + server.createQueue(address, RoutingType.MULTICAST, address, null, true, false); + AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(address); - for (int i = 0; i < numMessages; i++) { - AmqpMessage message = new AmqpMessage(); - message.setText("message-" + i); - sender.send(message); + try { + session.createReceiver(address.toString()); + fail("expected exception"); + } catch (Exception e) { + //ignore } - sender.close(); - connection.connect(); + connection.close(); } + protected Source createJmsSource(boolean topic) { Source source = new Source(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java index 2b4e2b4..c47207f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BrokerDefinedMulticastConsumerTest.java @@ -63,7 +63,7 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport { } @Test(timeout = 60000) - public void testConsumeWhenOnlyAnyicast() throws Exception { + public void testConsumeWhenOnlyAnycast() throws Exception { server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); sendMessages(1, address.toString()); @@ -83,20 +83,27 @@ public class BrokerDefinedMulticastConsumerTest extends AmqpClientTestSupport { connection.close(); } - private void sendMessages(int numMessages, String address) throws Exception { + @Test(timeout = 60000) + public void testConsumeWhenNoAddressHasBothRoutingTypesButDefaultQueueIsAnyCast() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + addressInfo.getRoutingTypes().add(RoutingType.ANYCAST); + server.createAddressInfo(addressInfo); + server.createQueue(address, RoutingType.MULTICAST, address, null, true, false); + AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(address); - for (int i = 0; i < numMessages; i++) { - AmqpMessage message = new AmqpMessage(); - message.setText("message-" + i); - sender.send(message); + try { + session.createReceiver(address.toString()); + fail("expected exception"); + } catch (Exception e) { + //ignore } - sender.close(); - connection.connect(); + connection.close(); } + protected Source createJmsSource(boolean topic) { Source source = new Source(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java new file mode 100644 index 0000000..a7d2b4e --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedAnycastConsumerTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + */ +public class ClientDefinedAnycastConsumerTest extends AmqpClientTestSupport { + + SimpleString address = new SimpleString("testAddress"); + SimpleString queue1 = new SimpleString("queue1"); + SimpleString queue2 = new SimpleString("queue2"); + + @Test(timeout = 60000) + public void testConsumeFromSingleQueueOnAddressSameName() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(address.toString()); + sendMessages(1, address.toString()); + receiver.flow(1); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(1, ((QueueImpl)server.getPostOffice().getBinding(address).getBindable()).getConsumerCount()); + + receiver.close(); + connection.close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java new file mode 100644 index 0000000..f5a7808 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + */ +public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { + + SimpleString address = new SimpleString("testAddress"); + + @Test(timeout = 60000) + public void test2ConsumersOnSharedVolatileAddress() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createSharedSource(TerminusDurability.NONE); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); + receiver.close(); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + receiver2.close(); + //check its been deleted + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + connection.close(); + } + + @Test(timeout = 60000) + public void test2ConsumersOnSharedVolatileAddressBrokerDefined() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createSharedSource(TerminusDurability.NONE); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|1"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); + receiver.close(); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + receiver2.close(); + //check its **Hasn't** been deleted + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + connection.close(); + } + + @Test(timeout = 60000) + public void test2ConsumersOnSharedVolatileAddressNoReceiverClose() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createSharedSource(TerminusDurability.NONE); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + //check its been deleted + connection.close(); + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + } + + @Test(timeout = 60000) + public void test2ConsumersOnSharedVolatileAddressGlobal() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect(false)); + AmqpSession session = connection.createSession(); + Source source = createSharedGlobalSource(TerminusDurability.NONE); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount()); + receiver.close(); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global"))); + receiver2.close(); + //check its been deleted + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global"))); + connection.close(); + } + + @Test(timeout = 60000) + public void test2ConsumersOnSharedDurableAddress() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createSharedSource(TerminusDurability.CONFIGURATION); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount()); + receiver.close(); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + receiver2.close(); + //check its been deleted + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + connection.close(); + } + + @Test(timeout = 60000) + public void test2ConsumersOnSharedDurableAddressReconnect() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createSharedSource(TerminusDurability.CONFIGURATION); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount()); + + connection.close(); + + connection = addConnection(client.connect("myClientId")); + session = connection.createSession(); + + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + + receiver.close(); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + receiver2.close(); + //check its been deleted + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + connection.close(); + } + + @Test(timeout = 60000) + public void test2ConsumersOnSharedDurableAddressReconnectwithNull() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createSharedSource(TerminusDurability.CONFIGURATION); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub")).getBindable()).getConsumerCount()); + + connection.close(); + + connection = addConnection(client.connect("myClientId")); + session = connection.createSession(); + + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + receiver = session.createDurableReceiver(null, "mySub"); + receiver2 = session.createDurableReceiver(null, "mySub|2"); + + receiver.close(); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + receiver2.close(); + //check its been deleted + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub"))); + connection.close(); + } + + @Test(timeout = 60000) + public void test2ConsumersOnSharedDurableAddressGlobal() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect(false)); + AmqpSession session = connection.createSession(); + Source source = createSharedGlobalSource(TerminusDurability.CONFIGURATION); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + AmqpReceiver receiver2 = session.createMulticastReceiver(source, "myReceiverID", "mySub|2"); + receiver.flow(1); + receiver2.flow(1); + sendMessages(2, address.toString()); + AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(amqpMessage); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")).getBindable()).getConsumerCount()); + receiver.close(); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global"))); + receiver2.close(); + //check its been deleted + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global"))); + connection.close(); + } + + @Test(timeout = 60000) + public void test2ConsumersOnNonSharedDurableAddress() throws Exception { + AddressInfo addressInfo = new AddressInfo(address); + addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); + server.createAddressInfo(addressInfo); + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect("myClientId")); + AmqpSession session = connection.createSession(); + Source source = createNonSharedSource(TerminusDurability.CONFIGURATION); + Source source1 = createSharedSource(TerminusDurability.CONFIGURATION); + AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub"); + try { + session.createMulticastReceiver(source1, "myReceiverID", "mySub|2"); + fail("Exception expected"); + } catch (Exception e) { + //expected + } + connection.close(); + } + + private Source createNonSharedSource(TerminusDurability terminusDurability) { + Source source = new Source(); + source.setAddress(address.toString()); + source.setCapabilities(TOPIC_CAPABILITY); + source.setDurable(terminusDurability); + return source; + } + + private Source createSharedSource(TerminusDurability terminusDurability) { + Source source = new Source(); + source.setAddress(address.toString()); + source.setCapabilities(TOPIC_CAPABILITY, SHARED); + source.setDurable(terminusDurability); + return source; + } + + private Source createSharedGlobalSource(TerminusDurability terminusDurability) { + Source source = new Source(); + source.setAddress(address.toString()); + source.setCapabilities(TOPIC_CAPABILITY, SHARED, GLOBAL); + source.setDurable(terminusDurability); + return source; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/83f8e6ec/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java index 39197fd..3965947 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java @@ -30,6 +30,8 @@ import javax.jms.TopicSubscriber; import java.util.Map; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.After; import org.junit.Assert; @@ -55,6 +57,8 @@ public class ProtonPubSubTest extends ProtonTestBase { @Before public void setUp() throws Exception { super.setUp(); + server.createAddressInfo(new AddressInfo(ssPubAddress, RoutingType.MULTICAST)); + server.createAddressInfo(new AddressInfo(ssprefixedPubAddress, RoutingType.MULTICAST)); server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true); server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true); factory = new JmsConnectionFactory("amqp://localhost:5672");
