Author: chirino
Date: Tue Mar 21 08:08:40 2006
New Revision: 387562

URL: http://svn.apache.org/viewcvs?rev=387562&view=rev
Log:
removed the preLoadLimit logic since it was causing the 
RoundRobingDispatchPolicyTests to fail intermitently.  The preLoadLimit was an 
additional prefetch limit, and when it kicked in, 
it would cause the round robin distribution to stop sending round robin.

Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=387562&r1=387561&r2=387562&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 Tue Mar 21 08:08:40 2006
@@ -46,8 +46,6 @@
     final protected LinkedList dispatched=new LinkedList();
     
     protected int prefetchExtension=0;
-    int preLoadLimit=1024*100;
-    int preLoadSize=0;
     boolean dispatching=false;
     
     long enqueueCounter;
@@ -65,6 +63,8 @@
             dispatch(node);
         }else{
             synchronized(pending){
+                if( pending.isEmpty() )
+                    log.info("Prefetch limit.");
                 pending.addLast(node);
             }
         }
@@ -79,7 +79,6 @@
                     try{
                         MessageDispatch 
md=createMessageDispatch(node,node.getMessage());
                         dispatched.addLast(node);
-                        incrementPreloadSize(node.getSize());
                         node.decrementReferenceCount();
                     }catch(Exception e){
                         log.error("Problem processing 
MessageDispatchNotification: "+mdn,e);
@@ -101,6 +100,7 @@
                 final MessageReference node=(MessageReference) iter.next();
                 MessageId messageId=node.getMessageId();
                 
if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
+System.out.println("in range: "+messageId);
                     inAckRange=true;
                 }
                 if(inAckRange){
@@ -202,7 +202,7 @@
     }
 
     protected boolean isFull(){
-        return 
dispatched.size()-prefetchExtension>=info.getPrefetchSize()||preLoadSize>preLoadLimit;
+        return dispatched.size()-prefetchExtension>=info.getPrefetchSize();
     }
     
     synchronized public int getPendingQueueSize(){
@@ -252,7 +252,6 @@
             dispatchCounter++;
             MessageDispatch md=createMessageDispatch(node,message);
             dispatched.addLast(node);            
-            incrementPreloadSize(node.getMessage().getSize());
             if(info.isDispatchAsync()){
                 md.setConsumer(new Runnable(){
                     public void run(){
@@ -275,7 +274,6 @@
 
     synchronized private void onDispatch(final MessageReference node,final 
Message message){
         boolean wasFull=isFull();
-        decrementPreloadSize(message.getSize());
         node.decrementReferenceCount();
         if(node.getRegionDestination()!=null){
             
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
@@ -288,16 +286,6 @@
                 }
             }
         }
-    }
-
-    private int incrementPreloadSize(int size){
-        preLoadSize+=size;
-        return preLoadSize;
-    }
-
-    private int decrementPreloadSize(int size){
-        preLoadSize-=size;
-        return preLoadSize;
     }
 
     /**


Reply via email to