Repository: activemq-artemis
Updated Branches:
  refs/heads/master f0c13622a -> 756609f6a


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index c7021fd..6fc5019 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -170,6 +170,10 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
 
    private Boolean defaultPurgeOnNoConsumers = null;
 
+   private Integer defaultConsumersBeforeDispatch = null;
+
+   private Long defaultDelayBeforeDispatch = null;
+
    private RoutingType defaultQueueRoutingType = null;
 
    private RoutingType defaultAddressRoutingType = null;
@@ -214,6 +218,8 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
       this.maxSizeBytesRejectThreshold = other.maxSizeBytesRejectThreshold;
       this.defaultMaxConsumers = other.defaultMaxConsumers;
       this.defaultPurgeOnNoConsumers = other.defaultPurgeOnNoConsumers;
+      this.defaultConsumersBeforeDispatch = 
other.defaultConsumersBeforeDispatch;
+      this.defaultDelayBeforeDispatch = other.defaultDelayBeforeDispatch;
       this.defaultQueueRoutingType = other.defaultQueueRoutingType;
       this.defaultAddressRoutingType = other.defaultAddressRoutingType;
    }
@@ -328,6 +334,24 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
       return this;
    }
 
+   public int getDefaultConsumersBeforeDispatch() {
+      return defaultConsumersBeforeDispatch != null ? 
defaultConsumersBeforeDispatch : 
ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch();
+   }
+
+   public AddressSettings setDefaultConsumersBeforeDispatch(Integer 
defaultConsumersBeforeDispatch) {
+      this.defaultConsumersBeforeDispatch = defaultConsumersBeforeDispatch;
+      return this;
+   }
+
+   public long getDefaultDelayBeforeDispatch() {
+      return defaultDelayBeforeDispatch != null ? defaultDelayBeforeDispatch : 
ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch();
+   }
+
+   public AddressSettings setDefaultDelayBeforeDispatch(Long 
defaultDelayBeforeDispatch) {
+      this.defaultDelayBeforeDispatch = defaultDelayBeforeDispatch;
+      return this;
+   }
+
    public boolean isDefaultPurgeOnNoConsumers() {
       return defaultPurgeOnNoConsumers != null ? defaultPurgeOnNoConsumers : 
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
    }
@@ -667,6 +691,18 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
       if (defaultAddressRoutingType == null) {
          defaultAddressRoutingType = merged.defaultAddressRoutingType;
       }
+      if (defaultExclusiveQueue == null) {
+         defaultExclusiveQueue = merged.defaultExclusiveQueue;
+      }
+      if (defaultLastValueQueue == null) {
+         defaultLastValueQueue = merged.defaultLastValueQueue;
+      }
+      if (defaultConsumersBeforeDispatch == null) {
+         defaultConsumersBeforeDispatch = 
merged.defaultConsumersBeforeDispatch;
+      }
+      if (defaultDelayBeforeDispatch == null) {
+         defaultDelayBeforeDispatch = merged.defaultDelayBeforeDispatch;
+      }
    }
 
    @Override
@@ -767,6 +803,14 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
       if (buffer.readableBytes() > 0) {
          defaultExclusiveQueue = BufferHelper.readNullableBoolean(buffer);
       }
+
+      if (buffer.readableBytes() > 0) {
+         defaultConsumersBeforeDispatch = 
BufferHelper.readNullableInteger(buffer);
+      }
+
+      if (buffer.readableBytes() > 0) {
+         defaultDelayBeforeDispatch = BufferHelper.readNullableLong(buffer);
+      }
    }
 
    @Override
@@ -805,7 +849,9 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
          BufferHelper.sizeOfNullableBoolean(defaultPurgeOnNoConsumers) +
          DataConstants.SIZE_BYTE +
          DataConstants.SIZE_BYTE +
-         BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue);
+         BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue) +
+         BufferHelper.sizeOfNullableInteger(defaultConsumersBeforeDispatch) +
+         BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch);
    }
 
    @Override
@@ -882,6 +928,10 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
 
       BufferHelper.writeNullableBoolean(buffer, defaultExclusiveQueue);
 
+      BufferHelper.writeNullableInteger(buffer, 
defaultConsumersBeforeDispatch);
+
+      BufferHelper.writeNullableLong(buffer, defaultDelayBeforeDispatch);
+
    }
 
    /* (non-Javadoc)
@@ -928,6 +978,8 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
       result = prime * result + ((defaultPurgeOnNoConsumers == null) ? 0 : 
defaultPurgeOnNoConsumers.hashCode());
       result = prime * result + ((defaultQueueRoutingType == null) ? 0 : 
defaultQueueRoutingType.hashCode());
       result = prime * result + ((defaultAddressRoutingType == null) ? 0 : 
defaultAddressRoutingType.hashCode());
+      result = prime * result + ((defaultConsumersBeforeDispatch == null) ? 0 
: defaultConsumersBeforeDispatch.hashCode());
+      result = prime * result + ((defaultDelayBeforeDispatch == null) ? 0 : 
defaultDelayBeforeDispatch.hashCode());
       return result;
    }
 
@@ -1133,6 +1185,18 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
             return false;
       } else if 
(!defaultAddressRoutingType.equals(other.defaultAddressRoutingType))
          return false;
+
+      if (defaultConsumersBeforeDispatch == null) {
+         if (other.defaultConsumersBeforeDispatch != null)
+            return false;
+      } else if 
(!defaultConsumersBeforeDispatch.equals(other.defaultConsumersBeforeDispatch))
+         return false;
+
+      if (defaultDelayBeforeDispatch == null) {
+         if (other.defaultDelayBeforeDispatch != null)
+            return false;
+      } else if 
(!defaultDelayBeforeDispatch.equals(other.defaultDelayBeforeDispatch))
+         return false;
       return true;
    }
 
@@ -1212,6 +1276,10 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
          defaultQueueRoutingType +
          ", defaultAddressRoutingType=" +
          defaultAddressRoutingType +
+         ", defaultConsumersBeforeDispatch=" +
+         defaultConsumersBeforeDispatch +
+         ", defaultDelayBeforeDispatch=" +
+         defaultDelayBeforeDispatch +
          "]";
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 831b4cb..e96923d 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -510,6 +510,8 @@
                         <xsd:attribute name="purge-on-no-consumers" 
type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="exclusive" type="xsd:boolean" 
use="optional"/>
                         <xsd:attribute name="last-value" type="xsd:boolean" 
use="optional"/>
+                        <xsd:attribute name="consumers-before-dispatch" 
type="xsd:int" use="optional"/>
+                        <xsd:attribute name="delay-before-dispatch" 
type="xsd:long" use="optional"/>
                         <xsd:attributeGroup ref="xml:specialAttrs"/>
                      </xsd:complexType>
                   </xsd:element>
@@ -2802,6 +2804,22 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="default-consumers-before-dispatch" 
type="xsd:int" default="0" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     the default number of consumers needed before dispatch 
can start for queues under the address.
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
+            <xsd:element name="default-delay-before-dispatch" type="xsd:long" 
default="-1" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     the default delay to wait before dispatching if number of 
consumers before dispatch is not met for queues under the address.
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
             <xsd:element name="redistribution-delay" type="xsd:long" 
default="-1" maxOccurs="1" minOccurs="0">
                <xsd:annotation>
                   <xsd:documentation>
@@ -3119,6 +3137,8 @@
       <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" 
use="optional"/>
       <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
       <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>
+      <xsd:attribute name="consumers-before-dispatch" type="xsd:int" 
use="optional"/>
+      <xsd:attribute name="delay-before-dispatch" type="xsd:long" 
use="optional"/>
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 4cdd11c..8fcac20 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -423,7 +423,8 @@ public class FileConfigurationTest extends 
ConfigurationImplTest {
       assertEquals("color='blue'", queueConfiguration.getFilterString());
       
assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), 
queueConfiguration.getPurgeOnNoConsumers());
       assertEquals("addr1", queueConfiguration.getAddress());
-      assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), 
queueConfiguration.getMaxConsumers());
+      // If null, then default will be taken from address-settings (which 
defaults to ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers())
+      assertEquals(null, queueConfiguration.getMaxConsumers());
 
       // Addr 1 Queue 2
       queueConfiguration = 
addressConfiguration.getQueueConfigurations().get(1);
@@ -431,7 +432,7 @@ public class FileConfigurationTest extends 
ConfigurationImplTest {
       assertEquals("q2", queueConfiguration.getName());
       assertTrue(queueConfiguration.isDurable());
       assertEquals("color='green'", queueConfiguration.getFilterString());
-      assertEquals(Queue.MAX_CONSUMERS_UNLIMITED, 
queueConfiguration.getMaxConsumers());
+      assertEquals(Queue.MAX_CONSUMERS_UNLIMITED, 
queueConfiguration.getMaxConsumers().intValue());
       assertFalse(queueConfiguration.getPurgeOnNoConsumers());
       assertEquals("addr1", queueConfiguration.getAddress());
 
@@ -449,7 +450,7 @@ public class FileConfigurationTest extends 
ConfigurationImplTest {
       assertEquals("q3", queueConfiguration.getName());
       assertTrue(queueConfiguration.isDurable());
       assertEquals("color='red'", queueConfiguration.getFilterString());
-      assertEquals(10, queueConfiguration.getMaxConsumers());
+      assertEquals(10, queueConfiguration.getMaxConsumers().intValue());
       
assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), 
queueConfiguration.getPurgeOnNoConsumers());
       assertEquals("addr2", queueConfiguration.getAddress());
 
@@ -459,7 +460,8 @@ public class FileConfigurationTest extends 
ConfigurationImplTest {
       assertEquals("q4", queueConfiguration.getName());
       assertTrue(queueConfiguration.isDurable());
       assertNull(queueConfiguration.getFilterString());
-      assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), 
queueConfiguration.getMaxConsumers());
+      // If null, then default will be taken from address-settings (which 
defaults to ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers())
+      assertEquals(null, queueConfiguration.getMaxConsumers());
       assertTrue(queueConfiguration.getPurgeOnNoConsumers());
       assertEquals("addr2", queueConfiguration.getAddress());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index f45a1dd..96de8c7 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -794,6 +794,41 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public int getConsumersBeforeDispatch() {
+         return 0;
+      }
+
+      @Override
+      public void setConsumersBeforeDispatch(int consumersBeforeDispatch) {
+
+      }
+
+      @Override
+      public long getDelayBeforeDispatch() {
+         return 0;
+      }
+
+      @Override
+      public void setDelayBeforeDispatch(long delayBeforeDispatch) {
+
+      }
+
+      @Override
+      public long getDispatchStartTime() {
+         return 0;
+      }
+
+      @Override
+      public boolean isDispatching() {
+         return false;
+      }
+
+      @Override
+      public void setDispatching(boolean dispatching) {
+
+      }
+
+      @Override
       public void setMaxConsumer(int maxConsumers) {
 
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-tools/src/test/resources/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd 
b/artemis-tools/src/test/resources/artemis-configuration.xsd
index 41c881e..30de90b 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -491,6 +491,8 @@
                         <xsd:attribute name="purge-on-no-consumers" 
type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="exclusive" type="xsd:boolean" 
use="optional"/>
                         <xsd:attribute name="last-value" type="xsd:boolean" 
use="optional"/>
+                        <xsd:attribute name="consumers-before-dispatch" 
type="xsd:int" use="optional"/>
+                        <xsd:attribute name="delay-before-dispatch" 
type="xsd:long" use="optional"/>
                         <xsd:attributeGroup ref="xml:specialAttrs"/>
                      </xsd:complexType>
                   </xsd:element>
@@ -2498,6 +2500,22 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="default-consumers-before-dispatch" 
type="xsd:int" default="0" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     the default number of consumers needed before dispatch 
can start for queues under the address.
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
+            <xsd:element name="default-delay-before-dispatch" type="xsd:long" 
default="-1" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     the default delay to wait before dispatching if number of 
consumers before dispatch is not met for queues under the address.
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
             <xsd:element name="redistribution-delay" type="xsd:long" 
default="-1" maxOccurs="1" minOccurs="0">
                <xsd:annotation>
                   <xsd:documentation>
@@ -2769,6 +2787,8 @@
       <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" 
use="optional"/>
       <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
       <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>
+      <xsd:attribute name="consumers-before-dispatch" type="xsd:int" 
use="optional"/>
+      <xsd:attribute name="delay-before-dispatch" type="xsd:long" 
use="optional"/>
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
new file mode 100644
index 0000000..4d2d195
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Exclusive Test
+ */
+public class ConsumerDelayDispatchTest extends JMSTestBase {
+
+   private SimpleString queueName = 
SimpleString.toSimpleString("jms.consumer.delay.queue");
+   private SimpleString normalQueueName = 
SimpleString.toSimpleString("jms.noraml.queue");
+
+   private static final long DELAY_BEFORE_DISPATCH = 10000L;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, 
null, true, false, false, false, false, -1, false, true, false, 2, 
DELAY_BEFORE_DISPATCH, true);
+      server.createQueue(normalQueueName, RoutingType.ANYCAST, 
normalQueueName, null, null, true, false, false, false, false, -1, false, true, 
false, 0, -1, true);
+
+   }
+
+
+   protected ConnectionFactory getCF() throws Exception {
+      return cf;
+   }
+
+   @Test
+   public void testNoDelayOnDefault() throws Exception {
+      sendMessage(normalQueueName);
+
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+
+      try {
+
+         Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+         connection.start();
+
+         Destination queue = session.createQueue(normalQueueName.toString());
+         MessageConsumer consumer1 = session.createConsumer(queue);
+
+         Assert.assertNotNull(receive(consumer1));
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test
+   public void testDelayBeforeDispatch() throws Exception {
+      sendMessage(queueName);
+
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+
+      try {
+
+         Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+         connection.start();
+
+         Destination queue = session.createQueue(queueName.toString());
+         MessageConsumer consumer1 = session.createConsumer(queue);
+
+         Assert.assertNull(receive(consumer1));
+         Thread.sleep(DELAY_BEFORE_DISPATCH);
+
+         Assert.assertNotNull(receive(consumer1));
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test
+   public void testConsumersBeforeDispatch() throws Exception {
+      sendMessage(queueName);
+
+
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+
+      try {
+         Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+         connection.start();
+         Destination queue = session.createQueue(queueName.toString());
+
+         MessageConsumer consumer1 = session.createConsumer(queue);
+
+         Assert.assertNull(receive(consumer1));
+
+         MessageConsumer consumer2 = session.createConsumer(queue);
+
+         Assert.assertNotNull(receive(consumer1, consumer2));
+      } finally {
+         connection.close();
+      }
+   }
+
+
+   @Test
+   public void testContinueAndResetConsumer() throws Exception {
+      sendMessage(queueName);
+
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+
+      try {
+         Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+         connection.start();
+         Destination queue = session.createQueue(queueName.toString());
+
+         MessageConsumer consumer1 = session.createConsumer(queue);
+
+         Assert.assertNull(receive(consumer1));
+
+         MessageConsumer consumer2 = session.createConsumer(queue);
+
+         Assert.assertNotNull(receive(consumer1, consumer2));
+
+         consumer2.close();
+
+         //Ensure that now dispatch is active, if we close a consumer, 
dispatching continues.
+         sendMessage(queueName);
+
+         Assert.assertNotNull(receive(consumer1));
+
+         //Stop all consumers, which should reset dispatch rules.
+         consumer1.close();
+
+         //Ensure that once all consumers are stopped, that dispatch rules 
reset and wait for min consumers.
+         sendMessage(queueName);
+
+         MessageConsumer consumer3 = session.createConsumer(queue);
+
+         Assert.assertNull(receive(consumer3));
+
+         MessageConsumer consumer4 = session.createConsumer(queue);
+
+         Assert.assertNotNull(receive(consumer3, consumer4));
+
+
+         //Stop all consumers, which should reset dispatch rules.
+         consumer3.close();
+         consumer4.close();
+
+         //Ensure that once all consumers are stopped, that dispatch rules 
reset and wait for delay.
+         sendMessage(queueName);
+
+         MessageConsumer consumer5 = session.createConsumer(queue);
+
+         Assert.assertNull(receive(consumer5));
+
+         Thread.sleep(DELAY_BEFORE_DISPATCH);
+
+         Assert.assertNotNull(receive(consumer5));
+
+      } finally {
+         connection.close();
+      }
+   }
+
+   private Message receive(MessageConsumer consumer1) throws JMSException {
+      return consumer1.receive(1000);
+   }
+
+   private Message receive(MessageConsumer consumer1, MessageConsumer 
consumer2) throws JMSException {
+      Message receivedMessage = receive(consumer1);
+      if (receivedMessage == null) {
+         receivedMessage = receive(consumer2);
+      }
+      return receivedMessage;
+   }
+
+   public void sendMessage(SimpleString queue) throws Exception {
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+      try {
+
+         Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+         connection.start();
+
+         Destination destination = session.createQueue(queue.toString());
+         MessageProducer producer = session.createProducer(destination);
+
+         TextMessage message = session.createTextMessage();
+         message.setText("Message");
+         producer.send(message);
+      } finally {
+         connection.close();
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index ae24a45..2d78092 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -156,6 +156,11 @@ public class ActiveMQServerControlUsingCoreTest extends 
ActiveMQServerControlTes
          }
 
          @Override
+         public String updateQueue(String name, String routingType, Integer 
maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Integer 
consumersBeforeDispatch, Long delayBeforeDispatch, String user) throws 
Exception {
+            return null;
+         }
+
+         @Override
          public void deleteAddress(@Parameter(name = "name", desc = "The name 
of the address") String name) throws Exception {
             proxy.invokeOperation("deleteAddress", name);
          }
@@ -189,6 +194,11 @@ public class ActiveMQServerControlUsingCoreTest extends 
ActiveMQServerControlTes
          }
 
          @Override
+         public String createQueue(String address, String routingType, String 
name, String filterStr, boolean durable, int maxConsumers, boolean 
purgeOnNoConsumers, boolean exclusive, boolean lastValue, int 
consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) 
throws Exception {
+            return null;
+         }
+
+         @Override
          public void createQueue(final String address, final String name, 
final boolean durable) throws Exception {
             proxy.invokeOperation("createQueue", address, name, durable);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
index e3b179b..ac2ed61 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
@@ -105,6 +105,52 @@ public class QueueConfigRestartTest extends 
ActiveMQTestBase {
       Assert.assertTrue(queueBinding2.getQueue().isExclusive());
    }
 
+   @Test
+   public void testQueueConfigConsumersBeforeDispatchAndRestart() throws 
Exception {
+      int consumersBeforeDispatch = 5;
+      ActiveMQServer server = createServer(true);
+
+      server.start();
+
+      SimpleString address = new SimpleString("test.address");
+      SimpleString queue = new SimpleString("test.queue");
+
+      server.createQueue(address, RoutingType.MULTICAST, queue, null, null, 
true, false, false, false,false, 10, true, true, true, consumersBeforeDispatch, 
-1, true);
+
+      QueueBinding queueBinding1 = 
(QueueBinding)server.getPostOffice().getBinding(queue);
+      Assert.assertEquals(consumersBeforeDispatch, 
queueBinding1.getQueue().getConsumersBeforeDispatch());
+
+      server.stop();
+
+      server.start();
+
+      QueueBinding queueBinding2 = 
(QueueBinding)server.getPostOffice().getBinding(queue);
+      Assert.assertEquals(consumersBeforeDispatch, 
queueBinding1.getQueue().getConsumersBeforeDispatch());
+   }
+
+   @Test
+   public void testQueueConfigDelayBeforeDispatchAndRestart() throws Exception 
{
+      long delayBeforeDispatch = 5000L;
+      ActiveMQServer server = createServer(true);
+
+      server.start();
+
+      SimpleString address = new SimpleString("test.address");
+      SimpleString queue = new SimpleString("test.queue");
+
+      server.createQueue(address, RoutingType.MULTICAST, queue, null, null, 
true, false, false, false,false, 10, true, true, true, 0, delayBeforeDispatch, 
true);
+
+      QueueBinding queueBinding1 = 
(QueueBinding)server.getPostOffice().getBinding(queue);
+      Assert.assertEquals(delayBeforeDispatch, 
queueBinding1.getQueue().getDelayBeforeDispatch());
+
+      server.stop();
+
+      server.start();
+
+      QueueBinding queueBinding2 = 
(QueueBinding)server.getPostOffice().getBinding(queue);
+      Assert.assertEquals(delayBeforeDispatch, 
queueBinding1.getQueue().getDelayBeforeDispatch());
+   }
+
 
    @Test
    public void testQueueConfigUserAndRestart() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 192d700..71ced7f 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -49,6 +49,41 @@ public class FakeQueue extends CriticalComponentImpl 
implements Queue {
    }
 
    @Override
+   public int getConsumersBeforeDispatch() {
+      return 0;
+   }
+
+   @Override
+   public void setConsumersBeforeDispatch(int consumersBeforeDispatch) {
+
+   }
+
+   @Override
+   public long getDelayBeforeDispatch() {
+      return 0;
+   }
+
+   @Override
+   public void setDelayBeforeDispatch(long delayBeforeDispatch) {
+
+   }
+
+   @Override
+   public long getDispatchStartTime() {
+      return 0;
+   }
+
+   @Override
+   public boolean isDispatching() {
+      return false;
+   }
+
+   @Override
+   public void setDispatching(boolean dispatching) {
+
+   }
+
+   @Override
    public boolean isExclusive() {
       // no-op
       return false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 3f35084..44e5823 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -49,6 +49,8 @@ public class FakePostOffice implements PostOffice {
                                    Integer maxConsumers,
                                    Boolean purgeOnNoConsumers,
                                    Boolean exclusive,
+                                   Integer consumersBeforeDispatch,
+                                   Long delayBeforeDispatch,
                                    SimpleString user) throws Exception {
       return null;
    }

Reply via email to