Repository: activemq-artemis Updated Branches: refs/heads/ARTEMIS-780 82299f4de -> bfc1f0be6
more amqp work plus test fixes and API enhancements Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bfc1f0be Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bfc1f0be Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bfc1f0be Branch: refs/heads/ARTEMIS-780 Commit: bfc1f0be6982b8ecf2c823cb40f1d7a41cd27865 Parents: 82299f4 Author: Andy Taylor <[email protected]> Authored: Tue Dec 6 13:14:51 2016 +0000 Committer: Andy Taylor <[email protected]> Committed: Tue Dec 6 13:15:50 2016 +0000 ---------------------------------------------------------------------- .../activemq/cli/test/FileBrokerTest.java | 24 ---- .../core/management/ActiveMQServerControl.java | 59 +++++++++ .../amqp/broker/AMQPSessionCallback.java | 4 + .../protocol/amqp/proton/AmqpSupport.java | 1 + .../proton/ProtonServerReceiverContext.java | 11 ++ .../amqp/proton/ProtonServerSenderContext.java | 6 + .../amqp/proton/handler/ExtCapability.java | 2 +- .../artemis/rest/test/FindDestinationTest.java | 3 + .../impl/ActiveMQServerControlImpl.java | 120 +++++++++++++------ .../cluster/impl/ClusterConnectionImpl.java | 2 +- .../amqp/AmqpTempDestinationTest.java | 2 - .../management/ActiveMQServerControlTest.java | 87 +++++++------- .../ActiveMQServerControlUsingCoreTest.java | 15 +++ .../management/ManagementControlHelper.java | 8 ++ 14 files changed, 238 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java index a50a49f..b04b540 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/FileBrokerTest.java @@ -45,30 +45,6 @@ import static org.junit.Assert.fail; public class FileBrokerTest { @Test - public void startWithJMS() throws Exception { - ServerDTO serverDTO = new ServerDTO(); - serverDTO.configuration = "broker.xml"; - FileBroker broker = null; - try { - broker = new FileBroker(serverDTO, new ActiveMQJAASSecurityManager()); - broker.start(); - JMSServerManagerImpl jmsServerManager = (JMSServerManagerImpl) broker.getComponents().get("jms"); - Assert.assertNotNull(jmsServerManager); - Assert.assertTrue(jmsServerManager.isStarted()); - //this tells us the jms server is activated - Assert.assertTrue(jmsServerManager.getJMSStorageManager().isStarted()); - ActiveMQServerImpl activeMQServer = (ActiveMQServerImpl) broker.getComponents().get("core"); - Assert.assertNotNull(activeMQServer); - Assert.assertTrue(activeMQServer.isStarted()); - Assert.assertTrue(broker.isStarted()); - } finally { - if (broker != null) { - broker.stop(); - } - } - } - - @Test public void startWithoutJMS() throws Exception { ServerDTO serverDTO = new ServerDTO(); serverDTO.configuration = "broker-nojms.xml"; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 1797c9a..abd8e9e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -451,10 +451,29 @@ public interface ActiveMQServerControl { * @param address address to bind the queue to * @param name name of the queue */ + @Deprecated @Operation(desc = "Create a queue with the specified address", impact = MBeanOperationInfo.ACTION) void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, @Parameter(name = "name", desc = "Name of the queue") String name) throws Exception; + + /** + * Create a durable queue. + * <br> + * If {@code address} is {@code null} it will be defaulted to {@code name}. + * <br> + * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. + * + * @param address address to bind the queue to + * @param name name of the queue + * @param routingType The routing type used for this address, MULTICAST or ANYCAST + */ + @Operation(desc = "Create a queue with the specified address", impact = MBeanOperationInfo.ACTION) + void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, + @Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception; + + /** * Create a queue. * <br> @@ -466,6 +485,7 @@ public interface ActiveMQServerControl { * @param name name of the queue * @param durable whether the queue is durable */ + @Deprecated @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION) void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, @Parameter(name = "name", desc = "Name of the queue") String name, @@ -480,6 +500,24 @@ public interface ActiveMQServerControl { * * @param address address to bind the queue to * @param name name of the queue + * @param durable whether the queue is durable + * @param routingType The routing type used for this address, MULTICAST or ANYCAST + */ + @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION) + void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, + @Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, + @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception; + + /** + * Create a queue. + * <br> + * If {@code address} is {@code null} it will be defaulted to {@code name}. + * <br> + * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. + * + * @param address address to bind the queue to + * @param name name of the queue * @param filter of the queue * @param durable whether the queue is durable */ @@ -496,6 +534,27 @@ public interface ActiveMQServerControl { * <br> * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. * + * @param address address to bind the queue to + * @param name name of the queue + * @param filter of the queue + * @param durable whether the queue is durable + * @param routingType The routing type used for this address, MULTICAST or ANYCAST + */ + @Operation(desc = "Create a queue", impact = MBeanOperationInfo.ACTION) + void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, + @Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "filter", desc = "Filter of the queue") String filter, + @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, + @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception; + + + /** + * Create a queue. + * <br> + * If {@code address} is {@code null} it will be defaulted to {@code name}. + * <br> + * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. + * * @param address address to bind the queue to * @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST} * @param name name of the queue http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index a1928be..c870eec 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -563,4 +563,8 @@ public class AMQPSessionCallback implements SessionCallback { public AddressInfo getAddress(SimpleString address) { return serverSession.getAddress(address); } + + public void removeTemporaryQueue(String address) throws Exception { + serverSession.deleteQueue(SimpleString.toSimpleString(address)); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java index 7bdbd2e..ff398dc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java @@ -55,6 +55,7 @@ public class AmqpSupport { public static final Symbol PLATFORM = Symbol.valueOf("platform"); public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted"); public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced"); + public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS"); // Symbols used in configuration of newly opened links. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 515acc3..446edc0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; @@ -56,6 +57,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements // Used by the broker to decide when to refresh clients credit. This is not used when client requests credit. private static int minCreditRefresh = 30; + private TerminusExpiryPolicy expiryPolicy; public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection, @@ -88,6 +90,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } catch (Exception e) { throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } + expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH; target.setAddress(address); } else { //if not dynamic then we use the targets address as the address to forward the messages to, however there has to @@ -165,6 +168,14 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements @Override public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { protonSession.removeReceiver(receiver); + org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); + if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) { + try { + sessionSPI.removeTemporaryQueue(target.getAddress()); + } catch (Exception e) { + //ignore on close, its temp anyway and will be removed later + } + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index c0b9643..7e9a243 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -446,6 +446,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr sessionSPI.deleteQueue(queue); } } + } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) { + try { + sessionSPI.removeTemporaryQueue(source.getAddress()); + } catch (Exception e) { + //ignore on close, its temp anyway and will be removed later + } } } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java index 6325ff6..931efa7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java @@ -22,7 +22,7 @@ import org.apache.qpid.proton.engine.Connection; public class ExtCapability { - public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY}; + public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY, AmqpSupport.SHARED_SUBS}; public static Symbol[] getCapabilities() { return capabilities; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java ---------------------------------------------------------------------- diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java index db23f56..be14056 100644 --- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java +++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/FindDestinationTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.rest.test; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.jboss.resteasy.client.ClientRequest; import org.jboss.resteasy.client.ClientResponse; import org.jboss.resteasy.spi.Link; @@ -30,6 +31,7 @@ public class FindDestinationTest extends MessageTestBase { @Test public void testFindQueue() throws Exception { String testName = "testFindQueue"; + server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString(testName), RoutingType.MULTICAST)); server.getActiveMQServer().createQueue(new SimpleString(testName), RoutingType.MULTICAST, new SimpleString(testName), null, false, false); ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/" + testName)); @@ -60,6 +62,7 @@ public class FindDestinationTest extends MessageTestBase { @Test public void testFindTopic() throws Exception { + server.getActiveMQServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("testTopic"), RoutingType.MULTICAST)); server.getActiveMQServer().createQueue(new SimpleString("testTopic"), RoutingType.MULTICAST, new SimpleString("testTopic"), null, false, false); ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/topics/testTopic")); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 4464062..841aa84 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -619,7 +619,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } } - @Deprecated @Override public void createQueue(final String address, final String name) throws Exception { checkStarted(); @@ -633,6 +632,18 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } @Override + public void createQueue(final String address, final String name, final String routingType) throws Exception { + checkStarted(); + + clearIO(); + try { + server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), null, true, false); + } finally { + blockOnIO(); + } + } + + @Override public void createQueue(final String address, final String name, final boolean durable) throws Exception { checkStarted(); @@ -645,35 +656,44 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } @Override - public void createQueue(String address, - String routingType, - String name, - String filterStr, - boolean durable, - int maxConsumers, - boolean deleteOnNoConsumers, - boolean autoCreateAddress) throws Exception { + public void createQueue(final String address, final String name, final boolean durable, final String routingType) throws Exception { checkStarted(); clearIO(); + try { + server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), null, durable, false); + } finally { + blockOnIO(); + } + } - SimpleString filter = filterStr == null ? null : new SimpleString(filterStr); + @Override + public void createQueue(final String address, + final String name, + final String filterStr, + final boolean durable) throws Exception { + checkStarted(); + + clearIO(); try { + SimpleString filter = null; if (filterStr != null && !filterStr.trim().equals("")) { filter = new SimpleString(filterStr); } - server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); + server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false); } finally { blockOnIO(); } } + @Override public void createQueue(final String address, final String name, final String filterStr, - final boolean durable) throws Exception { + final boolean durable, + final String routingType) throws Exception { checkStarted(); clearIO(); @@ -683,12 +703,38 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active filter = new SimpleString(filterStr); } - server.createQueue(SimpleString.toSimpleString(address), ActiveMQDefaultConfiguration.getDefaultRoutingType(), new SimpleString(name), filter, durable, false); + server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false); + } finally { + blockOnIO(); + } + } + + @Override + public void createQueue(String address, + String routingType, + String name, + String filterStr, + boolean durable, + int maxConsumers, + boolean deleteOnNoConsumers, + boolean autoCreateAddress) throws Exception { + checkStarted(); + + clearIO(); + + SimpleString filter = filterStr == null ? null : new SimpleString(filterStr); + try { + if (filterStr != null && !filterStr.trim().equals("")) { + filter = new SimpleString(filterStr); + } + + server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress); } finally { blockOnIO(); } } + @Override public String[] getQueueNames() { checkStarted(); @@ -1704,30 +1750,30 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active settings.add("expiryAddress", addressSettings.getExpiryAddress().toString()); } return settings.add("expiryDelay", addressSettings.getExpiryDelay()) - .add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts()) - .add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize()) - .add("maxSizeBytes", addressSettings.getMaxSizeBytes()) - .add("pageSizeBytes", addressSettings.getPageSizeBytes()) - .add("redeliveryDelay", addressSettings.getRedeliveryDelay()) - .add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier()) - .add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay()) - .add("redistributionDelay", addressSettings.getRedistributionDelay()) - .add("lastValueQueue", addressSettings.isLastValueQueue()) - .add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute()) - .add("addressFullMessagePolicy", policy) - .add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()) - .add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()) - .add("slowConsumerPolicy", consumerPolicy) - .add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues()) - .add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics()) - .add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues()) - .add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsQueues()) - .add("autoCreateQueues", addressSettings.isAutoCreateQueues()) - .add("autoDeleteQueues", addressSettings.isAutoDeleteQueues()) - .add("autoCreateAddress", addressSettings.isAutoCreateAddresses()) - .add("autoDeleteAddress", addressSettings.isAutoDeleteAddresses()) - .build() - .toString(); + .add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts()) + .add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize()) + .add("maxSizeBytes", addressSettings.getMaxSizeBytes()) + .add("pageSizeBytes", addressSettings.getPageSizeBytes()) + .add("redeliveryDelay", addressSettings.getRedeliveryDelay()) + .add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier()) + .add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay()) + .add("redistributionDelay", addressSettings.getRedistributionDelay()) + .add("lastValueQueue", addressSettings.isLastValueQueue()) + .add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute()) + .add("addressFullMessagePolicy", policy) + .add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()) + .add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()) + .add("slowConsumerPolicy", consumerPolicy) + .add("autoCreateJmsQueues", addressSettings.isAutoCreateJmsQueues()) + .add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics()) + .add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues()) + .add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsQueues()) + .add("autoCreateQueues", addressSettings.isAutoCreateQueues()) + .add("autoDeleteQueues", addressSettings.isAutoDeleteQueues()) + .add("autoCreateAddress", addressSettings.isAutoCreateAddresses()) + .add("autoDeleteAddress", addressSettings.isAutoDeleteAddresses()) + .build() + .toString(); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 2ae2329..c997dab 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -720,7 +720,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn } else { // Add binding in storage so the queue will get reloaded on startup and we can find it - it's never // actually routed to at that address though - queue = server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false); + queue = server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false, -1, false, true); } // There are a few things that will behave differently when it's an internal queue http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java index d7874e3..4dbe21e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java @@ -111,8 +111,6 @@ public class AmqpTempDestinationTest extends AmqpClientTestSupport { sender.close(); - Thread.sleep(10000); - queueView = getProxyToQueue(remoteTarget.getAddress()); assertNull(queueView); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index 8de7678..bd2392e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -54,6 +54,7 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.jlibaio.LibaioContext; @@ -185,12 +186,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); - checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - - serverControl.createQueue(address.toString(), name.toString()); + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false); - checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer); + checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer); Assert.assertEquals(address.toString(), queueControl.getAddress()); Assert.assertEquals(name.toString(), queueControl.getName()); Assert.assertNull(queueControl.getFilter()); @@ -211,12 +212,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); - checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - - serverControl.createQueue(address.toString(), name.toString(), filter, durable); + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), filter, durable, -1, false, false); - checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer); + checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer); Assert.assertEquals(address.toString(), queueControl.getAddress()); Assert.assertEquals(name.toString(), queueControl.getName()); Assert.assertEquals(filter, queueControl.getFilter()); @@ -236,12 +237,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); - checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - - serverControl.createQueue(address.toString(), name.toString(), durable); + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false); - checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer); + checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer); Assert.assertEquals(address.toString(), queueControl.getAddress()); Assert.assertEquals(name.toString(), queueControl.getName()); Assert.assertNull(queueControl.getFilter()); @@ -264,12 +265,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); - checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + serverControl.createAddress(address.toString(), "ANYCAST"); serverControl.createQueue(address.toString(), RoutingType.ANYCAST.toString(), name.toString(), null, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress); checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); - QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer); + QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer); Assert.assertEquals(address.toString(), queueControl.getAddress()); Assert.assertEquals(name.toString(), queueControl.getName()); Assert.assertNull(queueControl.getFilter()); @@ -297,8 +298,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - - serverControl.createQueue(address.toString(), name.toString(), durable); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false); ServerLocator receiveLocator = createInVMNonHALocator(); ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator); @@ -307,7 +308,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase { Assert.assertFalse(consumer.isClosed()); - checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); + checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); serverControl.destroyQueue(name.toString(), true); Wait.waitFor(new Wait.Condition() { @Override @@ -329,12 +330,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); - checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - - serverControl.createQueue(address.toString(), name.toString(), filter, durable); + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), filter, durable, -1, false, false); - checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer); + checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer); Assert.assertEquals(address.toString(), queueControl.getAddress()); Assert.assertEquals(name.toString(), queueControl.getName()); Assert.assertNull(queueControl.getFilter()); @@ -355,12 +356,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = createManagementControl(); - checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); + checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), filter, durable, -1, false, false); - serverControl.createQueue(address.toString(), name.toString(), filter, durable); - - checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name)); - QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer); + checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST)); + QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer); Assert.assertEquals(address.toString(), queueControl.getAddress()); Assert.assertEquals(name.toString(), queueControl.getName()); Assert.assertNull(queueControl.getFilter()); @@ -383,8 +384,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase { // management operations Assert.assertFalse(ActiveMQServerControlTest.contains(name.toString(), serverControl.getQueueNames())); - - serverControl.createQueue(address.toString(), name.toString()); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false); Assert.assertTrue(ActiveMQServerControlTest.contains(name.toString(), serverControl.getQueueNames())); serverControl.destroyQueue(name.toString()); @@ -402,8 +403,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase { // management operations Assert.assertFalse(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames())); - - serverControl.createQueue(address.toString(), name.toString()); + serverControl.createAddress(address.toString(), "ANYCAST"); + serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, true, -1, false, false); Assert.assertTrue(ActiveMQServerControlTest.contains(address.toString(), serverControl.getAddressNames())); serverControl.destroyQueue(name.toString()); @@ -1212,7 +1213,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory factory = createSessionFactory(locator); ClientSession session = addClientSession(factory.createSession()); - server.createQueue(queueName, queueName, null, false, false); + server.createAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST)); + server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false); addClientConsumer(session.createConsumer(queueName)); addClientConsumer(session.createConsumer(queueName, SimpleString.toSimpleString(filter), true)); @@ -1268,8 +1270,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase { ServerLocator locator2 = createInVMNonHALocator(); ClientSessionFactory factory2 = createSessionFactory(locator2); ClientSession session2 = addClientSession(factory2.createSession()); - - server.createQueue(queueName, queueName, null, false, false); + serverControl.createAddress(queueName.toString(), "ANYCAST"); + server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false); addClientConsumer(session.createConsumer(queueName)); Thread.sleep(200); @@ -1334,7 +1336,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase { @Test public void testListSessionsAsJSON() throws Exception { SimpleString queueName = new SimpleString(UUID.randomUUID().toString()); - server.createQueue(queueName, queueName, null, false, false); + server.createAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST)); + server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, false, false); ActiveMQServerControl serverControl = createManagementControl(); ServerLocator locator = createInVMNonHALocator(); @@ -1399,8 +1402,10 @@ public class ActiveMQServerControlTest extends ManagementTestBase { this.conf.clearConnectorConfigurations().addConnectorConfiguration("server2-connector", new TransportConfiguration(INVM_CONNECTOR_FACTORY, params)); server2.start(); - server.createQueue(address, address, null, true, false); - server2.createQueue(address, address, null, true, false); + server.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false); + server2.createAddressInfo(new AddressInfo(address, RoutingType.ANYCAST)); + server2.createQueue(address, RoutingType.ANYCAST, address, null, true, false, -1, false, false); ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory csf = createSessionFactory(locator); ClientSession session = csf.createSession(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 280fdc4..2831f79 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -127,6 +127,21 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes } @Override + public void createQueue(String address, String name, String routingType) throws Exception { + proxy.invokeOperation("createQueue", address, name, routingType); + } + + @Override + public void createQueue(String address, String name, boolean durable, String routingType) throws Exception { + proxy.invokeOperation("createQueue", address, name, durable, routingType); + } + + @Override + public void createQueue(String address,String name, String filter, boolean durable, String routingType) throws Exception { + proxy.invokeOperation("createQueue", address, name, filter, durable, routingType); + } + + @Override public void createQueue(final String address, final String name, final boolean durable) throws Exception { proxy.invokeOperation("createQueue", address, name, durable); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfc1f0be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java index 6bc8f3d..11785e4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementControlHelper.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl; import org.apache.activemq.artemis.api.core.management.DivertControl; import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.core.server.RoutingType; public class ManagementControlHelper { @@ -73,6 +74,13 @@ public class ManagementControlHelper { return (QueueControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, ActiveMQDefaultConfiguration.getDefaultRoutingType()), QueueControl.class, mbeanServer); } + public static QueueControl createQueueControl(final SimpleString address, + final SimpleString name, + final RoutingType routingType, + final MBeanServer mbeanServer) throws Exception { + return (QueueControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, routingType), QueueControl.class, mbeanServer); + } + public static AddressControl createAddressControl(final SimpleString address, final MBeanServer mbeanServer) throws Exception { return (AddressControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getAddressObjectName(address), AddressControl.class, mbeanServer);
