This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-6.1.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-6.1.x by this push:
     new 9a1f00b0f0 AMQ-9698 - Fix message expiration on durable subs (#1423)
9a1f00b0f0 is described below

commit 9a1f00b0f0b44e8dd781367fdf32d311e4f75fd3
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Wed May 7 10:39:10 2025 -0400

    AMQ-9698 - Fix message expiration on durable subs (#1423)
    
    This commit fixes multiple problems with handling message expiration on
    durable topic subscriptions.
    
    1) Memory usage tracking is fixed on expiration by correctly
    decrementing the counter inside AbstractStoreCursor when calling the
    remove(message) method, which was previously missed.
    2) A new refrence type is used to wrap references in TopicStorePrefetch
    so that if multiple subscriptions share a reference in their cursors
    each one can expire the message. Previously only one would expire as the
    message would be marked as expired and skipped.
    3) On client expiration, the references are properly decremented so
    memory tracking is correct.
    4) The expiration thread for Topics has been improved to be much more
    efficient for KahaDB by only scanning for expired messages if there are
    durables eligible for expiration. The thread also now checks the index
    to see if expired messages are associated with the subs still so we
    don't expire the same sub multiple times. Only messages that need to
    still be processed are returned which further cuts down memory usage.
    
    (cherry picked from commit 4abcfa1ad6808ba6786cf402b4feedef3659d0a2)
---
 .../broker/region/DurableTopicSubscription.java    |  10 +
 .../broker/region/PrefetchSubscription.java        |   8 +-
 .../org/apache/activemq/broker/region/Topic.java   |  64 ++++-
 .../broker/region/cursors/AbstractStoreCursor.java |  12 +-
 .../broker/region/cursors/TopicStorePrefetch.java  |  27 ++
 .../org/apache/activemq/store/MessageStore.java    |   6 +
 .../apache/activemq/store/ProxyMessageStore.java   |   4 +
 .../activemq/store/ProxyTopicMessageStore.java     |   9 +
 .../apache/activemq/store/TopicMessageStore.java   |  18 ++
 .../activemq/store/memory/MemoryMessageStore.java  |   5 +
 .../store/memory/MemoryTopicMessageStore.java      |   6 +
 .../activemq/store/jdbc/JDBCMessageStore.java      |   6 +
 .../activemq/store/jdbc/JDBCTopicMessageStore.java |   7 +
 .../apache/activemq/store/kahadb/KahaDBStore.java  |  61 ++++-
 .../activemq/store/kahadb/TempKahaDBStore.java     |  11 +
 .../JmsSendReceiveWithMessageExpirationTest.java   |  86 ++++++-
 .../activemq/broker/MessageExpirationTest.java     |   8 +
 .../cursors/AbstractPendingMessageCursorTest.java  |   2 +-
 .../cursors/StoreCursorRemoveFromCacheTest.java    | 143 +++++++++++
 .../region/cursors/StoreQueueCursorOrderTest.java  |   5 +
 .../activemq/bugs/MessageExpirationReaperTest.java |  58 ++++-
 .../store/kahadb/KahaDBRecoverExpiredTest.java     | 286 +++++++++++++++++++++
 22 files changed, 821 insertions(+), 21 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index e0dc6d0f07..9a7bc9a725 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -297,6 +297,16 @@ public class DurableTopicSubscription extends 
PrefetchSubscription implements Us
         pending.remove(node);
     }
 
+    @Override
+    protected void processExpiredAck(ConnectionContext context, Destination 
dest,
+        MessageReference node) {
+
+        // Each subscription needs to expire both on the store and
+        // decrement the reference count
+        super.processExpiredAck(context, dest, node);
+        node.decrementReferenceCount();
+    }
+
     @Override
     protected void doAddRecoveredMessage(MessageReference message) throws 
Exception {
         synchronized (pending) {
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index d59717444c..b2adf56a67 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -298,9 +298,8 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
                         inAckRange = true;
                     }
                     if (inAckRange) {
-                        Destination regionDestination = nodeDest;
                         if (broker.isExpired(node)) {
-                            regionDestination.messageExpired(context, this, 
node);
+                            processExpiredAck(context, nodeDest, node);
                         }
                         iter.remove();
                         decrementPrefetchCounter(node);
@@ -396,6 +395,11 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
         }
     }
 
