On 12/31/06, Rob Davies <[EMAIL PROTECTED]> wrote:
On 31 Dec 2006, at 07:59, Hiram Chirino wrote: > On 12/31/06, Rob Davies <[EMAIL PROTECTED]> wrote: >> Hey Hiram, >> >> this change breaks org.apache.activemq.broker.RecoveryBrokerTest, >> oorg.apache.activemq.broker.BrokerTest, etc for me. >> > > yeah I think I have fix for that. sorry I broke it. I'm running the > test suite again now. Basically I think I need to default boolean > empty=false; So that an initial recovery of subscription is done. > >> also - I'm not sure I like TopicStorePrefetch possibly returning null >> when a hasNext() has returned true >> > > Yeah me neither :) I did not fully understand why it was returning > null when I expected it to return a value. I was thinking it could be > a timing issue with the MessageStore. > >> What was the problem in CursorDurableTest ? I hadn't seen that one >> > > CursorDurableTest had a test that was failing due to out of > order/duplicates showing up. This was cause sometimes some messages > were direct dispatched and at other times they are dispatched from the > pending list. But since the pending list's .next() was returning the > items that were directly dispatched and not even added to the pending > list. This is when the dups and out of order issues would show up. > > The problem is that TopicStorePrefetch.next() was returning everything > added to the durable subscription since it's backed by the > MessageStore. And that's not what we want. We only want it to return > things that are explicitly added to it since it's the pending list. I wonder if the real problem is then in PrefetchSubscription.add() - because only if pending is empty (nothing in the store) should it dispatch directly
Could be an interaction. I think TopicStorePrefetch still needs a little more work. I think we need to recover the TopicStorePrefetch when the the durable subscription is created so that way we know if it is initially empty or not.
> > >> cheers, >> >> Rob >> >> On 30 Dec 2006, at 23:49, [EMAIL PROTECTED] wrote: >> >> > Author: chirino >> > Date: Sat Dec 30 15:49:03 2006 >> > New Revision: 491346 >> > >> > URL: http://svn.apache.org/viewvc?view=rev&rev=491346 >> > Log: >> > Fix for CursorDurableTest. >> > The TopicStorePrefetch was iterating items that were in the >> > subscription but not added to the pending list. >> > >> > Modified: >> > incubator/activemq/trunk/activemq-core/src/main/java/org/ >> apache/ >> > activemq/broker/region/PrefetchSubscription.java >> > incubator/activemq/trunk/activemq-core/src/main/java/org/ >> apache/ >> > activemq/broker/region/cursors/TopicStorePrefetch.java >> > >> > Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/ >> > apache/activemq/broker/region/PrefetchSubscription.java >> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/ >> activemq- >> > core/src/main/java/org/apache/activemq/broker/region/ >> > PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346 >> > >> ===================================================================== >> = >> > ======== >> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/ >> apache/ >> > activemq/broker/region/PrefetchSubscription.java (original) >> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/ >> apache/ >> > activemq/broker/region/PrefetchSubscription.java Sat Dec 30 >> > 15:49:03 2006 >> > @@ -406,7 +406,9 @@ >> > pending.reset(); >> > while(pending.hasNext()&&!isFull() >> > &&count<numberToDispatch){ >> > MessageReference >> node=pending.next(); >> > - >> > + if ( node == null ) >> > + break; >> > + >> > if(canDispatch(node)){ >> > pending.remove(); >> > // Message may have been >> > sitting in the pending list a while >> > >> > Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/ >> > apache/activemq/broker/region/cursors/TopicStorePrefetch.java >> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/ >> activemq- >> > core/src/main/java/org/apache/activemq/broker/region/cursors/ >> > TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346 >> > >> ===================================================================== >> = >> > ======== >> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/ >> apache/ >> > activemq/broker/region/cursors/TopicStorePrefetch.java (original) >> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/ >> apache/ >> > activemq/broker/region/cursors/TopicStorePrefetch.java Sat Dec 30 >> > 15:49:03 2006 >> > @@ -20,7 +20,7 @@ >> > >> > import java.io.IOException; >> > import java.util.LinkedList; >> > -import javax.jms.JMSException; >> > + >> > import org.apache.activemq.broker.region.Destination; >> > import org.apache.activemq.broker.region.MessageReference; >> > import org.apache.activemq.broker.region.Topic; >> > @@ -48,6 +48,10 @@ >> > private String subscriberName; >> > private Destination regionDestination; >> > >> > + boolean empty=true; >> > + private MessageId firstMessageId; >> > + private MessageId lastMessageId; >> > + >> > /** >> > * @param topic >> > * @param clientId >> > @@ -73,7 +77,7 @@ >> > * @return true if there are no pending messages >> > */ >> > public boolean isEmpty(){ >> > - return batchList.isEmpty(); >> > + return empty; >> > } >> > >> > public synchronized int size(){ >> > @@ -86,27 +90,55 @@ >> > } >> > >> > public synchronized void addMessageLast(MessageReference node) >> > throws Exception{ >> > - if(node!=null){ >> > + if(node!=null){ >> > + if( empty ) { >> > + firstMessageId = node.getMessageId(); >> > + empty=false; >> > + } >> > + lastMessageId = node.getMessageId(); >> > node.decrementReferenceCount(); >> > } >> > } >> > >> > - public synchronized boolean hasNext(){ >> > - if(isEmpty()){ >> > - try{ >> > - fillBatch(); >> > - }catch(Exception e){ >> > - log.error("Failed to fill batch",e); >> > - throw new RuntimeException(e); >> > - } >> > - } >> > + public synchronized boolean hasNext() { >> > return !isEmpty(); >> > } >> > >> > public synchronized MessageReference next(){ >> > - Message result = (Message)batchList.removeFirst(); >> > - result.setRegionDestination(regionDestination); >> > - return result; >> > + >> > + if( empty ) { >> > + return null; >> > + } else { >> > + >> > + // We may need to fill in the batch... >> > + if(batchList.isEmpty()){ >> > + try{ >> > + fillBatch(); >> > + }catch(Exception e){ >> > + log.error("Failed to fill batch",e); >> > + throw new RuntimeException(e); >> > + } >> > + if( batchList.isEmpty()) { >> > + return null; >> > + } >> > + } >> > + >> > + Message result = (Message)batchList.removeFirst(); >> > + >> > + if( firstMessageId != null ) { >> > + // Skip messages until we get to the first message. >> > + if( !result.getMessageId().equals >> (firstMessageId) ) >> > + return null; >> > + firstMessageId = null; >> > + } >> > + if( lastMessageId != null ) { >> > + if( result.getMessageId().equals >> (lastMessageId) ) { >> > + empty=true; >> > + } >> > + } >> > + result.setRegionDestination(regionDestination); >> > + return result; >> > + } >> > } >> > >> > public void reset(){ >> > @@ -130,13 +162,7 @@ >> > >> > // implementation >> > protected void fillBatch() throws Exception{ >> > - store.recoverNextMessages(clientId,subscriberName, >> > - maxBatchSize,this); >> > - // this will add more messages to the batch list >> > - if(!batchList.isEmpty()){ >> > - Message message=(Message)batchList.getLast(); >> > - >> > - } >> > + store.recoverNextMessages >> > (clientId,subscriberName,maxBatchSize,this); >> > } >> > >> > public void gc() { >> > >> > >> >> > > > -- > Regards, > Hiram > > Blog: http://hiramchirino.com
-- Regards, Hiram Blog: http://hiramchirino.com