Author: rajdavies
Date: Tue Mar 20 13:40:03 2012
New Revision: 1302887
URL: http://svn.apache.org/viewvc?rev=1302887&view=rev
Log:
add property optimizeMessageStoreInFlightLimit on destinations so can set the
inflight limit above which optimize store dispatch is no longer applied - for
https://issues.apache.org/jira/browse/AMQ-3750
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1302887&r1=1302886&r2=1302887&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Tue Mar 20 13:40:03 2012
@@ -99,6 +99,10 @@ public abstract class BaseDestination im
protected final Scheduler scheduler;
private boolean disposed = false;
private boolean doOptimzeMessageStorage = true;
+ /*
+ * percentage of in-flight messages above which optimize message store is
disabled
+ */
+ private int optimizeMessageStoreInFlightLimit = 10;
/**
* @param brokerService
@@ -723,6 +727,14 @@ public abstract class BaseDestination im
this.doOptimzeMessageStorage = doOptimzeMessageStorage;
}
+ public int getOptimizeMessageStoreInFlightLimit() {
+ return optimizeMessageStoreInFlightLimit;
+ }
+
+ public void setOptimizeMessageStoreInFlightLimit(int
optimizeMessageStoreInFlightLimit) {
+ this.optimizeMessageStoreInFlightLimit =
optimizeMessageStoreInFlightLimit;
+ }
+
public abstract List<Subscription> getConsumers();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1302887&r1=1302886&r2=1302887&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Mar 20 13:40:03 2012
@@ -2164,7 +2164,7 @@ public class Queue extends BaseDestinati
result = false;
break;
}
- if (s.getInFlightUsage() > 10){
+ if (s.getInFlightUsage() >
getOptimizeMessageStoreInFlightLimit()){
result = false;
break;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1302887&r1=1302886&r2=1302887&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Tue Mar 20 13:40:03 2012
@@ -707,7 +707,7 @@ public class Topic extends BaseDestinati
result = false;
break;
}
- if (s.getInFlightUsage() > 10){
+ if (s.getInFlightUsage() >
getOptimizeMessageStoreInFlightLimit()){
result = false;
break;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1302887&r1=1302886&r2=1302887&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Tue Mar 20 13:40:03 2012
@@ -97,6 +97,10 @@ public class PolicyEntry extends Destina
private boolean reduceMemoryFootprint;
private NetworkBridgeFilterFactory networkBridgeFilterFactory;
private boolean doOptimzeMessageStorage = true;
+ /*
+ * percentage of in-flight messages above which optimize message store is
disabled
+ */
+ private int optimizeMessageStoreInFlightLimit = 10;
public void configure(Broker broker,Queue queue) {
@@ -173,6 +177,8 @@ public class PolicyEntry extends Destina
destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
+
destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
+
}
public void configure(Broker broker, SystemUsage memoryManager,
TopicSubscription subscription) {
@@ -842,4 +848,12 @@ public class PolicyEntry extends Destina
public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
this.doOptimzeMessageStorage = doOptimzeMessageStorage;
}
+
+ public int getOptimizeMessageStoreInFlightLimit() {
+ return optimizeMessageStoreInFlightLimit;
+ }
+
+ public void setOptimizeMessageStoreInFlightLimit(int
optimizeMessageStoreInFlightLimit) {
+ this.optimizeMessageStoreInFlightLimit =
optimizeMessageStoreInFlightLimit;
+ }
}