This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ba54964bd ARTEMIS-4258 delayBeforeDispatch not working with OpenWire
6ba54964bd is described below

commit 6ba54964bd447e9f536d905af27de6504e79f066
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Apr 25 14:12:25 2023 -0500

    ARTEMIS-4258 delayBeforeDispatch not working with OpenWire
---
 .../core/protocol/openwire/amq/AMQConsumer.java    |  12 +++
 .../JMSConsumerDelayDispatchTest.java}             | 117 ++++++++++++++-------
 2 files changed, 91 insertions(+), 38 deletions(-)

diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index c6ca2310ad..fee1f6c849 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -80,6 +80,7 @@ public class AMQConsumer {
    //it's address/queue to management service
    private boolean internalAddress = false;
    private volatile Set<MessageReference> rolledbackMessageRefs;
+   private ScheduledFuture<?> delayedDispatchPrompter;
 
    public AMQConsumer(AMQSession amqSession,
                       org.apache.activemq.command.ActiveMQDestination d,
@@ -179,6 +180,14 @@ public class AMQConsumer {
          }
       }
 
+      if (serverConsumer != null && serverConsumer.getQueue() != null && 
serverConsumer.getQueue().getQueueConfiguration() != null) {
+         Long delayBeforeDispatch = 
serverConsumer.getQueue().getQueueConfiguration().getDelayBeforeDispatch();
+         if (delayBeforeDispatch != null && delayBeforeDispatch > 0) {
+            Long schedule = delayBeforeDispatch / 2;
+            delayedDispatchPrompter = scheduledPool.scheduleAtFixedRate(() -> 
serverConsumer.promptDelivery(), schedule, schedule, TimeUnit.MILLISECONDS);
+         }
+      }
+
       serverConsumer.setProtocolData(this);
    }
 
