ARTEMIS-877 Add Consumer support for AMQP for new addressing schema
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/224f62b2 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/224f62b2 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/224f62b2 Branch: refs/heads/master Commit: 224f62b295db26eaa0bca0f7798178f025d737c0 Parents: a182a13 Author: Andy Taylor <[email protected]> Authored: Sat Dec 3 09:03:43 2016 +0000 Committer: Martyn Taylor <[email protected]> Committed: Fri Dec 9 18:43:15 2016 +0000 ---------------------------------------------------------------------- .../activemq/cli/test/FileBrokerTest.java | 24 -- .../core/management/ActiveMQServerControl.java | 59 ++++ .../artemis/core/server/AddressQueryResult.java | 66 ++++ .../amqp/broker/AMQPSessionCallback.java | 64 +++- .../protocol/amqp/proton/AmqpSupport.java | 1 + .../proton/ProtonServerReceiverContext.java | 14 +- .../amqp/proton/ProtonServerSenderContext.java | 217 +++++++++--- .../amqp/proton/handler/ExtCapability.java | 2 +- .../artemis/rest/test/FindDestinationTest.java | 3 + .../impl/ActiveMQServerControlImpl.java | 120 ++++--- .../artemis/core/postoffice/AddressManager.java | 6 + .../artemis/core/postoffice/PostOffice.java | 7 +- .../core/postoffice/impl/CompositeAddress.java | 50 +++ .../core/postoffice/impl/PostOfficeImpl.java | 11 + .../postoffice/impl/SimpleAddressManager.java | 31 ++ .../artemis/core/server/ActiveMQServer.java | 2 + .../artemis/core/server/ServerSession.java | 8 + .../cluster/impl/ClusterConnectionImpl.java | 2 +- .../core/server/impl/ActiveMQServerImpl.java | 21 +- .../core/server/impl/ServerSessionImpl.java | 21 ++ .../transport/amqp/client/AmqpClient.java | 38 +++ .../transport/amqp/client/AmqpConnection.java | 9 +- .../transport/amqp/client/AmqpSession.java | 63 +++- .../integration/amqp/AmqpClientTestSupport.java | 32 ++ .../amqp/AmqpDurableReceiverTest.java | 7 +- .../amqp/AmqpTempDestinationTest.java | 2 - .../integration/amqp/AmqpTransactionTest.java | 7 - .../amqp/BrokerDefinedAnycastConsumerTest.java | 240 ++++++++++++++ .../BrokerDefinedMulticastConsumerTest.java | 119 +++++++ .../amqp/ClientDefinedAnycastConsumerTest.java | 52 +++ .../amqp/ClientDefinedMultiConsumerTest.java | 327 +++++++++++++++++++ .../integration/amqp/ProtonPubSubTest.java | 4 + .../tests/integration/amqp/ProtonTest.java | 58 +++- .../amqp/SendingAndReceivingTest.java | 4 + .../management/ActiveMQServerControlTest.java | 87 ++--- .../ActiveMQServerControlUsingCoreTest.java | 15 + .../management/ManagementControlHelper.java | 8 + .../integration/openwire/BasicOpenWireTest.java | 1 - .../core/server/impl/fakes/FakePostOffice.java | 12 + 39 files changed, 1611 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java index a50a49f..b04b540 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java @@ -45,30 +45,6 @@ import static org.junit.Assert.fail; public class FileBrokerTest { @Test - public void startWithJMS() throws Exception { - ServerDTO serverDTO = new ServerDTO(); - serverDTO.configuration = "broker.xml"; - FileBroker broker = null; - try { - broker = new FileBroker(serverDTO, new ActiveMQJAASSecurityManager()); - broker.start(); - JMSServerManagerImpl jmsServerManager = (JMSServerManagerImpl) broker.getComponents().get("jms"); - Assert.assertNotNull(jmsServerManager); - Assert.assertTrue(jmsServerManager.isStarted()); - //this tells us the jms server is activated - Assert.assertTrue(jmsServerManager.getJMSStorageManager().isStarted()); - ActiveMQServerImpl activeMQServer = (ActiveMQServerImpl) broker.getComponents().get("core"); - Assert.assertNotNull(activeMQServer); - Assert.assertTrue(activeMQServer.isStarted()); - Assert.assertTrue(broker.isStarted()); - } finally { - if (broker != null) { - broker.stop(); - } - } - } - - @Test public void startWithoutJMS() throws Exception { ServerDTO serverDTO = new ServerDTO(); serverDTO.configuration = "broker-nojms.xml"; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 1797c9a..abd8e9e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -451,10 +451,29 @@ public interface ActiveMQServerControl { * @param address address to bind the queue to * @param name name of the queue */ + @Deprecated @Operation(desc = "Create a queue with the specified address", impact = MBeanOperationInfo.ACTION) void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, @Parameter(name = "name", desc = "Name of the queue") String name) throws Exception; + + /** + * Create a durable queue. + * <br> + * If {@code address} is {@code null} it will be defaulted to {@code name}. + * <br> + * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. + * + * @param address address to bind the queue to + * @param name name of the queue + * @param routingType The routing type used for this address, MULTICAST or ANYCAST + */ + @Operation(desc = "Create a queue with the specified address", impact = MBeanOperationInfo.ACTION) + void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, + @Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception; + + /** * Create a queue. * <br> @@ -466,6 +485,7 @@ public interface ActiveMQServerControl { * @param name name of the queue * @param durable whether the queue is durable */ + @Deprecated @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION) void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, @Parameter(name = "name", desc = "Name of the queue") String name, @@ -480,6 +500,24 @@ public interface ActiveMQServerControl { * * @param address address to bind the queue to * @param name name of the queue + * @param durable whether the queue is durable + * @param routingType The routing type used for this address, MULTICAST or ANYCAST + */ + @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION) + void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, + @Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, + @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception; + + /** + * Create a queue. + * <br> + * If {@code address} is {@code null} it will be defaulted to {@code name}. + * <br> + * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. + * + * @param address address to bind the queue to + * @param name name of the queue * @param filter of the queue * @param durable whether the queue is durable */ @@ -496,6 +534,27 @@ public interface ActiveMQServerControl { * <br> * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. * + * @param address address to bind the queue to + * @param name name of the queue + * @param filter of the queue + * @param durable whether the queue is durable + * @param routingType The routing type used for this address, MULTICAST or ANYCAST + */ + @Operation(desc = "Create a queue", impact = MBeanOperationInfo.ACTION) + void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, + @Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "filter", desc = "Filter of the queue") String filter, + @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, + @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception; + + + /** + * Create a queue. + * <br> + * If {@code address} is {@code null} it will be defaulted to {@code name}. + * <br> + * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. + * * @param address address to bind the queue to * @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST} * @param name name of the queue http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java new file mode 100644 index 0000000..07d7406 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/AddressQueryResult.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server; + +import org.apache.activemq.artemis.api.core.SimpleString; + +import java.util.Set; + + +public class AddressQueryResult { + private final SimpleString name; + private final Set<RoutingType> routingTypes; + private final long id; + private final boolean autoCreated; + private final boolean exists; + private final boolean autoCreateAddresses; + + public AddressQueryResult(SimpleString name, Set<RoutingType> routingTypes, long id, boolean autoCreated, boolean exists, boolean autoCreateAddresses) { + + this.name = name; + this.routingTypes = routingTypes; + this.id = id; + + this.autoCreated = autoCreated; + this.exists = exists; + this.autoCreateAddresses = autoCreateAddresses; + } + + public SimpleString getName() { + return name; + } + + public Set<RoutingType> getRoutingTypes() { + return routingTypes; + } + + public long getId() { + return id; + } + + public boolean isAutoCreated() { + return autoCreated; + } + + public boolean isExists() { + return exists; + } + + public boolean isAutoCreateAddresses() { + return autoCreateAddresses; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 6d4abc4..9d69b00 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -28,12 +28,15 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; @@ -192,28 +195,40 @@ public class AMQPSessionCallback implements SessionCallback { serverConsumer.receiveCredits(-1); } - public void createTemporaryQueue(String queueName) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false); + public void createTemporaryQueue(String queueName, RoutingType routingType) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), routingType, null, true, false); } - public void createTemporaryQueue(String address, String queueName, String filter) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), true, false); + public void createTemporaryQueue(String address, String queueName, RoutingType routingType, String filter) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), true, false); } - public void createDurableQueue(String address, String queueName, String filter) throws Exception { - serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true); + public void createUnsharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, 1, false, false); } - public QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception { + public void createSharedDurableQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, true, -1, false, false); + } + + public void createSharedVolatileQueue(String address, RoutingType routingType, String queueName, String filter) throws Exception { + serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), routingType, SimpleString.toSimpleString(filter), false, false, -1, true, true); + } + + public QueueQueryResult queueQuery(String queueName, RoutingType routingType, boolean autoCreate) throws Exception { QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) { try { - serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), null, false, true); + serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName), routingType, null, false, true); } catch (ActiveMQQueueExistsException e) { // The queue may have been created by another thread in the mean time. Catch and do nothing. } - queueQueryResult = new QueueQueryResult(queueQueryResult.getName(), queueQueryResult.getAddress(), queueQueryResult.isDurable(), queueQueryResult.isTemporary(), queueQueryResult.getFilterString(), queueQueryResult.getConsumerCount(), queueQueryResult.getMessageCount(), queueQueryResult.isAutoCreateQueues(), true, queueQueryResult.isAutoCreated(), queueQueryResult.isDeleteOnNoConsumers(), queueQueryResult.getRoutingType(), queueQueryResult.getMaxConsumers()); + queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); + } + + if (queueQueryResult.getRoutingType() != routingType) { + throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType); } return queueQueryResult; } @@ -231,6 +246,20 @@ public class AMQPSessionCallback implements SessionCallback { return bindingQueryResult.isExists(); } + public AddressQueryResult addressQuery(String addressName, RoutingType routingType, boolean autoCreate) throws Exception { + AddressQueryResult addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName)); + + if (!addressQueryResult.isExists() && addressQueryResult.isAutoCreateAddresses() && autoCreate) { + try { + serverSession.createAddress(SimpleString.toSimpleString(addressName), routingType, true); + } catch (ActiveMQQueueExistsException e) { + // The queue may have been created by another thread in the mean time. Catch and do nothing. + } + addressQueryResult = serverSession.executeAddressQuery(SimpleString.toSimpleString(addressName)); + } + return addressQueryResult; + } + public void closeSender(final Object brokerConsumer) throws Exception { final ServerConsumer consumer = ((ServerConsumer) brokerConsumer); @@ -522,4 +551,21 @@ public class AMQPSessionCallback implements SessionCallback { protonSPI.removeTransaction(txid); } + + public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception { + return serverSession.getMatchingQueue(address, routingType); + } + + + public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception { + return serverSession.getMatchingQueue(address, queueName, routingType); + } + + public AddressInfo getAddress(SimpleString address) { + return serverSession.getAddress(address); + } + + public void removeTemporaryQueue(String address) throws Exception { + serverSession.deleteQueue(SimpleString.toSimpleString(address)); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java index 7bdbd2e..ff398dc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java @@ -55,6 +55,7 @@ public class AmqpSupport { public static final Symbol PLATFORM = Symbol.valueOf("platform"); public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted"); public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced"); + public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS"); // Symbols used in configuration of newly opened links. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 0cc293a..a265836 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; @@ -27,6 +28,7 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; @@ -55,6 +57,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements // Used by the broker to decide when to refresh clients credit. This is not used when client requests credit. private static int minCreditRefresh = 30; + private TerminusExpiryPolicy expiryPolicy; public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection, @@ -83,10 +86,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements address = sessionSPI.tempQueueName(); try { - sessionSPI.createTemporaryQueue(address); + sessionSPI.createTemporaryQueue(address, RoutingType.ANYCAST); } catch (Exception e) { throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } + expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH; target.setAddress(address); } else { //if not dynamic then we use the targets address as the address to forward the messages to, however there has to @@ -165,6 +169,14 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements @Override public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { protonSession.removeReceiver(receiver); + org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); + if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) { + try { + sessionSPI.removeTemporaryQueue(target.getAddress()); + } catch (Exception e) { + //ignore on close, its temp anyway and will be removed later + } + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 960942d..43de7c4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -19,10 +19,16 @@ package org.apache.activemq.artemis.protocol.amqp.proton; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Set; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress; +import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.QueueQueryResult; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; @@ -30,6 +36,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; @@ -66,6 +73,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private static final Symbol COPY = Symbol.valueOf("copy"); private static final Symbol TOPIC = Symbol.valueOf("topic"); + private static final Symbol QUEUE = Symbol.valueOf("queue"); + private static final Symbol SHARED = Symbol.valueOf("shared"); + private static final Symbol GLOBAL = Symbol.valueOf("global"); private Object brokerConsumer; @@ -74,7 +84,14 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr protected final AMQPConnectionContext connection; protected boolean closed = false; protected final AMQPSessionCallback sessionSPI; + private boolean multicast; + //todo get this from somewhere + private RoutingType defaultRoutingType = RoutingType.ANYCAST; protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0); + private RoutingType routingTypeToUse = defaultRoutingType; + private boolean shared = false; + private boolean global = false; + private boolean isVolatile = false; public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) { super(); @@ -127,7 +144,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr super.initialise(); Source source = (Source) sender.getRemoteSource(); - String queue; + String queue = null; String selector = null; final Map<Symbol, Object> supportedFilters = new HashMap<>(); @@ -148,32 +165,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } - // if we have a capability for a Topic (AMQP -> JMS Mapping) or we are configured on this - // address to act like a topic then act like a subscription. - boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source); - - if (isPubSub) { - Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS); - if (filter != null) { - String remoteContainerId = sender.getSession().getConnection().getRemoteContainer(); - String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'"; - if (selector != null) { - selector += " AND " + noLocalFilter; - } else { - selector = noLocalFilter; - } - - supportedFilters.put(filter.getKey(), filter.getValue()); - } - } - if (source == null) { // Attempt to recover a previous subscription happens when a link reattach happens on a // subscription queue String clientId = getClientId(); String pubId = sender.getName(); - queue = createQueueName(clientId, pubId); - QueueQueryResult result = sessionSPI.queueQuery(queue, false); + queue = createQueueName(clientId, pubId, true, global, false); + QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false); + multicast = true; + routingTypeToUse = RoutingType.MULTICAST; // Once confirmed that the address exists we need to return a Source that reflects // the lifetime policy and capabilities of the new subscription. @@ -222,23 +222,86 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // node is temporary and will be deleted on closing of the session queue = java.util.UUID.randomUUID().toString(); try { - sessionSPI.createTemporaryQueue(queue); + sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST); // protonSession.getServerSession().createQueue(queue, queue, null, true, false); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); } source.setAddress(queue); } else { + SimpleString addressToUse; + SimpleString queueNameToUse = null; + shared = hasCapabilities(SHARED, source); + global = hasCapabilities(GLOBAL, source); + + //find out if we have an address made up of the address and queue name, if yes then set queue name + if (CompositeAddress.isFullyQualified(source.getAddress())) { + CompositeAddress compositeAddress = CompositeAddress.getQueueName(source.getAddress()); + addressToUse = new SimpleString(compositeAddress.getAddress()); + queueNameToUse = new SimpleString(compositeAddress.getQueueName()); + } else { + addressToUse = new SimpleString(source.getAddress()); + } + //check to see if the client has defined how we act + boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source); + if (clientDefined) { + multicast = hasCapabilities(TOPIC, source); + AddressInfo addressInfo = sessionSPI.getAddress(addressToUse); + Set<RoutingType> routingTypes = addressInfo.getRoutingTypes(); + //if the client defines 1 routing type and the broker another then throw an exception + if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) { + throw new ActiveMQAMQPIllegalStateException("Address is not configured for topic support"); + } else if (!multicast && !routingTypes.contains(RoutingType.ANYCAST)) { + throw new ActiveMQAMQPIllegalStateException("Address is not configured for queue support"); + } + } else { + //if not we look up the address + AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true); + if (!addressQueryResult.isExists()) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); + } + + Set<RoutingType> routingTypes = addressQueryResult.getRoutingTypes(); + if (routingTypes.contains(RoutingType.MULTICAST) && routingTypes.size() == 1) { + multicast = true; + } else { + //todo add some checks if both routing types are supported + multicast = false; + } + } + routingTypeToUse = multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST; // if not dynamic then we use the target's address as the address to forward the // messages to, however there has to be a queue bound to it so we need to check this. - if (isPubSub) { - // if we are a subscription and durable create a durable queue using the container - // id and link name - if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) { + if (multicast) { + Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS); + if (filter != null) { + String remoteContainerId = sender.getSession().getConnection().getRemoteContainer(); + String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'"; + if (selector != null) { + selector += " AND " + noLocalFilter; + } else { + selector = noLocalFilter; + } + + supportedFilters.put(filter.getKey(), filter.getValue()); + } + + + if (queueNameToUse != null) { + SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.MULTICAST ); + queue = matchingAnycastQueue.toString(); + } + //if the address specifies a broker configured queue then we always use this, treat it as a queue + if (queue != null) { + multicast = false; + } else if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) { + + // if we are a subscription and durable create a durable queue using the container + // id and link name String clientId = getClientId(); String pubId = sender.getName(); - queue = createQueueName(clientId, pubId); - QueueQueryResult result = sessionSPI.queueQuery(queue, false); + queue = createQueueName(clientId, pubId, shared, global, false); + QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false); if (result.isExists()) { // If a client reattaches to a durable subscription with a different no-local @@ -248,25 +311,54 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (result.getConsumerCount() == 0) { sessionSPI.deleteQueue(queue); - sessionSPI.createDurableQueue(source.getAddress(), queue, selector); + sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); } else { throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist"); } } } else { - sessionSPI.createDurableQueue(source.getAddress(), queue, selector); + if (shared) { + sessionSPI.createSharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); + } else { + sessionSPI.createUnsharedDurableQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); + } } } else { // otherwise we are a volatile subscription - queue = java.util.UUID.randomUUID().toString(); - try { - sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector); - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); + isVolatile = true; + if (shared && sender.getName() != null) { + queue = createQueueName(getClientId(), sender.getName(), shared, global, isVolatile); + try { + sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector); + } catch (ActiveMQQueueExistsException e) { + //this is ok, just means its shared + } + } else { + queue = java.util.UUID.randomUUID().toString(); + try { + sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector); + } catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); + } } } } else { - queue = source.getAddress(); + if (queueNameToUse != null) { + SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, queueNameToUse, RoutingType.ANYCAST); + if (matchingAnycastQueue != null) { + queue = matchingAnycastQueue.toString(); + } else { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); + } + } else { + SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST); + if (matchingAnycastQueue != null) { + queue = matchingAnycastQueue.toString(); + } else { + queue = addressToUse.toString(); + } + } + } if (queue == null) { @@ -274,7 +366,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } try { - if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) { + if (!sessionSPI.queueQuery(queue, routingTypeToUse, !multicast).isExists()) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); } } catch (ActiveMQAMQPNotFoundException e) { @@ -290,9 +382,11 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // have not honored what it asked for. source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters); - boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); + boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); try { - brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly); + brokerConsumer = sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly); + } catch (ActiveMQAMQPResourceLimitExceededException e1) { + throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage()); } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage()); } @@ -302,10 +396,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return connection.getRemoteContainer(); } - private boolean isPubSub(Source source) { - String pubSubPrefix = sessionSPI.getPubSubPrefix(); - return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix); - } /* * close the session @@ -341,23 +431,30 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // any durable resources for say pub subs if (remoteLinkClose) { Source source = (Source) sender.getSource(); - if (source != null && source.getAddress() != null && (hasCapabilities(TOPIC, source) || isPubSub(source))) { + if (source != null && source.getAddress() != null && multicast) { String queueName = source.getAddress(); - QueueQueryResult result = sessionSPI.queueQuery(queueName, false); + QueueQueryResult result = sessionSPI.queueQuery(queueName, routingTypeToUse, false); if (result.isExists() && source.getDynamic()) { sessionSPI.deleteQueue(queueName); } else { String clientId = getClientId(); String pubId = sender.getName(); - String queue = createQueueName(clientId, pubId); - result = sessionSPI.queueQuery(queue, false); - if (result.isExists()) { - if (result.getConsumerCount() > 0) { - System.out.println("error"); - } + if (pubId.contains("|")) { + pubId = pubId.split("\\|")[0]; + } + String queue = createQueueName(clientId, pubId, shared, global, isVolatile); + result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false); + //only delete if it isn't volatile and has no consumers + if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) { sessionSPI.deleteQueue(queue); } } + } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) { + try { + sessionSPI.removeTemporaryQueue(source.getAddress()); + } catch (Exception e) { + //ignore on close, its temp anyway and will be removed later + } } } } catch (Exception e) { @@ -521,7 +618,19 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return false; } - private static String createQueueName(String clientId, String pubId) { - return clientId + "." + pubId; + private static String createQueueName(String clientId, String pubId, boolean shared, boolean global, boolean isVolatile) { + String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId; + if (shared) { + if (queue.contains("|")) { + queue = queue.split("\\|")[0]; + } + if (isVolatile) { + queue += ":shared-volatile"; + } + if (global) { + queue += ":global"; + } + } + return queue; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java index 6325ff6..931efa7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java @@ -22,7 +22,7 @@ import org.apache.qpid.proton.engine.Connection; public class ExtCapability { - public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY}; + public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY, AmqpSupport.SHARED_SUBS}; public static Symbol[] getCapabilities() { return capabilities; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java ---------------------------------------------------------------------- diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java index db23f56..be14056 100644 --- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java +++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.rest.test; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.jboss.resteasy.client.ClientRequest; import org.jboss.resteasy.client.ClientResponse; import org.jboss.resteasy.spi.Link; @@ -30,6 +31,7 @@ public class FindDestinationTest extends MessageTestBase { @Test public void testFindQueue() throws Exception { String testName = "testFindQueue"; + server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString(testName), RoutingType.MULTICAST)); server.getActiveMQServer().createQueue(new SimpleString(testName), RoutingType.MULTICAST, new SimpleString(testName), null, false, false); ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/" + testName)); @@ -60,6 +62,7 @@ public class FindDestinationTest extends MessageTestBase { @Test public void testFindTopic() throws Exception { + server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("testTopic"), RoutingType.MULTICAST)); server.getActiveMQServer().createQueue(new SimpleString("testTopic"), RoutingType.MULTICAST, new SimpleString("testTopic"), null, false, false); ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/topics/testTopic")); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 4464062..841aa84 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -619,7 +619,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } } - @Deprecated @Override public void createQueue(final String address, final String name) throws Exception { checkStarted(); @@ -633,6 +632,18 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } @Override + public void createQueue(final String address, final String name, final String routingType) throws Exception { + checkStarted(); + + clearIO(); + try { + server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), null, true, false); + } finally { + blockOnIO(); + } + } + + @Override public void createQueue(final String address, final String name, final boolean durable) throws Exception { checkStarted(); @@ -645,35 +656,44 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } @Override - public void createQueue(String address, - String routingType, - String name, - String filterStr, - boolean durable, - int maxConsumers, - boolean deleteOnNoConsumers, - boolean autoCreateAddress) throws Exception { + public void createQueue(final String address, final String name, final boolean durable, final String routingType) throws Exception { checkStarted(); clearIO(); + try { + server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), null, durable, false); + } finally { + blockOnIO(); + } + } - SimpleString filter = filterStr == null ? null : new SimpleString(filterStr); + @Override + public void createQueue(final String address, + final String name, + final String filterStr, + final boolean durable) throws Exception { + checkStarted(); + + clearIO(); try { + SimpleString filter = null; if (filterStr != null && !filterStr.trim().equals("")) { filter = new SimpleString(filterStr); } - server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); + server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false); } finally { blockOnIO(); } } + @Override public void createQueue(final String address, final String name, final String filterStr, - final boolean durable) throws Exception { + final boolean durable, + final String routingType) throws Exception { checkStarted(); clearIO(); @@ -683,12 +703,38 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active filter = new SimpleString(filterStr); } - server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false); + server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false); + } finally { + blockOnIO(); + } + } + + @Override + public void createQueue(String address, + String routingType, + String name, + String filterStr, + boolean durable, + int maxConsumers, + boolean deleteOnNoConsumers, + boolean autoCreateAddress) throws Exception { + checkStarted(); + + clearIO(); + + SimpleString filter = filterStr == null ? null : new SimpleString(filterStr); + try { + if (filterStr != null && !filterStr.trim().equals("")) { + filter = new SimpleString(filterStr); + } + + server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); } finally { blockOnIO(); } } + @Override public String[] getQueueNames() { checkStarted(); @@ -1704,30 +1750,30 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active settings.add("expiryAddress", addressSettings.getExpiryAddress().toString()); } return settings.add("expiryDelay", addressSettings.getExpiryDelay()) - .add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts()) - .add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize()) - .add("maxSizeBytes", addressSettings.getMaxSizeBytes()) - .add("pageSizeBytes", addressSettings.getPageSizeBytes()) - .add("redeliveryDelay", addressSettings.getRedeliveryDelay()) - .add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier()) - .add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay()) - .add("redistributionDelay", addressSettings.getRedistributionDelay()) - .add("lastValueQueue", addressSettings.isLastValueQueue()) - .add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute()) - .add("addressFullMessagePolicy", policy) - .add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()) - .add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()) - .add("slowConsumerPolicy", consumerPolicy) - .add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues()) - .add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics()) - .add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues()) - .add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsQueues()) - .add("autoCreateQueues", addressSettings.isAutoCreateQueues()) - .add("autoDeleteQueues", addressSettings.isAutoDeleteQueues()) - .add("autoCreateAddress", addressSettings.isAutoCreateAddresses()) - .add("autoDeleteAddress", addressSettings.isAutoDeleteAddresses()) - .build() - .toString(); + .add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts()) + .add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize()) + .add("maxSizeBytes", addressSettings.getMaxSizeBytes()) + .add("pageSizeBytes", addressSettings.getPageSizeBytes()) + .add("redeliveryDelay", addressSettings.getRedeliveryDelay()) + .add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier()) + .add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay()) + .add("redistributionDelay", addressSettings.getRedistributionDelay()) + .add("lastValueQueue", addressSettings.isLastValueQueue()) + .add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute()) + .add("addressFullMessagePolicy", policy) + .add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()) + .add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()) + .add("slowConsumerPolicy", consumerPolicy) + .add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues()) + .add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics()) + .add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues()) + .add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsQueues()) + .add("autoCreateQueues", addressSettings.isAutoCreateQueues()) + .add("autoDeleteQueues", addressSettings.isAutoDeleteQueues()) + .add("autoCreateAddress", addressSettings.isAutoCreateAddresses()) + .add("autoDeleteAddress", addressSettings.isAutoDeleteAddresses()) + .build() + .toString(); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java index 1cf1a07..ada1d77 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/AddressManager.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Set; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -44,6 +45,10 @@ public interface AddressManager { Bindings getMatchingBindings(SimpleString address) throws Exception; + SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception; + + SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception; + void clear(); Binding getBinding(SimpleString queueName); @@ -59,4 +64,5 @@ public interface AddressManager { AddressInfo removeAddressInfo(SimpleString address); AddressInfo getAddressInfo(SimpleString address); + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index 48ec7db..dc5f4b4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -79,6 +80,10 @@ public interface PostOffice extends ActiveMQComponent { Map<SimpleString, Binding> getAllBindings(); + SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception; + + SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception; + RoutingStatus route(ServerMessage message, boolean direct) throws Exception; RoutingStatus route(ServerMessage message, @@ -119,6 +124,4 @@ public interface PostOffice extends ActiveMQComponent { boolean isAddressBound(final SimpleString address) throws Exception; Set<SimpleString> getAddresses(); - - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java new file mode 100644 index 0000000..32083a5 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/CompositeAddress.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.postoffice.impl; + +public class CompositeAddress { + + public static String SEPARATOR = "::"; + private final String address; + private final String queueName; + + public String getAddress() { + return address; + } + + public String getQueueName() { + return queueName; + } + + public CompositeAddress(String address, String queueName) { + + this.address = address; + this.queueName = queueName; + } + + public static boolean isFullyQualified(String address) { + return address.toString().contains(SEPARATOR); + } + + public static CompositeAddress getQueueName(String address) { + String[] split = address.split(SEPARATOR); + if (split.length <= 0) { + throw new IllegalStateException("Nott A Fully Qualified Name"); + } + return new CompositeAddress(split[0], split[1]); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 9bd69d1..34e966c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -67,6 +67,7 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -866,6 +867,16 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override + public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception { + return addressManager.getMatchingQueue(address, routingType); + } + + @Override + public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception { + return addressManager.getMatchingQueue(address, queueName, routingType); + } + + @Override public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception { // We send direct to the queue so we can send it to the same queue that is bound to the notifications address - // this is crucial for ensuring http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java index 8db4f6f..c0e5b2d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java @@ -119,6 +119,37 @@ public class SimpleAddressManager implements AddressManager { } @Override + public SimpleString getMatchingQueue(final SimpleString address, RoutingType routingType) throws Exception { + + Binding binding = nameMap.get(address); + + if (binding == null || !(binding instanceof LocalQueueBinding) + || !binding.getAddress().equals(address)) { + Bindings bindings = mappings.get(address); + if (bindings != null) { + for (Binding theBinding : bindings.getBindings()) { + if (theBinding instanceof LocalQueueBinding) { + binding = theBinding; + break; + } + } + } + } + + return binding != null ? binding.getUniqueName() : null; + } + + @Override + public SimpleString getMatchingQueue(final SimpleString address, final SimpleString queueName, RoutingType routingType) throws Exception { + Binding binding = nameMap.get(queueName); + + if (binding != null && !binding.getAddress().equals(address)) { + throw new IllegalStateException("queue belongs to address" + binding.getAddress()); + } + return binding != null ? binding.getUniqueName() : null; + } + + @Override public void clear() { nameMap.clear(); mappings.clear(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 0aeaf6b..5fe71bb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -344,6 +344,8 @@ public interface ActiveMQServer extends ActiveMQComponent { QueueQueryResult queueQuery(SimpleString name) throws Exception; + AddressQueryResult addressQuery(SimpleString name) throws Exception; + Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString, boolean durable, boolean temporary, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index badadf4..e6b5ad4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -163,6 +163,8 @@ public interface ServerSession extends SecurityAuth { QueueQueryResult executeQueueQuery(SimpleString name) throws Exception; + AddressQueryResult executeAddressQuery(SimpleString name) throws Exception; + BindingQueryResult executeBindingQuery(SimpleString address) throws Exception; void closeConsumer(long consumerID) throws Exception; @@ -237,4 +239,10 @@ public interface ServerSession extends SecurityAuth { List<MessageReference> getInTXMessagesForConsumer(long consumerId); String getValidatedUser(); + + SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception; + + SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception; + + AddressInfo getAddress(SimpleString address); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 2ae2329..c997dab 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -720,7 +720,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn } else { // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never // actually routed to at that address though - queue = server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false); + queue = server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false, -1, false, true); } // There are a few things that will behave differently when it's an internal queue http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 425eee5..742de33 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -108,6 +108,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.Divert; @@ -748,6 +749,24 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override + public AddressQueryResult addressQuery(SimpleString name) throws Exception { + if (name == null) { + throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull(); + } + + boolean autoCreateAddresses = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateAddresses(); + + AddressInfo addressInfo = postOffice.getAddressInfo(name); + AddressQueryResult response; + if (addressInfo != null) { + response = new AddressQueryResult(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.getId(), addressInfo.isAutoCreated(), true, autoCreateAddresses); + } else { + response = new AddressQueryResult(name, null, -1, false, false, autoCreateAddresses); + } + return response; + } + + @Override public void threadDump() { StringWriter str = new StringWriter(); PrintWriter out = new PrintWriter(str); @@ -1468,7 +1487,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { final SimpleString filterString, final boolean durable, final boolean temporary) throws Exception { - return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true); + return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), false); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 79600b9..347874b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.LargeServerMessage; @@ -708,6 +709,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override + public AddressQueryResult executeAddressQuery(SimpleString name) throws Exception { + return server.addressQuery(name); + } + + @Override public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception { return server.bindingQuery(address); } @@ -1484,6 +1490,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override + public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception { + return server.getPostOffice().getMatchingQueue(address, routingType); + } + + @Override + public SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception { + return server.getPostOffice().getMatchingQueue(address, queueName, routingType); + } + + @Override + public AddressInfo getAddress(SimpleString address) { + return server.getPostOffice().getAddressInfo(address); + } + + @Override public String toString() { StringBuffer buffer = new StringBuffer(); if (this.metaData != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java index 56353e4..fddaf9d 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java @@ -76,6 +76,44 @@ public class AmqpClient { } /** + * Creates a connection with the broker at the given location, this method initiates a + * connect attempt immediately and will fail if the remote peer cannot be reached. + * + * @throws Exception if an error occurs attempting to connect to the Broker. + * @return a new connection object used to interact with the connected peer. + */ + public AmqpConnection connect(boolean noContainerId) throws Exception { + + AmqpConnection connection = createConnection(); + connection.setNoContainerID(); + + LOG.debug("Attempting to create new connection to peer: {}", remoteURI); + connection.connect(); + + return connection; + } + + + /** + * Creates a connection with the broker at the given location, this method initiates a + * connect attempt immediately and will fail if the remote peer cannot be reached. + * + * @throws Exception if an error occurs attempting to connect to the Broker. + * @return a new connection object used to interact with the connected peer. + */ + public AmqpConnection connect(String containerId) throws Exception { + + AmqpConnection connection = createConnection(); + connection.setContainerId(containerId); + + LOG.debug("Attempting to create new connection to peer: {}", remoteURI); + connection.connect(); + + return connection; + } + + + /** * Creates a connection object using the configured values for user, password, remote URI * etc. This method does not immediately initiate a connection to the remote leaving that * to the caller which provides a connection object that can have additional configuration http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 01c60bc..723daef 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -104,6 +104,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements private long closeTimeout = DEFAULT_CLOSE_TIMEOUT; private long drainTimeout = DEFAULT_DRAIN_TIMEOUT; private boolean trace; + private boolean noContainerID = false; public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, @@ -139,7 +140,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements serializer.execute(new Runnable() { @Override public void run() { - getEndpoint().setContainer(safeGetContainerId()); + if (!noContainerID) { + getEndpoint().setContainer(safeGetContainerId()); + } getEndpoint().setHostname(remoteURI.getHost()); if (!getOfferedCapabilities().isEmpty()) { getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new Symbol[0])); @@ -735,4 +738,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements public String toString() { return "AmqpConnection { " + connectionId + " }"; } + + public void setNoContainerID() { + noContainerID = true; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/224f62b2/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index fc3fdf7..d4b16c1 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -271,7 +271,68 @@ public class AmqpSession extends AmqpAbstractResource<Session> { checkClosed(); final ClientFuture request = new ClientFuture(); - final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId()); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return receiver; + } + + + /** + * Create a receiver instance using the given Source + * + * @param source the caller created and configured Source used to create the receiver link. + * @return a newly created receiver that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createMulticastReceiver(Source source, String receiverId, String receiveName) throws Exception { + checkClosed(); + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId); + receiver.setSubscriptionName(receiveName); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return receiver; + } + + /** + * Create a receiver instance using the given Source + * + * @param source the caller created and configured Source used to create the receiver link. + * @return a newly created receiver that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createMulticastReceiver(String receiverId, String address, String receiveName) throws Exception { + checkClosed(); + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, receiverId); + receiver.setSubscriptionName(receiveName); connection.getScheduler().execute(new Runnable() {
