This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
new ea2b81808f AMQ-9698 - Add recovery listener to store recoverExpired()
method
ea2b81808f is described below
commit ea2b81808f00dfaad4b774c3edcd00db0a22208d
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Thu May 8 08:57:09 2025 -0400
AMQ-9698 - Add recovery listener to store recoverExpired() method
This allows the store to check if the memory usage is full in the system
before continuing to try and load messages to expire on durable subs
(cherry picked from commit 6dac231f8d4da64ac160b6ff790bd543aaa70428)
---
.../org/apache/activemq/broker/region/Topic.java | 33 ++++-
.../activemq/store/ProxyTopicMessageStore.java | 5 +-
.../apache/activemq/store/TopicMessageStore.java | 20 +--
.../store/memory/MemoryTopicMessageStore.java | 3 +-
.../activemq/store/jdbc/JDBCTopicMessageStore.java | 3 +-
.../store/journal/JournalTopicMessageStore.java | 3 +-
.../apache/activemq/store/kahadb/KahaDBStore.java | 16 ++-
.../activemq/store/kahadb/TempKahaDBStore.java | 3 +-
.../activemq/bugs/MessageExpirationReaperTest.java | 34 ++++++
.../store/kahadb/KahaDBRecoverExpiredTest.java | 136 ++++++++++++++++++---
10 files changed, 219 insertions(+), 37 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 4fcd508cf9..27c5cf132c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -22,7 +22,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -54,7 +53,6 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
-import org.apache.activemq.management.MessageFlowStats;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore.StoreType;
import org.apache.activemq.store.NoLocalSubscriptionAware;
@@ -829,6 +827,34 @@ public class Topic extends BaseDestination implements Task
{
}
}
+
+ /**
+ * Simple recovery listener that will check if the topic memory usage is
full
+ * when hasSpace() is called. This could be enhanced in the future if
needed.
+ */
+ private final MessageRecoveryListener expiryListener = new
MessageRecoveryListener() {
+
+ @Override
+ public boolean recoverMessage(Message message) {
+ return true;
+ }
+
+ @Override
+ public boolean recoverMessageReference(MessageId ref) {
+ return true;
+ }
+
+ @Override
+ public boolean hasSpace() {
+ return !Topic.this.memoryUsage.isFull();
+ }
+
+ @Override
+ public boolean isDuplicate(MessageId ref) {
+ return false;
+ }
+ };
+
private final AtomicBoolean expiryTaskInProgress = new
AtomicBoolean(false);
private final Runnable expireMessagesWork = () -> {
try {
@@ -851,7 +877,8 @@ public class Topic extends BaseDestination implements Task {
// For each eligible subscription, return the messages in the
store that are expired
// The same message refs are shared between subs if duplicated
so this is efficient
- var expired = store.recoverExpired(subs,
getMaxExpirePageSize());
+ var expired = store.recoverExpired(subs,
getMaxExpirePageSize(),
+ expiryListener);
final ConnectionContext connectionContext =
createConnectionContext();
// Go through any expired messages and remove for each sub
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
index 23c9e81809..d9b92500c0 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
@@ -241,7 +241,8 @@ public class ProxyTopicMessageStore extends
ProxyMessageStore implements TopicMe
}
@Override
- public Map<SubscriptionKey, List<Message>>
recoverExpired(Set<SubscriptionKey> subs, int max) throws Exception {
- return ((TopicMessageStore)delegate).recoverExpired(subs, max);
+ public Map<SubscriptionKey, List<Message>>
recoverExpired(Set<SubscriptionKey> subs, int max,
+ MessageRecoveryListener listener) throws Exception {
+ return ((TopicMessageStore)delegate).recoverExpired(subs, max,
listener);
}
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
b/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
index 6ee046cff6..05f48539dc 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
@@ -161,15 +161,21 @@ public interface TopicMessageStore extends MessageStore {
void addSubscription(SubscriptionInfo subscriptionInfo, boolean
retroactive) throws IOException;
/**
- * Iterates over the pending messages in a topic and recovers any expired
messages found for
- * each of the subscriptions up to the maximum number of messages to
search. Only subscriptions
- * that have at least 1 expired message will be returned in the map.
+ * Iterates over the pending messages in a topic and recovers any expired
messages found for each
+ * of the subscriptions up to the maximum number of messages to search.
Only subscriptions that
+ * have at least 1 expired message will be returned in the map.
+ * <br>
+ * The expiry listener is only used to verify if there is space. Messages
that are expired
+ * and will be added to 1 or more subscription in the returned map will be
passed to
+ * the callback. The callback will only be called once per each unique
message.
+ *
+ * @param subs The subscription keys to check for expired messages
+ * @param maxBrowse The maximum number of messages to check
+ * @param listener
*
- * @param subs
- * @param max
* @return Expired messages for each subscription
- * @throws Exception
*/
- Map<SubscriptionKey,List<Message>> recoverExpired(Set<SubscriptionKey>
subs, int max) throws Exception;
+ Map<SubscriptionKey,List<Message>> recoverExpired(Set<SubscriptionKey>
subs, int maxBrowse,
+ MessageRecoveryListener listener) throws Exception;
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
index 13a13c0759..0aa6dc6ce0 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
@@ -182,7 +182,8 @@ public class MemoryTopicMessageStore extends
MemoryMessageStore implements Topic
}
@Override
- public Map<SubscriptionKey, List<Message>>
recoverExpired(Set<SubscriptionKey> subs, int max) {
+ public Map<SubscriptionKey, List<Message>>
recoverExpired(Set<SubscriptionKey> subs, int max,
+ MessageRecoveryListener listener) {
throw new UnsupportedOperationException("recoverExpired not
supported");
}
diff --git
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
index 7f29a46a64..26390ba308 100644
---
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
+++
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
@@ -363,7 +363,8 @@ public class JDBCTopicMessageStore extends JDBCMessageStore
implements TopicMess
}
@Override
- public Map<SubscriptionKey, List<Message>>
recoverExpired(Set<SubscriptionKey> subs, int max) {
+ public Map<SubscriptionKey, List<Message>>
recoverExpired(Set<SubscriptionKey> subs, int max,
+ MessageRecoveryListener listener) {
throw new UnsupportedOperationException("recoverExpired not
supported");
}
diff --git
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
index f4971f6737..1613caab50 100644
---
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
+++
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
@@ -85,7 +85,8 @@ public class JournalTopicMessageStore extends
JournalMessageStore implements Top
}
@Override
- public Map<SubscriptionKey, List<Message>>
recoverExpired(Set<SubscriptionKey> subs, int max) {
+ public Map<SubscriptionKey, List<Message>>
recoverExpired(Set<SubscriptionKey> subs, int max,
+ MessageRecoveryListener listener) {
throw new UnsupportedOperationException("recoverExpired not
supported");
}
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 0f5406f7a2..b6f77c3165 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -1331,11 +1331,11 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
}
@Override
- public Map<SubscriptionKey,List<Message>>
recoverExpired(Set<SubscriptionKey> subscriptions, int max) throws Exception {
+ public Map<SubscriptionKey,List<Message>>
recoverExpired(Set<SubscriptionKey> subscriptions, int maxBrowse,
+ MessageRecoveryListener listener) throws Exception {
indexLock.writeLock().lock();
try {
- return pageFile.tx().execute(
- (CallableClosure<Map<SubscriptionKey,List<Message>>,
Exception>) tx -> {
+ return pageFile.tx().execute(tx -> {
StoredDestination sd = getStoredDestination(dest, tx);
sd.orderIndex.resetCursorPosition();
int count = 0;
@@ -1351,9 +1351,10 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
}
// Iterate one time through the topic and check each
message, stopping if we run out
- // or reach the max
+ // hit the max browse limit, or if the listener
returns false for hasSpace()
+ final Set<Long> uniqueExpired = new HashSet<>();
for (Iterator<Entry<Long, MessageKeys>> iterator =
- sd.orderIndex.iterator(tx, new
MessageOrderCursor()); count < max && iterator.hasNext(); ) {
+ sd.orderIndex.iterator(tx, new
MessageOrderCursor()); count < maxBrowse && iterator.hasNext() &&
listener.hasSpace(); ) {
count++;
Entry<Long, MessageKeys> entry = iterator.next();
Set<String> ackedAndPrepared =
ackedAndPreparedMap.get(destination.getPhysicalName());
@@ -1371,6 +1372,11 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
if (sequence != null &&
sequence.contains(entry.getKey())) {
List<Message> expMessages =
expired.computeIfAbsent(subKeyEntry.getValue(), m -> new ArrayList<>());
expMessages.add(msg);
+ // pass unique messages to the
listener so it can do any custom
+ // handling for the next call to
listener.hasSpace()
+ if (uniqueExpired.add(entry.getKey()))
{
+ listener.recoverMessage(msg);
+ }
}
}
}
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 942076b2f1..6bbf9534ee 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -339,7 +339,8 @@ public class TempKahaDBStore extends TempMessageDatabase
implements PersistenceA
}
@Override
- public Map<SubscriptionKey, List<Message>>
recoverExpired(Set<SubscriptionKey> subs, int max) {
+ public Map<SubscriptionKey, List<Message>>
recoverExpired(Set<SubscriptionKey> subs, int max,
+ MessageRecoveryListener listener) {
throw new UnsupportedOperationException("recoverExpired not
supported");
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
index 65f3f1802c..4e5d98684b 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
@@ -221,6 +221,40 @@ public class MessageExpirationReaperTest {
assertEquals(0, brokerDest.getMemoryUsage().getUsage());
}
+ @Test
+ public void testExpiredMessagesOnTopicRecoveryListener() throws Exception{
+ Session session = createSession();
+
+ // use a zero prefetch so messages don't go inflight
+ ActiveMQTopic destination = new ActiveMQTopic(destinationName +
"?consumer.prefetchSize=0");
+
+ MessageProducer producer = session.createProducer(destination);
+ MessageConsumer consumer =
session.createDurableSubscriber(destination, "test-durable");
+ producer.setTimeToLive(1000);
+
+ final int count = 3;
+ // Send some messages with an expiration
+ for (int i = 0; i < count; i++) {
+ TextMessage message = session.createTextMessage("" + i);
+ producer.send(message);
+ }
+
+ // Set a very low memory usage so we will be > 100% which should
prevent the expiry
+ // thread from continuing
+ broker.getSystemUsage().getMemoryUsage().setLimit(1024);
+ DestinationViewMBean view = createView(destination);
+
+ // not expired yet...
+ assertEquals("Incorrect enqueue count", 3, view.getEnqueueCount() );
+
+ // close consumer so topic thinks consumer is inactive
+ consumer.close();
+
+ // Memory is > 100% so we shouldn't expire
+ Thread.sleep(3000);
+ assertEquals(0, view.getExpiredCount());
+ }
+
protected DestinationViewMBean createView(ActiveMQDestination destination)
throws Exception {
String domain = "org.apache.activemq";
ObjectName name;
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBRecoverExpiredTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBRecoverExpiredTest.java
index 3e9bfe0cf9..5491b2755a 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBRecoverExpiredTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBRecoverExpiredTest.java
@@ -27,17 +27,22 @@ import javax.jms.Session;
import javax.jms.TopicSession;
import java.io.File;
import java.net.URI;
+import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.SubscriptionKey;
import org.junit.After;
@@ -48,7 +53,7 @@ import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
/**
- * Test for {@link TopicMessageStore#recoverExpired(Set, int)}
+ * Test for {@link TopicMessageStore#recoverExpired(Set, int,
org.apache.activemq.store.MessageRecoveryListener)}
*/
public class KahaDBRecoverExpiredTest {
@@ -110,7 +115,7 @@ public class KahaDBRecoverExpiredTest {
TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
// nothing should be expired yet, no messags
- var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+ var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100,
listener);
assertTrue(expired.isEmpty());
// Sent 10 messages, alternating no expiration and 1 second ttl
@@ -124,7 +129,7 @@ public class KahaDBRecoverExpiredTest {
// wait for the time to pass the point of needing expiration
Thread.sleep(1500);
// We should now find both durables have 5 expired messages
- expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+ expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, listener);
assertEquals(2, expired.size());
assertEquals(5, expired.get(subKey1).size());
assertEquals(5, expired.get(subKey2).size());
@@ -140,7 +145,7 @@ public class KahaDBRecoverExpiredTest {
}
// Now the first sub should only have 3 expired, but still 5 on the
second
- expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+ expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, listener);
assertEquals(3, expired.get(subKey1).size());
assertEquals(5, expired.get(subKey2).size());
@@ -157,7 +162,7 @@ public class KahaDBRecoverExpiredTest {
}
// should be empty again
- expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+ expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, listener);
assertTrue(expired.isEmpty());
}
@@ -173,7 +178,7 @@ public class KahaDBRecoverExpiredTest {
TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
// nothing should be expired yet, no messags
- var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+ var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100,
listener);
assertTrue(expired.isEmpty());
// Sent 50 messages with no ttl followed by 50 with ttl
@@ -188,18 +193,18 @@ public class KahaDBRecoverExpiredTest {
Thread.sleep(1500);
// We should now find both durables have 50 expired messages
- expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+ expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, listener);
assertEquals(2, expired.size());
assertEquals(50, expired.get(subKey1).size());
assertEquals(50, expired.get(subKey2).size());
// Max is 50, should find none expired
- expired = store.recoverExpired(Set.of(subKey1, subKey2), 50);
+ expired = store.recoverExpired(Set.of(subKey1, subKey2), 50, listener);
assertTrue(expired.isEmpty());
// We should now find both durables have 25 expired messages with
// max at 75
- expired = store.recoverExpired(Set.of(subKey1, subKey2), 75);
+ expired = store.recoverExpired(Set.of(subKey1, subKey2), 75, listener);
assertEquals(2, expired.size());
assertEquals(25, expired.get(subKey1).size());
assertEquals(25, expired.get(subKey2).size());
@@ -215,7 +220,7 @@ public class KahaDBRecoverExpiredTest {
}
// We should now find 25 on sub1 and 50 on sub2 with a max of 100
- expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+ expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, listener);
assertEquals(2, expired.size());
assertEquals(25, expired.get(subKey1).size());
assertEquals(50, expired.get(subKey2).size());
@@ -234,7 +239,7 @@ public class KahaDBRecoverExpiredTest {
TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
// nothing should be expired yet, no messags
- var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100);
+ var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100,
listener);
assertTrue(expired.isEmpty());
// Send 10 expired
@@ -248,7 +253,7 @@ public class KahaDBRecoverExpiredTest {
Thread.sleep(1500);
// Test getting each sub individually, get sub2 first
- expired = store.recoverExpired(Set.of(subKey2), 100);
+ expired = store.recoverExpired(Set.of(subKey2), 100, listener);
assertEquals(1, expired.size());
assertEquals(10, expired.get(subKey2).size());
@@ -261,26 +266,125 @@ public class KahaDBRecoverExpiredTest {
ack.getLastMessageId(), ack);
// check only sub2 has 9
- expired = store.recoverExpired(Set.of(subKey2), 100);
+ expired = store.recoverExpired(Set.of(subKey2), 100, listener);
assertEquals(1, expired.size());
assertEquals(9, expired.get(subKey2).size());
// check only sub1 still has 10
- expired = store.recoverExpired(Set.of(subKey1), 100);
+ expired = store.recoverExpired(Set.of(subKey1), 100, listener);
assertEquals(1, expired.size());
assertEquals(10, expired.get(subKey1).size());
// verify passing in unmatched sub leaves it out of the result set
var unmatched = new SubscriptionKey("clientId", "sub3");
- expired = store.recoverExpired(Set.of(unmatched), 100);
+ expired = store.recoverExpired(Set.of(unmatched), 100, listener);
assertTrue(expired.isEmpty());
// try 2 that exist and 1 that doesn't
- expired = store.recoverExpired(Set.of(subKey1, subKey2, unmatched), 100);
+ expired = store.recoverExpired(Set.of(subKey1, subKey2, unmatched), 100,
listener);
assertEquals(2, expired.size());
}
}
+ // test recovery listener works with hasSpace()
+ @Test
+ public void testRecoverExpiredRecoveryListener() throws Exception {
+ try (Session session = initializeSubs()) {
+ MessageProducer prod = session.createProducer(topic);
+
+ Destination dest = broker.getDestination(topic);
+ TopicMessageStore store = (TopicMessageStore) dest.getMessageStore();
+
+ // Sent 50 messages with no ttl followed by 50 with ttl
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ for (int i = 0; i < 10; i++) {
+ message.setText("message" + i);
+ prod.send(message, Message.DEFAULT_DELIVERY_MODE,
Message.DEFAULT_PRIORITY, 1000);
+ }
+
+ // wait for the time to pass the point of needing expiration
+ Thread.sleep(1500);
+
+ // don't return any, has space is false
+ final AtomicBoolean hasSpaceCalled = new AtomicBoolean();
+ var expired = store.recoverExpired(Set.of(subKey1, subKey2), 100, new
MessageRecoveryListener() {
+ @Override
+ public boolean recoverMessage(org.apache.activemq.command.Message
message) {
+ return true;
+ }
+
+ @Override
+ public boolean recoverMessageReference(MessageId ref) {
+ return false;
+ }
+
+ @Override
+ public boolean hasSpace() {
+ hasSpaceCalled.set(true);
+ return false;
+ }
+
+ @Override
+ public boolean isDuplicate(MessageId ref) {
+ return false;
+ }
+ });
+
+ assertTrue(expired.isEmpty());
+ assertTrue(hasSpaceCalled.get());
+
+ // check we only call recoverMessage() once for each unique id\
+ Set<MessageId> ids = new HashSet<>();
+ store.recoverExpired(Set.of(subKey1, subKey2), 100, new
MessageRecoveryListener() {
+ @Override
+ public boolean recoverMessage(org.apache.activemq.command.Message
message) {
+ // assert unique
+ assertTrue("duplicate message passed to listener",
ids.add(message.getMessageId()));
+ return true;
+ }
+
+ @Override
+ public boolean recoverMessageReference(MessageId ref) {
+ return false;
+ }
+
+ @Override
+ public boolean hasSpace() {
+ return true;
+ }
+
+ @Override
+ public boolean isDuplicate(MessageId ref) {
+ return false;
+ }
+ });
+ }
+
+ }
+
+ private final MessageRecoveryListener listener = new
MessageRecoveryListener() {
+
+ @Override
+ public boolean recoverMessage(org.apache.activemq.command.Message message)
{
+ return true;
+ }
+
+ @Override
+ public boolean recoverMessageReference(MessageId ref) {
+ return true;
+ }
+
+ @Override
+ public boolean hasSpace() {
+ return true;
+ }
+
+ @Override
+ public boolean isDuplicate(MessageId ref) {
+ return false;
+ }
+ };
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact