This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 6ba54964bd ARTEMIS-4258 delayBeforeDispatch not working with OpenWire
6ba54964bd is described below
commit 6ba54964bd447e9f536d905af27de6504e79f066
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Apr 25 14:12:25 2023 -0500
ARTEMIS-4258 delayBeforeDispatch not working with OpenWire
---
.../core/protocol/openwire/amq/AMQConsumer.java | 12 +++
.../JMSConsumerDelayDispatchTest.java} | 117 ++++++++++++++-------
2 files changed, 91 insertions(+), 38 deletions(-)
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index c6ca2310ad..fee1f6c849 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -80,6 +80,7 @@ public class AMQConsumer {
//it's address/queue to management service
private boolean internalAddress = false;
private volatile Set<MessageReference> rolledbackMessageRefs;
+ private ScheduledFuture<?> delayedDispatchPrompter;
public AMQConsumer(AMQSession amqSession,
org.apache.activemq.command.ActiveMQDestination d,
@@ -179,6 +180,14 @@ public class AMQConsumer {
}
}
+ if (serverConsumer != null && serverConsumer.getQueue() != null &&
serverConsumer.getQueue().getQueueConfiguration() != null) {
+ Long delayBeforeDispatch =
serverConsumer.getQueue().getQueueConfiguration().getDelayBeforeDispatch();
+ if (delayBeforeDispatch != null && delayBeforeDispatch > 0) {
+ Long schedule = delayBeforeDispatch / 2;
+ delayedDispatchPrompter = scheduledPool.scheduleAtFixedRate(() ->
serverConsumer.promptDelivery(), schedule, schedule, TimeUnit.MILLISECONDS);
+ }
+ }
+
serverConsumer.setProtocolData(this);
}
@@ -404,6 +413,9 @@ public class AMQConsumer {
public void removeConsumer() throws Exception {
serverConsumer.close(false);
+ if (delayedDispatchPrompter != null) {
+ delayedDispatchPrompter.cancel(false);
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSConsumerDelayDispatchTest.java
similarity index 64%
rename from
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
rename to
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSConsumerDelayDispatchTest.java
index 3240911484..dd4e265de4 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSConsumerDelayDispatchTest.java
@@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.artemis.tests.integration.jms.client;
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -29,43 +28,45 @@ import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
-/**
- * Exclusive Test
- */
-public class ConsumerDelayDispatchTest extends JMSTestBase {
+public class JMSConsumerDelayDispatchTest extends
MultiprotocolJMSClientTestSupport {
private SimpleString queueName =
SimpleString.toSimpleString("jms.consumer.delay.queue");
- private SimpleString normalQueueName =
SimpleString.toSimpleString("jms.noraml.queue");
+ private SimpleString normalQueueName =
SimpleString.toSimpleString("jms.normal.queue");
private static final long DELAY_BEFORE_DISPATCH = 10000L;
@Override
- @Before
- public void setUp() throws Exception {
- super.setUp();
+ protected void createAddressAndQueues(ActiveMQServer server) throws
Exception {
+ super.createAddressAndQueues(server);
server.createQueue(new
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setExclusive(true).setConsumersBeforeDispatch(2).setDelayBeforeDispatch(DELAY_BEFORE_DISPATCH));
server.createQueue(new
QueueConfiguration(normalQueueName).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
}
+ @Test
+ public void testNoDelayOnDefaultAMQP() throws Exception {
+ testNoDelayOnDefault(AMQPConnection);
+ }
- protected ConnectionFactory getCF() throws Exception {
- return cf;
+ @Test
+ public void testNoDelayOnDefaultOpenWire() throws Exception {
+ testNoDelayOnDefault(OpenWireConnection);
}
@Test
- public void testNoDelayOnDefault() throws Exception {
- sendMessage(normalQueueName);
+ public void testNoDelayOnDefaultCore() throws Exception {
+ testNoDelayOnDefault(CoreConnection);
+ }
- ConnectionFactory fact = getCF();
- Connection connection = fact.createConnection();
+ private void testNoDelayOnDefault(ConnectionSupplier supplier) throws
Exception {
+ sendMessage(normalQueueName, supplier);
- try {
+ Connection connection = supplier.createConnection();
+ try {
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
connection.start();
@@ -79,14 +80,26 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
}
@Test
- public void testDelayBeforeDispatch() throws Exception {
- sendMessage(queueName);
+ public void testDelayBeforeDispatchAMQP() throws Exception {
+ testDelayBeforeDispatch(AMQPConnection);
+ }
- ConnectionFactory fact = getCF();
- Connection connection = fact.createConnection();
+ @Test
+ public void testDelayBeforeDispatchOpenWire() throws Exception {
+ testDelayBeforeDispatch(OpenWireConnection);
+ }
- try {
+ @Test
+ public void testDelayBeforeDispatchCore() throws Exception {
+ testDelayBeforeDispatch(CoreConnection);
+ }
+
+ private void testDelayBeforeDispatch(ConnectionSupplier supplier) throws
Exception {
+ sendMessage(queueName, supplier);
+
+ Connection connection = supplier.createConnection();
+ try {
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
connection.start();
@@ -103,12 +116,24 @@ public class ConsumerDelayDispatchTest extends
JMSTestBase {
}
@Test
- public void testConsumersBeforeDispatch() throws Exception {
- sendMessage(queueName);
+ public void testConsumersBeforeDispatchAMQP() throws Exception {
+ testConsumersBeforeDispatch(AMQPConnection);
+ }
+
+ @Test
+ public void testConsumersBeforeDispatchOpenWire() throws Exception {
+ testConsumersBeforeDispatch(OpenWireConnection);
+ }
+
+ @Test
+ public void testConsumersBeforeDispatchCore() throws Exception {
+ testConsumersBeforeDispatch(CoreConnection);
+ }
+ private void testConsumersBeforeDispatch(ConnectionSupplier supplier)
throws Exception {
+ sendMessage(queueName, supplier);
- ConnectionFactory fact = getCF();
- Connection connection = fact.createConnection();
+ Connection connection = supplier.createConnection();
try {
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
@@ -127,13 +152,25 @@ public class ConsumerDelayDispatchTest extends
JMSTestBase {
}
}
+ @Test
+ public void testContinueAndResetConsumerAMQP() throws Exception {
+ testContinueAndResetConsumer(AMQPConnection);
+ }
@Test
- public void testContinueAndResetConsumer() throws Exception {
- sendMessage(queueName);
+ public void testContinueAndResetConsumerOpenWire() throws Exception {
+ testContinueAndResetConsumer(OpenWireConnection);
+ }
+
+ @Test
+ public void testContinueAndResetConsumerCore() throws Exception {
+ testContinueAndResetConsumer(CoreConnection);
+ }
+
+ private void testContinueAndResetConsumer(ConnectionSupplier supplier)
throws Exception {
+ sendMessage(queueName, supplier);
- ConnectionFactory fact = getCF();
- Connection connection = fact.createConnection();
+ Connection connection = supplier.createConnection();
try {
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
@@ -151,15 +188,17 @@ public class ConsumerDelayDispatchTest extends
JMSTestBase {
consumer2.close();
//Ensure that now dispatch is active, if we close a consumer,
dispatching continues.
- sendMessage(queueName);
+ sendMessage(queueName, supplier);
Assert.assertNotNull(receive(consumer1));
//Stop all consumers, which should reset dispatch rules.
consumer1.close();
+ session.close();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//Ensure that once all consumers are stopped, that dispatch rules
reset and wait for min consumers.
- sendMessage(queueName);
+ sendMessage(queueName, supplier);
MessageConsumer consumer3 = session.createConsumer(queue);
@@ -173,9 +212,11 @@ public class ConsumerDelayDispatchTest extends JMSTestBase
{
//Stop all consumers, which should reset dispatch rules.
consumer3.close();
consumer4.close();
+ session.close();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//Ensure that once all consumers are stopped, that dispatch rules
reset and wait for delay.
- sendMessage(queueName);
+ sendMessage(queueName, supplier);
MessageConsumer consumer5 = session.createConsumer(queue);
@@ -191,6 +232,7 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
}
private Message receive(MessageConsumer consumer1) throws JMSException {
+ System.out.println("receiving...");
return consumer1.receive(1000);
}
@@ -202,9 +244,8 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
return receivedMessage;
}
- public void sendMessage(SimpleString queue) throws Exception {
- ConnectionFactory fact = getCF();
- Connection connection = fact.createConnection();
+ public void sendMessage(SimpleString queue, ConnectionSupplier supplier)
throws Exception {
+ Connection connection = supplier.createConnection();
try {
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);