Author: dejanb
Date: Thu Jan 22 09:22:12 2009
New Revision: 736720
URL: http://svn.apache.org/viewvc?rev=736720&view=rev
Log:
Recovery dispatch refactoring as the part of the solution for the
https://issues.apache.org/activemq/browse/AMQ-2016
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=736720&r1=736719&r2=736720&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Jan 22 09:22:12 2009
@@ -233,16 +233,18 @@
}
}
- // any newly paged in messages that are not dispatched are added
to pagedInPending in iterate()
- doPageIn(false);
+ // do recovery dispatch only if it is a browser subscription
+ if(sub instanceof QueueBrowserSubscription ) {
+ // any newly paged in messages that are not dispatched are
added to pagedInPending in iterate()
+ doPageIn(false);
+
+ synchronized (pagedInMessages) {
+ RecoveryDispatch rd = new RecoveryDispatch();
+ rd.messages = new
ArrayList<QueueMessageReference>(pagedInMessages.values());
+ rd.subscription = sub;
+ recoveries.addLast(rd);
+ }
- synchronized (pagedInMessages) {
- RecoveryDispatch rd = new RecoveryDispatch();
- rd.messages = new
ArrayList<QueueMessageReference>(pagedInMessages.values());
- rd.subscription = sub;
- recoveries.addLast(rd);
- }
- if( sub instanceof QueueBrowserSubscription ) {
((QueueBrowserSubscription)sub).incrementQueueRef();
}
if (!(this.optimizedDispatch || isSlave())) {
@@ -303,9 +305,14 @@
doDispatch(list);
}
}
-
- if (consumers.isEmpty()) {
- messages.gc();
+ //if it is a last consumer (and not a browser) dispatch all
pagedIn messages
+ if (consumers.isEmpty() && !(sub instanceof
QueueBrowserSubscription)) {
+ List<QueueMessageReference> list = new
ArrayList<QueueMessageReference>();
+ for (QueueMessageReference ref :
pagedInMessages.values()) {
+ list.add(ref);
+ }
+ pagedInPendingDispatch.clear();
+ doDispatch(list);
}
if (!(this.optimizedDispatch || isSlave())) {
wakeup();
@@ -615,6 +622,7 @@
int count = 0;
List<Message> l = new ArrayList<Message>();
try {
+ pageInMessages(false);
synchronized (this.pagedInPendingDispatch) {
for (Iterator<QueueMessageReference> i =
this.pagedInPendingDispatch
.iterator(); i.hasNext()
@@ -657,7 +665,7 @@
}
}
}
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.error("Problem retrieving message in browse() ", e);
}
return l.toArray(new Message[l.size()]);
@@ -899,7 +907,7 @@
int movedCounter = 0;
Set<MessageReference> set = new
CopyOnWriteArraySet<MessageReference>();
do {
- pageInMessages();
+ doPageIn(true);
synchronized (pagedInMessages) {
set.addAll(pagedInMessages.values());
}
@@ -981,7 +989,7 @@
e.printStackTrace();
}
}
-
+
boolean pageInMoreMessages = false;
synchronized (messages) {
pageInMoreMessages = !messages.isEmpty();
@@ -1230,6 +1238,7 @@
pagedInPendingDispatch.add(qmr);
}
}
+ wakeup();
}
}
}
@@ -1268,11 +1277,16 @@
}
}
interestCount++;
+ } else {
+ // makes sure it gets dispatched again
+ if (!node.isDropped() &&
!((QueueMessageReference)node).isAcked() && (!node.isDropped() ||
s.getConsumerInfo().isBrowser())) {
+ interestCount++;
+ }
}
}
- if (target == null && interestCount>0) {
- // This means all subs were full...
+ if ((target == null && interestCount>0) || consumers.size() == 0) {
+ // This means all subs were full or that there are no
consumers...
rc.add((QueueMessageReference)node);
}
@@ -1288,10 +1302,6 @@
}
}
- //LOG.info(getName()+" Pending messages:");
- //for (MessageReference n : rc) {
- // LOG.info(getName()+" - " + n.getMessageId());
- // }
return rc;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java?rev=736720&r1=736719&r2=736720&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
Thu Jan 22 09:22:12 2009
@@ -70,7 +70,7 @@
if (result) {
result = exclusiveConsumer == null
|| exclusiveConsumer == subscription;
- if (result) {
+ if (result && !subscription.isFull()) {
QueueMessageReference node = (QueueMessageReference) m;
// Keep message groups together.
String groupId = node.getGroupID();