This is an automated email from the ASF dual-hosted git repository. vavrtom pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push: new 1a4ac67109 QPID-8590: [Broker-J] Purge on flow to disk queue (#128) 1a4ac67109 is described below commit 1a4ac67109b53ef397024f49ac8594b3f1a17e59 Author: Daniil Kirilyuk <daniel.kiril...@gmail.com> AuthorDate: Thu Jun 9 09:20:54 2022 +0200 QPID-8590: [Broker-J] Purge on flow to disk queue (#128) --- .../queue/FlowToDiskOverflowPolicyHandler.java | 138 ++++++- .../queue/FlowToDiskOverflowPolicyHandlerTest.java | 412 ++++++++++++++++++++- 2 files changed, 516 insertions(+), 34 deletions(-) diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java index 36fcf9e245..06e3d43122 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java @@ -18,54 +18,96 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.MessageDeletedException; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.OverflowPolicy; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.store.StoredMessage; +/** + * If the queue breaches a limit, newly arriving messages are written to disk and the in-memory representation of + * the message is minimised. The Broker will transparently retrieve messages from disk as they are required by a + * consumer or management. The flow to disk policy does not actually restrict the overall size of the queue, + * merely the space occupied in memory. + */ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler { + /** + * Delegate handler + */ private final Handler _handler; + /** + * Constructor injects queue + * + * @param queue Queue + */ FlowToDiskOverflowPolicyHandler(final Queue<?> queue) { _handler = new Handler(queue); queue.addChangeListener(_handler); } + /** + * Checks queue overflow (is called when adding or deleting queue entry) + * + * @param newlyEnqueued QueueEntry enqueued (is null on deletion) + */ @Override public void checkOverflow(final QueueEntry newlyEnqueued) { - _handler.checkOverflow(newlyEnqueued); - + _handler.checkOverflow(newlyEnqueued, true); } + /** + * Delegate handler + */ private static class Handler extends OverflowPolicyMaximumQueueDepthChangeListener { + /** + * Queue instance + */ private final Queue<?> _queue; + /** + * Constructor injects queue + * + * @param queue Queue + */ private Handler(final Queue<?> queue) { super(OverflowPolicy.FLOW_TO_DISK); _queue = queue; } + /** + * Is called when max queue depth is changed + * + * @param queue Queue + */ @Override void onMaximumQueueDepthChange(final Queue<?> queue) { - checkOverflow(null); + checkOverflow(null, false); } - private void checkOverflow(final QueueEntry newlyEnqueued) + /** + * Either flows messages to the disk or restores them into the memory + * + * @param newlyEnqueued QueueEntry (could be null in case of deletion or limit change) + * @param stopOnFirstMatch Whether flowing to disk / restoring to memory should be stopped after fist match + */ + private void checkOverflow(final QueueEntry newlyEnqueued, final boolean stopOnFirstMatch) { - long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes(); - long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages(); + final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes(); + final long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages(); if (maximumQueueDepthBytes >= 0L || maximumQueueDepthMessages >= 0L) { if (newlyEnqueued == null) { - flowTailToDiskIfNecessary(maximumQueueDepthBytes, maximumQueueDepthMessages); + balanceTailIfNecessary(maximumQueueDepthBytes, maximumQueueDepthMessages, stopOnFirstMatch); } else { @@ -74,45 +116,76 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler } } - private void flowTailToDiskIfNecessary(final long maximumQueueDepthBytes, final long maximumQueueDepthMessages) + /** + * Either flows tail messages to disk or restores them into the memory depending on the overflow limit. + * Boolean flag stopOnFirstMatch is true when enqueueing or deleting messages, is false when overflow limit + * was changed + * + * @param maximumQueueDepthBytes Max queue depth in bytes + * @param maximumQueueDepthMessages Max queue depth in messages + * @param stopOnFirstMatch Whether flowing to disk / restoring to memory should be stopped after fist match + */ + private void balanceTailIfNecessary( + final long maximumQueueDepthBytes, + final long maximumQueueDepthMessages, + final boolean stopOnFirstMatch) { final long queueDepthBytes = _queue.getQueueDepthBytes(); final long queueDepthMessages = _queue.getQueueDepthMessages(); + final boolean isMaximumQueueDepthBytesUnlimited = maximumQueueDepthBytes >= 0L; - if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > maximumQueueDepthBytes) || - (maximumQueueDepthMessages >= 0L && queueDepthMessages > maximumQueueDepthMessages)) + if ((isMaximumQueueDepthBytesUnlimited && queueDepthBytes > maximumQueueDepthBytes) || + (maximumQueueDepthMessages >= 0L && queueDepthMessages >= maximumQueueDepthMessages)) { long cumulativeDepthBytes = 0; long cumulativeDepthMessages = 0; - QueueEntryIterator queueEntryIterator = _queue.queueEntryIterator(); + final QueueEntryIterator queueEntryIterator = _queue.queueEntryIterator(); while (queueEntryIterator.advance()) { - QueueEntry node = queueEntryIterator.getNode(); + final QueueEntry node = queueEntryIterator.getNode(); if (node != null && !node.isDeleted()) { - ServerMessage message = node.getMessage(); + final ServerMessage<?> message = node.getMessage(); if (message != null) { cumulativeDepthMessages++; cumulativeDepthBytes += message.getSizeIncludingHeader(); - if (cumulativeDepthBytes > maximumQueueDepthBytes + final boolean isInMemory = message.getStoredMessage().isInContentInMemory(); + + if ((isMaximumQueueDepthBytesUnlimited && cumulativeDepthBytes > maximumQueueDepthBytes) || cumulativeDepthMessages > maximumQueueDepthMessages) { + if (stopOnFirstMatch || !isInMemory) + { + break; + } flowToDisk(node); } + else if (!isInMemory) + { + restoreInMemory(node); + } } } } } } - private void flowNewEntryToDiskIfNecessary(final QueueEntry newlyEnqueued, - final long maximumQueueDepthBytes, - final long maximumQueueDepthMessages) + /** + * Flows queue entry to the disk (when overflow limit is exceeded) + * + * @param newlyEnqueued QueueEntry + * @param maximumQueueDepthBytes Max queue depth in bytes + * @param maximumQueueDepthMessages Max queue depth in messages + */ + private void flowNewEntryToDiskIfNecessary( + final QueueEntry newlyEnqueued, + final long maximumQueueDepthBytes, + final long maximumQueueDepthMessages) { final long queueDepthBytes = _queue.getQueueDepthBytes(); final long queueDepthMessages = _queue.getQueueDepthMessages(); @@ -124,9 +197,14 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler } } + /** + * Flows queue entry to the disk + * + * @param node QueueEntry + */ private void flowToDisk(final QueueEntry node) { - try (MessageReference messageReference = node.getMessage().newReference()) + try (final MessageReference<?> messageReference = node.getMessage().newReference()) { if (node.getQueue().checkValid(node)) { @@ -138,5 +216,29 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler // pass } } + + /** + * Restores queue entry in memory + * + * @param node QueueEntry + */ + private void restoreInMemory(final QueueEntry node) + { + try (final MessageReference<?> messageReference = node.getMessage().newReference()) + { + if (node.getQueue().checkValid(node)) + { + final StoredMessage<?> storedMessage = messageReference.getMessage().getStoredMessage(); + try (final QpidByteBuffer qpidByteBuffer = storedMessage.getContent(0, storedMessage.getContentSize())) + { + qpidByteBuffer.dispose(); + } + } + } + catch (MessageDeletedException mde) + { + // pass + } + } } } diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java index 9d8f997055..739968e0a6 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java @@ -20,19 +20,24 @@ package org.apache.qpid.server.queue; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.junit.Before; import org.junit.Test; +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.BrokerTestHelper; @@ -43,10 +48,18 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.test.utils.UnitTestBase; +/** + * Tests for FlowToDiskOverflowPolicyHandler. + * + * Here mockito verify() for flowToDisk() checks whether message was flowed to disk or not, + * and mockito verify() for getContent() checks whether message was restored into memory or not + */ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase { private Queue<?> _queue; + private Map<StoredMessage<?>, Boolean> _state; + @Before public void setUp() throws Exception { @@ -59,28 +72,112 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.FLOW_TO_DISK); _queue = (AbstractQueue<?>) virtualHost.createChild(Queue.class, attributes); + _state = new HashMap<>(); } + /** + * Lowers the overflow limit, forcing messages to be flowed to the disk + */ @Test - public void testOverflowAfterLoweringLimit() throws Exception + public void overflowAfterLoweringLimit() { - ServerMessage<?> message = createMessage(10L); - _queue.enqueue(message, null, null); - StoredMessage<?> storedMessage = message.getStoredMessage(); - verify(storedMessage, never()).flowToDisk(); + _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 10)); - ServerMessage<?> message2 = createMessage(10L); - _queue.enqueue(message2, null, null); - StoredMessage<?> storedMessage2 = message2.getStoredMessage(); - verify(storedMessage2, never()).flowToDisk(); + List<ServerMessage<?>> messages = new ArrayList<>(); - _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 10)); + for (int i = 0; i < 15; i ++) + { + messages.add(createMessage(10L)); + _queue.enqueue(messages.get(i), null, null); + } - verify(storedMessage2).flowToDisk(); + for (int i = 0; i < 10; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + for (int i = 10; i < 15; i ++) + { + verify(messages.get(i).getStoredMessage()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5)); + + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + for (int i = 5; i < 10; i ++) + { + verify(messages.get(i).getStoredMessage()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + for (int i = 11; i < 15; i ++) + { + verify(messages.get(i).getStoredMessage()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + } + + /** + * Rises the overflow limit, forcing messages to be restored in the memory from the flowed to disk state + */ + @Test + public void overflowAfterRisingLimit() + { + _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5)); + + List<ServerMessage<?>> messages = new ArrayList<>(); + + for (int i = 0; i < 15; i ++) + { + messages.add(createMessage(10L)); + _queue.enqueue(messages.get(i), null, null); + } + + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + for (int i = 5; i < 15; i ++) + { + verify(messages.get(i).getStoredMessage()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 10)); + + // first five messages should be neither be flowed to the disk nor restored to memory (nothing changed to them) + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + // middle five messages should be restored to memory + for (int i = 5; i < 10; i ++) + { + verify(messages.get(i).getStoredMessage()).flowToDisk(); + verify(messages.get(i).getStoredMessage()).getContent(anyInt(), anyInt()); + } + + // last five messages should remain flowed to the disk + for (int i = 11; i < 15; i ++) + { + verify(messages.get(i).getStoredMessage()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } } @Test - public void testOverflowOnSecondMessage() throws Exception + public void overflowOnSecondMessage() { _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 10)); ServerMessage<?> message = createMessage(10L); @@ -95,7 +192,7 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase } @Test - public void testBytesOverflow() throws Exception + public void bytesOverflow() { _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 0)); ServerMessage<?> message = createMessage(1L); @@ -105,7 +202,7 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase } @Test - public void testMessagesOverflow() throws Exception + public void messagesOverflow() { _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 0)); ServerMessage<?> message = createMessage(1L); @@ -115,7 +212,7 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase } @Test - public void testNoOverflow() throws Exception + public void noOverflow() { _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 10)); _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 10)); @@ -125,6 +222,276 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase verify(storedMessage, never()).flowToDisk(); } + @Test + public void oneByOneDeletion() + { + _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5)); + + List<ServerMessage<?>> messages = new ArrayList<>(); + + for (int i = 0; i < 10; i ++) + { + messages.add(createMessage(10L)); + _queue.enqueue(messages.get(i), null, null); + } + + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + } + + for (int i = 5; i < 10; i ++) + { + verify(messages.get(i).getStoredMessage()).flowToDisk(); + } + + QueueEntryIterator it = _queue.queueEntryIterator(); + it.advance(); + _queue.deleteEntry(it.getNode()); + + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + } + verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt()); + verify(messages.get(6).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + verify(messages.get(7).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + verify(messages.get(8).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + verify(messages.get(9).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + + it.advance(); + _queue.deleteEntry(it.getNode()); + + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + } + verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt()); + verify(messages.get(6).getStoredMessage()).getContent(anyInt(), anyInt()); + verify(messages.get(7).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + verify(messages.get(8).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + verify(messages.get(9).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + + it.advance(); + _queue.deleteEntry(it.getNode()); + + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + } + verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt()); + verify(messages.get(6).getStoredMessage()).getContent(anyInt(), anyInt()); + verify(messages.get(7).getStoredMessage()).getContent(anyInt(), anyInt()); + verify(messages.get(8).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + verify(messages.get(9).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + + it.advance(); + _queue.deleteEntry(it.getNode()); + + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + } + verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt()); + verify(messages.get(6).getStoredMessage()).getContent(anyInt(), anyInt()); + verify(messages.get(7).getStoredMessage()).getContent(anyInt(), anyInt()); + verify(messages.get(8).getStoredMessage()).getContent(anyInt(), anyInt()); + verify(messages.get(9).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + + it.advance(); + _queue.deleteEntry(it.getNode()); + + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + } + verify(messages.get(5).getStoredMessage()).getContent(anyInt(), anyInt()); + verify(messages.get(6).getStoredMessage()).getContent(anyInt(), anyInt()); + verify(messages.get(7).getStoredMessage()).getContent(anyInt(), anyInt()); + verify(messages.get(8).getStoredMessage()).getContent(anyInt(), anyInt()); + verify(messages.get(9).getStoredMessage()).getContent(anyInt(), anyInt()); + } + + @Test + public void clearQueue() + { + _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5)); + + List<ServerMessage<?>> messages = new ArrayList<>(); + + for (int i = 0; i < 15; i ++) + { + messages.add(createMessage(10L)); + _queue.enqueue(messages.get(i), null, null); + } + assertEquals(15, _queue.getQueueDepthMessages()); + + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + for (int i = 5; i < 15; i ++) + { + verify(messages.get(i).getStoredMessage()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + long deleted = _queue.clearQueue(); + + assertEquals(15, deleted); + assertEquals(0, _queue.getQueueDepthMessages()); + + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + for (int i = 5; i < 15; i ++) + { + verify(messages.get(i).getStoredMessage()).flowToDisk(); + verify(messages.get(i).getStoredMessage()).getContent(anyInt(), anyInt()); + } + } + + /** + * Deletes messages 5-10 of 15 in the queue with the limit 5: + * + * o o o o o | _ _ _ _ _ | _ _ _ _ _ + * => + * o o o o o | x x x x x | _ _ _ _ _ + * => + * o o o o o | _ _ _ _ _ + */ + @Test + public void deleteMessagesAfterLimit() + { + _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5)); + + List<ServerMessage<?>> messages = new ArrayList<>(); + List<QueueEntry> entries = new ArrayList<>(); + + for (int i = 0; i < 15; i ++) + { + messages.add(createMessage(10L)); + _queue.enqueue(messages.get(i), null, null); + } + assertEquals(15, _queue.getQueueDepthMessages()); + + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + for (int i = 5; i < 15; i ++) + { + verify(messages.get(i).getStoredMessage()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + QueueEntryIterator it = _queue.queueEntryIterator(); + while (it.advance()) + { + entries.add(it.getNode()); + } + + for (int i = 5; i < 10; i ++) + { + _queue.deleteEntry(entries.get(i)); + } + + assertEquals(10, _queue.getQueueDepthMessages()); + + // first 5 messages shouldn't be either flowed to disk or restored in memory, they remain without changes + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + // last 5 messages should be first flowed to disk but never restored in memory + for (int i = 5; i < 10; i ++) + { + verify(messages.get(i).getStoredMessage()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + } + + /** + * Deletes messages 3-7 of 15 in the queue with the limit 5: + * + * o o o o o | _ _ _ _ _ | _ _ _ _ _ + * => + * o o o x x | x x x _ _ | _ _ _ _ _ + * => + * o o o o o | _ _ _ _ _ + */ + @Test + public void deleteMessagesAroundLimit() + { + _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 5)); + + List<ServerMessage<?>> messages = new ArrayList<>(); + List<QueueEntry> entries = new ArrayList<>(); + + for (int i = 0; i < 15; i ++) + { + messages.add(createMessage(10L)); + _queue.enqueue(messages.get(i), null, null); + } + assertEquals(15, _queue.getQueueDepthMessages()); + + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + for (int i = 5; i < 15; i ++) + { + verify(messages.get(i).getStoredMessage()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + QueueEntryIterator it = _queue.queueEntryIterator(); + while (it.advance()) + { + entries.add(it.getNode()); + } + + for (int i = 3; i < 8; i ++) + { + _queue.deleteEntry(entries.get(i)); + } + + assertEquals(10, _queue.getQueueDepthMessages()); + + // first 5 messages shouldn't be either flowed to disk or restored in memory + for (int i = 0; i < 5; i ++) + { + verify(messages.get(i).getStoredMessage(), never()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + + // messages 5-10 should be first flowed to disk and restored in memory + for (int i = 5; i < 10; i ++) + { + verify(messages.get(i).getStoredMessage()).flowToDisk(); + verify(messages.get(i).getStoredMessage()).getContent(anyInt(), anyInt()); + } + + // messages 5-10 should be flowed to disk and never restored in memory + for (int i = 11; i < 15; i ++) + { + verify(messages.get(i).getStoredMessage()).flowToDisk(); + verify(messages.get(i).getStoredMessage(), never()).getContent(anyInt(), anyInt()); + } + } + + @SuppressWarnings("rawtypes") private ServerMessage createMessage(long size) { ServerMessage message = mock(ServerMessage.class); @@ -133,9 +500,22 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase when(message.getValidationStatus()).thenReturn(ServerMessage.ValidationStatus.VALID); StoredMessage storedMessage = mock(StoredMessage.class); + _state.put(storedMessage, true); when(message.getStoredMessage()).thenReturn(storedMessage); - when(storedMessage.isInContentInMemory()).thenReturn(true); + when(storedMessage.isInContentInMemory()).thenAnswer(invocation -> _state.get(message.getStoredMessage())); when(storedMessage.getInMemorySize()).thenReturn(size); + when(storedMessage.flowToDisk()).thenAnswer(invocation -> + { + StoredMessage sm = (StoredMessage) invocation.getMock(); + _state.put(sm, false); + return true; + }); + when(storedMessage.getContent(anyInt(), anyInt())).thenAnswer(invocation -> + { + StoredMessage sm = (StoredMessage) invocation.getMock(); + _state.put(sm, true); + return QpidByteBuffer.allocate((int)size); + }); MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org