This is an automated email from the ASF dual-hosted git repository.
mattrpav pushed a commit to branch activemq-5.18.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.18.x by this push:
new 972d31f976 [AMQ-9257] Disabled expire message checking when
pauseDispatch=true (#1005)
972d31f976 is described below
commit 972d31f976133c77ae74aca4e359c5a57a75b4ff
Author: Matt Pavlovich <[email protected]>
AuthorDate: Fri May 19 08:06:53 2023 -0500
[AMQ-9257] Disabled expire message checking when pauseDispatch=true (#1005)
(cherry picked from commit 9a5b61f6a28184cfe832871302ece16069ebb71d)
---
.../org/apache/activemq/broker/region/Queue.java | 5 ++
.../broker/region/QueueDispatchSelector.java | 2 +-
.../ExpiredMessagesWithNoConsumerTest.java | 68 ++++++++++++++++++++++
3 files changed, 74 insertions(+), 1 deletion(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 79f897ec12..66fa2cf397 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -957,6 +957,11 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
}
private void expireMessages() {
+ if(isDispatchPaused()) {
+ LOG.debug("{} dispatchPaused, skipping expire messages check",
getActiveMQDestination().getQualifiedName());
+ return;
+ }
+
LOG.debug("{} expiring messages ..",
getActiveMQDestination().getQualifiedName());
// just track the insertion count
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
index 7d3f69f84e..d57f25ad7f 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
public class QueueDispatchSelector extends SimpleDispatchSelector {
private static final Logger LOG =
LoggerFactory.getLogger(QueueDispatchSelector.class);
private Subscription exclusiveConsumer;
- private boolean paused;
+ private volatile boolean paused;
/**
* @param destination
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
index e2ad7f602a..fdc520771e 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
@@ -37,6 +37,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
import
org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
import
org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -254,6 +255,73 @@ public class ExpiredMessagesWithNoConsumerTest extends
CombinationTestSupport {
assertEquals("memory usage doesn't go to duck egg", 0,
view.getMemoryPercentUsage());
}
+ public void testExpiredMessagesWithNoConsumerPauseResume() throws
Exception {
+
+ createBrokerWithMemoryLimit();
+
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(connectionUri);
+ connection = factory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = session.createProducer(destination);
+ producer.setTimeToLive(1000);
+ connection.start();
+ final long sendCount = 2000;
+
+ ObjectName name = new
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test");
+ final QueueViewMBean queueView = (QueueViewMBean)
broker.getManagementContext().newProxyInstance(name, QueueViewMBean.class,
true);
+ queueView.pause();
+ assertTrue(queueView.isPaused());
+
+ final Thread producingThread = new Thread("Producing Thread") {
+ @Override
+ public void run() {
+ try {
+ int i = 0;
+ long tStamp = System.currentTimeMillis();
+ while (i++ < sendCount) {
+ producer.send(session.createTextMessage("test"));
+ if (i%100 == 0) {
+ LOG.info("sent: " + i + " @ " +
((System.currentTimeMillis() - tStamp) / 100) + "m/ms");
+ tStamp = System.currentTimeMillis() ;
+ }
+ }
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+
+ producingThread.start();
+
+ assertTrue("producer failed to complete within allocated time",
Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ producingThread.join(TimeUnit.SECONDS.toMillis(3000));
+ return !producingThread.isAlive();
+ }
+ }));
+
+ assertEquals("No messages should have expired", Long.valueOf(0l),
Long.valueOf(queueView.getExpiredCount()));
+ queueView.resume();
+ assertFalse(queueView.isPaused());
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("enqueue=" + queueView.getEnqueueCount() + ",
dequeue=" + queueView.getDequeueCount()
+ + ", inflight=" + queueView.getInFlightCount() + ",
expired= " + queueView.getExpiredCount()
+ + ", size= " + queueView.getQueueSize());
+ return sendCount == queueView.getExpiredCount();
+ }
+ }, Wait.MAX_WAIT_MILLIS * 10);
+ LOG.info("enqueue=" + queueView.getEnqueueCount() + ", dequeue=" +
queueView.getDequeueCount()
+ + ", inflight=" + queueView.getInFlightCount() + ", expired= "
+ queueView.getExpiredCount()
+ + ", size= " + queueView.getQueueSize());
+
+ assertEquals("Not all sent messages have expired", sendCount,
queueView.getExpiredCount());
+ assertEquals("memory usage doesn't go to duck egg", 0,
queueView.getMemoryPercentUsage());
+ }
+
// first ack delivered after expiry
public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
createBroker();