On 12/31/06, Rob Davies <[EMAIL PROTECTED]> wrote:

On 31 Dec 2006, at 07:59, Hiram Chirino wrote:

> On 12/31/06, Rob Davies <[EMAIL PROTECTED]> wrote:
>> Hey Hiram,
>>
>> this change breaks  org.apache.activemq.broker.RecoveryBrokerTest,
>> oorg.apache.activemq.broker.BrokerTest, etc   for me.
>>
>
> yeah I think I have fix for that. sorry I broke it.  I'm running the
> test suite again now.  Basically I think I need to default boolean
> empty=false;  So that an initial recovery of subscription is done.
>
>> also - I'm not sure I like TopicStorePrefetch possibly returning null
>> when a hasNext() has returned true
>>
>
> Yeah me neither :)  I did not fully understand why it was returning
> null when I expected it to return a value.  I was thinking it could be
> a timing issue with the MessageStore.
>
>> What was the problem in CursorDurableTest ? I hadn't seen that one
>>
>
> CursorDurableTest had a test that was failing due to out of
> order/duplicates showing up.  This was cause sometimes some messages
> were direct dispatched and at other times they are dispatched from the
> pending list.  But since the pending list's .next() was returning the
> items that were directly dispatched and not even added to the pending
> list.  This is when the dups and out of order issues would show up.
>
> The problem is that TopicStorePrefetch.next() was returning everything
> added to the durable subscription since it's backed by the
> MessageStore.  And that's not what we want.  We only want it to return
> things that are explicitly added to it since it's the pending list.

I wonder if the real problem is then in PrefetchSubscription.add() -
because only if pending is empty (nothing in the store) should it
dispatch directly

Could be an interaction.  I think TopicStorePrefetch still needs a
little more work.  I think we need to recover the TopicStorePrefetch
when the the durable subscription is created so that way we know if it
is initially empty or not.

