Author: chirino Date: Sat Dec 30 15:49:03 2006 New Revision: 491346 URL: http://svn.apache.org/viewvc?view=rev&rev=491346 Log: Fix for CursorDurableTest. The TopicStorePrefetch was iterating items that were in the subscription but not added to the pending list.
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Sat Dec 30 15:49:03 2006 @@ -406,7 +406,9 @@ pending.reset(); while(pending.hasNext()&&!isFull()&&count<numberToDispatch){ MessageReference node=pending.next(); - + if ( node == null ) + break; + if(canDispatch(node)){ pending.remove(); // Message may have been sitting in the pending list a while Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Sat Dec 30 15:49:03 2006 @@ -20,7 +20,7 @@ import java.io.IOException; import java.util.LinkedList; -import javax.jms.JMSException; + import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Topic; @@ -48,6 +48,10 @@ private String subscriberName; private Destination regionDestination; + boolean empty=true; + private MessageId firstMessageId; + private MessageId lastMessageId; + /** * @param topic * @param clientId @@ -73,7 +77,7 @@ * @return true if there are no pending messages */ public boolean isEmpty(){ - return batchList.isEmpty(); + return empty; } public synchronized int size(){ @@ -86,27 +90,55 @@ } public synchronized void addMessageLast(MessageReference node) throws Exception{ - if(node!=null){ + if(node!=null){ + if( empty ) { + firstMessageId = node.getMessageId(); + empty=false; + } + lastMessageId = node.getMessageId(); node.decrementReferenceCount(); } } - public synchronized boolean hasNext(){ - if(isEmpty()){ - try{ - fillBatch(); - }catch(Exception e){ - log.error("Failed to fill batch",e); - throw new RuntimeException(e); - } - } + public synchronized boolean hasNext() { return !isEmpty(); } public synchronized MessageReference next(){ - Message result = (Message)batchList.removeFirst(); - result.setRegionDestination(regionDestination); - return result; + + if( empty ) { + return null; + } else { + + // We may need to fill in the batch... + if(batchList.isEmpty()){ + try{ + fillBatch(); + }catch(Exception e){ + log.error("Failed to fill batch",e); + throw new RuntimeException(e); + } + if( batchList.isEmpty()) { + return null; + } + } + + Message result = (Message)batchList.removeFirst(); + + if( firstMessageId != null ) { + // Skip messages until we get to the first message. + if( !result.getMessageId().equals(firstMessageId) ) + return null; + firstMessageId = null; + } + if( lastMessageId != null ) { + if( result.getMessageId().equals(lastMessageId) ) { + empty=true; + } + } + result.setRegionDestination(regionDestination); + return result; + } } public void reset(){ @@ -130,13 +162,7 @@ // implementation protected void fillBatch() throws Exception{ - store.recoverNextMessages(clientId,subscriberName, - maxBatchSize,this); - // this will add more messages to the batch list - if(!batchList.isEmpty()){ - Message message=(Message)batchList.getLast(); - - } + store.recoverNextMessages(clientId,subscriberName,maxBatchSize,this); } public void gc() {