This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 6dac231f8d AMQ-9698 - Add recovery listener to store recoverExpired() 
method
6dac231f8d is described below

commit 6dac231f8d4da64ac160b6ff790bd543aaa70428
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Thu May 8 08:57:09 2025 -0400

    AMQ-9698 - Add recovery listener to store recoverExpired() method
    
    This allows the store to check if the memory usage is full in the system
    before continuing to try and load messages to expire on durable subs
---
 .../org/apache/activemq/broker/region/Topic.java   |  33 ++++-
 .../activemq/store/ProxyTopicMessageStore.java     |   5 +-
 .../apache/activemq/store/TopicMessageStore.java   |  20 +--
 .../store/memory/MemoryTopicMessageStore.java      |   3 +-
 .../activemq/store/jdbc/JDBCTopicMessageStore.java |   3 +-
 .../apache/activemq/store/kahadb/KahaDBStore.java  |  16 ++-
 .../activemq/store/kahadb/TempKahaDBStore.java     |   3 +-
 .../activemq/bugs/MessageExpirationReaperTest.java |  34 ++++++
 .../store/kahadb/KahaDBRecoverExpiredTest.java     | 136 ++++++++++++++++++---
 9 files changed, 217 insertions(+), 36 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 441e4c4765..b24c71b62f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -22,7 +22,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -54,7 +53,6 @@ import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
-import org.apache.activemq.management.MessageFlowStats;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore.StoreType;
 import org.apache.activemq.store.NoLocalSubscriptionAware;
@@ -829,6 +827,34 @@ public class Topic extends BaseDestination implements Task 
{
         }
     }
 
+
+    /**
+     * Simple recovery listener that will check if the topic memory usage is 
full
+     * when hasSpace() is called. This could be enhanced in the future if 
needed.
+     */
+    private final MessageRecoveryListener expiryListener =  new 
MessageRecoveryListener() {
+
+        @Override
+        public boolean recoverMessage(Message message) {
+            return true;
+        }
+
+        @Override
+        public boolean recoverMessageReference(MessageId ref)  {
+            return true;
+        }
+
+        @Override
+        public boolean hasSpace() {
+            return !Topic.this.memoryUsage.isFull();
+        }
+
+        @Override
+        public boolean isDuplicate(MessageId ref) {
+            return false;
+        }
+    };
+
     private final AtomicBoolean expiryTaskInProgress = new 
