Author: chirino
Date: Tue Mar 21 08:08:40 2006
New Revision: 387562
URL: http://svn.apache.org/viewcvs?rev=387562&view=rev
Log:
removed the preLoadLimit logic since it was causing the
RoundRobingDispatchPolicyTests to fail intermitently. The preLoadLimit was an
additional prefetch limit, and when it kicked in,
it would cause the round robin distribution to stop sending round robin.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=387562&r1=387561&r2=387562&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Mar 21 08:08:40 2006
@@ -46,8 +46,6 @@
final protected LinkedList dispatched=new LinkedList();
protected int prefetchExtension=0;
- int preLoadLimit=1024*100;
- int preLoadSize=0;
boolean dispatching=false;
long enqueueCounter;
@@ -65,6 +63,8 @@
dispatch(node);
}else{
synchronized(pending){
+ if( pending.isEmpty() )
+ log.info("Prefetch limit.");
pending.addLast(node);
}
}
@@ -79,7 +79,6 @@
try{
MessageDispatch
md=createMessageDispatch(node,node.getMessage());
dispatched.addLast(node);
- incrementPreloadSize(node.getSize());
node.decrementReferenceCount();
}catch(Exception e){
log.error("Problem processing
MessageDispatchNotification: "+mdn,e);
@@ -101,6 +100,7 @@
final MessageReference node=(MessageReference) iter.next();
MessageId messageId=node.getMessageId();
if(ack.getFirstMessageId()==null||ack.getFirstMessageId().equals(messageId)){
+System.out.println("in range: "+messageId);
inAckRange=true;
}
if(inAckRange){
@@ -202,7 +202,7 @@
}
protected boolean isFull(){
- return
dispatched.size()-prefetchExtension>=info.getPrefetchSize()||preLoadSize>preLoadLimit;
+ return dispatched.size()-prefetchExtension>=info.getPrefetchSize();
}
synchronized public int getPendingQueueSize(){
@@ -252,7 +252,6 @@
dispatchCounter++;
MessageDispatch md=createMessageDispatch(node,message);
dispatched.addLast(node);
- incrementPreloadSize(node.getMessage().getSize());
if(info.isDispatchAsync()){
md.setConsumer(new Runnable(){
public void run(){
@@ -275,7 +274,6 @@
synchronized private void onDispatch(final MessageReference node,final
Message message){
boolean wasFull=isFull();
- decrementPreloadSize(message.getSize());
node.decrementReferenceCount();
if(node.getRegionDestination()!=null){
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
@@ -288,16 +286,6 @@
}
}
}
- }
-
- private int incrementPreloadSize(int size){
- preLoadSize+=size;
- return preLoadSize;
- }
-
- private int decrementPreloadSize(int size){
- preLoadSize-=size;
- return preLoadSize;
}
/**