This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch activemq-5.18.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.18.x by this push:
     new 972d31f976 [AMQ-9257] Disabled expire message checking when 
pauseDispatch=true (#1005)
972d31f976 is described below

commit 972d31f976133c77ae74aca4e359c5a57a75b4ff
Author: Matt Pavlovich <m...@hyte.io>
AuthorDate: Fri May 19 08:06:53 2023 -0500

    [AMQ-9257] Disabled expire message checking when pauseDispatch=true (#1005)
    
    (cherry picked from commit 9a5b61f6a28184cfe832871302ece16069ebb71d)
---
 .../org/apache/activemq/broker/region/Queue.java   |  5 ++
 .../broker/region/QueueDispatchSelector.java       |  2 +-
 .../ExpiredMessagesWithNoConsumerTest.java         | 68 ++++++++++++++++++++++
 3 files changed, 74 insertions(+), 1 deletion(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 79f897ec12..66fa2cf397 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -957,6 +957,11 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
     }
 
     private void expireMessages() {
+        if(isDispatchPaused()) {
+            LOG.debug("{} dispatchPaused, skipping expire messages check", 
getActiveMQDestination().getQualifiedName());
+            return;
+        }
+
         LOG.debug("{} expiring messages ..", 
getActiveMQDestination().getQualifiedName());
 
         // just track the insertion count
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
index 7d3f69f84e..d57f25ad7f 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
 public class QueueDispatchSelector extends SimpleDispatchSelector {
     private static final Logger LOG = 
LoggerFactory.getLogger(QueueDispatchSelector.class);
     private Subscription exclusiveConsumer;
-    private boolean paused;
+    private volatile boolean paused;
 
     /**
      * @param destination
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
index e2ad7f602a..fdc520771e 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
@@ -37,6 +37,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
 import 
org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
 import 
org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -254,6 +255,73 @@ public class ExpiredMessagesWithNoConsumerTest extends 
CombinationTestSupport {
         assertEquals("memory usage doesn't go to duck egg", 0, 
view.getMemoryPercentUsage());
     }
 
+    public void testExpiredMessagesWithNoConsumerPauseResume() throws 
Exception {
+
+        createBrokerWithMemoryLimit();
+
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionUri);
+        connection = factory.createConnection();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(destination);
+        producer.setTimeToLive(1000);
+        connection.start();
+        final long sendCount = 2000;
+
+        ObjectName name = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test");
+        final QueueViewMBean queueView = (QueueViewMBean) 
broker.getManagementContext().newProxyInstance(name, QueueViewMBean.class, 
true);
+        queueView.pause();
+        assertTrue(queueView.isPaused());
+
+        final Thread producingThread = new Thread("Producing Thread") {
+            @Override
+            public void run() {
+                try {
+                    int i = 0;
+                    long tStamp = System.currentTimeMillis();
+                    while (i++ < sendCount) {
+                        producer.send(session.createTextMessage("test"));
+                        if (i%100 == 0) {
+                            LOG.info("sent: " + i + " @ " + 
((System.currentTimeMillis() - tStamp) / 100)  + "m/ms");
+                            tStamp = System.currentTimeMillis() ;
+                        }
+                    }
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+
+        producingThread.start();
+
+        assertTrue("producer failed to complete within allocated time", 
Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                producingThread.join(TimeUnit.SECONDS.toMillis(3000));
+                return !producingThread.isAlive();
+            }
+        }));
+
+        assertEquals("No messages should have expired", Long.valueOf(0l), 
Long.valueOf(queueView.getExpiredCount()));
+        queueView.resume();
+        assertFalse(queueView.isPaused());
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("enqueue=" + queueView.getEnqueueCount() + ", 
dequeue=" + queueView.getDequeueCount()
+                        + ", inflight=" + queueView.getInFlightCount() + ", 
expired= " + queueView.getExpiredCount()
+                        + ", size= " + queueView.getQueueSize());
+                return sendCount == queueView.getExpiredCount();
+            }
+        }, Wait.MAX_WAIT_MILLIS * 10);
+        LOG.info("enqueue=" + queueView.getEnqueueCount() + ", dequeue=" + 
queueView.getDequeueCount()
+                + ", inflight=" + queueView.getInFlightCount() + ", expired= " 
+ queueView.getExpiredCount()
+                + ", size= " + queueView.getQueueSize());
+
+        assertEquals("Not all sent messages have expired", sendCount, 
queueView.getExpiredCount());
+        assertEquals("memory usage doesn't go to duck egg", 0, 
queueView.getMemoryPercentUsage());
+    }
+
     // first ack delivered after expiry
     public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
         createBroker();

Reply via email to