This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new b1e8441ca9 AMQ-9726 - Fix FilePendingMessageCursor clear() method
(#1452)
b1e8441ca9 is described below
commit b1e8441ca93f0aefa8817c6923cc4741b757da3a
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Thu Jun 5 07:18:53 2025 -0400
AMQ-9726 - Fix FilePendingMessageCursor clear() method (#1452)
This fixes the clear() method so that when clearing the memory map it
will decrement memory usage, and when clearing the disk list it
will destroy and reset the list for future writes.
---
.../region/cursors/FilePendingMessageCursor.java | 15 ++--
.../plist/KahaDBFilePendingMessageCursorTest.java | 83 ++++++++++++++++++++++
2 files changed, 94 insertions(+), 4 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 801208c798..ed51ce8881 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -162,8 +162,7 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
@Override
public synchronized void destroy() throws Exception {
stop();
- for (Iterator<MessageReference> i = memoryList.iterator();
i.hasNext();) {
- MessageReference node = i.next();
+ for (MessageReference node : memoryList) {
node.decrementReferenceCount();
}
memoryList.clear();
@@ -365,11 +364,19 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
*/
@Override
public synchronized void clear() {
+ // AMQ-9726 - Iterate over all nodes to decrement the ref count
+ // to decrement the memory usage tracker
+ for (MessageReference node : memoryList) {
+ node.decrementReferenceCount();
+ }
memoryList.clear();
if (!isDiskListEmpty()) {
try {
- getDiskList().destroy();
- } catch (IOException e) {
+ // AMQ-9726 - This method will destroy the list and
+ // set the reference to null so it will be reset
+ // for future writes
+ destroyDiskList();
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
index ce4a8ed0e1..481950e4d0 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
@@ -29,6 +29,8 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -94,5 +96,86 @@ public class KahaDBFilePendingMessageCursorTest extends
FilePendingMessageCursor
assertEquals("expected page usage", initialPageCount -1,
pageFile.getPageCount() - pageFile.getFreePageCount() );
}
+ // Test for AMQ-9726
+ @Test
+ public void testClearCursor() throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setUseJmx(false);
+ SystemUsage usage = brokerService.getSystemUsage();
+ usage.getMemoryUsage().setLimit(1024*150);
+ Destination dest = new Queue(brokerService, new ActiveMQQueue("Q"),
null, new DestinationStatistics(), null);
+ dest.setMemoryUsage(usage.getMemoryUsage());
+ brokerService.start();
+
+ underTest = new FilePendingMessageCursor(brokerService.getBroker(),
"test", false);
+ underTest.setSystemUsage(usage);
+
+ // Add 10 messages to the cursor in memory
+ addTestMessages(dest);
+
+ // Verify memory usage was increased and cache is enabled
+ assertTrue(dest.getMemoryUsage().getUsage() > 0);
+ assertEquals(10, underTest.size());
+ assertTrue(underTest.isCacheEnabled());
+ assertEquals(0, dest.getTempUsage().getUsage());
+
+ // Clear, this will verify memory usage is correctly decremented
+ // and the memory map is cleared as well. Memory was previously
+ // incorrectly not being cleared.
+ underTest.clear();
+ assertEquals(0, underTest.size());
+ assertEquals(0, dest.getMemoryUsage().getUsage());
+
+ // Now test the disk cursor
+ // set the memory usage limit very small so messages will go to
+ // the disk list and not memory and send 10 more messages
+ usage.getMemoryUsage().setLimit(1);
+ addTestMessages(dest);
+
+ // confirm the cache is false and the memory is 0 because
+ // the messages exist on disk and not in the memory map
+ // also very temp usage is greater than 0 now
+ assertFalse(underTest.isCacheEnabled());
+ assertEquals(0, dest.getMemoryUsage().getUsage());
+ assertTrue(dest.getTempUsage().getUsage() > 0);
+ assertEquals(10, underTest.size());
+
+ // Test clearing the disk list shows a size of 0
+ underTest.clear();
+ assertEquals(0, underTest.size());
+
+ // Send 10 more messages to verify that we can send again
+ // to the disk list after clear. Previously clear did not
+ // correctly destroy/reset the disk cursor so an exception
+ // was thrown when adding messages again after calling clear()
+ addTestMessages(dest);
+ assertFalse(underTest.isCacheEnabled());
+ assertEquals(0, dest.getMemoryUsage().getUsage());
+ assertTrue(dest.getTempUsage().getUsage() > 0);
+ assertEquals(10, underTest.size());
+
+ // one final clear() and reset limit to make sure we can send to
+ // memory again
+ underTest.clear();
+ usage.getMemoryUsage().setLimit(1024*150);
+ assertEquals(0, underTest.size());
+ assertEquals(0, dest.getMemoryUsage().getUsage());
+
+ // Verify memory usage was increased and cache is enabled
+ addTestMessages(dest);
+ assertTrue(dest.getMemoryUsage().getUsage() > 0);
+ assertEquals(10, underTest.size());
+ assertTrue(underTest.isCacheEnabled());
+ }
+
+ private void addTestMessages(Destination dest) throws Exception {
+ for (int i = 0; i< 10; i++) {
+ ActiveMQMessage mqMessage = new ActiveMQMessage();
+ mqMessage.setMessageId(new MessageId("1:2:3:" + i));
+ mqMessage.setMemoryUsage(dest.getMemoryUsage());
+ mqMessage.setRegionDestination(dest);
+ underTest.addMessageLast(new IndirectMessageReference(mqMessage));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact