Hey Hiram,

this change breaks org.apache.activemq.broker.RecoveryBrokerTest, oorg.apache.activemq.broker.BrokerTest, etc for me.

also - I'm not sure I like TopicStorePrefetch possibly returning null when a hasNext() has returned true

What was the problem in CursorDurableTest ? I hadn't seen that one

cheers,

Rob

On 30 Dec 2006, at 23:49, [EMAIL PROTECTED] wrote:

Author: chirino
Date: Sat Dec 30 15:49:03 2006
New Revision: 491346

URL: http://svn.apache.org/viewvc?view=rev&rev=491346
Log:
Fix for CursorDurableTest.
The TopicStorePrefetch was iterating items that were in the subscription but not added to the pending list.

Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ activemq/broker/region/PrefetchSubscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ activemq/broker/region/cursors/TopicStorePrefetch.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/ apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq- core/src/main/java/org/apache/activemq/broker/region/ PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346 ====================================================================== ======== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ activemq/broker/region/PrefetchSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ activemq/broker/region/PrefetchSubscription.java Sat Dec 30 15:49:03 2006
@@ -406,7 +406,9 @@
                             pending.reset();
while(pending.hasNext()&&!isFull() &&count<numberToDispatch){
                                 MessageReference node=pending.next();
-
+                                if ( node == null )
+                                       break;
+
                                 if(canDispatch(node)){
                                     pending.remove();
// Message may have been sitting in the pending list a while

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/ apache/activemq/broker/region/cursors/TopicStorePrefetch.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq- core/src/main/java/org/apache/activemq/broker/region/cursors/ TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346 ====================================================================== ======== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ activemq/broker/region/cursors/TopicStorePrefetch.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ activemq/broker/region/cursors/TopicStorePrefetch.java Sat Dec 30 15:49:03 2006
@@ -20,7 +20,7 @@

 import java.io.IOException;
 import java.util.LinkedList;
-import javax.jms.JMSException;
+
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Topic;
@@ -48,6 +48,10 @@
     private String subscriberName;
     private Destination regionDestination;

+    boolean empty=true;
+       private MessageId firstMessageId;
+       private MessageId lastMessageId;
+
     /**
      * @param topic
      * @param clientId
@@ -73,7 +77,7 @@
      * @return true if there are no pending messages
      */
     public boolean isEmpty(){
-        return batchList.isEmpty();
+        return empty;
     }

     public synchronized int size(){
@@ -86,27 +90,55 @@
     }

public synchronized void addMessageLast(MessageReference node) throws Exception{
-        if(node!=null){
+               if(node!=null){
+                       if( empty ) {
+                               firstMessageId = node.getMessageId();
+                               empty=false;
+                       }
+               lastMessageId = node.getMessageId();
             node.decrementReferenceCount();
         }
     }

-    public synchronized boolean hasNext(){
-        if(isEmpty()){
-            try{
-                fillBatch();
-            }catch(Exception e){
-                log.error("Failed to fill batch",e);
-                throw new RuntimeException(e);
-            }
-        }
+    public synchronized boolean hasNext() {
         return !isEmpty();
     }

     public synchronized MessageReference next(){
-        Message result = (Message)batchList.removeFirst();
-        result.setRegionDestination(regionDestination);
-        return result;
+               
+        if( empty ) {
+               return null;
+        } else {
+
+               // We may need to fill in the batch...
+            if(batchList.isEmpty()){
+                try{
+                    fillBatch();
+                }catch(Exception e){
+                    log.error("Failed to fill batch",e);
+                    throw new RuntimeException(e);
+                }
+                if( batchList.isEmpty()) {
+                       return null;
+                }
+            }
+
+            Message result = (Message)batchList.removeFirst();
+               
+               if( firstMessageId != null ) {
+               // Skip messages until we get to the first message.
+                       if( !result.getMessageId().equals(firstMessageId) )
+                               return null;
+                       firstMessageId = null;
+               }
+               if( lastMessageId != null ) {
+                       if( result.getMessageId().equals(lastMessageId) ) {
+                               empty=true;
+                       }
+               }               
+            result.setRegionDestination(regionDestination);
+            return result;
+        }
     }

     public void reset(){
@@ -130,13 +162,7 @@

     // implementation
     protected void fillBatch() throws Exception{
-        store.recoverNextMessages(clientId,subscriberName,
-                maxBatchSize,this);
-        // this will add more messages to the batch list
-        if(!batchList.isEmpty()){
-            Message message=(Message)batchList.getLast();
-
-        }
+ store.recoverNextMessages (clientId,subscriberName,maxBatchSize,this);
     }

     public void gc() {



Reply via email to