Author: rajdavies Date: Sun Dec 24 23:19:29 2006 New Revision: 490111 URL: http://svn.apache.org/viewvc?view=rev&rev=490111 Log: it's ensure dispatching happens in order
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=490111&r1=490110&r2=490111 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Sun Dec 24 23:19:29 2006 @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import org.apache.activemq.broker.Broker; @@ -60,6 +61,7 @@ protected long enqueueCounter; protected long dispatchCounter; protected long dequeueCounter; + private AtomicBoolean dispatching = new AtomicBoolean(); public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException{ @@ -389,31 +391,37 @@ protected void dispatchMatched() throws IOException{ - List toDispatch=null; - synchronized(pending){ + if(dispatching.compareAndSet(false,true)){ try{ - pending.reset(); - while(pending.hasNext()&&!isFull()){ - MessageReference node=pending.next(); - pending.remove(); - // Message may have been sitting in the pending list a while - // waiting for the consumer to ak the message. - if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ - continue; // just drop it. + List toDispatch=null; + synchronized(pending){ + try{ + pending.reset(); + while(pending.hasNext()&&!isFull()){ + MessageReference node=pending.next(); + pending.remove(); + // Message may have been sitting in the pending list a while + // waiting for the consumer to ak the message. + if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ + continue; // just drop it. + } + if(toDispatch==null){ + toDispatch=new ArrayList(); + } + toDispatch.add(node); + } + }finally{ + pending.release(); } - if(toDispatch==null){ - toDispatch=new ArrayList(); + } + if(toDispatch!=null){ + for(int i=0;i<toDispatch.size();i++){ + MessageReference node=(MessageReference)toDispatch.get(i); + dispatch(node); } - toDispatch.add(node); } }finally{ - pending.release(); - } - } - if(toDispatch!=null){ - for(int i=0;i<toDispatch.size();i++){ - MessageReference node=(MessageReference)toDispatch.get(i); - dispatch(node); + dispatching.set(false); } } }