This is an automated email from the ASF dual-hosted git repository.
mattrpav pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new f166435d31 [#2020] Add expiryCheckEnabled option to
MessageEvictionStrategy to skip O(n) expiry scan TopicSubscription
f166435d31 is described below
commit f166435d31d6e9d856d8996741ca9feeaeb8ccb2
Author: mwashburn <[email protected]>
AuthorDate: Thu May 14 12:18:19 2026 -0400
[#2020] Add expiryCheckEnabled option to MessageEvictionStrategy to skip
O(n) expiry scan TopicSubscription
When a slow-consumer's pending-message buffer exceeds the high-water mark
(default: 1000 messages), TopicSubscription.add() calls
removeExpiredMessages()
on every single message add. That method iterates every pending message and
calls isExpired(), which is an O(n) scan over the full buffer.
When messages carry no TTL (isExpired() always returns false), this scan
provides no benefit -- it iterates the entire buffer on every add with zero
messages removed. With a large pending limit (e.g. 20,000), this adds up to
millions of no-op iterations per second on busy topics.
This commit adds an expiryCheckEnabled boolean (default true) to
MessageEvictionStrategy.
Setting it to false inserts a single guard in TopicSubscription.add():
if (expiryCheckEnabled && !matched.isEmpty() && matched.size() > max) {
removeExpiredMessages();
}
Also adds:
- TopicSubscriptionEnableExpiryTest: 11 correctness/propagation/integration
tests
- TopicSubscriptionEnableExpiryThroughputTest: throughput comparison test
---
.../activemq/broker/region/TopicSubscription.java | 3 +-
.../region/policy/MessageEvictionStrategy.java | 14 +
.../policy/MessageEvictionStrategySupport.java | 26 +-
.../region/TopicSubscriptionEnableExpiryTest.java | 408 +++++++++++++++++++++
...opicSubscriptionEnableExpiryThroughputTest.java | 211 +++++++++++
5 files changed, 658 insertions(+), 4 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 8a1a300dae..62659a6d5b 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -25,7 +25,6 @@ import
org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.*;
-import org.apache.activemq.management.MessageFlowStats;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transport.TransmitCallback;
@@ -166,7 +165,7 @@ public class TopicSubscription extends AbstractSubscription
{
if (maximumPendingMessages > 0 &&
maximumPendingMessages < max) {
max = maximumPendingMessages;
}
- if (!matched.isEmpty() && matched.size() > max) {
+ if (messageEvictionStrategy.isExpiryCheckEnabled() &&
!matched.isEmpty() && matched.size() > max) {
removeExpiredMessages();
}
// lets discard old messages as we are a slow consumer
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
index 334cb3730f..5a5f62f51f 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategy.java
@@ -42,4 +42,18 @@ public interface MessageEvictionStrategy {
*/
int getEvictExpiredMessagesHighWatermark();
+ /**
+ * Returns whether the eager expired-message scan is enabled.
+ * <p>
+ * When {@code false}, the O(n) scan inside
+ * {@link org.apache.activemq.broker.region.TopicSubscription#add} is
skipped entirely.
+ * Set to {@code false} when messages carry no TTL, or when the scan cost
outweighs
+ * the benefit of eagerly evicting expired messages from slow-consumer
buffers.
+ * <p>
+ * See {@link MessageEvictionStrategySupport} for the default
implementation that returns {@code true}.
+ *
+ * @return {@code true} if the expiry scan is enabled (default), {@code
false} if skipped
+ */
+ boolean isExpiryCheckEnabled();
+
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
index 32f0c6f0c0..0df890b6cf 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageEvictionStrategySupport.java
@@ -24,6 +24,7 @@ package org.apache.activemq.broker.region.policy;
public abstract class MessageEvictionStrategySupport implements
MessageEvictionStrategy {
private int evictExpiredMessagesHighWatermark = 1000;
+ private boolean expiryCheckEnabled = true;
public int getEvictExpiredMessagesHighWatermark() {
return evictExpiredMessagesHighWatermark;
@@ -35,6 +36,27 @@ public abstract class MessageEvictionStrategySupport
implements MessageEvictionS
public void setEvictExpiredMessagesHighWatermark(int
evictExpiredMessagesHighWaterMark) {
this.evictExpiredMessagesHighWatermark =
evictExpiredMessagesHighWaterMark;
}
-
-
+
+ @Override
+ public boolean isExpiryCheckEnabled() {
+ return expiryCheckEnabled;
+ }
+
+ /**
+ * Controls whether the broker performs an eager expired-message scan when
a
+ * non-durable topic subscription's pending slow-consumer buffer exceeds
+ * {@link #getEvictExpiredMessagesHighWatermark()}.
+ * <p>
+ * Set to {@code false} when messages carry no TTL, or when the O(n) scan
cost
+ * outweighs the benefit of eagerly evicting expired messages from
slow-consumer
+ * buffers. When messages have no TTL, every scan iterates the full buffer
without
+ * removing anything, adding latency to every enqueue once the buffer
exceeds the
+ * high-water mark.
+ *
+ * @param expiryCheckEnabled {@code false} to skip the scan; {@code true}
to enable it (default)
+ */
+ public void setExpiryCheckEnabled(boolean expiryCheckEnabled) {
+ this.expiryCheckEnabled = expiryCheckEnabled;
+ }
+
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryTest.java
new file mode 100644
index 0000000000..4dcc34e574
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryTest.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import jakarta.jms.Connection;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+
+import org.apache.activemq.broker.BrokerService;
+import
org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.activemq.test.annotations.ParallelTest;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests correctness of the {@code ExpiryCheckEnabled} feature on
+ * {@link org.apache.activemq.broker.region.policy.MessageEvictionStrategy}
and its effect on
+ * {@link TopicSubscription}.
+ *
+ * <p>Background: when a slow-consumer queue exceeds
+ * {@code evictExpiredMessagesHighWatermark} (default: 1000), ActiveMQ calls
+ * {@code TopicSubscription.removeExpiredMessages()} on every single
+ * {@code add()} call. That method iterates every pending message checking
+ * {@code isExpired()} — an O(n) scan. Setting {@code
ExpiryCheckEnabled=false}
+ * on the {@link
org.apache.activemq.broker.region.policy.MessageEvictionStrategy} skips that
+ * scan entirely via a single boolean check guarding the call site.
+ */
+@Category(ParallelTest.class)
+public class TopicSubscriptionEnableExpiryTest {
+
+ //
-------------------------------------------------------------------------
+ // Unit tests — no broker needed
+ //
-------------------------------------------------------------------------
+
+ /**
+ * {@link OldestMessageEvictionStrategy#isExpiryCheckEnabled()} must
default to {@code true} so
+ * that existing deployments that do not set the property are unaffected.
+ */
+ @Test
+ public void testEvictionStrategyExpiryCheckDefaultsToTrue() {
+ OldestMessageEvictionStrategy strategy = new
OldestMessageEvictionStrategy();
+ assertTrue("ExpiryCheckEnabled must default to true (preserves
existing behaviour)",
+ strategy.isExpiryCheckEnabled());
+ }
+
+ @Test
+ public void testEvictionStrategySetExpiryCheckFalse() {
+ OldestMessageEvictionStrategy strategy = new
OldestMessageEvictionStrategy();
+ strategy.setExpiryCheckEnabled(false);
+ assertFalse("ExpiryCheckEnabled should be false after setter call",
+ strategy.isExpiryCheckEnabled());
+ }
+
+ @Test
+ public void testEvictionStrategySetExpiryCheckRoundTrip() {
+ OldestMessageEvictionStrategy strategy = new
OldestMessageEvictionStrategy();
+ strategy.setExpiryCheckEnabled(false);
+ assertFalse(strategy.isExpiryCheckEnabled());
+ strategy.setExpiryCheckEnabled(true);
+ assertTrue(strategy.isExpiryCheckEnabled());
+ }
+
+ /**
+ * {@link TopicSubscription} must pick up the eviction strategy flag — when
+ * {@code ExpiryCheckEnabled=false} is set on the strategy the scan is
skipped.
+ */
+ @Test
+ public void testTopicSubscriptionUsesStrategyExpiryCheckFlag() throws
Exception {
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.start();
+ try {
+ TopicSubscription sub = buildMinimalTopicSubscription(broker);
+ // default strategy — expiry scan enabled
+ assertTrue("default strategy must have ExpiryCheckEnabled=true",
+ sub.getMessageEvictionStrategy().isExpiryCheckEnabled());
+
+ OldestMessageEvictionStrategy strategy = new
OldestMessageEvictionStrategy();
+ strategy.setExpiryCheckEnabled(false);
+ sub.setMessageEvictionStrategy(strategy);
+ assertFalse("strategy with ExpiryCheckEnabled=false must reflect
on subscription",
+ sub.getMessageEvictionStrategy().isExpiryCheckEnabled());
+ } finally {
+ broker.stop();
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // PolicyEntry propagation tests
+ //
-------------------------------------------------------------------------
+
+ /**
+ * When a {@link PolicyEntry} is configured with an eviction strategy that
has
+ * {@code ExpiryCheckEnabled=false}, {@code
PolicyEntry.configure(TopicSubscription)}
+ * must propagate the strategy so the O(n) expiry scan is skipped.
+ */
+ @Test
+ public void testPolicyEntryPropagatesEvictionStrategyToSubscription()
throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.start();
+ try {
+ ConstantPendingMessageLimitStrategy limitStrategy = new
ConstantPendingMessageLimitStrategy();
+ limitStrategy.setLimit(2000);
+
+ OldestMessageEvictionStrategy evictionStrategy = new
OldestMessageEvictionStrategy();
+ evictionStrategy.setExpiryCheckEnabled(false);
+
+ PolicyEntry entry = new PolicyEntry();
+ entry.setPendingMessageLimitStrategy(limitStrategy);
+ entry.setMessageEvictionStrategy(evictionStrategy);
+
+ TopicSubscription sub = buildMinimalTopicSubscription(broker);
+
assertTrue(sub.getMessageEvictionStrategy().isExpiryCheckEnabled()); // default
+
+ entry.configure(broker.getBroker(), broker.getSystemUsage(), sub);
+
+ assertFalse("PolicyEntry.configure() must propagate eviction
strategy with ExpiryCheckEnabled=false",
+ sub.getMessageEvictionStrategy().isExpiryCheckEnabled());
+ } finally {
+ broker.stop();
+ }
+ }
+
+ /**
+ * When the default eviction strategy is used (no override on PolicyEntry),
+ * the subscription's expiry scan must remain enabled.
+ */
+ @Test
+ public void testDefaultPolicyEntryLeavesExpiryCheckEnabled() throws
Exception {
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.start();
+ try {
+ ConstantPendingMessageLimitStrategy limitStrategy = new
ConstantPendingMessageLimitStrategy();
+ limitStrategy.setLimit(2000);
+
+ PolicyEntry entry = new PolicyEntry();
+ entry.setPendingMessageLimitStrategy(limitStrategy);
+ // no messageEvictionStrategy override — default
OldestMessageEvictionStrategy used
+
+ TopicSubscription sub = buildMinimalTopicSubscription(broker);
+ entry.configure(broker.getBroker(), broker.getSystemUsage(), sub);
+
+ assertTrue("subscription must still have ExpiryCheckEnabled=true
when using the default eviction strategy",
+ sub.getMessageEvictionStrategy().isExpiryCheckEnabled());
+ } finally {
+ broker.stop();
+ }
+ }
+
+ /**
+ * When no {@link
org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy} is
+ * set at all, the subscription's expiry scan flag must remain at its
default ({@code true}).
+ */
+ @Test
+ public void testPolicyEntryWithNoStrategyLeavesExpiryCheckEnabled() throws
Exception {
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.start();
+ try {
+ PolicyEntry entry = new PolicyEntry(); // no strategy, no eviction
strategy override
+
+ TopicSubscription sub = buildMinimalTopicSubscription(broker);
+ entry.configure(broker.getBroker(), broker.getSystemUsage(), sub);
+
+ assertTrue("subscription must keep ExpiryCheckEnabled=true when no
strategy is configured",
+ sub.getMessageEvictionStrategy().isExpiryCheckEnabled());
+ } finally {
+ broker.stop();
+ }
+ }
+
+ /**
+ * A custom {@link
org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy} with no
+ * eviction strategy override must leave the subscription's expiry scan
enabled.
+ */
+ @Test
+ public void
testCustomLimitStrategyWithDefaultEvictionLeavesExpiryCheckEnabled() throws
Exception {
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.start();
+ try {
+
org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy
customStrategy =
+ subscription -> 500;
+
+ PolicyEntry entry = new PolicyEntry();
+ entry.setPendingMessageLimitStrategy(customStrategy);
+ // no eviction strategy override — default
OldestMessageEvictionStrategy(ExpiryCheckEnabled=true)
+
+ TopicSubscription sub = buildMinimalTopicSubscription(broker);
+ entry.configure(broker.getBroker(), broker.getSystemUsage(), sub);
+
+ assertTrue("A custom limit strategy with default eviction strategy
must leave ExpiryCheckEnabled=true",
+ sub.getMessageEvictionStrategy().isExpiryCheckEnabled());
+ } finally {
+ broker.stop();
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // Integration tests — embedded broker, real JMS
+ //
-------------------------------------------------------------------------
+
+ /**
+ * With {@code ExpiryCheckEnabled=false} on the eviction strategy, messages
+ * with an explicit TTL that has passed must NOT be removed by the eager
expiry
+ * scan. The messages remain in the pending queue and are only evicted by
the
+ * normal eviction strategy when the limit is exceeded.
+ *
+ * <p>We verify this by:
+ * <ol>
+ * <li>Configuring a topic with limit=200, ExpiryCheckEnabled=false.
+ * <li>Sending 250 messages with a very short TTL.
+ * <li>Waiting for all TTLs to elapse.
+ * <li>Sending one more message (triggers the code path).
+ * <li>Asserting that the broker's expired-message counter is 0
+ * (no expiry scan ran) while the discarded counter is > 0
+ * (normal eviction ran as expected).
+ * </ol>
+ */
+ @Test
+ public void testExpiryCheckDisabledSkipsExpiredMessageScan() throws
Exception {
+ final int PENDING_LIMIT = 200;
+ final int SEND_COUNT = 250;
+ final long SHORT_TTL_MS = 100;
+
+ BrokerService broker = buildBroker(PENDING_LIMIT, false /*
ExpiryCheckEnabled=false */);
+ try {
+ // prefetchSize=1 so messages pile up in the broker's matched
queue (not in client buffer)
+ ActiveMQTopic topic = new
ActiveMQTopic("TEST.EXPIRY.DISABLED?consumer.prefetchSize=1");
+
+ org.apache.activemq.ActiveMQConnectionFactory cf =
+ new
org.apache.activemq.ActiveMQConnectionFactory("vm://expiry-disabled");
+ Connection conn = cf.createConnection();
+ conn.start();
+ Session session = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ // Create a slow consumer (prefetch=1, never reads) to back up
messages
+ MessageConsumer consumer = session.createConsumer(topic);
+
+ MessageProducer producer = session.createProducer(new
ActiveMQTopic("TEST.EXPIRY.DISABLED"));
+ // Send messages with short TTL
+ for (int i = 0; i < SEND_COUNT; i++) {
+ TextMessage msg = session.createTextMessage("msg-" + i);
+ producer.send(msg, jakarta.jms.DeliveryMode.NON_PERSISTENT, 4,
SHORT_TTL_MS);
+ }
+
+ // Wait for all TTLs to expire
+ Thread.sleep(SHORT_TTL_MS * 5);
+
+ // Send one more message — this triggers the guard in
TopicSubscription.add()
+ producer.send(session.createTextMessage("trigger"));
+
+ // Grab the destination stats
+ Destination dest = broker.getDestination(new
ActiveMQTopic("TEST.EXPIRY.DISABLED"));
+ long expiredCount =
dest.getDestinationStatistics().getExpired().getCount();
+
+ assertEquals(
+ "With ExpiryCheckEnabled=false, the expiry scan must not
run — expired counter must be 0",
+ 0L, expiredCount);
+
+ conn.close();
+ } finally {
+ broker.stop();
+ }
+ }
+
+ /**
+ * Complementary test: with {@code ExpiryCheckEnabled=true} (the default)
the eager
+ * scan DOES run and picks up expired messages, so the broker's expired
counter
+ * should be non-zero after the same scenario.
+ */
+ @Test
+ public void testExpiryCheckEnabledRunsExpiredMessageScan() throws
Exception {
+ final int PENDING_LIMIT = 200;
+ final int SEND_COUNT = 250;
+ final long SHORT_TTL_MS = 100;
+
+ BrokerService broker = buildBroker(PENDING_LIMIT, true /*
ExpiryCheckEnabled=true */);
+ try {
+ // prefetchSize=1 so messages pile up in the broker's matched
queue (not in client buffer)
+ ActiveMQTopic topic = new
ActiveMQTopic("TEST.EXPIRY.ENABLED?consumer.prefetchSize=1");
+
+ org.apache.activemq.ActiveMQConnectionFactory cf =
+ new
org.apache.activemq.ActiveMQConnectionFactory("vm://expiry-enabled");
+ Connection conn = cf.createConnection();
+ conn.start();
+ Session session = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ // Slow consumer — never reads
+ MessageConsumer consumer = session.createConsumer(topic);
+
+ MessageProducer producer = session.createProducer(new
ActiveMQTopic("TEST.EXPIRY.ENABLED"));
+ for (int i = 0; i < SEND_COUNT; i++) {
+ TextMessage msg = session.createTextMessage("msg-" + i);
+ producer.send(msg, jakarta.jms.DeliveryMode.NON_PERSISTENT, 4,
SHORT_TTL_MS);
+ }
+
+ // Wait for all TTLs to expire
+ Thread.sleep(SHORT_TTL_MS * 5);
+
+ // Send more messages to trigger expiry scan (queue already >
highWatermark)
+ for (int i = 0; i < 50; i++) {
+ producer.send(session.createTextMessage("trigger-" + i));
+ }
+
+ Destination dest = broker.getDestination(new
ActiveMQTopic("TEST.EXPIRY.ENABLED"));
+ long expiredCount =
dest.getDestinationStatistics().getExpired().getCount();
+
+ assertTrue(
+ "With ExpiryCheckEnabled=true, the expiry scan must run
and detect expired messages (got " + expiredCount + ")",
+ expiredCount > 0);
+
+ conn.close();
+ } finally {
+ broker.stop();
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // Helpers
+ //
-------------------------------------------------------------------------
+
+ private BrokerService buildBroker(int pendingLimit, boolean
ExpiryCheckEnabled) throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ String brokerName = ExpiryCheckEnabled ? "expiry-enabled" :
"expiry-disabled";
+ broker.setBrokerName(brokerName);
+ broker.addConnector("vm://" + brokerName);
+ broker.setDeleteAllMessagesOnStartup(true);
+
+ ConstantPendingMessageLimitStrategy limitStrategy = new
ConstantPendingMessageLimitStrategy();
+ limitStrategy.setLimit(pendingLimit);
+
+ OldestMessageEvictionStrategy evictionStrategy = new
OldestMessageEvictionStrategy();
+ evictionStrategy.setExpiryCheckEnabled(ExpiryCheckEnabled);
+
+ PolicyEntry entry = new PolicyEntry();
+ entry.setTopic(">");
+ entry.setPendingMessageLimitStrategy(limitStrategy);
+ entry.setMessageEvictionStrategy(evictionStrategy);
+ entry.setDeadLetterStrategy(null); // don't route to DLQ
+
+ List<PolicyEntry> entries = new ArrayList<>();
+ entries.add(entry);
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setPolicyEntries(entries);
+ broker.setDestinationPolicy(policyMap);
+
+ broker.start();
+ broker.waitUntilStarted();
+ return broker;
+ }
+
+ /**
+ * Builds a minimal {@link TopicSubscription} using the broker's internals
—
+ * just enough to test the flag, without going through a real JMS
connection.
+ */
+ private TopicSubscription buildMinimalTopicSubscription(BrokerService
broker) throws Exception {
+ org.apache.activemq.command.ConsumerInfo info = new
org.apache.activemq.command.ConsumerInfo();
+ info.setConsumerId(new org.apache.activemq.command.ConsumerId(
+ new org.apache.activemq.command.SessionId(
+ new
org.apache.activemq.command.ConnectionId("test-conn"), 1), 1));
+ info.setDestination(new ActiveMQTopic("TEST.UNIT"));
+ info.setPrefetchSize(10);
+
+ org.apache.activemq.broker.ConnectionContext ctx =
+ new org.apache.activemq.broker.ConnectionContext();
+ ctx.setBroker(broker.getBroker());
+
+ return new TopicSubscription(broker.getBroker(), ctx, info,
broker.getSystemUsage());
+ }
+}
+
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryThroughputTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryThroughputTest.java
new file mode 100644
index 0000000000..2299f255d8
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/TopicSubscriptionEnableExpiryThroughputTest.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import
org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.apache.activemq.test.annotations.ParallelTest;
+import static org.junit.Assert.assertTrue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Throughput comparison: {@code expiryCheckEnabled=true} vs {@code
expiryCheckEnabled=false}
+ * for a slow-consumer topic with a large pending-message limit.
+ *
+ * <h3>What is being measured</h3>
+ * <p>When a topic consumer is slow (its pending queue exceeds
+ * {@code evictExpiredMessagesHighWatermark = 1000} by default), ActiveMQ calls
+ * {@link TopicSubscription#removeExpiredMessages()} on every single
+ * {@code add()} call. That method iterates every pending message to call
+ * {@code isExpired()} — an O(n) operation. With a pending limit of 5,000
+ * that scan runs up to 5,000 iterations per message send, dominated by the
+ * Java heap iteration cost.
+ *
+ * <p>With {@code expiryCheckEnabled=false} on the {@link
org.apache.activemq.broker.region.policy.MessageEvictionStrategy}
+ * the scan body is skipped entirely via a single boolean check, reducing
+ * per-send work back to O(1).
+ *
+ * <h3>Pass/fail threshold</h3>
+ * <p>The test asserts that the {@code expiryCheckEnabled=false} run completes
at
+ * least {@code MIN_SPEEDUP_FACTOR}× faster than the {@code
expiryCheckEnabled=true}
+ * run. A factor of 3 is deliberately conservative — in practice the
+ * improvement is often 50–200× for large queues with pure in-memory messages.
+ *
+ * <p>The test is annotated {@code @Category(ParallelTest.class)} so it runs
+ * in the normal CI suite, but uses a modest message count to keep wall-clock
+ * time acceptable on slow machines.
+ */
+@Category(ParallelTest.class)
+public class TopicSubscriptionEnableExpiryThroughputTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TopicSubscriptionEnableExpiryThroughputTest.class);
+
+ /** Number of messages to send during the warm-up phase (fills queue above
highWatermark). */
+ private static final int WARMUP_COUNT = 1200;
+
+ /**
+ * Number of messages timed during the measurement phase.
+ * Sending happens after the queue is already above 1000 (highWatermark),
+ * so every message triggers the expiry-scan code path.
+ */
+ private static final int TIMED_COUNT = 2000;
+
+ /** Pending message limit — large enough that O(n) scan is expensive. */
+ private static final int PENDING_LIMIT = 5000;
+
+ /**
+ * Minimum speedup factor we require for the test to pass.
+ * Conservative: real-world improvement is typically 50–200×.
+ */
+ private static final double MIN_SPEEDUP_FACTOR = 3.0;
+
+ //
-------------------------------------------------------------------------
+
+ @Test
+ public void testEnableExpiryFalseIsFasterForSlowConsumer() throws
Exception {
+ long msWithExpiry = measureSendTime(true);
+ long msWithoutExpiry = measureSendTime(false);
+
+ LOG.info("=== expiryCheckEnabled throughput results ===");
+ LOG.info(" expiryCheckEnabled=true : {} ms for {} timed messages ({}
msg/s)",
+ msWithExpiry, TIMED_COUNT,
+ msWithExpiry > 0 ? (TIMED_COUNT * 1000L / msWithExpiry) :
"n/a");
+ LOG.info(" expiryCheckEnabled=false : {} ms for {} timed messages ({}
msg/s)",
+ msWithoutExpiry, TIMED_COUNT,
+ msWithoutExpiry > 0 ? (TIMED_COUNT * 1000L / msWithoutExpiry)
: "n/a");
+ LOG.info(" Speedup factor : {}", msWithExpiry > 0 ?
String.format("%.1f×", (double) msWithExpiry / msWithoutExpiry) : "n/a");
+
+ // Guard against pathological results (e.g. CI machine starved)
+ // — only assert if the expiry run was genuinely slow (> 200 ms).
+ if (msWithExpiry > 200) {
+ double speedup = (double) msWithExpiry / Math.max(1,
msWithoutExpiry);
+ assertTrue(
+ String.format(
+ "Expected expiryCheckEnabled=false to be at least
%.0f× faster than expiryCheckEnabled=true, "
+ + "but got %.1f× (%d ms vs %d ms). "
+ + "This likely means the O(n) expiry scan is no longer
being skipped.",
+ MIN_SPEEDUP_FACTOR, speedup, msWithoutExpiry,
msWithExpiry),
+ speedup >= MIN_SPEEDUP_FACTOR);
+ } else {
+ LOG.warn("expiryCheckEnabled=true run finished in only {} ms —
machine may be too fast "
+ + "or warm-up count is too low to trigger the O(n) path
reliably on this hardware. "
+ + "Skipping ratio assertion.", msWithExpiry);
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // Helpers
+ //
-------------------------------------------------------------------------
+
+ /**
+ * Starts a broker with the given {@code expiryCheckEnabled} setting,
creates a
+ * slow consumer (prefetch=1, never reads), sends {@code WARMUP_COUNT}
+ * messages to fill the pending queue above the eviction high-water mark,
+ * then times sending {@code TIMED_COUNT} additional messages.
+ *
+ * @return wall-clock milliseconds for the timed phase
+ */
+ private long measureSendTime(boolean expiryCheckEnabled) throws Exception {
+ String brokerName = "perf-" + (expiryCheckEnabled ? "expiry-on" :
"expiry-off");
+ BrokerService broker = buildBroker(brokerName, PENDING_LIMIT,
expiryCheckEnabled);
+ try {
+ ActiveMQConnectionFactory cf =
+ new ActiveMQConnectionFactory("vm://" + brokerName +
"?create=false");
+ Connection conn = cf.createConnection();
+ conn.start();
+ Session session = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ ActiveMQTopic topic = new ActiveMQTopic("PERF.TOPIC");
+
+ // Create a consumer but never call receive() — this makes it slow.
+ // prefetch=1 so messages pile up in the broker's pending queue.
+ ActiveMQTopic topicWithPrefetch = new
ActiveMQTopic("PERF.TOPIC?consumer.prefetchSize=1");
+ MessageConsumer consumer =
session.createConsumer(topicWithPrefetch);
+
+ MessageProducer producer = session.createProducer(topic);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ // ---- Warm-up phase: fill pending queue above the high-water
mark (1000) ----
+ for (int i = 0; i < WARMUP_COUNT; i++) {
+ producer.send(session.createTextMessage("warmup-" + i));
+ }
+
+ // ---- Timed phase: every add() triggers the expiry-scan code
path ----
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < TIMED_COUNT; i++) {
+ producer.send(session.createTextMessage("timed-" + i));
+ }
+ long elapsed = System.currentTimeMillis() - start;
+
+ conn.close();
+ return elapsed;
+ } finally {
+ broker.stop();
+ }
+ }
+
+ private BrokerService buildBroker(String brokerName, int pendingLimit,
boolean expiryCheckEnabled)
+ throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.setBrokerName(brokerName);
+ broker.addConnector("vm://" + brokerName);
+ broker.setDeleteAllMessagesOnStartup(true);
+
+ ConstantPendingMessageLimitStrategy strategy = new
ConstantPendingMessageLimitStrategy();
+ strategy.setLimit(pendingLimit);
+
+ OldestMessageEvictionStrategy evictionStrategy = new
OldestMessageEvictionStrategy();
+ evictionStrategy.setExpiryCheckEnabled(expiryCheckEnabled);
+
+ PolicyEntry entry = new PolicyEntry();
+ entry.setTopic(">");
+ entry.setTopicPrefetch(1);
+ entry.setPendingMessageLimitStrategy(strategy);
+ entry.setMessageEvictionStrategy(evictionStrategy);
+ entry.setDeadLetterStrategy(null);
+
+ List<PolicyEntry> entries = new ArrayList<>();
+ entries.add(entry);
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setPolicyEntries(entries);
+ broker.setDestinationPolicy(policyMap);
+
+ broker.start();
+ broker.waitUntilStarted();
+ return broker;
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact