This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a3abbc877 AMQ-9721 - Fix performance issues during non-persistent 
cursor removal (#1447)
5a3abbc877 is described below

commit 5a3abbc877bff6d73c2ab6cecec81d50dd18c839
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Wed Jun 4 08:26:32 2025 -0400

    AMQ-9721 - Fix performance issues during non-persistent cursor removal 
(#1447)
    
    This fixes the broker so multiple removals are no longer done for the
    same message leading to having to search the entire non persistent
    pending list. Durable subscriptions now check the persistence type of
    the message so the cursor will no longer search everything in a
    non-persistent pending list when the message is persistent.
---
 .../org/apache/activemq/broker/region/Topic.java   | 18 ++++++--
 .../activemq/broker/region/TopicSubscription.java  |  5 +-
 .../cursors/StoreDurableSubscriberCursor.java      | 24 +++++++++-
 .../cursors/KahaDBPendingMessageCursorTest.java    | 54 ++++++++++++++++++++++
 4 files changed, 94 insertions(+), 7 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index b24c71b62f..bb044c797a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -704,6 +705,11 @@ public class Topic extends BaseDestination implements Task 
{
                     for (DurableTopicSubscription sub : 
durableSubscribers.values()) {
                         if (!sub.isActive() || 
sub.isEnableMessageExpirationOnActiveDurableSubs()) {
                             message.setRegionDestination(this);
+                            // AMQ-9721 - Remove message from the cursor if it 
exists after
+                            // loading from the store.  Store recoverExpired() 
does not inc
+                            // the ref count so we don't need to decrement 
here, but if
+                            // the cursor finds its own copy in memory it will 
dec that ref.
+                            sub.removePending(message);
                             messageExpired(connectionContext, sub, message);
                         }
                     }
@@ -894,6 +900,15 @@ public class Topic extends BaseDestination implements Task 
{
                         if (isEligibleForExpiration(sub)) {
                             expiredMessages.forEach(message -> {
                                 message.setRegionDestination(Topic.this);
+                                try {
+                                    // AMQ-9721 - Remove message from the 
cursor if it exists after
+                                    // loading from the store.  Store 
recoverExpired() does not inc
+                                    // the ref count so we don't need to 
decrement here, but if
+                                    // the cursor finds its own copy in memory 
it will dec that ref.
+                                    sub.removePending(message);
+                                } catch (IOException e) {
+                                    throw new UncheckedIOException(e);
+                                }
                                 messageExpired(connectionContext, sub, 
message);
                             });
                         }
@@ -932,9 +947,6 @@ public class Topic extends BaseDestination implements Task {
         ack.setDestination(destination);
         ack.setMessageID(reference.getMessageId());
         try {
-            if (subs instanceof DurableTopicSubscription) {
-                ((DurableTopicSubscription)subs).removePending(reference);
-            }
             acknowledge(context, subs, ack, reference);
         } catch (Exception e) {
             LOG.error("Failed to remove expired Message from the store ", e);
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 383cfe2833..8a1a300dae 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -188,6 +188,9 @@ public class TopicSubscription extends AbstractSubscription 
{
                                 messagesToEvict = oldMessages.length;
                                 for (int i = 0; i < messagesToEvict; i++) {
                                     MessageReference oldMessage = 
oldMessages[i];
+                                    // AMQ-9721 - discard no longer removes 
from matched so remove here
+                                    oldMessage.decrementReferenceCount();
+                                    matched.remove(oldMessage);
                                     //Expired here is false as we are 
discarding due to the messageEvictingStrategy
                                     discard(oldMessage, false);
                                 }
@@ -751,8 +754,6 @@ public class TopicSubscription extends AbstractSubscription 
{
     private void discard(MessageReference message, boolean expired) {
         discarding = true;
         try {
-            message.decrementReferenceCount();
-            matched.remove(message);
             if (destination != null) {
                 
destination.getDestinationStatistics().getDequeues().increment();
                 if(destination.isAdvancedNetworkStatisticsEnabled() && 
getContext() != null && getContext().isNetworkConnection()) {
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
index 55a77550e1..510412b794 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
@@ -28,6 +28,7 @@ import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.NullMessageReference;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.usage.SystemUsage;
@@ -274,8 +275,27 @@ public class StoreDurableSubscriberCursor extends 
AbstractPendingMessageCursor {
 
     @Override
     public synchronized void remove(MessageReference node) {
-        for (PendingMessageCursor tsp : storePrefetches) {
-            tsp.remove(node);
+        // AMQ-9721 - Check if message is persistent or non-persistent.
+        // Removing from the non-persistent cursor requires searching the
+        // entire list if it's paged onto disk which is quite slow,
+        // so it doesn't make sense to try and remove as it will never
+        // exist if it's persistent.
+
+        // MessageReference can be a null reference if called from 
DurableSubscriptionView
+        // so we do not know if it's persistent and just need to search 
everything.
+        if (node instanceof NullMessageReference) {
+            for (PendingMessageCursor tsp : storePrefetches) {
+                tsp.remove(node);
+            }
+        } else if (node.isPersistent()) {
+            for (PendingMessageCursor tsp : storePrefetches) {
+                if (tsp.equals(nonPersistent)) {
+                    continue;
+                }
+                tsp.remove(node);
+            }
+        } else {
+            nonPersistent.remove(node);
         }
     }
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
index 183162238d..31776ca790 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
@@ -35,9 +35,12 @@ import jakarta.jms.TopicSubscriber;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.store.MessageStoreSubscriptionStatistics;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@@ -352,4 +355,55 @@ public class KahaDBPendingMessageCursorTest extends
 
     }
 
+    // Test for AMQ-9721
+    @Test
+    public void testDurableCursorRemoveRefMessageSize() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new 
ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        // send 100 persistent and non persistent
+        Topic topic = publishTestMessagesDurable(connection, new String[] 
{"sub1"}, 100,
+            publishedMessageSize, DeliveryMode.NON_PERSISTENT);
+        publishTestMessagesDurable(connection, new String[] {"sub1"}, 100,
+            publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+
+        // verify the count and size
+        verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
+
+        // Iterate and remove using the pending cursor to test removal
+        final DurableTopicSubscription sub = 
topic.getDurableTopicSubs().get(subKey);
+        PendingMessageCursor pending = sub.getPending();
+        try {
+            pending.reset();
+            while (pending.hasNext()) {
+                MessageReference node = pending.next();
+                node.decrementReferenceCount();
+                // test the remove(ref) method which has been updated
+                // to check persistence type
+                pending.remove(node);
+
+                // If persistent remove out of the store
+                if (node.isPersistent()) {
+                    MessageAck ack = new MessageAck();
+                    ack.setLastMessageId(node.getMessageId());
+                    ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+                    ack.setDestination(topic.getActiveMQDestination());
+                    topic.acknowledge(sub.getContext(), sub, ack, node);
+                }
+            }
+        } finally {
+            pending.release();
+        }
+
+        // verify everything has been cleared correctly, persistent and
+        // non-persistent
+        verifyPendingStats(topic, subKey, 0, 0);
+        // Memory usage should be 0 after removal
+        assertEquals(0, topic.getMemoryUsage().getUsage());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to