@@ -404,6 +413,9 @@ public class AMQConsumer {
 
    public void removeConsumer() throws Exception {
       serverConsumer.close(false);
+      if (delayedDispatchPrompter != null) {
+         delayedDispatchPrompter.cancel(false);
+      }
    }
 
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSConsumerDelayDispatchTest.java
similarity index 64%
rename from 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
rename to 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSConsumerDelayDispatchTest.java
index 3240911484..dd4e265de4 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSConsumerDelayDispatchTest.java
@@ -14,10 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.artemis.tests.integration.jms.client;
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
 
 import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -29,43 +28,45 @@ import javax.jms.TextMessage;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
-/**
- * Exclusive Test
- */
-public class ConsumerDelayDispatchTest extends JMSTestBase {
+public class JMSConsumerDelayDispatchTest extends 
MultiprotocolJMSClientTestSupport {
 
    private SimpleString queueName = 
SimpleString.toSimpleString("jms.consumer.delay.queue");
-   private SimpleString normalQueueName = 
SimpleString.toSimpleString("jms.noraml.queue");
+   private SimpleString normalQueueName = 
SimpleString.toSimpleString("jms.normal.queue");
 
    private static final long DELAY_BEFORE_DISPATCH = 10000L;
 
    @Override
-   @Before
-   public void setUp() throws Exception {
-      super.setUp();
+   protected void createAddressAndQueues(ActiveMQServer server) throws 
Exception {
+      super.createAddressAndQueues(server);
       server.createQueue(new 
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setExclusive(true).setConsumersBeforeDispatch(2).setDelayBeforeDispatch(DELAY_BEFORE_DISPATCH));
       server.createQueue(new 
QueueConfiguration(normalQueueName).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
    }
 
+   @Test
+   public void testNoDelayOnDefaultAMQP() throws Exception {
+      testNoDelayOnDefault(AMQPConnection);
+   }
 
-   protected ConnectionFactory getCF() throws Exception {
-      return cf;
+   @Test
+   public void testNoDelayOnDefaultOpenWire() throws Exception {
+      testNoDelayOnDefault(OpenWireConnection);
    }
 
    @Test
-   public void testNoDelayOnDefault() throws Exception {
-      sendMessage(normalQueueName);
+   public void testNoDelayOnDefaultCore() throws Exception {
+      testNoDelayOnDefault(CoreConnection);
+   }
 
-      ConnectionFactory fact = getCF();
-      Connection connection = fact.createConnection();
+   private void testNoDelayOnDefault(ConnectionSupplier supplier) throws 
Exception {
+      sendMessage(normalQueueName, supplier);
 
-      try {
+      Connection connection = supplier.createConnection();
 
+      try {
          Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
          connection.start();
 
@@ -79,14 +80,26 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
    }
 
    @Test
-   public void testDelayBeforeDispatch() throws Exception {
-      sendMessage(queueName);
+   public void testDelayBeforeDispatchAMQP() throws Exception {
+      testDelayBeforeDispatch(AMQPConnection);
+   }
 
-      ConnectionFactory fact = getCF();
-      Connection connection = fact.createConnection();
+   @Test
+   public void testDelayBeforeDispatchOpenWire() throws Exception {
+      testDelayBeforeDispatch(OpenWireConnection);
+   }
 
-      try {
+   @Test
+   public void testDelayBeforeDispatchCore() throws Exception {
+      testDelayBeforeDispatch(CoreConnection);
+   }
+
+   private void testDelayBeforeDispatch(ConnectionSupplier supplier) throws 
Exception {
+      sendMessage(queueName, supplier);
+
+      Connection connection = supplier.createConnection();
 
+      try {
          Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
          connection.start();
 
@@ -103,12 +116,24 @@ public class ConsumerDelayDispatchTest extends 
JMSTestBase {
    }
 
    @Test
-   public void testConsumersBeforeDispatch() throws Exception {
-      sendMessage(queueName);
+   public void testConsumersBeforeDispatchAMQP() throws Exception {
+      testConsumersBeforeDispatch(AMQPConnection);
+   }
+
+   @Test
+   public void testConsumersBeforeDispatchOpenWire() throws Exception {
+      testConsumersBeforeDispatch(OpenWireConnection);
+   }
+
+   @Test
+   public void testConsumersBeforeDispatchCore() throws Exception {
+      testConsumersBeforeDispatch(CoreConnection);
+   }
 
+   private void testConsumersBeforeDispatch(ConnectionSupplier supplier) 
throws Exception {
+      sendMessage(queueName, supplier);
 
-      ConnectionFactory fact = getCF();
-      Connection connection = fact.createConnection();
+      Connection connection = supplier.createConnection();
 
       try {
          Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
@@ -127,13 +152,25 @@ public class ConsumerDelayDispatchTest extends 
JMSTestBase {
       }
    }
 
+   @Test
+   public void testContinueAndResetConsumerAMQP() throws Exception {
+      testContinueAndResetConsumer(AMQPConnection);
+   }
 
    @Test
-   public void testContinueAndResetConsumer() throws Exception {
-      sendMessage(queueName);
+   public void testContinueAndResetConsumerOpenWire() throws Exception {
+      testContinueAndResetConsumer(OpenWireConnection);
+   }
+
+   @Test
+   public void testContinueAndResetConsumerCore() throws Exception {
+      testContinueAndResetConsumer(CoreConnection);
+   }
+
+   private void testContinueAndResetConsumer(ConnectionSupplier supplier) 
throws Exception {
+      sendMessage(queueName, supplier);
 
-      ConnectionFactory fact = getCF();
-      Connection connection = fact.createConnection();
+      Connection connection = supplier.createConnection();
 
       try {
          Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
@@ -151,15 +188,17 @@ public class ConsumerDelayDispatchTest extends 
JMSTestBase {
          consumer2.close();
 
          //Ensure that now dispatch is active, if we close a consumer, 
dispatching continues.
-         sendMessage(queueName);
+         sendMessage(queueName, supplier);
 
          Assert.assertNotNull(receive(consumer1));
 
          //Stop all consumers, which should reset dispatch rules.
          consumer1.close();
+         session.close();
+         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
          //Ensure that once all consumers are stopped, that dispatch rules 
reset and wait for min consumers.
-         sendMessage(queueName);
+         sendMessage(queueName, supplier);
 
          MessageConsumer consumer3 = session.createConsumer(queue);
 
@@ -173,9 +212,11 @@ public class ConsumerDelayDispatchTest extends JMSTestBase 
{
          //Stop all consumers, which should reset dispatch rules.
          consumer3.close();
          consumer4.close();
+         session.close();
+         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
          //Ensure that once all consumers are stopped, that dispatch rules 
reset and wait for delay.
-         sendMessage(queueName);
+         sendMessage(queueName, supplier);
 
          MessageConsumer consumer5 = session.createConsumer(queue);
 
@@ -191,6 +232,7 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
    }
 
    private Message receive(MessageConsumer consumer1) throws JMSException {
+      System.out.println("receiving...");
       return consumer1.receive(1000);
    }
 
@@ -202,9 +244,8 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
       return receivedMessage;
    }
 
-   public void sendMessage(SimpleString queue) throws Exception {
-      ConnectionFactory fact = getCF();
-      Connection connection = fact.createConnection();
+   public void sendMessage(SimpleString queue, ConnectionSupplier supplier) 
throws Exception {
+      Connection connection = supplier.createConnection();
       try {
 
          Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);

Reply via email to