Hi Rob,
I changed the condition for when to check the "trash" list to:
if (count > 0 || trash.size() > 1000)
this gave much better performance for a range of application data. I've
re-attached the patch again to avoid confusion. As I said before - the
broker consumes far less CPU now than before, so I am able to add a lot
more consumers now.
Any thoughts? Are there better ways of implementing this?
Cheers,
David
David Sitsky wrote:
Hi Rob,
I was using a version that did have you most recent changes.
To give you a better idea of what I meant, I hacked up some changes
which you can see from the attached patch.
The idea is instead of going through the pending list performing the
same computations over and over again on messages which have been
already been handled by other subscriptions, to move them to another list.
For a particular run, this reduced my application run-time from 47
minutes to 38 minutes.
I'm sure there are better ways of implementing this - but do you see
what I mean?
Cheers,
David
Rob Davies wrote:
David,
which release are you working on ? There was a change last night in
Queue's that might affect the cpu usage.
On Feb 8, 2008, at 5:11 PM, David Sitsky wrote:
In my application, I have noticed with 20 consumers, the broker's CPU
is going through the roof, with many threads in
PrefetchSubscription.dispatchPending(). With my consumers, it might
be 500-1000 messages dispatched before a commit() can be called.
With 20 consumers, this means there can be a build-up of 20,000
uncommited messages lying around the system, let-alone the new
messages which are being pumped into the system at a furious rate.
Not nice I know, but I don't have much choice about it at the moment,
for application-specific reasons.
As you can imagine, I can have some very big pending queue sizes -
sometimes 100,000 in size.
I am experimenting with different prefetch sizes which may help, but
I suspect every time a prefetch thread is trying to dispatch a
message, it might have to iterate through very large numbers of
deleted messages or messages which have been claimed by other
subscribers before it finds a matching message. Multiply this by 20,
and there is a lot of CPU being consumed. This worries me for
scalability reasons - if I want to keep bumping up the number of
consumers.
I'm not sure what the best way of improving this is... is it possible
when we call dispatchPending() to not call
pendingMessageCursor.reset() perhaps?
reset() is a nop for the QueueStoreCursor :(
I'm trying to understand why we need to reset the cursor, when
presumably all off the messages we have gone over before in a
previous dispatchPending() call are either deleted, dispatched or
locked by another node, and therefore don't need to be checked again
(or we check if we reach the end of the cursor list)?
I
I realise if a transaction is rolled back, that a message that was
previously locked by another consumer may be freed. There are
probably message ordering isues too.
Is it possible when we are iterating through the cursor if we find a
node locked by another consumer to perhaps move it to the end of the
cursor (or another list) and check it only if we found no matches?
I'm sure there are a lot of complexities here I am not aware of - but
I am curious what others think.
Doing this sort of chance should reduce the latencies and CPU usage
of the broker significantly.
Cheers,
David
--
Cheers,
David
Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
Web: http://www.nuix.com Fax: +61 2 9212 6902
Index:
activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
===================================================================
---
activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(revision 619666)
+++
activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(working copy)
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -54,6 +55,7 @@
private static final Log LOG =
LogFactory.getLog(PrefetchSubscription.class);
protected PendingMessageCursor pending;
+ protected List<MessageReference> trash = new
LinkedList<MessageReference>();
protected final List<MessageReference> dispatched = new
CopyOnWriteArrayList<MessageReference>();
protected int prefetchExtension;
protected long enqueueCounter;
@@ -439,22 +449,41 @@
protected void dispatchPending() throws IOException {
if (!isSlave()) {
+ int count = 0;
synchronized(pendingLock) {
try {
int numberToDispatch = countBeforeFull();
if (numberToDispatch > 0) {
pending.setMaxBatchSize(numberToDispatch);
- int count = 0;
pending.reset();
while (pending.hasNext() && !isFull()
&& count < numberToDispatch) {
MessageReference node = pending.next();
+ LockOwner lockOwner;
if (node == null) {
break;
}
if(isDropped(node)) {
pending.remove();
}
+ else if (node instanceof QueueMessageReference &&
+ (((QueueMessageReference)node).isAcked()))
+ {
+ // Message has been acked. Move it to the
trash, since it
+ // is unlikely to be dispatched to this
subscription.
+ pending.remove();
+ trash.add(node);
+ }
+ else if (node instanceof IndirectMessageReference
&&
+ (lockOwner =
((IndirectMessageReference)node).getLockOwner()) != null &&
+ lockOwner != this)
+ {
+ // Message which has been locked by another
subscription.
+ // Move it to the trash, since it is unlikely
to be
+ // dispatched to this subscription.
+ pending.remove();
+ trash.add(node);
+ }
else if (canDispatch(node)) {
pending.remove();
// Message may have been sitting in the pending
@@ -475,7 +504,40 @@
} finally {
pending.release();
}
- }
+
+ // Check if any trash can be cleaned up or some messages need
to be placed
+ // back into the pending list.
+ if (count > 0 || trash.size() > 1000)
+ {
+ for (Iterator<MessageReference> iter = trash.iterator();
iter.hasNext();)
+ {
+ MessageReference node = iter.next();
+ if (isDropped(node))
+ {
+ // Message has been deleted, so it can be removed.
+ iter.remove();
+ }
+ else if ((node instanceof QueueMessageReference &&
+ !((QueueMessageReference) node).isAcked()) ||
+ (node instanceof IndirectMessageReference &&
+ ((IndirectMessageReference)
node).getLockOwner() == null))
+ {
+ // Message is no longer acked or it is not locked
by anyone
+ // probably due to a rolledback transaction.
Re-inject it into
+ // the pending list again. This shouldn't be very
common.
+ try
+ {
+ pending.addMessageLast(node);
+ iter.remove();
+ }
+ catch (Exception e)
+ {
+ throw new IOException("Unable to add message to
pending list", e);
+ }
+ }
+ }
+ }
+ }
}
}