[
https://issues.apache.org/jira/browse/AMQ-3312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timothy Bish resolved AMQ-3312.
-------------------------------
Resolution: Fixed
Fix Version/s: 5.6.0
Assignee: Timothy Bish
Fixed, patch applied along with fixing the other two methods that had this same
issue. Test updated to cover all three.
> Queue's moveMatchingMessagesTo method is extremely slow to the point of being
> unusable as Queue size increases
> --------------------------------------------------------------------------------------------------------------
>
> Key: AMQ-3312
> URL: https://issues.apache.org/jira/browse/AMQ-3312
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.5.0
> Reporter: Stirling Chow
> Assignee: Timothy Bish
> Fix For: 5.6.0
>
> Attachments: AMQ3312.patch, LargeQueueSparseDeleteTest.java
>
>
> Symptom
> =======
> We have a system based on ActiveMQ that stores messages in a non-peristent
> queue. Frequently, we have to move a specific message from this queue to
> another queue. The message to be moved may be anywhere in the queue, and is
> identified by a selector on a custom JMS integer property.
> To facilitate the selection and move, we use
> org.apache.activemq.broker.region.Queue#moveMatchingMessagesTo(ConnectionContext
> context, String selector, ActiveMQDestination dest)
> We've found that once our queue grows past 10K messages, moving a single
> message takes over 10s. When the queue grows past 20K messages, a move takes
> 70s. It's clear from testing that the time to move a message grows
> exponentially as the queue size increases, to the point that
> moveMatchingMessagesTo becomes unusable.
> Cause
> =====
> AMQ 5.5.0 has this implementation for moveMatchingMessagesTo:
> {code:title=Queue#moveMatchingMessagesTo(ConnectionContext context,
> MessageReferenceFilter filter}
> public int moveMatchingMessagesTo(ConnectionContext context,
> MessageReferenceFilter filter,
> ActiveMQDestination dest, int maximumMessages) throws Exception {
> int movedCounter = 0;
> Set<QueueMessageReference> set = new
> CopyOnWriteArraySet<QueueMessageReference>();
> do {
> doPageIn(true);
> pagedInMessagesLock.readLock().lock();
> try{
> set.addAll(pagedInMessages.values());
> }finally {
> pagedInMessagesLock.readLock().unlock();
> }
> List<QueueMessageReference> list = new
> ArrayList<QueueMessageReference>(set);
> for (QueueMessageReference ref : list) {
> if (filter.evaluate(context, ref)) {
> // We should only move messages that can be locked.
> moveMessageTo(context, ref, dest);
> set.remove(ref);
> if (++movedCounter >= maximumMessages && maximumMessages
> > 0) {
> return movedCounter;
> }
> }
> }
> } while (set.size() <
> this.destinationStatistics.getMessages().getCount() && set.size() <
> maximumMessages);
> return movedCounter;
> }
> {code}
> In the context that we use, maximumMessages is Integer.MAXINT:
> {code:title=moveMatchingMessagesTo(ConnectionContext context, String
> selector, ActiveMQDestination dest)}
> public int moveMatchingMessagesTo(ConnectionContext context, String
> selector, ActiveMQDestination dest)
> throws Exception {
> return moveMatchingMessagesTo(context, selector, dest,
> Integer.MAX_VALUE);
> }
> {code}
> Since moveMatchingMessagesTo instantiates the set variable as a
> CopyOnWriteArraySet, each doPageIn loop creates a new array, copies the
> existing set members, and then linearly scans the array for the insertion.
> The result is that moveMatchingMessagesTo is an O(n^2) algorithm with respect
> to message copying, where n is the size of the queue.
> Solution
> ========
> set is scoped to a single call of moveMatchingMessagesTo, and is only
> accessed by a single thread, so there is no benefit to using
> CopyOnWriteArraySet. Simply changing set to a HashSet prevents the need for
> the doPageIn loop to copy the set on each iteration, and insertion becomes an
> O(1) operation.
> Attached is a unit test that demonstrates how moving the last message from a
> queue of 10K messages takes 8s (on our machine). Included is a patch Queue
> that changes set from a CopyOnWriteArraySet to a HashSet; with this patch,
> the same unit test completes in under 200ms.
--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira