This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
new cf19b850b7 AMQ-9721 - Fix performance issues during non-persistent
cursor removal (#1447)
cf19b850b7 is described below
commit cf19b850b78eaa7b14295df4262bf800fe82f1ad
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Wed Jun 4 08:26:32 2025 -0400
AMQ-9721 - Fix performance issues during non-persistent cursor removal
(#1447)
This fixes the broker so multiple removals are no longer done for the
same message leading to having to search the entire non persistent
pending list. Durable subscriptions now check the persistence type of
the message so the cursor will no longer search everything in a
non-persistent pending list when the message is persistent.
(cherry picked from commit 5a3abbc877bff6d73c2ab6cecec81d50dd18c839)
---
.../org/apache/activemq/broker/region/Topic.java | 18 ++++++--
.../activemq/broker/region/TopicSubscription.java | 5 +-
.../cursors/StoreDurableSubscriberCursor.java | 24 +++++++++-
.../cursors/KahaDBPendingMessageCursorTest.java | 54 ++++++++++++++++++++++
4 files changed, 94 insertions(+), 7 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 27c5cf132c..e921c36dd4 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -704,6 +705,11 @@ public class Topic extends BaseDestination implements Task
{
for (DurableTopicSubscription sub :
durableSubscribers.values()) {
if (!sub.isActive() ||
sub.isEnableMessageExpirationOnActiveDurableSubs()) {
message.setRegionDestination(this);
+ // AMQ-9721 - Remove message from the cursor if it
exists after
+ // loading from the store. Store recoverExpired()
does not inc
+ // the ref count so we don't need to decrement
here, but if
+ // the cursor finds its own copy in memory it will
dec that ref.
+ sub.removePending(message);
messageExpired(connectionContext, sub, message);
}
}
@@ -894,6 +900,15 @@ public class Topic extends BaseDestination implements Task
{
if (isEligibleForExpiration(sub)) {
expiredMessages.forEach(message -> {
message.setRegionDestination(Topic.this);
+ try {
+ // AMQ-9721 - Remove message from the
cursor if it exists after
+ // loading from the store. Store
recoverExpired() does not inc
+ // the ref count so we don't need to
decrement here, but if
+ // the cursor finds its own copy in memory
it will dec that ref.
+ sub.removePending(message);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
messageExpired(connectionContext, sub,
message);
});
}
@@ -932,9 +947,6 @@ public class Topic extends BaseDestination implements Task {
ack.setDestination(destination);
ack.setMessageID(reference.getMessageId());
try {
- if (subs instanceof DurableTopicSubscription) {
- ((DurableTopicSubscription)subs).removePending(reference);
- }
acknowledge(context, subs, ack, reference);
} catch (Exception e) {
LOG.error("Failed to remove expired Message from the store ", e);
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 73a5113765..a6cb27f927 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -188,6 +188,9 @@ public class TopicSubscription extends AbstractSubscription
{
messagesToEvict = oldMessages.length;
for (int i = 0; i < messagesToEvict; i++) {
MessageReference oldMessage =
oldMessages[i];
+ // AMQ-9721 - discard no longer removes
from matched so remove here
+ oldMessage.decrementReferenceCount();
+ matched.remove(oldMessage);
//Expired here is false as we are
discarding due to the messageEvictingStrategy
discard(oldMessage, false);
}
@@ -751,8 +754,6 @@ public class TopicSubscription extends AbstractSubscription
{
private void discard(MessageReference message, boolean expired) {
discarding = true;
try {
- message.decrementReferenceCount();
- matched.remove(message);
if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment();
if(destination.isAdvancedNetworkStatisticsEnabled() &&
getContext() != null && getContext().isNetworkConnection()) {
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
index 55a77550e1..510412b794 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
@@ -28,6 +28,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.NullMessageReference;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.usage.SystemUsage;
@@ -274,8 +275,27 @@ public class StoreDurableSubscriberCursor extends
AbstractPendingMessageCursor {
@Override
public synchronized void remove(MessageReference node) {
- for (PendingMessageCursor tsp : storePrefetches) {
- tsp.remove(node);
+ // AMQ-9721 - Check if message is persistent or non-persistent.
+ // Removing from the non-persistent cursor requires searching the
+ // entire list if it's paged onto disk which is quite slow,
+ // so it doesn't make sense to try and remove as it will never
+ // exist if it's persistent.
+
+ // MessageReference can be a null reference if called from
DurableSubscriptionView
+ // so we do not know if it's persistent and just need to search
everything.
+ if (node instanceof NullMessageReference) {
+ for (PendingMessageCursor tsp : storePrefetches) {
+ tsp.remove(node);
+ }
+ } else if (node.isPersistent()) {
+ for (PendingMessageCursor tsp : storePrefetches) {
+ if (tsp.equals(nonPersistent)) {
+ continue;
+ }
+ tsp.remove(node);
+ }
+ } else {
+ nonPersistent.remove(node);
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
index 8fd42e275e..9bba67f723 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
@@ -35,9 +35,12 @@ import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@@ -352,4 +355,55 @@ public class KahaDBPendingMessageCursorTest extends
}
+ // Test for AMQ-9721
+ @Test
+ public void testDurableCursorRemoveRefMessageSize() throws Exception {
+ AtomicLong publishedMessageSize = new AtomicLong();
+
+ Connection connection = new
ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ // send 100 persistent and non persistent
+ Topic topic = publishTestMessagesDurable(connection, new String[]
{"sub1"}, 100,
+ publishedMessageSize, DeliveryMode.NON_PERSISTENT);
+ publishTestMessagesDurable(connection, new String[] {"sub1"}, 100,
+ publishedMessageSize, DeliveryMode.PERSISTENT);
+
+ SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+
+ // verify the count and size
+ verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
+
+ // Iterate and remove using the pending cursor to test removal
+ final DurableTopicSubscription sub =
topic.getDurableTopicSubs().get(subKey);
+ PendingMessageCursor pending = sub.getPending();
+ try {
+ pending.reset();
+ while (pending.hasNext()) {
+ MessageReference node = pending.next();
+ node.decrementReferenceCount();
+ // test the remove(ref) method which has been updated
+ // to check persistence type
+ pending.remove(node);
+
+ // If persistent remove out of the store
+ if (node.isPersistent()) {
+ MessageAck ack = new MessageAck();
+ ack.setLastMessageId(node.getMessageId());
+ ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ ack.setDestination(topic.getActiveMQDestination());
+ topic.acknowledge(sub.getContext(), sub, ack, node);
+ }
+ }
+ } finally {
+ pending.release();
+ }
+
+ // verify everything has been cleared correctly, persistent and
+ // non-persistent
+ verifyPendingStats(topic, subKey, 0, 0);
+ // Memory usage should be 0 after removal
+ assertEquals(0, topic.getMemoryUsage().getUsage());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact