Author: gtully
Date: Mon Sep 7 16:08:06 2009
New Revision: 812214
URL: http://svn.apache.org/viewvc?rev=812214&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-1112 - resolve issues with
optimizedDispatch, no need for message expiry to respect optimizeDispatch -
iterate can always use the task runner in the expiry case, similar to usage
change
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=812214&r1=812213&r2=812214&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Sep 7 16:08:06 2009
@@ -1266,7 +1266,7 @@
} catch (IOException e) {
LOG.error("Failed to remove expired Message from the store ",e);
}
- wakeup();
+ asyncWakeup();
}
protected ConnectionContext createConnectionContext() {
@@ -1297,15 +1297,18 @@
public void wakeup() {
if (optimizedDispatch || isSlave()) {
iterate();
- }else {
- try {
- taskRunner.wakeup();
- } catch (InterruptedException e) {
- LOG.warn("Task Runner failed to wakeup ", e);
- }
+ } else {
+ asyncWakeup();
}
}
+ public void asyncWakeup() {
+ try {
+ this.taskRunner.wakeup();
+ } catch (InterruptedException e) {
+ LOG.warn("Async task tunner failed to wakeup ", e);
+ }
+ }
private boolean isSlave() {
return broker.getBrokerService().isSlave();
@@ -1371,6 +1374,7 @@
}
private void doDispatch(List<QueueMessageReference> list) throws Exception
{
+ boolean doWakeUp = false;
synchronized(dispatchMutex) {
synchronized (pagedInPendingDispatch) {
@@ -1391,11 +1395,14 @@
pagedInPendingDispatch.add(qmr);
}
}
- wakeup();
+ doWakeUp = true;
}
}
}
}
+ if (doWakeUp) {
+ wakeup();
+ }
}
/**
@@ -1603,11 +1610,7 @@
public void onUsageChanged(Usage usage, int oldPercentUsage, int
newPercentUsage) {
if (oldPercentUsage > newPercentUsage) {
- try {
- this.taskRunner.wakeup();
- } catch (InterruptedException e) {
- LOG.warn(getName() + " failed to wakeup task runner on
usageChange: " + e);
- }
+ asyncWakeup();
}
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=812214&r1=812213&r2=812214&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Mon Sep 7 16:08:06 2009
@@ -76,7 +76,7 @@
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
// TODO Optimize dispatch makes this test hang
- //defaultEntry.setOptimizedDispatch(true);
+ defaultEntry.setOptimizedDispatch(true);
defaultEntry.setExpireMessagesPeriod(100);
defaultEntry.setMaxExpirePageSize(800);