>
>
>> cheers,
>>
>> Rob
>>
>> On 30 Dec 2006, at 23:49, [EMAIL PROTECTED] wrote:
>>
>> > Author: chirino
>> > Date: Sat Dec 30 15:49:03 2006
>> > New Revision: 491346
>> >
>> > URL: http://svn.apache.org/viewvc?view=rev&rev=491346
>> > Log:
>> > Fix for CursorDurableTest.
>> > The TopicStorePrefetch was iterating items that were in the
>> > subscription but not added to the pending list.
>> >
>> > Modified:
>> >     incubator/activemq/trunk/activemq-core/src/main/java/org/
>> apache/
>> > activemq/broker/region/PrefetchSubscription.java
>> >     incubator/activemq/trunk/activemq-core/src/main/java/org/
>> apache/
>> > activemq/broker/region/cursors/TopicStorePrefetch.java
>> >
>> > Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
>> > apache/activemq/broker/region/PrefetchSubscription.java
>> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/
>> activemq-
>> > core/src/main/java/org/apache/activemq/broker/region/
>> > PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346
>> >
>> =====================================================================
>> =
>> > ========
>> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/
>> apache/
>> > activemq/broker/region/PrefetchSubscription.java (original)
>> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
>> apache/
>> > activemq/broker/region/PrefetchSubscription.java Sat Dec 30
>> > 15:49:03 2006
>> > @@ -406,7 +406,9 @@
>> >                              pending.reset();
>> >                              while(pending.hasNext()&&!isFull()
>> > &&count<numberToDispatch){
>> >                                  MessageReference
>> node=pending.next();
>> > -
>> > +                                if ( node == null )
>> > +                                     break;
>> > +
>> >                                  if(canDispatch(node)){
>> >                                      pending.remove();
>> >                                      // Message may have been
>> > sitting in the pending list a while
>> >
>> > Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
>> > apache/activemq/broker/region/cursors/TopicStorePrefetch.java
>> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/
>> activemq-
>> > core/src/main/java/org/apache/activemq/broker/region/cursors/
>> > TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
>> >
>> =====================================================================
>> =
>> > ========
>> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/
>> apache/
>> > activemq/broker/region/cursors/TopicStorePrefetch.java (original)
>> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
>> apache/
>> > activemq/broker/region/cursors/TopicStorePrefetch.java Sat Dec 30
>> > 15:49:03 2006
>> > @@ -20,7 +20,7 @@
>> >
>> >  import java.io.IOException;
>> >  import java.util.LinkedList;
>> > -import javax.jms.JMSException;
>> > +
>> >  import org.apache.activemq.broker.region.Destination;
>> >  import org.apache.activemq.broker.region.MessageReference;
>> >  import org.apache.activemq.broker.region.Topic;
>> > @@ -48,6 +48,10 @@
>> >      private String subscriberName;
>> >      private Destination regionDestination;
>> >
>> > +    boolean empty=true;
>> > +     private MessageId firstMessageId;
>> > +     private MessageId lastMessageId;
>> > +
>> >      /**
>> >       * @param topic
>> >       * @param clientId
>> > @@ -73,7 +77,7 @@
>> >       * @return true if there are no pending messages
>> >       */
>> >      public boolean isEmpty(){
>> > -        return batchList.isEmpty();
>> > +        return empty;
>> >      }
>> >
>> >      public synchronized int size(){
>> > @@ -86,27 +90,55 @@
>> >      }
>> >
>> >      public synchronized void addMessageLast(MessageReference node)
>> > throws Exception{
>> > -        if(node!=null){
>> > +             if(node!=null){
>> > +                     if( empty ) {
>> > +                             firstMessageId = node.getMessageId();
>> > +                             empty=false;
>> > +                     }
>> > +             lastMessageId = node.getMessageId();
>> >              node.decrementReferenceCount();
>> >          }
>> >      }
>> >
>> > -    public synchronized boolean hasNext(){
>> > -        if(isEmpty()){
>> > -            try{
>> > -                fillBatch();
>> > -            }catch(Exception e){
>> > -                log.error("Failed to fill batch",e);
>> > -                throw new RuntimeException(e);
>> > -            }
>> > -        }
>> > +    public synchronized boolean hasNext() {
>> >          return !isEmpty();
>> >      }
>> >
>> >      public synchronized MessageReference next(){
>> > -        Message result = (Message)batchList.removeFirst();
>> > -        result.setRegionDestination(regionDestination);
>> > -        return result;
>> > +
>> > +        if( empty ) {
>> > +             return null;
>> > +        } else {
>> > +
>> > +             // We may need to fill in the batch...
>> > +            if(batchList.isEmpty()){
>> > +                try{
>> > +                    fillBatch();
>> > +                }catch(Exception e){
>> > +                    log.error("Failed to fill batch",e);
>> > +                    throw new RuntimeException(e);
>> > +                }
>> > +                if( batchList.isEmpty()) {
>> > +                     return null;
>> > +                }
>> > +            }
>> > +
>> > +            Message result = (Message)batchList.removeFirst();
>> > +
>> > +             if( firstMessageId != null ) {
>> > +             // Skip messages until we get to the first message.
>> > +                     if( !result.getMessageId().equals
>> (firstMessageId) )
>> > +                             return null;
>> > +                     firstMessageId = null;
>> > +             }
>> > +             if( lastMessageId != null ) {
>> > +                     if( result.getMessageId().equals
>> (lastMessageId) ) {
>> > +                             empty=true;
>> > +                     }
>> > +             }
>> > +            result.setRegionDestination(regionDestination);
>> > +            return result;
>> > +        }
>> >      }
>> >
>> >      public void reset(){
>> > @@ -130,13 +162,7 @@
>> >
>> >      // implementation
>> >      protected void fillBatch() throws Exception{
>> > -        store.recoverNextMessages(clientId,subscriberName,
>> > -                maxBatchSize,this);
>> > -        // this will add more messages to the batch list
>> > -        if(!batchList.isEmpty()){
>> > -            Message message=(Message)batchList.getLast();
>> > -
>> > -        }
>> > +        store.recoverNextMessages
>> > (clientId,subscriberName,maxBatchSize,this);
>> >      }
>> >
>> >      public void gc() {
>> >
>> >
>>
>>
>
>
> --
> Regards,
> Hiram
>
> Blog: http://hiramchirino.com




--
Regards,
Hiram

Blog: http://hiramchirino.com

Reply via email to