Hi Rob,
I like the new changes, but with the changes as they are, for
my application for one of my benchmarks, it takes twice as
long to complete.
I believe the culprit for this is that when the new code can't
find a consumer which is not full, the broker chooses the
consumer with the lowest dispatch queue size.
In my application, since I have a prefetch size of 1, and have
longish-running transactions, the dispatch queue size is not
indicative of the current load for that consumer. As a
result, I think this is what is responsible for poor load-
balancing in my case.
For applications which commit() after each processed message,
I am sure this wouldn't be the case. In some ways, reverting
to the old behaviour of adding the pending message to all
consumers might lead to better load balancing with this code.
However - I think it is better if the consumers can decide
when they want more messages rather than the broker pushing
messages at them? I've attached a patch which demonstrates
this. When LAZY_DISPATCH is set to true (set via a system
property for now for testing purposes) this changes the
behaviour slightly.
The basic idea is pageInMessages() only pages in the minimum
number of messages that can be dispatched immediately to non-
full consumers. Whenever a consumer acks a message, which
updates its prefetch size, we make sure Queue.wakeup() is
called so that the consumer will receive new messages.
With this change in effect - I see slightly faster or almost
the same times with the previous benchmark. However memory
usage on the broker is far better, as the pending queues for
each consumer is either 0 or very small.
What do you think? I guess there are better ways of doing this.
I am doing a large overnight run with 16 consumers, so we'll
see how the performance goes.
You'll also notice in the patch, that in
Queue.addSubscriber(), I thought there didn't seem to be any
need for adding a message to a new consumer if the message has
already been locked by another consumer?
Cheers,
David
Rob Davies wrote:
Hi David,
please let us know if these changes helps/hinders your app!
cheers,
Rob
On 19 Feb 2008, at 08:32, David Sitsky wrote:
If what I said above is true, then the immediately above
if statement needs to be moved outside its enclosing if -
otherwise it only gets executed when targets != null.
We'd want this to execute if we found a matching target
wouldn't we?
Don't think so? We only want the message going to one
subscription? I may have misunderstood what you mean!
Yes - ignore what I said, I had my wires crossed.
Cheers,
David
--
Cheers,
David
Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2
9280 0699
Web: http://www.nuix.com Fax: +61 2
9212 6902
Index: activemq-core/src/main/java/org/apache/activemq/broker/
region/PrefetchSubscription.java
=
=
=
================================================================
--- activemq-core/src/main/java/org/apache/activemq/broker/
region/PrefetchSubscription.java (revision 628917)
+++ activemq-core/src/main/java/org/apache/activemq/broker/
region/PrefetchSubscription.java (working copy)
@@ -160,6 +160,8 @@
public void acknowledge(final ConnectionContext
context,final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case.
boolean callDispatchMatched = false;
+ Queue queue = null;
+ synchronized(dispatchLock) {
if (ack.isStandardAck()) {
// Acknowledge all dispatched messages up till
the message id of
@@ -223,8 +225,12 @@
prefetchExtension = Math.max(0,
prefetchExtension -
(index + 1));
}
+ if (queue == null)
+ {
+ queue = (Queue)node.getRegionDestination();
+ }
callDispatchMatched = true;
- break;
+ break;
}
}
}
@@ -253,6 +259,10 @@
if
(ack.getLastMessageId().equals(node.getMessageId())) {
prefetchExtension =
Math.max(prefetchExtension,
index + 1);
+ if (queue == null)
+ {
+ queue =
(Queue)node.getRegionDestination();
+ }
callDispatchMatched = true;
break;
}
@@ -279,6 +289,10 @@
if (inAckRange) {
node.incrementRedeliveryCounter();
if
(ack.getLastMessageId().equals(messageId)) {
+ if (queue == null)
+ {
+ queue = (Queue)node.getRegionDestination();
+ }
callDispatchMatched = true;
break;
}
@@ -320,6 +334,10 @@
if
(ack.getLastMessageId().equals(messageId)) {
prefetchExtension = Math.max(0,
prefetchExtension
- (index + 1));
+ if (queue == null)
+ {
+ queue = (Queue)node.getRegionDestination();
+ }
callDispatchMatched = true;
break;
}
@@ -336,6 +354,9 @@
}
}
if (callDispatchMatched) {
+ if (Queue.LAZY_DISPATCH) {
+ queue.wakeup();
+ }
dispatchPending();
} else {
if (isSlave()) {
Index: activemq-core/src/main/java/org/apache/activemq/broker/
region/Queue.java
=
=
=
================================================================
--- activemq-core/src/main/java/org/apache/activemq/broker/
region/Queue.java (revision 628917)
+++ activemq-core/src/main/java/org/apache/activemq/broker/
region/Queue.java (working copy)
@@ -75,6 +75,8 @@
* @version $Revision: 1.28 $
*/
public class Queue extends BaseDestination implements Task {
+ public static final boolean LAZY_DISPATCH =
+
Boolean
.parseBoolean(System.getProperty("activemq.lazy.dispatch",
"true"));
private final Log log;
private final List<Subscription> consumers = new
ArrayList<Subscription>(50);
private PendingMessageCursor messages;
@@ -212,12 +214,12 @@
synchronized (pagedInMessages) {
// Add all the matching messages in the queue to
the
// subscription.
-
for (Iterator<MessageReference> i =
pagedInMessages.values()
.iterator(); i.hasNext();) {
QueueMessageReference node =
(QueueMessageReference) i
.next();
- if (!node.isDropped() && !node.isAcked()
&& (!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
+ if ((!node.isDropped() ||
sub.getConsumerInfo().isBrowser()) && !node.isAcked() &&
+ node.getLockOwner() == null) {
msgContext.setMessageReference(node);
if (sub.matches(node, msgContext)) {
sub.add(node);
@@ -940,7 +945,11 @@
dispatchLock.lock();
try{
- final int toPageIn = getMaxPageSize() -
pagedInMessages.size();
+ int toPageIn = getMaxPageSize() -
pagedInMessages.size();
+ if (LAZY_DISPATCH) {
+ // Only page in the minimum number of messages which
can be dispatched immediately.
+ toPageIn =
Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
+ }
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
messages.setMaxBatchSize(toPageIn);
int count = 0;
@@ -976,12 +985,25 @@
}
return result;
}
+
+ private int getConsumerMessageCountBeforeFull() throws
Exception {
+ int total = 0;
+ synchronized (consumers) {
+ for (Subscription s : consumers) {
+ if (s instanceof PrefetchSubscription) {
+ total +=
((PrefetchSubscription)s).countBeforeFull();
+ }
+ }
+ }
+ return total;
+ }
private void doDispatch(List<MessageReference> list) throws
Exception {
if (list != null) {
synchronized (consumers) {
for (MessageReference node : list) {
+
Subscription target = null;
List<Subscription> targets = null;
for (Subscription s : consumers) {