Queue's moveMatchingMessagesTo method is extremely slow to the point of being
unusuable as Queue size increases
---------------------------------------------------------------------------------------------------------------
Key: AMQ-3312
URL: https://issues.apache.org/jira/browse/AMQ-3312
Project: ActiveMQ
Issue Type: Bug
Components: Broker
Affects Versions: 5.5.0
Reporter: Stirling Chow
Symptom
=======
We have a system based on ActiveMQ that stores messages in a non-peristent
queue. Frequently, we have to move a specific message from this queue to
another queue. The message to be moved may be anywhere in the queue, and is
identified by a selector on a custom JMS integer property.
To facilitate the selection and move, we use
org.apache.activemq.broker.region.Queue#moveMatchingMessagesTo(ConnectionContext
context, String selector, ActiveMQDestination dest)
We've found that once our queue grows past 10K messages, moving a single
message takes over 10s. When the queue grows past 20K messages, a move takes
70s. It's clear from testing that the time to move a message grows
exponentially as the queue size increases, to the point that
moveMatchingMessagesTo becomes unusable.
Cause
=====
AMQ 5.5.0 has this implementation for moveMatchingMessagesTo:
{code:title=Queue#moveMatchingMessagesTo(ConnectionContext context,
MessageReferenceFilter filter}
public int moveMatchingMessagesTo(ConnectionContext context,
MessageReferenceFilter filter,
ActiveMQDestination dest, int maximumMessages) throws Exception {
int movedCounter = 0;
Set<QueueMessageReference> set = new
CopyOnWriteArraySet<QueueMessageReference>();
do {
doPageIn(true);
pagedInMessagesLock.readLock().lock();
try{
set.addAll(pagedInMessages.values());
}finally {
pagedInMessagesLock.readLock().unlock();
}
List<QueueMessageReference> list = new
ArrayList<QueueMessageReference>(set);
for (QueueMessageReference ref : list) {
if (filter.evaluate(context, ref)) {
// We should only move messages that can be locked.
moveMessageTo(context, ref, dest);
set.remove(ref);
if (++movedCounter >= maximumMessages && maximumMessages >
0) {
return movedCounter;
}
}
}
} while (set.size() <
this.destinationStatistics.getMessages().getCount() && set.size() <
maximumMessages);
return movedCounter;
}
{code}
In the context that we use, maximumMessages is Integer.MAXINT:
{code:title=moveMatchingMessagesTo(ConnectionContext context, String selector,
ActiveMQDestination dest)}
public int moveMatchingMessagesTo(ConnectionContext context, String
selector, ActiveMQDestination dest)
throws Exception {
return moveMatchingMessagesTo(context, selector, dest,
Integer.MAX_VALUE);
}
{code}
Since moveMatchingMessagesTo instantiates the set variable as a
CopyOnWriteArraySet, each doPageIn loop creates a new array, copies the
existing set members, and then linearly scans the array for the insertion. The
result is that moveMatchingMessagesTo is an O(n^2) algorithm with respect to
message copying, where n is the size of the queue.
Solution
========
set is scoped to a single call of moveMatchingMessagesTo, and is only accessed
by a single thread, so there is no benefit to using CopyOnWriteArraySet.
Simply changing set to a HashSet prevents the need for the doPageIn loop to
copy the set on each iteration, and insertion becomes an O(1) operation.
Attached is a unit test that demonstrates how moving the last message from a
queue of 10K messages takes 8s (on our machine). Included is a patch Queue
that changes set from a CopyOnWriteArraySet to a HashSet; with this patch, the
same unit test completes in under 200ms.
--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira