Author: chirino
Date: Wed Mar 19 09:29:26 2008
New Revision: 638924
URL: http://svn.apache.org/viewvc?rev=638924&view=rev
Log:
Do the inital recovery dispatch in the iterate() thread so that the
addSubscription() operation on the Queue executes quickly.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=638924&r1=638923&r2=638924&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Wed Mar 19 09:29:26 2008
@@ -265,7 +265,7 @@
}
if (info.isBrowser()) {
- ((QueueBrowserSubscription)sub).browseDone();
+ ((QueueBrowserSubscription)sub).destinationsAdded();
}
return sub;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=638924&r1=638923&r2=638924&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Mar 19 09:29:26 2008
@@ -99,6 +99,7 @@
wakeup();
}
};
+
private static final Comparator<Subscription>orderedCompare = new
Comparator<Subscription>() {
public int compare(Subscription s1, Subscription s2) {
@@ -195,14 +196,19 @@
}
}
+ class RecoveryDispatch {
+ public ArrayList<QueueMessageReference> messages;
+ public Subscription subscription;
+ }
+ LinkedList<RecoveryDispatch> recoveries = new
LinkedList<RecoveryDispatch>();
public void addSubscription(ConnectionContext context, Subscription sub)
throws Exception {
dispatchLock.lock();
try {
sub.add(context, this);
destinationStatistics.getConsumers().increment();
- MessageEvaluationContext msgContext = new
NonCachedMessageEvaluationContext();
+// MessageEvaluationContext msgContext = new
NonCachedMessageEvaluationContext();
// needs to be synchronized - so no contention with dispatching
synchronized (consumers) {
@@ -223,21 +229,33 @@
// duplicates
// etc.
doPageIn(false);
- msgContext.setDestination(destination);
+// msgContext.setDestination(destination);
+
synchronized (pagedInMessages) {
- // Add all the matching messages in the queue to the
- // subscription.
-
- for (QueueMessageReference node:pagedInMessages.values()){
- if (!node.isDropped() && !node.isAcked() &&
(!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
- msgContext.setMessageReference(node);
- if (sub.matches(node, msgContext)) {
- sub.add(node);
- }
- }
- }
-
- }
+ RecoveryDispatch rd = new RecoveryDispatch();
+ rd.messages = new
ArrayList<QueueMessageReference>(pagedInMessages.values());
+ rd.subscription = sub;
+ recoveries.addLast(rd);
+ }
+
+ if( sub instanceof QueueBrowserSubscription ) {
+ ((QueueBrowserSubscription)sub).incrementQueueRef();
+ }
+
+// System.out.println(new Date()+": Locked pagedInMessages:
"+sub.getConsumerInfo().getConsumerId());
+// // Add all the matching messages in the queue to the
+// // subscription.
+//
+// for (QueueMessageReference node:pagedInMessages.values()){
+// if (!node.isDropped() && !node.isAcked() &&
(!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
+// msgContext.setMessageReference(node);
+// if (sub.matches(node, msgContext)) {
+// sub.add(node);
+// }
+// }
+// }
+//
+// }
wakeup();
}finally {
dispatchLock.unlock();
@@ -880,16 +898,56 @@
} while (count < this.destinationStatistics.getMessages().getCount());
return movedCounter;
}
+
+ RecoveryDispatch getNextRecoveryDispatch() {
+ synchronized (pagedInMessages) {
+ if( recoveries.isEmpty() ) {
+ return null;
+ }
+ return recoveries.removeFirst();
+ }
+
+ }
+ protected boolean isRecoveryDispatchEmpty() {
+ synchronized (pagedInMessages) {
+ return recoveries.isEmpty();
+ }
+ }
/**
* @return true if we would like to iterate again
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate() {
+
+ RecoveryDispatch rd;
+ while ((rd = getNextRecoveryDispatch()) != null) {
+ try {
+ MessageEvaluationContext msgContext = new
NonCachedMessageEvaluationContext();
+ msgContext.setDestination(destination);
+
+ for (QueueMessageReference node : rd.messages) {
+ if (!node.isDropped() && !node.isAcked() &&
(!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
+ msgContext.setMessageReference(node);
+ if (rd.subscription.matches(node, msgContext)) {
+ rd.subscription.add(node);
+ }
+ }
+ }
+
+ if( rd.subscription instanceof QueueBrowserSubscription ) {
+
((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
boolean result = false;
synchronized (messages) {
result = !messages.isEmpty();
- }
+ }
+
if (result) {
try {
pageInMessages(false);
@@ -1027,46 +1085,53 @@
private void doDispatch(List<QueueMessageReference> list) throws Exception
{
if (list != null) {
- synchronized (consumers) {
- for (MessageReference node : list) {
- Subscription target = null;
- List<Subscription> targets = null;
- for (Subscription s : consumers) {
- if (dispatchSelector.canSelect(s, node)) {
- if (!s.isFull()) {
- s.add(node);
- node.incrementReferenceCount();
- target = s;
- break;
- } else {
- if (targets == null) {
- targets = new ArrayList<Subscription>();
- }
- targets.add(s);
+ List<Subscription> consumers;
+ synchronized (this.consumers) {
+ consumers = new ArrayList<Subscription>(this.consumers);
+ }
+
+ for (MessageReference node : list) {
+ Subscription target = null;
+ List<Subscription> targets = null;
+ for (Subscription s : consumers) {
+ if (dispatchSelector.canSelect(s, node)) {
+ if (!s.isFull()) {
+ s.add(node);
+ node.incrementReferenceCount();
+ target = s;
+ break;
+ } else {
+ if (targets == null) {
+ targets = new ArrayList<Subscription>();
}
+ targets.add(s);
}
}
- if (target == null && targets != null) {
- // pick the least loaded to add the message too
- for (Subscription s : targets) {
- if (target == null
- || target.getInFlightUsage() > s
- .getInFlightUsage()) {
- target = s;
- }
- }
- if (target != null) {
- target.add(node);
- node.incrementReferenceCount();
+ }
+ if (target == null && targets != null) {
+ // pick the least loaded to add the message too
+ for (Subscription s : targets) {
+ if (target == null
+ || target.getInFlightUsage() > s
+ .getInFlightUsage()) {
+ target = s;
}
}
- if (target != null && !strictOrderDispatch &&
consumers.size() > 1 &&
- !dispatchSelector.isExclusiveConsumer(target)) {
- removeFromConsumerList(target);
- addToConsumerList(target);
+ if (target != null) {
+ target.add(node);
+ node.incrementReferenceCount();
}
-
}
+ if (target != null && !strictOrderDispatch && consumers.size()
> 1 &&
+ !dispatchSelector.isExclusiveConsumer(target)) {
+ synchronized (this.consumers) {
+ if( removeFromConsumerList(target) ) {
+ addToConsumerList(target);
+ consumers = new
ArrayList<Subscription>(this.consumers);
+ }
+ }
+ }
+
}
}
}
@@ -1094,8 +1159,8 @@
}
}
- private void removeFromConsumerList(Subscription sub) {
- consumers.remove(sub);
+ private boolean removeFromConsumerList(Subscription sub) {
+ return consumers.remove(sub);
}
private int getConsumerMessageCountBeforeFull() throws Exception {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=638924&r1=638923&r2=638924&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
Wed Mar 19 09:29:26 2008
@@ -29,7 +29,9 @@
public class QueueBrowserSubscription extends QueueSubscription {
+ int queueRefs;
boolean browseDone;
+ boolean destinationsAdded;
public QueueBrowserSubscription(Broker broker,Destination destination,
SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
@@ -46,9 +48,16 @@
+ this.prefetchExtension + ", pending=" + getPendingQueueSize();
}
- public void browseDone() throws Exception {
- browseDone = true;
- add(QueueMessageReference.NULL_MESSAGE);
+ synchronized public void destinationsAdded() throws Exception {
+ destinationsAdded = true;
+ checkDone();
+ }
+
+ private void checkDone() throws Exception {
+ if( !browseDone && queueRefs == 0 && destinationsAdded) {
+ browseDone=true;
+ add(QueueMessageReference.NULL_MESSAGE);
+ }
}
public boolean matches(MessageReference node, MessageEvaluationContext
context) throws IOException {
@@ -60,6 +69,15 @@
*/
protected void acknowledge(ConnectionContext context, final MessageAck
ack, final MessageReference n)
throws IOException {
+ }
+
+ synchronized public void incrementQueueRef() {
+ queueRefs++;
+ }
+
+ synchronized public void decrementQueueRef() throws Exception {
+ queueRefs--;
+ checkDone();
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=638924&r1=638923&r2=638924&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
Wed Mar 19 09:29:26 2008
@@ -87,7 +87,7 @@
log.error("Failed to page in more queue messages ", e);
}
}
- if (!messagesWaitingForSpace.isEmpty()) {
+ if (!messagesWaitingForSpace.isEmpty() || !isRecoveryDispatchEmpty()) {
try {
taskRunner.wakeup();
} catch (InterruptedException e) {