Repository: activemq-artemis Updated Branches: refs/heads/master f0c13622a -> 756609f6a
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index c7021fd..6fc5019 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -170,6 +170,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable private Boolean defaultPurgeOnNoConsumers = null; + private Integer defaultConsumersBeforeDispatch = null; + + private Long defaultDelayBeforeDispatch = null; + private RoutingType defaultQueueRoutingType = null; private RoutingType defaultAddressRoutingType = null; @@ -214,6 +218,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable this.maxSizeBytesRejectThreshold = other.maxSizeBytesRejectThreshold; this.defaultMaxConsumers = other.defaultMaxConsumers; this.defaultPurgeOnNoConsumers = other.defaultPurgeOnNoConsumers; + this.defaultConsumersBeforeDispatch = other.defaultConsumersBeforeDispatch; + this.defaultDelayBeforeDispatch = other.defaultDelayBeforeDispatch; this.defaultQueueRoutingType = other.defaultQueueRoutingType; this.defaultAddressRoutingType = other.defaultAddressRoutingType; } @@ -328,6 +334,24 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable return this; } + public int getDefaultConsumersBeforeDispatch() { + return defaultConsumersBeforeDispatch != null ? defaultConsumersBeforeDispatch : ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(); + } + + public AddressSettings setDefaultConsumersBeforeDispatch(Integer defaultConsumersBeforeDispatch) { + this.defaultConsumersBeforeDispatch = defaultConsumersBeforeDispatch; + return this; + } + + public long getDefaultDelayBeforeDispatch() { + return defaultDelayBeforeDispatch != null ? defaultDelayBeforeDispatch : ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(); + } + + public AddressSettings setDefaultDelayBeforeDispatch(Long defaultDelayBeforeDispatch) { + this.defaultDelayBeforeDispatch = defaultDelayBeforeDispatch; + return this; + } + public boolean isDefaultPurgeOnNoConsumers() { return defaultPurgeOnNoConsumers != null ? defaultPurgeOnNoConsumers : ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(); } @@ -667,6 +691,18 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable if (defaultAddressRoutingType == null) { defaultAddressRoutingType = merged.defaultAddressRoutingType; } + if (defaultExclusiveQueue == null) { + defaultExclusiveQueue = merged.defaultExclusiveQueue; + } + if (defaultLastValueQueue == null) { + defaultLastValueQueue = merged.defaultLastValueQueue; + } + if (defaultConsumersBeforeDispatch == null) { + defaultConsumersBeforeDispatch = merged.defaultConsumersBeforeDispatch; + } + if (defaultDelayBeforeDispatch == null) { + defaultDelayBeforeDispatch = merged.defaultDelayBeforeDispatch; + } } @Override @@ -767,6 +803,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable if (buffer.readableBytes() > 0) { defaultExclusiveQueue = BufferHelper.readNullableBoolean(buffer); } + + if (buffer.readableBytes() > 0) { + defaultConsumersBeforeDispatch = BufferHelper.readNullableInteger(buffer); + } + + if (buffer.readableBytes() > 0) { + defaultDelayBeforeDispatch = BufferHelper.readNullableLong(buffer); + } } @Override @@ -805,7 +849,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable BufferHelper.sizeOfNullableBoolean(defaultPurgeOnNoConsumers) + DataConstants.SIZE_BYTE + DataConstants.SIZE_BYTE + - BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue); + BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue) + + BufferHelper.sizeOfNullableInteger(defaultConsumersBeforeDispatch) + + BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch); } @Override @@ -882,6 +928,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable BufferHelper.writeNullableBoolean(buffer, defaultExclusiveQueue); + BufferHelper.writeNullableInteger(buffer, defaultConsumersBeforeDispatch); + + BufferHelper.writeNullableLong(buffer, defaultDelayBeforeDispatch); + } /* (non-Javadoc) @@ -928,6 +978,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable result = prime * result + ((defaultPurgeOnNoConsumers == null) ? 0 : defaultPurgeOnNoConsumers.hashCode()); result = prime * result + ((defaultQueueRoutingType == null) ? 0 : defaultQueueRoutingType.hashCode()); result = prime * result + ((defaultAddressRoutingType == null) ? 0 : defaultAddressRoutingType.hashCode()); + result = prime * result + ((defaultConsumersBeforeDispatch == null) ? 0 : defaultConsumersBeforeDispatch.hashCode()); + result = prime * result + ((defaultDelayBeforeDispatch == null) ? 0 : defaultDelayBeforeDispatch.hashCode()); return result; } @@ -1133,6 +1185,18 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable return false; } else if (!defaultAddressRoutingType.equals(other.defaultAddressRoutingType)) return false; + + if (defaultConsumersBeforeDispatch == null) { + if (other.defaultConsumersBeforeDispatch != null) + return false; + } else if (!defaultConsumersBeforeDispatch.equals(other.defaultConsumersBeforeDispatch)) + return false; + + if (defaultDelayBeforeDispatch == null) { + if (other.defaultDelayBeforeDispatch != null) + return false; + } else if (!defaultDelayBeforeDispatch.equals(other.defaultDelayBeforeDispatch)) + return false; return true; } @@ -1212,6 +1276,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable defaultQueueRoutingType + ", defaultAddressRoutingType=" + defaultAddressRoutingType + + ", defaultConsumersBeforeDispatch=" + + defaultConsumersBeforeDispatch + + ", defaultDelayBeforeDispatch=" + + defaultDelayBeforeDispatch + "]"; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/resources/schema/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 831b4cb..e96923d 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -510,6 +510,8 @@ <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/> <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/> <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/> + <xsd:attribute name="consumers-before-dispatch" type="xsd:int" use="optional"/> + <xsd:attribute name="delay-before-dispatch" type="xsd:long" use="optional"/> <xsd:attributeGroup ref="xml:specialAttrs"/> </xsd:complexType> </xsd:element> @@ -2802,6 +2804,22 @@ </xsd:annotation> </xsd:element> + <xsd:element name="default-consumers-before-dispatch" type="xsd:int" default="0" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + the default number of consumers needed before dispatch can start for queues under the address. + </xsd:documentation> + </xsd:annotation> + </xsd:element> + + <xsd:element name="default-delay-before-dispatch" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + the default delay to wait before dispatching if number of consumers before dispatch is not met for queues under the address. + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="redistribution-delay" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> @@ -3119,6 +3137,8 @@ <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/> <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/> <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/> + <xsd:attribute name="consumers-before-dispatch" type="xsd:int" use="optional"/> + <xsd:attribute name="delay-before-dispatch" type="xsd:long" use="optional"/> <xsd:attributeGroup ref="xml:specialAttrs"/> </xsd:complexType> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 4cdd11c..8fcac20 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -423,7 +423,8 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals("color='blue'", queueConfiguration.getFilterString()); assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), queueConfiguration.getPurgeOnNoConsumers()); assertEquals("addr1", queueConfiguration.getAddress()); - assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queueConfiguration.getMaxConsumers()); + // If null, then default will be taken from address-settings (which defaults to ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers()) + assertEquals(null, queueConfiguration.getMaxConsumers()); // Addr 1 Queue 2 queueConfiguration = addressConfiguration.getQueueConfigurations().get(1); @@ -431,7 +432,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals("q2", queueConfiguration.getName()); assertTrue(queueConfiguration.isDurable()); assertEquals("color='green'", queueConfiguration.getFilterString()); - assertEquals(Queue.MAX_CONSUMERS_UNLIMITED, queueConfiguration.getMaxConsumers()); + assertEquals(Queue.MAX_CONSUMERS_UNLIMITED, queueConfiguration.getMaxConsumers().intValue()); assertFalse(queueConfiguration.getPurgeOnNoConsumers()); assertEquals("addr1", queueConfiguration.getAddress()); @@ -449,7 +450,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals("q3", queueConfiguration.getName()); assertTrue(queueConfiguration.isDurable()); assertEquals("color='red'", queueConfiguration.getFilterString()); - assertEquals(10, queueConfiguration.getMaxConsumers()); + assertEquals(10, queueConfiguration.getMaxConsumers().intValue()); assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), queueConfiguration.getPurgeOnNoConsumers()); assertEquals("addr2", queueConfiguration.getAddress()); @@ -459,7 +460,8 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals("q4", queueConfiguration.getName()); assertTrue(queueConfiguration.isDurable()); assertNull(queueConfiguration.getFilterString()); - assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queueConfiguration.getMaxConsumers()); + // If null, then default will be taken from address-settings (which defaults to ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers()) + assertEquals(null, queueConfiguration.getMaxConsumers()); assertTrue(queueConfiguration.getPurgeOnNoConsumers()); assertEquals("addr2", queueConfiguration.getAddress()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index f45a1dd..96de8c7 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -794,6 +794,41 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override + public int getConsumersBeforeDispatch() { + return 0; + } + + @Override + public void setConsumersBeforeDispatch(int consumersBeforeDispatch) { + + } + + @Override + public long getDelayBeforeDispatch() { + return 0; + } + + @Override + public void setDelayBeforeDispatch(long delayBeforeDispatch) { + + } + + @Override + public long getDispatchStartTime() { + return 0; + } + + @Override + public boolean isDispatching() { + return false; + } + + @Override + public void setDispatching(boolean dispatching) { + + } + + @Override public void setMaxConsumer(int maxConsumers) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-tools/src/test/resources/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd index 41c881e..30de90b 100644 --- a/artemis-tools/src/test/resources/artemis-configuration.xsd +++ b/artemis-tools/src/test/resources/artemis-configuration.xsd @@ -491,6 +491,8 @@ <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/> <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/> <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/> + <xsd:attribute name="consumers-before-dispatch" type="xsd:int" use="optional"/> + <xsd:attribute name="delay-before-dispatch" type="xsd:long" use="optional"/> <xsd:attributeGroup ref="xml:specialAttrs"/> </xsd:complexType> </xsd:element> @@ -2498,6 +2500,22 @@ </xsd:annotation> </xsd:element> + <xsd:element name="default-consumers-before-dispatch" type="xsd:int" default="0" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + the default number of consumers needed before dispatch can start for queues under the address. + </xsd:documentation> + </xsd:annotation> + </xsd:element> + + <xsd:element name="default-delay-before-dispatch" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + the default delay to wait before dispatching if number of consumers before dispatch is not met for queues under the address. + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="redistribution-delay" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> @@ -2769,6 +2787,8 @@ <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/> <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/> <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/> + <xsd:attribute name="consumers-before-dispatch" type="xsd:int" use="optional"/> + <xsd:attribute name="delay-before-dispatch" type="xsd:long" use="optional"/> <xsd:attributeGroup ref="xml:specialAttrs"/> </xsd:complexType> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java new file mode 100644 index 0000000..4d2d195 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Exclusive Test + */ +public class ConsumerDelayDispatchTest extends JMSTestBase { + + private SimpleString queueName = SimpleString.toSimpleString("jms.consumer.delay.queue"); + private SimpleString normalQueueName = SimpleString.toSimpleString("jms.noraml.queue"); + + private static final long DELAY_BEFORE_DISPATCH = 10000L; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, -1, false, true, false, 2, DELAY_BEFORE_DISPATCH, true); + server.createQueue(normalQueueName, RoutingType.ANYCAST, normalQueueName, null, null, true, false, false, false, false, -1, false, true, false, 0, -1, true); + + } + + + protected ConnectionFactory getCF() throws Exception { + return cf; + } + + @Test + public void testNoDelayOnDefault() throws Exception { + sendMessage(normalQueueName); + + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + connection.start(); + + Destination queue = session.createQueue(normalQueueName.toString()); + MessageConsumer consumer1 = session.createConsumer(queue); + + Assert.assertNotNull(receive(consumer1)); + } finally { + connection.close(); + } + } + + @Test + public void testDelayBeforeDispatch() throws Exception { + sendMessage(queueName); + + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + connection.start(); + + Destination queue = session.createQueue(queueName.toString()); + MessageConsumer consumer1 = session.createConsumer(queue); + + Assert.assertNull(receive(consumer1)); + Thread.sleep(DELAY_BEFORE_DISPATCH); + + Assert.assertNotNull(receive(consumer1)); + } finally { + connection.close(); + } + } + + @Test + public void testConsumersBeforeDispatch() throws Exception { + sendMessage(queueName); + + + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + + try { + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + connection.start(); + Destination queue = session.createQueue(queueName.toString()); + + MessageConsumer consumer1 = session.createConsumer(queue); + + Assert.assertNull(receive(consumer1)); + + MessageConsumer consumer2 = session.createConsumer(queue); + + Assert.assertNotNull(receive(consumer1, consumer2)); + } finally { + connection.close(); + } + } + + + @Test + public void testContinueAndResetConsumer() throws Exception { + sendMessage(queueName); + + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + + try { + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + connection.start(); + Destination queue = session.createQueue(queueName.toString()); + + MessageConsumer consumer1 = session.createConsumer(queue); + + Assert.assertNull(receive(consumer1)); + + MessageConsumer consumer2 = session.createConsumer(queue); + + Assert.assertNotNull(receive(consumer1, consumer2)); + + consumer2.close(); + + //Ensure that now dispatch is active, if we close a consumer, dispatching continues. + sendMessage(queueName); + + Assert.assertNotNull(receive(consumer1)); + + //Stop all consumers, which should reset dispatch rules. + consumer1.close(); + + //Ensure that once all consumers are stopped, that dispatch rules reset and wait for min consumers. + sendMessage(queueName); + + MessageConsumer consumer3 = session.createConsumer(queue); + + Assert.assertNull(receive(consumer3)); + + MessageConsumer consumer4 = session.createConsumer(queue); + + Assert.assertNotNull(receive(consumer3, consumer4)); + + + //Stop all consumers, which should reset dispatch rules. + consumer3.close(); + consumer4.close(); + + //Ensure that once all consumers are stopped, that dispatch rules reset and wait for delay. + sendMessage(queueName); + + MessageConsumer consumer5 = session.createConsumer(queue); + + Assert.assertNull(receive(consumer5)); + + Thread.sleep(DELAY_BEFORE_DISPATCH); + + Assert.assertNotNull(receive(consumer5)); + + } finally { + connection.close(); + } + } + + private Message receive(MessageConsumer consumer1) throws JMSException { + return consumer1.receive(1000); + } + + private Message receive(MessageConsumer consumer1, MessageConsumer consumer2) throws JMSException { + Message receivedMessage = receive(consumer1); + if (receivedMessage == null) { + receivedMessage = receive(consumer2); + } + return receivedMessage; + } + + public void sendMessage(SimpleString queue) throws Exception { + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + connection.start(); + + Destination destination = session.createQueue(queue.toString()); + MessageProducer producer = session.createProducer(destination); + + TextMessage message = session.createTextMessage(); + message.setText("Message"); + producer.send(message); + } finally { + connection.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index ae24a45..2d78092 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -156,6 +156,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes } @Override + public String updateQueue(String name, String routingType, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, String user) throws Exception { + return null; + } + + @Override public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception { proxy.invokeOperation("deleteAddress", name); } @@ -189,6 +194,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes } @Override + public String createQueue(String address, String routingType, String name, String filterStr, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception { + return null; + } + + @Override public void createQueue(final String address, final String name, final boolean durable) throws Exception { proxy.invokeOperation("createQueue", address, name, durable); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java index e3b179b..ac2ed61 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java @@ -105,6 +105,52 @@ public class QueueConfigRestartTest extends ActiveMQTestBase { Assert.assertTrue(queueBinding2.getQueue().isExclusive()); } + @Test + public void testQueueConfigConsumersBeforeDispatchAndRestart() throws Exception { + int consumersBeforeDispatch = 5; + ActiveMQServer server = createServer(true); + + server.start(); + + SimpleString address = new SimpleString("test.address"); + SimpleString queue = new SimpleString("test.queue"); + + server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, true, consumersBeforeDispatch, -1, true); + + QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue); + Assert.assertEquals(consumersBeforeDispatch, queueBinding1.getQueue().getConsumersBeforeDispatch()); + + server.stop(); + + server.start(); + + QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue); + Assert.assertEquals(consumersBeforeDispatch, queueBinding1.getQueue().getConsumersBeforeDispatch()); + } + + @Test + public void testQueueConfigDelayBeforeDispatchAndRestart() throws Exception { + long delayBeforeDispatch = 5000L; + ActiveMQServer server = createServer(true); + + server.start(); + + SimpleString address = new SimpleString("test.address"); + SimpleString queue = new SimpleString("test.queue"); + + server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, true, 0, delayBeforeDispatch, true); + + QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue); + Assert.assertEquals(delayBeforeDispatch, queueBinding1.getQueue().getDelayBeforeDispatch()); + + server.stop(); + + server.start(); + + QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue); + Assert.assertEquals(delayBeforeDispatch, queueBinding1.getQueue().getDelayBeforeDispatch()); + } + @Test public void testQueueConfigUserAndRestart() throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 192d700..71ced7f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -49,6 +49,41 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override + public int getConsumersBeforeDispatch() { + return 0; + } + + @Override + public void setConsumersBeforeDispatch(int consumersBeforeDispatch) { + + } + + @Override + public long getDelayBeforeDispatch() { + return 0; + } + + @Override + public void setDelayBeforeDispatch(long delayBeforeDispatch) { + + } + + @Override + public long getDispatchStartTime() { + return 0; + } + + @Override + public boolean isDispatching() { + return false; + } + + @Override + public void setDispatching(boolean dispatching) { + + } + + @Override public boolean isExclusive() { // no-op return false; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 3f35084..44e5823 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -49,6 +49,8 @@ public class FakePostOffice implements PostOffice { Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, + Integer consumersBeforeDispatch, + Long delayBeforeDispatch, SimpleString user) throws Exception { return null; }