AtomicBoolean(false);
     private final Runnable expireMessagesWork = () -> {
         try {
@@ -851,7 +877,8 @@ public class Topic extends BaseDestination implements Task {
 
                 // For each eligible subscription, return the messages in the 
store that are expired
                 // The same message refs are shared between subs if duplicated 
so this is efficient
-                var expired = store.recoverExpired(subs, 
getMaxExpirePageSize());
+                var expired = store.recoverExpired(subs, 
getMaxExpirePageSize(),
+                    expiryListener);
 
                 final ConnectionContext connectionContext = 
createConnectionContext();
                 // Go through any expired messages and remove for each sub
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
index 23c9e81809..d9b92500c0 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
@@ -241,7 +241,8 @@ public class ProxyTopicMessageStore extends 
ProxyMessageStore implements TopicMe
     }
 
     @Override
-    public Map<SubscriptionKey, List<Message>> 
recoverExpired(Set<SubscriptionKey> subs, int max) throws Exception {
-        return ((TopicMessageStore)delegate).recoverExpired(subs, max);
+    public Map<SubscriptionKey, List<Message>> 
recoverExpired(Set<SubscriptionKey> subs, int max,
+        MessageRecoveryListener listener) throws Exception {
+        return ((TopicMessageStore)delegate).recoverExpired(subs, max, 
listener);
     }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
index 6111236149..11df40ff68 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
@@ -161,15 +161,21 @@ public interface TopicMessageStore extends MessageStore {
     void addSubscription(SubscriptionInfo subscriptionInfo, boolean 
retroactive) throws IOException;
 
     /**
-     * Iterates over the pending messages in a topic and recovers any expired 
messages found for
-     * each of the subscriptions up to the maximum number of messages to 
search. Only subscriptions
-     * that have at least 1 expired message will be returned in the map.
+     * Iterates over the pending messages in a topic and recovers any expired 
messages found for each
+     * of the subscriptions up to the maximum number of messages to search. 
Only subscriptions that
+     * have at least 1 expired message will be returned in the map.
+     * <br>
+     * The expiry listener is only used to verify if there is space. Messages 
that are expired
+     * and will be added to 1 or more subscription in the returned map will be 
passed to
+     * the callback. The callback will only be called once per each unique 
message.
+     *
+     * @param subs The subscription keys to check for expired messages
+     * @param maxBrowse The maximum number of messages to check
+     * @param listener
      *
-     * @param subs
-     * @param max
      * @return Expired messages for each subscription
-     * @throws Exception
      */
-    Map<SubscriptionKey,List<Message>> recoverExpired(Set<SubscriptionKey> 
subs, int max) throws Exception;
+    Map<SubscriptionKey,List<Message>> recoverExpired(Set<SubscriptionKey> 
subs, int maxBrowse,
+        MessageRecoveryListener listener) throws Exception;
 
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
index 13a13c0759..0aa6dc6ce0 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
@@ -182,7 +182,8 @@ public class MemoryTopicMessageStore extends 
MemoryMessageStore implements Topic
     }
 
     @Override
-    public Map<SubscriptionKey, List<Message>> 
recoverExpired(Set<SubscriptionKey> subs, int max) {
+    public Map<SubscriptionKey, List<Message>> 
recoverExpired(Set<SubscriptionKey> subs, int max,
+        MessageRecoveryListener listener) {
         throw new UnsupportedOperationException("recoverExpired not 
supported");
     }
 
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
index 7f29a46a64..26390ba308 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
@@ -363,7 +363,8 @@ public class JDBCTopicMessageStore extends JDBCMessageStore 
implements TopicMess
     }
 
     @Override
-    public Map<SubscriptionKey, List<Message>> 
recoverExpired(Set<SubscriptionKey> subs, int max) {
+    public Map<SubscriptionKey, List<Message>> 
recoverExpired(Set<SubscriptionKey> subs, int max,
+        MessageRecoveryListener listener) {
         throw new UnsupportedOperationException("recoverExpired not 
supported");
     }
 
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 99df6bb03f..f52c49421e 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -1331,11 +1331,11 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
         }
 
         @Override
-        public Map<SubscriptionKey,List<Message>> 
recoverExpired(Set<SubscriptionKey> subscriptions, int max) throws Exception {
+        public Map<SubscriptionKey,List<Message>> 
recoverExpired(Set<SubscriptionKey> subscriptions, int maxBrowse,
+            MessageRecoveryListener listener) throws Exception {
             indexLock.writeLock().lock();
             try {
-                return pageFile.tx().execute(
-                    (CallableClosure<Map<SubscriptionKey,List<Message>>, 
Exception>) tx -> {
+                return pageFile.tx().execute(tx -> {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         sd.orderIndex.resetCursorPosition();
                         int count = 0;
@@ -1351,9 +1351,10 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                         }
 
                         // Iterate one time through the topic and check each 
message, stopping if we run out
-                        // or reach the max
+                        // hit the max browse limit, or if the listener 
returns false for hasSpace()
+                        final Set<Long> uniqueExpired = new HashSet<>();
                         for (Iterator<Entry<Long, MessageKeys>> iterator =
-                            sd.orderIndex.iterator(tx, new 
MessageOrderCursor()); count < max && iterator.hasNext(); ) {
+                            sd.orderIndex.iterator(tx, new 
MessageOrderCursor()); count < maxBrowse && iterator.hasNext() && 
listener.hasSpace(); ) {
                             count++;
                             Entry<Long, MessageKeys> entry = iterator.next();
                             Set<String> ackedAndPrepared = 
ackedAndPreparedMap.get(destination.getPhysicalName());
@@ -1371,6 +1372,11 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                                     if (sequence != null && 
sequence.contains(entry.getKey())) {
                                         List<Message> expMessages = 
expired.computeIfAbsent(subKeyEntry.getValue(), m -> new ArrayList<>());
                                         expMessages.add(msg);
+                                        // pass unique messages to the 
listener so it can do any custom
+                                        // handling for the next call to 
listener.hasSpace()
+                                        if (uniqueExpired.add(entry.getKey())) 
{
+                                            listener.recoverMessage(msg);
+                                        }
                                     }
                                 }
                             }
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 942076b2f1..6bbf9534ee 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -339,7 +339,8 @@ public class TempKahaDBStore extends TempMessageDatabase 
implements PersistenceA
         }
 
         @Override
-        public Map<SubscriptionKey, List<Message>> 
recoverExpired(Set<SubscriptionKey> subs, int max) {
+        public Map<SubscriptionKey, List<Message>> 
recoverExpired(Set<SubscriptionKey> subs, int max,
+            MessageRecoveryListener listener) {
             throw new UnsupportedOperationException("recoverExpired not 
supported");
         }
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
index 035ba0e29f..d133b22c2e 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
@@ -221,6 +221,40 @@ public class MessageExpirationReaperTest {
         assertEquals(0, brokerDest.getMemoryUsage().getUsage());
     }
 
+    @Test
+    public void testExpiredMessagesOnTopicRecoveryListener() throws Exception{
+        Session session = createSession();
+
+        // use a zero prefetch so messages don't go inflight
+        ActiveMQTopic destination = new ActiveMQTopic(destinationName + 
"?consumer.prefetchSize=0");
+
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = 
session.createDurableSubscriber(destination, "test-durable");
+        producer.setTimeToLive(1000);
+
+        final int count = 3;
+        // Send some messages with an expiration
+        for (int i = 0; i < count; i++) {
+            TextMessage message = session.createTextMessage("" + i);
+            producer.send(message);
+        }
+
+        // Set a very low memory usage so we will be > 100% which should 
prevent the expiry
+        // thread from continuing
+        broker.getSystemUsage().getMemoryUsage().setLimit(1024);
+        DestinationViewMBean view = createView(destination);
+
+        // not expired yet...
+        assertEquals("Incorrect enqueue count", 3, view.getEnqueueCount() );
+
+        // close consumer so topic thinks consumer is inactive
+        consumer.close();
+
+        // Memory is > 100% so we shouldn't expire
+        Thread.sleep(3000);
+        assertEquals(0,  view.getExpiredCount());
+    }
+
     protected DestinationViewMBean createView(ActiveMQDestination destination) 
throws Exception {
         String domain = "org.apache.activemq";
         ObjectName name;
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBRecoverExpiredTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBRecoverExpiredTest.java
index a153a00676..3aaef774ba 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBRecoverExpiredTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBRecoverExpiredTest.java
@@ -27,17 +27,22 @@ import jakarta.jms.Session;
 import jakarta.jms.TopicSession;
 import java.io.File;
 import java.net.URI;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.util.SubscriptionKey;
 import org.junit.After;
@@ -48,7 +53,7 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
 
 /**
- * Test for {@link TopicMessageStore#recoverExpired(Set, int)}
+ * Test for {@link TopicMessageStore#recoverExpired(Set, int, 
org.apache.activemq.store.MessageRecoveryListener)}
  */
 public class KahaDBRecoverExpiredTest {
 
@@ -110,7 +115,7 @@ public class KahaDBRecoverExpiredTest {
       TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
 
       // nothing should be expired yet, no messags
-      var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, 
listener);
       assertTrue(expired.isEmpty());
 
       // Sent 10 messages, alternating no expiration and 1 second ttl
@@ -124,7 +129,7 @@ public class KahaDBRecoverExpiredTest {
       // wait for the time to pass the point of needing expiration
       Thread.sleep(1500);
       // We should now find both durables have 5 expired messages
-      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, listener);
       assertEquals(2, expired.size());
       assertEquals(5, expired.get(subKey1).size());
       assertEquals(5, expired.get(subKey2).size());
@@ -140,7 +145,7 @@ public class KahaDBRecoverExpiredTest {
       }
 
       // Now the first sub should only have 3 expired, but still 5 on the 
second
-      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, listener);
       assertEquals(3, expired.get(subKey1).size());
       assertEquals(5, expired.get(subKey2).size());
 
@@ -157,7 +162,7 @@ public class KahaDBRecoverExpiredTest {
       }
 
       // should be empty again
-      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, listener);
       assertTrue(expired.isEmpty());
     }
 
@@ -173,7 +178,7 @@ public class KahaDBRecoverExpiredTest {
       TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
 
       // nothing should be expired yet, no messags
-      var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, 
listener);
       assertTrue(expired.isEmpty());
 
       // Sent 50 messages with no ttl followed by 50 with ttl
@@ -188,18 +193,18 @@ public class KahaDBRecoverExpiredTest {
       Thread.sleep(1500);
 
       // We should now find both durables have 50 expired messages
-      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, listener);
       assertEquals(2, expired.size());
       assertEquals(50, expired.get(subKey1).size());
       assertEquals(50, expired.get(subKey2).size());
 
       // Max is 50, should find none expired
-      expired = store.recoverExpired(Set.of(subKey1, subKey2), 50);
+      expired = store.recoverExpired(Set.of(subKey1, subKey2), 50, listener);
       assertTrue(expired.isEmpty());
 
       // We should now find both durables have 25 expired messages with
       // max at 75
-      expired = store.recoverExpired(Set.of(subKey1, subKey2), 75);
+      expired = store.recoverExpired(Set.of(subKey1, subKey2), 75, listener);
       assertEquals(2, expired.size());
       assertEquals(25, expired.get(subKey1).size());
       assertEquals(25, expired.get(subKey2).size());
@@ -215,7 +220,7 @@ public class KahaDBRecoverExpiredTest {
       }
 
       // We should now find 25 on sub1 and 50 on sub2 with a max of 100
-      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, listener);
       assertEquals(2, expired.size());
       assertEquals(25, expired.get(subKey1).size());
       assertEquals(50, expired.get(subKey2).size());
@@ -234,7 +239,7 @@ public class KahaDBRecoverExpiredTest {
       TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
 
       // nothing should be expired yet, no messags
-      var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+      var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, 
listener);
       assertTrue(expired.isEmpty());
 
       // Send 10 expired
@@ -248,7 +253,7 @@ public class KahaDBRecoverExpiredTest {
       Thread.sleep(1500);
 
       // Test getting each sub individually, get sub2 first
-      expired = store.recoverExpired(Set.of(subKey2), 100);
+      expired = store.recoverExpired(Set.of(subKey2), 100, listener);
       assertEquals(1, expired.size());
       assertEquals(10, expired.get(subKey2).size());
 
@@ -261,26 +266,125 @@ public class KahaDBRecoverExpiredTest {
           ack.getLastMessageId(), ack);
 
       // check only sub2 has 9
-      expired = store.recoverExpired(Set.of(subKey2), 100);
+      expired = store.recoverExpired(Set.of(subKey2), 100, listener);
       assertEquals(1, expired.size());
       assertEquals(9, expired.get(subKey2).size());
 
       // check only sub1 still has 10
-      expired = store.recoverExpired(Set.of(subKey1), 100);
+      expired = store.recoverExpired(Set.of(subKey1), 100, listener);
       assertEquals(1, expired.size());
       assertEquals(10, expired.get(subKey1).size());
 
       // verify passing in unmatched sub leaves it out of the result set
       var unmatched = new SubscriptionKey("clientId", "sub3");
-      expired = store.recoverExpired(Set.of(unmatched), 100);
+      expired = store.recoverExpired(Set.of(unmatched), 100, listener);
       assertTrue(expired.isEmpty());
 
       // try 2 that exist and 1 that doesn't
-      expired = store.recoverExpired(Set.of(subKey1, subKey2, unmatched), 100);
+      expired = store.recoverExpired(Set.of(subKey1, subKey2, unmatched), 100, 
listener);
       assertEquals(2, expired.size());
 
     }
 
   }
 
+  // test recovery listener works with hasSpace()
+  @Test
+  public void testRecoverExpiredRecoveryListener() throws Exception {
+    try (Session session = initializeSubs()) {
+      MessageProducer prod = session.createProducer(topic);
+
+      Destination dest = broker.getDestination(topic);
+      TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
+
+      // Sent 50 messages with no ttl followed by 50 with ttl
+      ActiveMQTextMessage message = new ActiveMQTextMessage();
+      for (int i = 0; i < 10; i++) {
+        message.setText("message" + i);
+        prod.send(message, Message.DEFAULT_DELIVERY_MODE, 
Message.DEFAULT_PRIORITY, 1000);
+      }
+
+      // wait for the time to pass the point of needing expiration
+      Thread.sleep(1500);
+
+      // don't return any, has space is false
+      final AtomicBoolean hasSpaceCalled = new AtomicBoolean();
+      var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, new 
MessageRecoveryListener() {
+        @Override
+        public boolean recoverMessage(org.apache.activemq.command.Message 
message) {
+          return true;
+        }
+
+        @Override
+        public boolean recoverMessageReference(MessageId ref) {
+          return false;
+        }
+
+        @Override
+        public boolean hasSpace() {
+          hasSpaceCalled.set(true);
+          return false;
+        }
+
+        @Override
+        public boolean isDuplicate(MessageId ref) {
+          return false;
+        }
+      });
+
+      assertTrue(expired.isEmpty());
+      assertTrue(hasSpaceCalled.get());
+
+      // check we only call recoverMessage() once for each unique id\
+      Set<MessageId> ids = new HashSet<>();
+      store.recoverExpired(Set.of(subKey1, subKey2), 100, new 
MessageRecoveryListener() {
+        @Override
+        public boolean recoverMessage(org.apache.activemq.command.Message 
message) {
+          // assert unique
+          assertTrue("duplicate message passed to listener", 
ids.add(message.getMessageId()));
+          return true;
+        }
+
+        @Override
+        public boolean recoverMessageReference(MessageId ref) {
+          return false;
+        }
+
+        @Override
+        public boolean hasSpace() {
+          return true;
+        }
+
+        @Override
+        public boolean isDuplicate(MessageId ref) {
+          return false;
+        }
+      });
+    }
+
+  }
+
+  private final MessageRecoveryListener listener =  new 
MessageRecoveryListener() {
+
+    @Override
+    public boolean recoverMessage(org.apache.activemq.command.Message message) 
{
+      return true;
+    }
+
+    @Override
+    public boolean recoverMessageReference(MessageId ref)  {
+      return true;
+    }
+
+    @Override
+    public boolean hasSpace() {
+      return true;
+    }
+
+    @Override
+    public boolean isDuplicate(MessageId ref) {
+      return false;
+    }
+  };
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to