Author: rajdavies
Date: Tue Mar 20 13:40:03 2012
New Revision: 1302887

URL: http://svn.apache.org/viewvc?rev=1302887&view=rev
Log:
add property  optimizeMessageStoreInFlightLimit on destinations so can set the 
inflight limit above which optimize store dispatch is no longer applied - for 
https://issues.apache.org/jira/browse/AMQ-3750

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1302887&r1=1302886&r2=1302887&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 Tue Mar 20 13:40:03 2012
@@ -99,6 +99,10 @@ public abstract class BaseDestination im
     protected final Scheduler scheduler;
     private boolean disposed = false;
     private boolean doOptimzeMessageStorage = true;
+    /*
+     * percentage of in-flight messages above which optimize message store is 
disabled
+     */
+    private int optimizeMessageStoreInFlightLimit = 10;
 
     /**
      * @param brokerService
@@ -723,6 +727,14 @@ public abstract class BaseDestination im
         this.doOptimzeMessageStorage = doOptimzeMessageStorage;
     }
 
+    public int getOptimizeMessageStoreInFlightLimit() {
+        return optimizeMessageStoreInFlightLimit;
+    }
+
+    public void setOptimizeMessageStoreInFlightLimit(int 
optimizeMessageStoreInFlightLimit) {
+        this.optimizeMessageStoreInFlightLimit = 
optimizeMessageStoreInFlightLimit;
+    }
+
 
     public abstract List<Subscription> getConsumers();
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1302887&r1=1302886&r2=1302887&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Tue Mar 20 13:40:03 2012
@@ -2164,7 +2164,7 @@ public class Queue extends BaseDestinati
                             result = false;
                             break;
                         }
-                        if (s.getInFlightUsage() > 10){
+                        if (s.getInFlightUsage() > 
getOptimizeMessageStoreInFlightLimit()){
                             result = false;
                             break;
                         }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1302887&r1=1302886&r2=1302887&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 Tue Mar 20 13:40:03 2012
@@ -707,7 +707,7 @@ public class Topic extends BaseDestinati
                         result = false;
                         break;
                     }
-                    if (s.getInFlightUsage() > 10){
+                    if (s.getInFlightUsage() > 
getOptimizeMessageStoreInFlightLimit()){
                         result = false;
                         break;
                     }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1302887&r1=1302886&r2=1302887&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
 Tue Mar 20 13:40:03 2012
@@ -97,6 +97,10 @@ public class PolicyEntry extends Destina
     private boolean reduceMemoryFootprint;
     private NetworkBridgeFilterFactory networkBridgeFilterFactory;
     private boolean doOptimzeMessageStorage = true;
+    /*
+     * percentage of in-flight messages above which optimize message store is 
disabled
+     */
+    private int optimizeMessageStoreInFlightLimit = 10;
 
 
     public void configure(Broker broker,Queue queue) {
@@ -173,6 +177,8 @@ public class PolicyEntry extends Destina
         destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
         destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
         destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
+        
destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
+
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, 
TopicSubscription subscription) {
@@ -842,4 +848,12 @@ public class PolicyEntry extends Destina
     public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
         this.doOptimzeMessageStorage = doOptimzeMessageStorage;
     }
+
+    public int getOptimizeMessageStoreInFlightLimit() {
+        return optimizeMessageStoreInFlightLimit;
+    }
+
+    public void setOptimizeMessageStoreInFlightLimit(int 
optimizeMessageStoreInFlightLimit) {
+        this.optimizeMessageStoreInFlightLimit = 
optimizeMessageStoreInFlightLimit;
+    }
 }


Reply via email to