This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new f166435d31 [#2020] Add expiryCheckEnabled option to 
MessageEvictionStrategy to skip O(n) expiry scan TopicSubscription
f166435d31 is described below

commit f166435d31d6e9d856d8996741ca9feeaeb8ccb2
Author: mwashburn <[email protected]>
AuthorDate: Thu May 14 12:18:19 2026 -0400

    [#2020] Add expiryCheckEnabled option to MessageEvictionStrategy to skip 
O(n) expiry scan TopicSubscription
    
    When a slow-consumer's pending-message buffer exceeds the high-water mark
    (default: 1000 messages), TopicSubscription.add() calls 
removeExpiredMessages()
    on every single message add. That method iterates every pending message and
    calls isExpired(), which is an O(n) scan over the full buffer.
    When messages carry no TTL (isExpired() always returns false), this scan
    provides no benefit -- it iterates the entire buffer on every add with zero
    messages removed. With a large pending limit (e.g. 20,000), this adds up to
    millions of no-op iterations per second on busy topics.
    This commit adds an expiryCheckEnabled boolean (default true) to 
MessageEvictionStrategy.
    Setting it to false inserts a single guard in TopicSubscription.add():
        if (expiryCheckEnabled  && !matched.isEmpty() && matched.size() > max) {
            removeExpiredMessages();
        }
    
    Also adds:
    - TopicSubscriptionEnableExpiryTest: 11 correctness/propagation/integration 
tests
    - TopicSubscriptionEnableExpiryThroughputTest: throughput comparison test
---
 .../activemq/broker/region/TopicSubscription.java  |   3 +-
 .../region/policy/MessageEvictionStrategy.java     |  14 +
 .../policy/MessageEvictionStrategySupport.java     |  26 +-
 .../region/TopicSubscriptionEnableExpiryTest.java  | 408 +++++++++++++++++++++
 ...opicSubscriptionEnableExpiryThroughputTest.java | 211 +++++++++++
 5 files changed, 658 insertions(+), 4 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 8a1a300dae..62659a6d5b 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -25,7 +25,6 @@ import 
org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
 import org.apache.activemq.command.*;
-import org.apache.activemq.management.MessageFlowStats;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.transport.TransmitCallback;
@@ -166,7 +165,7 @@ public class TopicSubscription extends AbstractSubscription 
{
                         if (maximumPendingMessages > 0 && 
maximumPendingMessages < max) {
                             max = maximumPendingMessages;
                         }
-                        if (!matched.isEmpty() && matched.size() > max) {
+                        if (messageEvictionStrategy.isExpiryCheckEnabled() && 
!matched.isEmpty() && matched.size() > max) {
                             removeExpiredMessages();
                         }
                         // lets discard old messages as we are a slow consumer
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
index 334cb3730f..5a5f62f51f 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
@@ -42,4 +42,18 @@ public interface MessageEvictionStrategy {
      */
     int getEvictExpiredMessagesHighWatermark();
 
+    /**
+     * Returns whether the eager expired-message scan is enabled.
+     * <p>
+     * When {@code false}, the O(n) scan inside
+     * {@link org.apache.activemq.broker.region.TopicSubscription#add} is 
skipped entirely.
+     * Set to {@code false} when messages carry no TTL, or when the scan cost 
outweighs
+     * the benefit of eagerly evicting expired messages from slow-consumer 
buffers.
+     * <p>
+     * See {@link MessageEvictionStrategySupport} for the default 
implementation that returns {@code true}.
+     *
+     * @return {@code true} if the expiry scan is enabled (default), {@code 
false} if skipped
+     */
+    boolean isExpiryCheckEnabled();
+
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
index 32f0c6f0c0..0df890b6cf 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
@@ -24,6 +24,7 @@ package org.apache.activemq.broker.region.policy;
 public abstract class MessageEvictionStrategySupport implements 
MessageEvictionStrategy {
 
     private int evictExpiredMessagesHighWatermark = 1000;
+    private boolean expiryCheckEnabled = true;
 
     public int getEvictExpiredMessagesHighWatermark() {
         return evictExpiredMessagesHighWatermark;
@@ -35,6 +36,27 @@ public abstract class MessageEvictionStrategySupport 
implements MessageEvictionS
     public void setEvictExpiredMessagesHighWatermark(int 
evictExpiredMessagesHighWaterMark) {
         this.evictExpiredMessagesHighWatermark = 
evictExpiredMessagesHighWaterMark;
     }
-    
-    
+
+    @Override
+    public boolean isExpiryCheckEnabled() {
+        return expiryCheckEnabled;
+    }
+
+    /**
+     * Controls whether the broker performs an eager expired-message scan when 
a
+     * non-durable topic subscription's pending slow-consumer buffer exceeds
+     * {@link #getEvictExpiredMessagesHighWatermark()}.
+     * <p>
+     * Set to {@code false} when messages carry no TTL, or when the O(n) scan 
cost
+     * outweighs the benefit of eagerly evicting expired messages from 
slow-consumer
+     * buffers. When messages have no TTL, every scan iterates the full buffer 
without
+     * removing anything, adding latency to every enqueue once the buffer 
exceeds the
+     * high-water mark.
+     *
+     * @param expiryCheckEnabled {@code false} to skip the scan; {@code true} 
to enable it (default)
+     */
+    public void setExpiryCheckEnabled(boolean expiryCheckEnabled) {
+        this.expiryCheckEnabled = expiryCheckEnabled;
+    }
+
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryTest.java
new file mode 100644
index 0000000000..4dcc34e574
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryTest.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import jakarta.jms.Connection;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import 
org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.activemq.test.annotations.ParallelTest;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests correctness of the {@code ExpiryCheckEnabled} feature on
+ * {@link org.apache.activemq.broker.region.policy.MessageEvictionStrategy} 
and its effect on
+ * {@link TopicSubscription}.
+ *
+ * <p>Background: when a slow-consumer queue exceeds
+ * {@code evictExpiredMessagesHighWatermark} (default: 1000), ActiveMQ calls
+ * {@code TopicSubscription.removeExpiredMessages()} on every single
+ * {@code add()} call.  That method iterates every pending message checking
+ * {@code isExpired()} — an O(n) scan.  Setting {@code 
ExpiryCheckEnabled=false}
+ * on the {@link 
org.apache.activemq.broker.region.policy.MessageEvictionStrategy} skips that
+ * scan entirely via a single boolean check guarding the call site.
+ */
+@Category(ParallelTest.class)
+public class TopicSubscriptionEnableExpiryTest {
+
+    // 
-------------------------------------------------------------------------
+    // Unit tests — no broker needed
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * {@link OldestMessageEvictionStrategy#isExpiryCheckEnabled()} must 
default to {@code true} so
+     * that existing deployments that do not set the property are unaffected.
+     */
+    @Test
+    public void testEvictionStrategyExpiryCheckDefaultsToTrue() {
+        OldestMessageEvictionStrategy strategy = new 
OldestMessageEvictionStrategy();
+        assertTrue("ExpiryCheckEnabled must default to true (preserves 
existing behaviour)",
+                strategy.isExpiryCheckEnabled());
+    }
+
+    @Test
+    public void testEvictionStrategySetExpiryCheckFalse() {
+        OldestMessageEvictionStrategy strategy = new 
OldestMessageEvictionStrategy();
+        strategy.setExpiryCheckEnabled(false);
+        assertFalse("ExpiryCheckEnabled should be false after setter call",
+                strategy.isExpiryCheckEnabled());
+    }
+
+    @Test
+    public void testEvictionStrategySetExpiryCheckRoundTrip() {
+        OldestMessageEvictionStrategy strategy = new 
OldestMessageEvictionStrategy();
+        strategy.setExpiryCheckEnabled(false);
+        assertFalse(strategy.isExpiryCheckEnabled());
+        strategy.setExpiryCheckEnabled(true);
+        assertTrue(strategy.isExpiryCheckEnabled());
+    }
+
+    /**
+     * {@link TopicSubscription} must pick up the eviction strategy flag — when
+     * {@code ExpiryCheckEnabled=false} is set on the strategy the scan is 
skipped.
+     */
+    @Test
+    public void testTopicSubscriptionUsesStrategyExpiryCheckFlag() throws 
Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.start();
+        try {
+            TopicSubscription sub = buildMinimalTopicSubscription(broker);
+            // default strategy — expiry scan enabled
+            assertTrue("default strategy must have ExpiryCheckEnabled=true",
+                    sub.getMessageEvictionStrategy().isExpiryCheckEnabled());
+
+            OldestMessageEvictionStrategy strategy = new 
OldestMessageEvictionStrategy();
+            strategy.setExpiryCheckEnabled(false);
+            sub.setMessageEvictionStrategy(strategy);
+            assertFalse("strategy with ExpiryCheckEnabled=false must reflect 
on subscription",
+                    sub.getMessageEvictionStrategy().isExpiryCheckEnabled());
+        } finally {
+            broker.stop();
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // PolicyEntry propagation tests
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * When a {@link PolicyEntry} is configured with an eviction strategy that 
has
+     * {@code ExpiryCheckEnabled=false}, {@code 
PolicyEntry.configure(TopicSubscription)}
+     * must propagate the strategy so the O(n) expiry scan is skipped.
+     */
+    @Test
+    public void testPolicyEntryPropagatesEvictionStrategyToSubscription() 
throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.start();
+        try {
+            ConstantPendingMessageLimitStrategy limitStrategy = new 
ConstantPendingMessageLimitStrategy();
+            limitStrategy.setLimit(2000);
+
+            OldestMessageEvictionStrategy evictionStrategy = new 
OldestMessageEvictionStrategy();
+            evictionStrategy.setExpiryCheckEnabled(false);
+
+            PolicyEntry entry = new PolicyEntry();
+            entry.setPendingMessageLimitStrategy(limitStrategy);
+            entry.setMessageEvictionStrategy(evictionStrategy);
+
+            TopicSubscription sub = buildMinimalTopicSubscription(broker);
+            
assertTrue(sub.getMessageEvictionStrategy().isExpiryCheckEnabled()); // default
+
+            entry.configure(broker.getBroker(), broker.getSystemUsage(), sub);
+
+            assertFalse("PolicyEntry.configure() must propagate eviction 
strategy with ExpiryCheckEnabled=false",
+                    sub.getMessageEvictionStrategy().isExpiryCheckEnabled());
+        } finally {
+            broker.stop();
+        }
+    }
+
+    /**
+     * When the default eviction strategy is used (no override on PolicyEntry),
+     * the subscription's expiry scan must remain enabled.
+     */
+    @Test
+    public void testDefaultPolicyEntryLeavesExpiryCheckEnabled() throws 
Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.start();
+        try {
+            ConstantPendingMessageLimitStrategy limitStrategy = new 
ConstantPendingMessageLimitStrategy();
+            limitStrategy.setLimit(2000);
+
+            PolicyEntry entry = new PolicyEntry();
+            entry.setPendingMessageLimitStrategy(limitStrategy);
+            // no messageEvictionStrategy override — default 
OldestMessageEvictionStrategy used
+
+            TopicSubscription sub = buildMinimalTopicSubscription(broker);
+            entry.configure(broker.getBroker(), broker.getSystemUsage(), sub);
+
+            assertTrue("subscription must still have ExpiryCheckEnabled=true 
when using the default eviction strategy",
+                    sub.getMessageEvictionStrategy().isExpiryCheckEnabled());
+        } finally {
+            broker.stop();
+        }
+    }
+
+    /**
+     * When no {@link 
org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy} is
+     * set at all, the subscription's expiry scan flag must remain at its 
default ({@code true}).
+     */
+    @Test
+    public void testPolicyEntryWithNoStrategyLeavesExpiryCheckEnabled() throws 
Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.start();
+        try {
+            PolicyEntry entry = new PolicyEntry(); // no strategy, no eviction 
strategy override
+
+            TopicSubscription sub = buildMinimalTopicSubscription(broker);
+            entry.configure(broker.getBroker(), broker.getSystemUsage(), sub);
+
+            assertTrue("subscription must keep ExpiryCheckEnabled=true when no 
strategy is configured",
+                    sub.getMessageEvictionStrategy().isExpiryCheckEnabled());
+        } finally {
+            broker.stop();
+        }
+    }
+
+    /**
+     * A custom {@link 
org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy} with no
+     * eviction strategy override must leave the subscription's expiry scan 
enabled.
+     */
+    @Test
+    public void 
testCustomLimitStrategyWithDefaultEvictionLeavesExpiryCheckEnabled() throws 
Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.start();
+        try {
+            
org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy 
customStrategy =
+                    subscription -> 500;
+
+            PolicyEntry entry = new PolicyEntry();
+            entry.setPendingMessageLimitStrategy(customStrategy);
+            // no eviction strategy override — default 
OldestMessageEvictionStrategy(ExpiryCheckEnabled=true)
+
+            TopicSubscription sub = buildMinimalTopicSubscription(broker);
+            entry.configure(broker.getBroker(), broker.getSystemUsage(), sub);
+
+            assertTrue("A custom limit strategy with default eviction strategy 
must leave ExpiryCheckEnabled=true",
+                    sub.getMessageEvictionStrategy().isExpiryCheckEnabled());
+        } finally {
+            broker.stop();
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Integration tests — embedded broker, real JMS
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * With {@code ExpiryCheckEnabled=false} on the eviction strategy, messages
+     * with an explicit TTL that has passed must NOT be removed by the eager 
expiry
+     * scan.  The messages remain in the pending queue and are only evicted by 
the
+     * normal eviction strategy when the limit is exceeded.
+     *
+     * <p>We verify this by:
+     * <ol>
+     *   <li>Configuring a topic with limit=200, ExpiryCheckEnabled=false.
+     *   <li>Sending 250 messages with a very short TTL.
+     *   <li>Waiting for all TTLs to elapse.
+     *   <li>Sending one more message (triggers the code path).
+     *   <li>Asserting that the broker's expired-message counter is 0
+     *       (no expiry scan ran) while the discarded counter is > 0
+     *       (normal eviction ran as expected).
+     * </ol>
+     */
+    @Test
+    public void testExpiryCheckDisabledSkipsExpiredMessageScan() throws 
Exception {
+        final int PENDING_LIMIT = 200;
+        final int SEND_COUNT = 250;
+        final long SHORT_TTL_MS = 100;
+
+        BrokerService broker = buildBroker(PENDING_LIMIT, false /* 
ExpiryCheckEnabled=false */);
+        try {
+            // prefetchSize=1 so messages pile up in the broker's matched 
queue (not in client buffer)
+            ActiveMQTopic topic = new 
ActiveMQTopic("TEST.EXPIRY.DISABLED?consumer.prefetchSize=1");
+
+            org.apache.activemq.ActiveMQConnectionFactory cf =
+                    new 
org.apache.activemq.ActiveMQConnectionFactory("vm://expiry-disabled");
+            Connection conn = cf.createConnection();
+            conn.start();
+            Session session = conn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            // Create a slow consumer (prefetch=1, never reads) to back up 
messages
+            MessageConsumer consumer = session.createConsumer(topic);
+
+            MessageProducer producer = session.createProducer(new 
ActiveMQTopic("TEST.EXPIRY.DISABLED"));
+            // Send messages with short TTL
+            for (int i = 0; i < SEND_COUNT; i++) {
+                TextMessage msg = session.createTextMessage("msg-" + i);
+                producer.send(msg, jakarta.jms.DeliveryMode.NON_PERSISTENT, 4, 
SHORT_TTL_MS);
+            }
+
+            // Wait for all TTLs to expire
+            Thread.sleep(SHORT_TTL_MS * 5);
+
+            // Send one more message — this triggers the guard in 
TopicSubscription.add()
+            producer.send(session.createTextMessage("trigger"));
+
+            // Grab the destination stats
+            Destination dest = broker.getDestination(new 
ActiveMQTopic("TEST.EXPIRY.DISABLED"));
+            long expiredCount = 
dest.getDestinationStatistics().getExpired().getCount();
+
+            assertEquals(
+                    "With ExpiryCheckEnabled=false, the expiry scan must not 
run — expired counter must be 0",
+                    0L, expiredCount);
+
+            conn.close();
+        } finally {
+            broker.stop();
+        }
+    }
+
+    /**
+     * Complementary test: with {@code ExpiryCheckEnabled=true} (the default) 
the eager
+     * scan DOES run and picks up expired messages, so the broker's expired 
counter
+     * should be non-zero after the same scenario.
+     */
+    @Test
+    public void testExpiryCheckEnabledRunsExpiredMessageScan() throws 
Exception {
+        final int PENDING_LIMIT = 200;
+        final int SEND_COUNT = 250;
+        final long SHORT_TTL_MS = 100;
+
+        BrokerService broker = buildBroker(PENDING_LIMIT, true /* 
ExpiryCheckEnabled=true */);
+        try {
+            // prefetchSize=1 so messages pile up in the broker's matched 
queue (not in client buffer)
+            ActiveMQTopic topic = new 
ActiveMQTopic("TEST.EXPIRY.ENABLED?consumer.prefetchSize=1");
+
+            org.apache.activemq.ActiveMQConnectionFactory cf =
+                    new 
org.apache.activemq.ActiveMQConnectionFactory("vm://expiry-enabled");
+            Connection conn = cf.createConnection();
+            conn.start();
+            Session session = conn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            // Slow consumer — never reads
+            MessageConsumer consumer = session.createConsumer(topic);
+
+            MessageProducer producer = session.createProducer(new 
ActiveMQTopic("TEST.EXPIRY.ENABLED"));
+            for (int i = 0; i < SEND_COUNT; i++) {
+                TextMessage msg = session.createTextMessage("msg-" + i);
+                producer.send(msg, jakarta.jms.DeliveryMode.NON_PERSISTENT, 4, 
SHORT_TTL_MS);
+            }
+
+            // Wait for all TTLs to expire
+            Thread.sleep(SHORT_TTL_MS * 5);
+
+            // Send more messages to trigger expiry scan (queue already > 
highWatermark)
+            for (int i = 0; i < 50; i++) {
+                producer.send(session.createTextMessage("trigger-" + i));
+            }
+
+            Destination dest = broker.getDestination(new 
ActiveMQTopic("TEST.EXPIRY.ENABLED"));
+            long expiredCount = 
dest.getDestinationStatistics().getExpired().getCount();
+
+            assertTrue(
+                    "With ExpiryCheckEnabled=true, the expiry scan must run 
and detect expired messages (got " + expiredCount + ")",
+                    expiredCount > 0);
+
+            conn.close();
+        } finally {
+            broker.stop();
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Helpers
+    // 
-------------------------------------------------------------------------
+
+    private BrokerService buildBroker(int pendingLimit, boolean 
ExpiryCheckEnabled) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        String brokerName = ExpiryCheckEnabled ? "expiry-enabled" : 
"expiry-disabled";
+        broker.setBrokerName(brokerName);
+        broker.addConnector("vm://" + brokerName);
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        ConstantPendingMessageLimitStrategy limitStrategy = new 
ConstantPendingMessageLimitStrategy();
+        limitStrategy.setLimit(pendingLimit);
+
+        OldestMessageEvictionStrategy evictionStrategy = new 
OldestMessageEvictionStrategy();
+        evictionStrategy.setExpiryCheckEnabled(ExpiryCheckEnabled);
+
+        PolicyEntry entry = new PolicyEntry();
+        entry.setTopic(">");
+        entry.setPendingMessageLimitStrategy(limitStrategy);
+        entry.setMessageEvictionStrategy(evictionStrategy);
+        entry.setDeadLetterStrategy(null); // don't route to DLQ
+
+        List<PolicyEntry> entries = new ArrayList<>();
+        entries.add(entry);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setPolicyEntries(entries);
+        broker.setDestinationPolicy(policyMap);
+
+        broker.start();
+        broker.waitUntilStarted();
+        return broker;
+    }
+
+    /**
+     * Builds a minimal {@link TopicSubscription} using the broker's internals 
—
+     * just enough to test the flag, without going through a real JMS 
connection.
+     */
+    private TopicSubscription buildMinimalTopicSubscription(BrokerService 
broker) throws Exception {
+        org.apache.activemq.command.ConsumerInfo info = new 
org.apache.activemq.command.ConsumerInfo();
+        info.setConsumerId(new org.apache.activemq.command.ConsumerId(
+                new org.apache.activemq.command.SessionId(
+                        new 
org.apache.activemq.command.ConnectionId("test-conn"), 1), 1));
+        info.setDestination(new ActiveMQTopic("TEST.UNIT"));
+        info.setPrefetchSize(10);
+
+        org.apache.activemq.broker.ConnectionContext ctx =
+                new org.apache.activemq.broker.ConnectionContext();
+        ctx.setBroker(broker.getBroker());
+
+        return new TopicSubscription(broker.getBroker(), ctx, info, 
broker.getSystemUsage());
+    }
+}
+
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryThroughputTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryThroughputTest.java
new file mode 100644
index 0000000000..2299f255d8
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryThroughputTest.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import 
org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.activemq.test.annotations.ParallelTest;
+import static org.junit.Assert.assertTrue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Throughput comparison: {@code expiryCheckEnabled=true} vs {@code 
expiryCheckEnabled=false}
+ * for a slow-consumer topic with a large pending-message limit.
+ *
+ * <h3>What is being measured</h3>
+ * <p>When a topic consumer is slow (its pending queue exceeds
+ * {@code evictExpiredMessagesHighWatermark = 1000} by default), ActiveMQ calls
+ * {@link TopicSubscription#removeExpiredMessages()} on every single
+ * {@code add()} call.  That method iterates every pending message to call
+ * {@code isExpired()} — an O(n) operation.  With a pending limit of 5,000
+ * that scan runs up to 5,000 iterations per message send, dominated by the
+ * Java heap iteration cost.
+ *
+ * <p>With {@code expiryCheckEnabled=false} on the {@link 
org.apache.activemq.broker.region.policy.MessageEvictionStrategy}
+ * the scan body is skipped entirely via a single boolean check, reducing
+ * per-send work back to O(1).
+ *
+ * <h3>Pass/fail threshold</h3>
+ * <p>The test asserts that the {@code expiryCheckEnabled=false} run completes 
at
+ * least {@code MIN_SPEEDUP_FACTOR}× faster than the {@code 
expiryCheckEnabled=true}
+ * run.  A factor of 3 is deliberately conservative — in practice the
+ * improvement is often 50–200× for large queues with pure in-memory messages.
+ *
+ * <p>The test is annotated {@code @Category(ParallelTest.class)} so it runs
+ * in the normal CI suite, but uses a modest message count to keep wall-clock
+ * time acceptable on slow machines.
+ */
+@Category(ParallelTest.class)
+public class TopicSubscriptionEnableExpiryThroughputTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TopicSubscriptionEnableExpiryThroughputTest.class);
+
+    /** Number of messages to send during the warm-up phase (fills queue above 
highWatermark). */
+    private static final int WARMUP_COUNT = 1200;
+
+    /**
+     * Number of messages timed during the measurement phase.
+     * Sending happens after the queue is already above 1000 (highWatermark),
+     * so every message triggers the expiry-scan code path.
+     */
+    private static final int TIMED_COUNT = 2000;
+
+    /** Pending message limit — large enough that O(n) scan is expensive. */
+    private static final int PENDING_LIMIT = 5000;
+
+    /**
+     * Minimum speedup factor we require for the test to pass.
+     * Conservative: real-world improvement is typically 50–200×.
+     */
+    private static final double MIN_SPEEDUP_FACTOR = 3.0;
+
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    public void testEnableExpiryFalseIsFasterForSlowConsumer() throws 
Exception {
+        long msWithExpiry = measureSendTime(true);
+        long msWithoutExpiry = measureSendTime(false);
+
+        LOG.info("=== expiryCheckEnabled throughput results ===");
+        LOG.info("  expiryCheckEnabled=true  : {} ms for {} timed messages ({} 
msg/s)",
+                msWithExpiry, TIMED_COUNT,
+                msWithExpiry > 0 ? (TIMED_COUNT * 1000L / msWithExpiry) : 
"n/a");
+        LOG.info("  expiryCheckEnabled=false : {} ms for {} timed messages ({} 
msg/s)",
+                msWithoutExpiry, TIMED_COUNT,
+                msWithoutExpiry > 0 ? (TIMED_COUNT * 1000L / msWithoutExpiry) 
: "n/a");
+        LOG.info("  Speedup factor     : {}", msWithExpiry > 0 ? 
String.format("%.1f×", (double) msWithExpiry / msWithoutExpiry) : "n/a");
+
+        // Guard against pathological results (e.g. CI machine starved)
+        // — only assert if the expiry run was genuinely slow (> 200 ms).
+        if (msWithExpiry > 200) {
+            double speedup = (double) msWithExpiry / Math.max(1, 
msWithoutExpiry);
+            assertTrue(
+                    String.format(
+                        "Expected expiryCheckEnabled=false to be at least 
%.0f× faster than expiryCheckEnabled=true, "
+                        + "but got %.1f× (%d ms vs %d ms). "
+                        + "This likely means the O(n) expiry scan is no longer 
being skipped.",
+                        MIN_SPEEDUP_FACTOR, speedup, msWithoutExpiry, 
msWithExpiry),
+                    speedup >= MIN_SPEEDUP_FACTOR);
+        } else {
+            LOG.warn("expiryCheckEnabled=true run finished in only {} ms — 
machine may be too fast "
+                    + "or warm-up count is too low to trigger the O(n) path 
reliably on this hardware. "
+                    + "Skipping ratio assertion.", msWithExpiry);
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Helpers
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Starts a broker with the given {@code expiryCheckEnabled} setting, 
creates a
+     * slow consumer (prefetch=1, never reads), sends {@code WARMUP_COUNT}
+     * messages to fill the pending queue above the eviction high-water mark,
+     * then times sending {@code TIMED_COUNT} additional messages.
+     *
+     * @return wall-clock milliseconds for the timed phase
+     */
+    private long measureSendTime(boolean expiryCheckEnabled) throws Exception {
+        String brokerName = "perf-" + (expiryCheckEnabled ? "expiry-on" : 
"expiry-off");
+        BrokerService broker = buildBroker(brokerName, PENDING_LIMIT, 
expiryCheckEnabled);
+        try {
+            ActiveMQConnectionFactory cf =
+                    new ActiveMQConnectionFactory("vm://" + brokerName + 
"?create=false");
+            Connection conn = cf.createConnection();
+            conn.start();
+            Session session = conn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            ActiveMQTopic topic = new ActiveMQTopic("PERF.TOPIC");
+
+            // Create a consumer but never call receive() — this makes it slow.
+            // prefetch=1 so messages pile up in the broker's pending queue.
+            ActiveMQTopic topicWithPrefetch = new 
ActiveMQTopic("PERF.TOPIC?consumer.prefetchSize=1");
+            MessageConsumer consumer = 
session.createConsumer(topicWithPrefetch);
+
+            MessageProducer producer = session.createProducer(topic);
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+            // ---- Warm-up phase: fill pending queue above the high-water 
mark (1000) ----
+            for (int i = 0; i < WARMUP_COUNT; i++) {
+                producer.send(session.createTextMessage("warmup-" + i));
+            }
+
+            // ---- Timed phase: every add() triggers the expiry-scan code 
path ----
+            long start = System.currentTimeMillis();
+            for (int i = 0; i < TIMED_COUNT; i++) {
+                producer.send(session.createTextMessage("timed-" + i));
+            }
+            long elapsed = System.currentTimeMillis() - start;
+
+            conn.close();
+            return elapsed;
+        } finally {
+            broker.stop();
+        }
+    }
+
+    private BrokerService buildBroker(String brokerName, int pendingLimit, 
boolean expiryCheckEnabled)
+            throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.setBrokerName(brokerName);
+        broker.addConnector("vm://" + brokerName);
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        ConstantPendingMessageLimitStrategy strategy = new 
ConstantPendingMessageLimitStrategy();
+        strategy.setLimit(pendingLimit);
+
+        OldestMessageEvictionStrategy evictionStrategy = new 
OldestMessageEvictionStrategy();
+        evictionStrategy.setExpiryCheckEnabled(expiryCheckEnabled);
+
+        PolicyEntry entry = new PolicyEntry();
+        entry.setTopic(">");
+        entry.setTopicPrefetch(1);
+        entry.setPendingMessageLimitStrategy(strategy);
+        entry.setMessageEvictionStrategy(evictionStrategy);
+        entry.setDeadLetterStrategy(null);
+
+        List<PolicyEntry> entries = new ArrayList<>();
+        entries.add(entry);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setPolicyEntries(entries);
+        broker.setDestinationPolicy(policyMap);
+
+        broker.start();
+        broker.waitUntilStarted();
+        return broker;
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact



Reply via email to