+    protected void processExpiredAck(final ConnectionContext context, final 
Destination dest,
+        final MessageReference node) {
+        dest.messageExpired(context, this, node);
+    }
+
     private void registerRemoveSync(ConnectionContext context, final 
MessageReference node) {
         // setup a Synchronization to remove nodes from the
         // dispatched list.
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 61ecb7ade9..4a4b9c79b7 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -29,6 +31,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import java.util.stream.Collectors;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
@@ -52,6 +55,7 @@ import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore.StoreType;
 import org.apache.activemq.store.NoLocalSubscriptionAware;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
@@ -805,14 +809,60 @@ public class Topic extends BaseDestination implements 
Task {
     }
 
     private final AtomicBoolean expiryTaskInProgress = new 
AtomicBoolean(false);
-    private final Runnable expireMessagesWork = new Runnable() {
-        @Override
-        public void run() {
-            List<Message> browsedMessages = new InsertionCountList<Message>();
-            doBrowse(browsedMessages, getMaxExpirePageSize());
+    private final Runnable expireMessagesWork = () -> {
+        try {
+            final TopicMessageStore store = Topic.this.topicStore;
+            if (store != null && store.getType() == StoreType.KAHADB) {
+                if (store.getMessageCount() == 0) {
+                    LOG.debug("Skipping topic expiration check for {}, store 
size is 0", destination);
+                    return;
+                }
+
+                // get the sub keys that should be checked for expired messages
+                final var subs = durableSubscribers.entrySet().stream()
+                    .filter(entry -> isEligibleForExpiration(entry.getValue()))
+                    .map(Entry::getKey).collect(Collectors.toSet());
+
+                if (subs.isEmpty()) {
+                    LOG.debug("Skipping topic expiration check for {}, no 
eligible subscriptions to check", destination);
+                    return;
+                }
+
+                // For each eligible subscription, return the messages in the 
store that are expired
+                // The same message refs are shared between subs if duplicated 
so this is efficient
+                var expired = store.recoverExpired(subs, 
getMaxExpirePageSize());
+
+                final ConnectionContext connectionContext = 
createConnectionContext();
+                // Go through any expired messages and remove for each sub
+                for (Entry<SubscriptionKey, List<Message>> entry : 
expired.entrySet()) {
+                    DurableTopicSubscription sub = 
durableSubscribers.get(entry.getKey());
+                    List<Message> expiredMessages = entry.getValue();
+
+                    // If the sub still exists and there are expired messages 
then process
+                    if (sub != null && !expiredMessages.isEmpty()) {
+                        // There's a small race condition here if the sub 
comes online,
+                        // but it's not a big deal as at worst there maybe be 
duplicate acks for
+                        // the expired message but the store can handle it
+                        if (isEligibleForExpiration(sub)) {
+                            expiredMessages.forEach(message -> {
+                                message.setRegionDestination(Topic.this);
+                                messageExpired(connectionContext, sub, 
message);
+                            });
+                        }
+                    }
+                }
+            } else {
+                // If not KahaDB, fall back to the legacy browse method because
+                // the recoverExpired() method is not supported
+                doBrowse(new InsertionCountList<>(), getMaxExpirePageSize());
+            }
+        } catch (Throwable e) {
+            LOG.warn("Failed to expire messages on Topic: {}", 
getActiveMQDestination().getPhysicalName(), e);
+        } finally {
             expiryTaskInProgress.set(false);
         }
     };
+
     private final Runnable expireMessagesTask = new Runnable() {
         @Override
         public void run() {
@@ -906,6 +956,10 @@ public class Topic extends BaseDestination implements Task 
{
         }
     }
 
+    private static boolean isEligibleForExpiration(DurableTopicSubscription 
sub) {
+        return sub.isEnableMessageExpirationOnActiveDurableSubs() || 
!sub.isActive();
+    }
+
     public Map<SubscriptionKey, DurableTopicSubscription> 
getDurableTopicSubs() {
         return durableSubscribers;
     }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 30089e3551..8eec87e09f 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -114,7 +114,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
                 }
             }
             message.incrementReferenceCount();
-            batchList.addMessageLast(message);
+            batchList.addMessageLast(createBatchListRef(message));
             clearIterator(true);
             recovered = true;
         } else if (!cached) {
@@ -136,6 +136,10 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
         return recovered;
     }
 
+    protected MessageReference createBatchListRef(Message message) {
+        return message;
+    }
+
     protected boolean duplicateFromStoreExcepted(Message message) {
         // expected for messages pending acks with 
kahadb.concurrentStoreAndDispatchQueues=true for
         // which this existing unused flag has been repurposed
@@ -448,13 +452,15 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
 
     @Override
     public final synchronized void remove(MessageReference node) {
-        if (batchList.remove(node) != null) {
+        final PendingNode message = batchList.remove(node);
+        if (message != null) {
             size--;
             setCacheEnabled(false);
+            // decrement reference count if removed from batchList
+            message.getMessage().decrementReferenceCount();
         }
     }
 
-
     @Override
     public final synchronized void clear() {
         gc();
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
index a97b1e8b50..6b75806cc1 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.broker.region.IndirectMessageReference;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
@@ -151,6 +153,11 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         }
     }
 
+    @Override
+    protected MessageReference createBatchListRef(Message message) {
+        return new TopicStoreMessageReference(message);
+    }
+
     public byte getLastRecoveredPriority() {
         return lastRecoveredPriority;
     }
@@ -168,4 +175,24 @@ class TopicStorePrefetch extends AbstractStoreCursor {
     public String toString() {
         return "TopicStorePrefetch(" + clientId + "," + subscriberName + 
",storeHasMessages=" + this.storeHasMessages +") " + 
this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString();
     }
+
+    // This extends IndirectMessageReference to allow expiring messages for 
multiple
+    // durable subscriptions. Each durable subscription needs to ack the 
message in the store so
+    // each durable sub will now get their own reference so that the 
subscription can expire
+    // correctly and not just the first subscription.
+    static class TopicStoreMessageReference extends IndirectMessageReference {
+        private final AtomicBoolean processAsExpired = new 
AtomicBoolean(false);
+
+        public TopicStoreMessageReference(Message message) {
+            super(message);
+        }
+
+        @Override
+        public boolean canProcessAsExpired() {
+            // mark original message ref as expired, this won't be used
+            // by this class but someone may get the original message and 
check it
+            super.canProcessAsExpired();
+            return processAsExpired.compareAndSet(false, true);
+        }
+    }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java 
b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
index aee619a27a..83ac4bf384 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
@@ -209,4 +209,10 @@ public interface MessageStore extends Service {
     void updateMessage(Message message) throws IOException;
 
     void registerIndexListener(IndexListener indexListener);
+    StoreType getType();
+
+    enum StoreType {
+        MEMORY, JDBC, KAHADB, TEMP_KAHADB
+    }
+
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
index cd319a65d3..6cd466d142 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
@@ -180,4 +180,8 @@ public class ProxyMessageStore implements MessageStore {
         return delegate.getMessageStoreStatistics();
     }
 
+    @Override
+    public StoreType getType() {
+        return delegate.getType();
+    }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
index 09b6529fa9..23c9e81809 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
@@ -18,6 +18,9 @@ package org.apache.activemq.store;
 
 import java.io.IOException;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -25,6 +28,7 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.util.SubscriptionKey;
 
 /**
  * A simple proxy that delegates to another MessageStore.
@@ -235,4 +239,9 @@ public class ProxyTopicMessageStore extends 
ProxyMessageStore implements TopicMe
     public MessageStoreSubscriptionStatistics getMessageStoreSubStatistics() {
         return ((TopicMessageStore)delegate).getMessageStoreSubStatistics();
     }
+
+    @Override
+    public Map<SubscriptionKey, List<Message>> 
recoverExpired(Set<SubscriptionKey> subs, int max) throws Exception {
+        return ((TopicMessageStore)delegate).recoverExpired(subs, max);
+    }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
index 395f9f0f22..6111236149 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
@@ -20,10 +20,15 @@ import java.io.IOException;
 
 import jakarta.jms.JMSException;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.util.SubscriptionKey;
 
 /**
  * A MessageStore for durable topic subscriptions
@@ -154,4 +159,17 @@ public interface TopicMessageStore extends MessageStore {
      * @throws IOException
      */
     void addSubscription(SubscriptionInfo subscriptionInfo, boolean 
retroactive) throws IOException;
+
+    /**
+     * Iterates over the pending messages in a topic and recovers any expired 
messages found for
+     * each of the subscriptions up to the maximum number of messages to 
search. Only subscriptions
+     * that have at least 1 expired message will be returned in the map.
+     *
+     * @param subs
+     * @param max
+     * @return Expired messages for each subscription
+     * @throws Exception
+     */
+    Map<SubscriptionKey,List<Message>> recoverExpired(Set<SubscriptionKey> 
subs, int max) throws Exception;
+
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index 953c83e19a..ebeb6e0298 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -177,6 +177,11 @@ public class MemoryMessageStore extends 
AbstractMessageStore {
         }
     }
 
+    @Override
+    public StoreType getType() {
+        return StoreType.MEMORY;
+    }
+
     protected static final void incMessageStoreStatistics(final 
MessageStoreStatistics stats, final Message message) {
         if (stats != null && message != null) {
             stats.getMessageCount().increment();
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
index dd8be2be8e..13a13c0759 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import java.util.Set;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -180,6 +181,11 @@ public class MemoryTopicMessageStore extends 
MemoryMessageStore implements Topic
         }
     }
 
+    @Override
+    public Map<SubscriptionKey, List<Message>> 
recoverExpired(Set<SubscriptionKey> subs, int max) {
+        throw new UnsupportedOperationException("recoverExpired not 
supported");
+    }
+
     // Disabled for the memory store, can be enabled later if necessary
     private final MessageStoreSubscriptionStatistics stats = new 
MessageStoreSubscriptionStatistics(false);
 
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 8adc2f78ee..78c27b7183 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -482,4 +482,10 @@ public class JDBCMessageStore extends AbstractMessageStore 
{
         return destination.getPhysicalName() + ",pendingSize:" + 
pendingAdditions.size();
     }
 
+
+    @Override
+    public StoreType getType() {
+        return StoreType.JDBC;
+    }
+
 }
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
index a857bcf4ef..7f29a46a64 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -40,6 +41,7 @@ import 
org.apache.activemq.store.MessageStoreSubscriptionStatistics;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.SubscriptionKey;
 import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -360,6 +362,11 @@ public class JDBCTopicMessageStore extends 
JDBCMessageStore implements TopicMess
         }
     }
 
+    @Override
+    public Map<SubscriptionKey, List<Message>> 
recoverExpired(Set<SubscriptionKey> subs, int max) {
+        throw new UnsupportedOperationException("recoverExpired not 
supported");
+    }
+
     /**
      * @see 
org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
      *      String)
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 27dfa12718..e354fd09bd 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -81,12 +81,14 @@ import 
org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.disk.page.Transaction.CallableClosure;
 import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
 import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.SubscriptionKey;
 import org.apache.activemq.util.ThreadPoolUtils;
 import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
@@ -888,9 +890,14 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                 unlockAsyncJobQueue();
             }
         }
+
+        @Override
+        public StoreType getType() {
+            return StoreType.KAHADB;
+        }
     }
 
-    class KahaDBTopicMessageStore extends KahaDBMessageStore implements 
TopicMessageStore {
+    class KahaDBTopicMessageStore extends KahaDBMessageStore implements 
TopicMessageStore{
         private final AtomicInteger subscriptionCount = new AtomicInteger();
         protected final MessageStoreSubscriptionStatistics 
messageStoreSubStats =
                 new 
MessageStoreSubscriptionStatistics(isEnableSubscriptionStatistics());
@@ -1253,6 +1260,58 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
             }
         }
 
+        @Override
+        public Map<SubscriptionKey,List<Message>> 
recoverExpired(Set<SubscriptionKey> subscriptions, int max) throws Exception {
+            indexLock.writeLock().lock();
+            try {
+                return pageFile.tx().execute(
+                    (CallableClosure<Map<SubscriptionKey,List<Message>>, 
Exception>) tx -> {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        sd.orderIndex.resetCursorPosition();
+                        int count = 0;
+                        final Map<SubscriptionKey, List<Message>> expired = 
new HashMap<>();
+                        final Map<String, SubscriptionKey> subKeys = new 
HashMap<>();
+
+                        // Check each subscription and track the ones that 
exist
+                        for (SubscriptionKey sub : subscriptions) {
+                            final String subKeyString = 
subscriptionKey(sub.getClientId(), sub.getSubscriptionName());
+                            if (sd.subscriptionCache.contains(subKeyString)) {
+                                subKeys.put(subKeyString, sub);
+                            }
+                        }
+
+                        // Iterate one time through the topic and check each 
message, stopping if we run out
+                        // or reach the max
+                        for (Iterator<Entry<Long, MessageKeys>> iterator =
+                            sd.orderIndex.iterator(tx, new 
MessageOrderCursor()); count < max && iterator.hasNext(); ) {
+                            count++;
+                            Entry<Long, MessageKeys> entry = iterator.next();
+                            Set<String> ackedAndPrepared = 
ackedAndPreparedMap.get(destination.getPhysicalName());
+                            if (ackedAndPrepared != null && 
ackedAndPrepared.contains(entry.getValue().messageId)) {
+                                continue;
+                            }
+
+                            final Message msg = 
loadMessage(entry.getValue().location);
+                            if (msg.isExpired()) {
+                                // For every message that is expired, go 
through and check each subscription to see
+                                // if the message has already been acked. We 
don't want to return subs that have already
+                                // acked the message.
+                                for(Entry<String, SubscriptionKey> subKeyEntry 
: subKeys.entrySet()) {
+                                    SequenceSet sequence = 
sd.ackPositions.get(tx, subKeyEntry.getKey());
+                                    if (sequence != null && 
sequence.contains(entry.getKey())) {
+                                        List<Message> expMessages = 
expired.computeIfAbsent(subKeyEntry.getValue(), m -> new ArrayList<>());
+                                        expMessages.add(msg);
+                                    }
+                                }
+                            }
+                        }
+                        return expired;
+                    });
+            } finally {
+                indexLock.writeLock().unlock();
+            }
+        }
+
         @Override
         public void resetBatching(String clientId, String subscriptionName) {
             try {
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 7048b09a44..298f4bf8a5 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -63,6 +64,7 @@ import org.apache.activemq.store.kahadb.disk.page.Transaction;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.SubscriptionKey;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class TempKahaDBStore extends TempMessageDatabase implements 
PersistenceAdapter, BrokerServiceAware {
@@ -300,6 +302,10 @@ public class TempKahaDBStore extends TempMessageDatabase 
implements PersistenceA
             getMessageStoreStatistics().getMessageCount().setCount(count);
         }
 
+        @Override
+        public StoreType getType() {
+            return StoreType.TEMP_KAHADB;
+        }
     }
 
     class KahaDBTopicMessageStore extends KahaDBMessageStore implements 
TopicMessageStore {
@@ -333,6 +339,11 @@ public class TempKahaDBStore extends TempMessageDatabase 
implements PersistenceA
             process(command);
         }
 
+        @Override
+        public Map<SubscriptionKey, List<Message>> 
recoverExpired(Set<SubscriptionKey> subs, int max) {
+            throw new UnsupportedOperationException("recoverExpired not 
supported");
+        }
+
         @Override
         public void deleteSubscription(String clientId, String 
subscriptionName) throws IOException {
             KahaSubscriptionCommand command = new KahaSubscriptionCommand();
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
index db565dd31f..28c3af358d 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq;
 
+import static org.junit.Assert.assertEquals;
+
 import java.util.Date;
 import java.util.Vector;
 import java.util.concurrent.TimeUnit;
@@ -31,8 +33,12 @@ import jakarta.jms.Session;
 import jakarta.jms.Topic;
 
 import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,15 +57,20 @@ public class JmsSendReceiveWithMessageExpirationTest 
extends TestSupport {
     protected Destination producerDestination;
     protected boolean durable;
     protected int deliveryMode = DeliveryMode.PERSISTENT;
-    protected long timeToLive = 5000;
+    protected long timeToLive = 3000;
     protected boolean verbose;
 
     protected Connection connection;
+    protected BrokerService brokerService;
 
     protected void setUp() throws Exception {
 
         super.setUp();
 
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.start();
+
         data = new String[messageCount];
 
         for (int i = 0; i < messageCount; i++) {
@@ -225,7 +236,8 @@ public class JmsSendReceiveWithMessageExpirationTest 
extends TestSupport {
         consumerDestination = session.createTopic(getConsumerSubject());
         producerDestination = session.createTopic(getProducerSubject());
 
-        MessageConsumer consumer = createConsumer();
+        MessageConsumer consumer1 = createConsumer();
+        MessageConsumer consumer2 =  
session.createConsumer(consumerDestination);
         connection.start();
 
         for (int i = 0; i < data.length; i++) {
@@ -247,7 +259,64 @@ public class JmsSendReceiveWithMessageExpirationTest 
extends TestSupport {
         Thread.sleep(timeToLive + 1000);
 
         // message should have expired.
-        assertNull(consumer.receive(1000));
+        assertNull(consumer1.receive(1000));
+        assertNull(consumer2.receive(100));
+
+        for (Subscription consumer : brokerService.getDestination(
+                (ActiveMQDestination) consumerDestination)
+            .getConsumers()) {
+            assertEquals(0, consumer.getPendingQueueSize());
+        }
+
+        // Memory usage should be 0 after expiration
+        assertEquals(0, brokerService.getDestination((ActiveMQDestination) 
consumerDestination)
+            .getMemoryUsage().getUsage());
+    }
+
+    public void testConsumeExpiredTopicDurable() throws Exception {
+        brokerService.stop();
+
+        // Use persistent broker and durables so restart
+        brokerService = new BrokerService();
+        brokerService.setPersistent(true);
+        brokerService.start();
+        connection.close();
+        connection = createConnection();
+        connection.setClientID(getClass().getName());
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = createProducer(timeToLive);
+        Topic topic = session.createTopic("test.expiration.topic");
+        MessageConsumer consumer1 = session.createDurableSubscriber(topic, 
"sub1");
+        MessageConsumer consumer2 = session.createDurableSubscriber(topic, 
"sub2");
+
+        for (int i = 0; i < data.length; i++) {
+            Message message = session.createTextMessage(data[i]);
+            message.setStringProperty("stringProperty", data[i]);
+            message.setIntProperty("intProperty", i);
+            producer.send(topic, message);
+        }
+
+        // sleeps a second longer than the expiration time.
+        // Basically waits till topic expires.
+        Thread.sleep(timeToLive + 1000);
+
+        // message should have expired for both clients
+        assertNull(consumer1.receive(1000));
+        assertNull(consumer2.receive(100));
+
+        TopicMessageStore store = (TopicMessageStore) 
brokerService.getDestination((ActiveMQDestination) topic).getMessageStore();
+        assertEquals(0, store.getMessageCount(getClass().getName(), "sub1"));
+        assertEquals(0, store.getMessageCount(getClass().getName(), "sub2"));
+
+        for (Subscription consumer : 
brokerService.getDestination((ActiveMQDestination) topic)
+            .getConsumers()) {
+            assertEquals(0, consumer.getPendingQueueSize());
+        }
+        // Memory usage should be 0 after expiration
+        assertEquals(0, brokerService.getDestination((ActiveMQDestination) 
topic)
+            .getMemoryUsage().getUsage());
     }
 
     /**
@@ -303,8 +372,15 @@ public class JmsSendReceiveWithMessageExpirationTest 
extends TestSupport {
         LOG.info("Dumping stats...");
         LOG.info("Closing down connection");
 
-        session.close();
-        connection.close();
+        try {
+            session.close();
+            connection.close();
+        } catch (Exception e) {
+            // ignore
+        }
+        if (brokerService != null) {
+            brokerService.stop();
+        }
     }
 
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
index 422bfd8663..281f10de74 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker;
 
+import static org.junit.Assert.assertEquals;
+
 import jakarta.jms.DeliveryMode;
 
 import junit.framework.Test;
@@ -30,6 +32,7 @@ import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.util.Wait;
 
 public class MessageExpirationTest extends BrokerTestSupport {
 
@@ -261,6 +264,11 @@ public class MessageExpirationTest extends 
BrokerTestSupport {
         assertNoMessagesLeft(connection);
 
         connection.send(closeConnectionInfo(connectionInfo));
+
+        if (!destination.isTemporary()) {
+            assertTrue(Wait.waitFor(
+                () -> 
broker.getDestination(destination).getMemoryUsage().getUsage() == 0, 1000, 
100));
+        }
     }
 
     public static Test suite() {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
index 0ddea1ed73..6e3ca90717 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
@@ -76,7 +76,7 @@ public abstract class AbstractPendingMessageCursorTest 
extends AbstractStoreStat
     protected boolean enableSubscriptionStatistics;
 
     @Rule
-    public Timeout globalTimeout= new Timeout(60, TimeUnit.SECONDS);
+    public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS);
 
     /**
      * @param prioritizedMessages
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreCursorRemoveFromCacheTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreCursorRemoveFromCacheTest.java
new file mode 100644
index 0000000000..4dd93646ed
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreCursorRemoveFromCacheTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.function.BiConsumer;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.usage.SystemUsage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class StoreCursorRemoveFromCacheTest {
+
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder();
+
+    private final ActiveMQQueue destination = new ActiveMQQueue("queue");
+    private BrokerService broker;
+    private SystemUsage systemUsage;
+    private KahaDBStore store;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setPersistent(true);
+        KahaDBStore store = new KahaDBStore();
+        store.setDirectory(dataFileDir.getRoot());
+        broker.setPersistenceAdapter(store);
+        broker.start();
+        systemUsage = broker.getSystemUsage();
+        this.store = (KahaDBStore) broker.getPersistenceAdapter();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    @Test(timeout = 10000)
+    public void testRemoveFromCacheIterator() throws Exception {
+        testRemoveFromCache((cursor, ref) -> {
+            // test removing using the iterator
+            cursor.remove();
+        });
+    }
+
+    @Test(timeout = 10000)
+    public void testRemoveFromCacheRemoveMethod() throws Exception {
+        testRemoveFromCache((cursor, ref) -> {
+            // test using the remove method directly
+            // remove should also decrement after AMQ-9698, previously it did 
not
+            cursor.remove(ref);
+            assertEquals(0, ref.getReferenceCount());
+
+            // Call a second time to make sure we don't go negative
+            // and it will skip
+            cursor.remove(ref);
+            assertEquals(0, ref.getReferenceCount());
+        });
+    }
+
+    private void testRemoveFromCache(BiConsumer<QueueStorePrefetch, 
MessageReference> remove) throws Exception {
+        var systemUsage = broker.getSystemUsage();
+        final KahaDBStore store = (KahaDBStore) broker.getPersistenceAdapter();
+        final MessageStore messageStore = 
store.createQueueMessageStore(destination);
+        final Queue queue = new Queue(broker, destination, messageStore, new 
DestinationStatistics(), null);
+        var memoryUsage = queue.getMemoryUsage();
+
+        // create cursor and make sure cache is enabled
+        QueueStorePrefetch cursor = new QueueStorePrefetch(queue, 
broker.getBroker());
+        cursor.setSystemUsage(systemUsage);
+        cursor.start();
+        assertTrue("cache enabled", cursor.isUseCache() && 
cursor.isCacheEnabled());
+
+        for (int i = 0; i < 10; i++) {
+            ActiveMQTextMessage msg = getMessage(i);
+            msg.setMemoryUsage(memoryUsage);
+            cursor.addMessageLast(msg);
+            // reference count of 1 for the cache
+            assertEquals(1, msg.getReferenceCount());
+        }
+
+        assertTrue(memoryUsage.getUsage() > 0);
+
+        cursor.reset();
+        while (cursor.hasNext()) {
+            // next will increment again so need to decrement the reference
+            // next is required to be called for remove() to work
+            var ref = cursor.next();
+            assertEquals(2, ref.getReferenceCount());
+            ref.decrementReferenceCount();
+            remove.accept(cursor, ref);
+            assertEquals(0, ref.getReferenceCount());
+        }
+
+        assertEquals(0, memoryUsage.getUsage());
+        assertEquals(0, cursor.size());
+    }
+
+    private ActiveMQTextMessage getMessage(int i) throws Exception {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        MessageId id = new MessageId("11111:22222:" + i);
+        id.setBrokerSequenceId(i);
+        id.setProducerSequenceId(i);
+        message.setMessageId(id);
+        message.setDestination(destination);
+        message.setPersistent(true);
+        message.setResponseRequired(true);
+        message.setText("Msg:" + i + " " + "test");
+        assertEquals(message.getMessageId().getProducerSequenceId(), i);
+        return message;
+    }
+
+}
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
index 5a1ab90d58..771b884d80 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
@@ -516,6 +516,11 @@ public class StoreQueueCursorOrderTest {
             batch.incrementAndGet();
         }
 
+        @Override
+        public StoreType getType() {
+            return StoreType.MEMORY;
+        }
+
         @Override
         public void recoverMessageStoreStatistics() throws IOException {
             this.getMessageStoreStatistics().reset();
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
index a9028e42f6..035ba0e29f 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.bugs;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import jakarta.jms.*;
 import javax.management.ObjectName;
@@ -30,6 +31,7 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -164,11 +166,59 @@ public class MessageExpirationReaperTest {
         consumer.close();
 
         // Let the messages reach an expiry time
-        Thread.sleep(2000);
+        assertTrue("Incorrect queue size count", Wait.waitFor(() -> 
view.getQueueSize() == 0,
+            3000, 100));
+        assertTrue("Incorrect inflight count: " + view.getInFlightCount(),
+            Wait.waitFor(() -> view.getInFlightCount() == 0, 3000, 100));
+        assertTrue("Incorrect expired size count",  Wait.waitFor(() -> 
view.getEnqueueCount() == view.getExpiredCount(),
+            3000, 100));
+
+        // Memory usage should be 0 after expiration
+        assertEquals(0, 
broker.getDestination(destination).getMemoryUsage().getUsage());
+    }
 
-        assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 
0, view.getInFlightCount());
-        assertEquals("Incorrect queue size count", 0, view.getQueueSize());
-        assertEquals("Incorrect expired size count", view.getEnqueueCount(), 
view.getExpiredCount());
+    @Test
+    public void testExpiredMessagesOnTopic2Durables() throws Exception{
+        Session session = createSession();
+
+        // use a zero prefetch so messages don't go inflight
+        ActiveMQTopic destination = new ActiveMQTopic(destinationName + 
"?consumer.prefetchSize=0");
+
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = 
session.createDurableSubscriber(destination, "test-durable");
+        MessageConsumer consumer2 = 
session.createDurableSubscriber(destination, "test-durable-2");
+
+        producer.setTimeToLive(500);
+
+        final int count = 3;
+        // Send some messages with an expiration
+        for (int i = 0; i < count; i++) {
+            TextMessage message = session.createTextMessage("" + i);
+            producer.send(message);
+        }
+
+        DestinationViewMBean view = createView(destination);
+        // not expired yet...
+        assertEquals("Incorrect enqueue count", 3, view.getEnqueueCount() );
+
+        // close consumer so topic thinks consumer is inactive
+        consumer.close();
+        consumer2.close();
+
+        // Let the messages reach an expiry time
+        assertTrue("Incorrect queue size count", Wait.waitFor(() -> 
view.getQueueSize() == 0,
+            3000, 100));
+        assertTrue("Incorrect inflight count: " + view.getInFlightCount(),
+            Wait.waitFor(() -> view.getInFlightCount() == 0, 3000, 100));
+
+        // should be 2x enqueued for 2 subs
+        assertTrue("Incorrect expired size count",  Wait.waitFor(() -> 
view.getEnqueueCount() * 2 == view.getExpiredCount(),
+            3000, 100));
+
+        // check store and memory usage
+        org.apache.activemq.broker.region.Destination brokerDest = 
broker.getDestination(destination);
+        assertEquals("Incorrect queue size count", 0, 
brokerDest.getMessageStore().getMessageCount());
+        assertEquals(0, brokerDest.getMemoryUsage().getUsage());
     }
 
     protected DestinationViewMBean createView(ActiveMQDestination destination) 
throws Exception {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBRecoverExpiredTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBRecoverExpiredTest.java
new file mode 100644
index 0000000000..a153a00676
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBRecoverExpiredTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import jakarta.jms.Connection;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+import jakarta.jms.TopicSession;
+import java.io.File;
+import java.net.URI;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.util.SubscriptionKey;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+
+/**
+ * Test for {@link TopicMessageStore#recoverExpired(Set, int)}
+ */
+public class KahaDBRecoverExpiredTest {
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS);
+
+  @Rule
+  public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+
+  private BrokerService broker;
+  private URI brokerConnectURI;
+  private final ActiveMQTopic topic = new ActiveMQTopic("test.topic");
+  private final SubscriptionKey subKey1 = new SubscriptionKey("clientId", 
"sub1");
+  private final SubscriptionKey subKey2 = new SubscriptionKey("clientId", 
"sub2");
+
+  @Before
+  public void startBroker() throws Exception {
+    broker = new BrokerService();
+    broker.setPersistent(true);
+    KahaDBPersistenceAdapter persistenceAdapter = new 
KahaDBPersistenceAdapter();
+    persistenceAdapter.setDirectory(dataFileDir.getRoot());
+    broker.setPersistenceAdapter(persistenceAdapter);
+    //set up a transport
+    TransportConnector connector = broker
+        .addConnector(new TransportConnector());
+    connector.setUri(new URI("tcp://0.0.0.0:0"));
+    connector.setName("tcp");
+    broker.start();
+    broker.waitUntilStarted();
+    brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
+  }
+
+  @After
+  public void stopBroker() throws Exception {
+    broker.stop();
+    broker.waitUntilStopped();
+  }
+
+  private Session initializeSubs() throws JMSException {
+    Connection connection = new 
ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+    connection.setClientID("clientId");
+    connection.start();
+
+    Session session = connection.createSession(false, 
TopicSession.AUTO_ACKNOWLEDGE);
+    session.createDurableSubscriber(topic, "sub1");
+    session.createDurableSubscriber(topic, "sub2");
+
+    return session;
+  }
+
+  // test recover expired works in general, verify does not return
+  // expired if subs have already acked
+  @Test
+  public void testRecoverExpired() throws Exception {
+    try (Session session = initializeSubs()) {
+      MessageProducer prod = session.createProducer(topic);
+
+      Destination dest = broker.getDestination(topic);
+      TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
+
+      // nothing should be expired yet, no messags
+      var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      assertTrue(expired.isEmpty());
+
+      // Sent 10 messages, alternating no expiration and 1 second ttl
+      for (int i = 0; i < 10; i++) {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setText("message" + i);
+        var ttl = i % 2 == 0 ? 1000 : 0;
+        prod.send(message, Message.DEFAULT_DELIVERY_MODE, 
Message.DEFAULT_PRIORITY, ttl);
+      }
+
+      // wait for the time to pass the point of needing expiration
+      Thread.sleep(1500);
+      // We should now find both durables have 5 expired messages
+      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      assertEquals(2, expired.size());
+      assertEquals(5, expired.get(subKey1).size());
+      assertEquals(5, expired.get(subKey2).size());
+
+      // Acknowledge the first 2 messages of only the first sub
+      for (int i = 0; i < 2; i++) {
+        MessageAck ack = new MessageAck();
+        ack.setLastMessageId(expired.get(subKey1).get(i).getMessageId());
+        ack.setAckType(MessageAck.EXPIRED_ACK_TYPE);
+        ack.setDestination(topic);
+        store.acknowledge(broker.getAdminConnectionContext(),"clientId", 
"sub1",
+            ack.getLastMessageId(), ack);
+      }
+
+      // Now the first sub should only have 3 expired, but still 5 on the 
second
+      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      assertEquals(3, expired.get(subKey1).size());
+      assertEquals(5, expired.get(subKey2).size());
+
+      // ack all remaining
+      for (Entry<SubscriptionKey, List<org.apache.activemq.command.Message>> 
entry : expired.entrySet()) {
+        for (org.apache.activemq.command.Message message : entry.getValue()) {
+          MessageAck ack = new MessageAck();
+          ack.setLastMessageId(message.getMessageId());
+          ack.setAckType(MessageAck.EXPIRED_ACK_TYPE);
+          ack.setDestination(topic);
+          
store.acknowledge(broker.getAdminConnectionContext(),entry.getKey().getClientId(),
+              entry.getKey().getSubscriptionName(), ack.getLastMessageId(), 
ack);
+        }
+      }
+
+      // should be empty again
+      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      assertTrue(expired.isEmpty());
+    }
+
+  }
+
+  // test max number of messages to check works
+  @Test
+  public void testRecoverExpiredMax() throws Exception {
+    try (Session session = initializeSubs()) {
+      MessageProducer prod = session.createProducer(topic);
+
+      Destination dest = broker.getDestination(topic);
+      TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
+
+      // nothing should be expired yet, no messags
+      var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      assertTrue(expired.isEmpty());
+
+      // Sent 50 messages with no ttl followed by 50 with ttl
+      ActiveMQTextMessage message = new ActiveMQTextMessage();
+      for (int i = 0; i < 100; i++) {
+        message.setText("message" + i);
+        var ttl = i >= 50 ? 1000 : 0;
+        prod.send(message, Message.DEFAULT_DELIVERY_MODE, 
Message.DEFAULT_PRIORITY, ttl);
+      }
+
+      // wait for the time to pass the point of needing expiration
+      Thread.sleep(1500);
+
+      // We should now find both durables have 50 expired messages
+      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      assertEquals(2, expired.size());
+      assertEquals(50, expired.get(subKey1).size());
+      assertEquals(50, expired.get(subKey2).size());
+
+      // Max is 50, should find none expired
+      expired = store.recoverExpired(Set.of(subKey1, subKey2), 50);
+      assertTrue(expired.isEmpty());
+
+      // We should now find both durables have 25 expired messages with
+      // max at 75
+      expired = store.recoverExpired(Set.of(subKey1, subKey2), 75);
+      assertEquals(2, expired.size());
+      assertEquals(25, expired.get(subKey1).size());
+      assertEquals(25, expired.get(subKey2).size());
+
+      // Acknowledge the first 25 messages of only the first sub
+      for (int i = 0; i < 25; i++) {
+        MessageAck ack = new MessageAck();
+        ack.setLastMessageId(expired.get(subKey1).get(i).getMessageId());
+        ack.setAckType(MessageAck.EXPIRED_ACK_TYPE);
+        ack.setDestination(topic);
+        store.acknowledge(broker.getAdminConnectionContext(),"clientId", 
"sub1",
+            ack.getLastMessageId(), ack);
+      }
+
+      // We should now find 25 on sub1 and 50 on sub2 with a max of 100
+      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      assertEquals(2, expired.size());
+      assertEquals(25, expired.get(subKey1).size());
+      assertEquals(50, expired.get(subKey2).size());
+
+    }
+
+  }
+
+  // Test that filtering works by the set of subscriptions
+  @Test
+  public void testRecoverExpiredSubSet() throws Exception {
+    try (Session session = initializeSubs()) {
+      MessageProducer prod = session.createProducer(topic);
+
+      Destination dest = broker.getDestination(topic);
+      TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
+
+      // nothing should be expired yet, no messags
+      var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      assertTrue(expired.isEmpty());
+
+      // Send 10 expired
+      for (int i = 0; i < 10; i++) {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setText("message" + i);
+        prod.send(message, Message.DEFAULT_DELIVERY_MODE, 
Message.DEFAULT_PRIORITY, 1000);
+      }
+
+      // wait for the time to pass the point of needing expiration
+      Thread.sleep(1500);
+
+      // Test getting each sub individually, get sub2 first
+      expired = store.recoverExpired(Set.of(subKey2), 100);
+      assertEquals(1, expired.size());
+      assertEquals(10, expired.get(subKey2).size());
+
+      // ack the first message of sub2
+      MessageAck ack = new MessageAck();
+      ack.setLastMessageId(expired.get(subKey2).get(0).getMessageId());
+      ack.setAckType(MessageAck.EXPIRED_ACK_TYPE);
+      ack.setDestination(topic);
+      store.acknowledge(broker.getAdminConnectionContext(),"clientId", "sub2",
+          ack.getLastMessageId(), ack);
+
+      // check only sub2 has 9
+      expired = store.recoverExpired(Set.of(subKey2), 100);
+      assertEquals(1, expired.size());
+      assertEquals(9, expired.get(subKey2).size());
+
+      // check only sub1 still has 10
+      expired = store.recoverExpired(Set.of(subKey1), 100);
+      assertEquals(1, expired.size());
+      assertEquals(10, expired.get(subKey1).size());
+
+      // verify passing in unmatched sub leaves it out of the result set
+      var unmatched = new SubscriptionKey("clientId", "sub3");
+      expired = store.recoverExpired(Set.of(unmatched), 100);
+      assertTrue(expired.isEmpty());
+
+      // try 2 that exist and 1 that doesn't
+      expired = store.recoverExpired(Set.of(subKey1, subKey2, unmatched), 100);
+      assertEquals(2, expired.size());
+
+    }
+
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to