Hey Hiram,
this change breaks org.apache.activemq.broker.RecoveryBrokerTest,
oorg.apache.activemq.broker.BrokerTest, etc for me.
also - I'm not sure I like TopicStorePrefetch possibly returning null
when a hasNext() has returned true
What was the problem in CursorDurableTest ? I hadn't seen that one
cheers,
Rob
On 30 Dec 2006, at 23:49, [EMAIL PROTECTED] wrote:
Author: chirino
Date: Sat Dec 30 15:49:03 2006
New Revision: 491346
URL: http://svn.apache.org/viewvc?view=rev&rev=491346
Log:
Fix for CursorDurableTest.
The TopicStorePrefetch was iterating items that were in the
subscription but not added to the pending list.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
activemq/broker/region/cursors/TopicStorePrefetch.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-
core/src/main/java/org/apache/activemq/broker/region/
PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346
======================================================================
========
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
activemq/broker/region/PrefetchSubscription.java Sat Dec 30
15:49:03 2006
@@ -406,7 +406,9 @@
pending.reset();
while(pending.hasNext()&&!isFull()
&&count<numberToDispatch){
MessageReference node=pending.next();
-
+ if ( node == null )
+ break;
+
if(canDispatch(node)){
pending.remove();
// Message may have been
sitting in the pending list a while
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-
core/src/main/java/org/apache/activemq/broker/region/cursors/
TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
======================================================================
========
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
activemq/broker/region/cursors/TopicStorePrefetch.java Sat Dec 30
15:49:03 2006
@@ -20,7 +20,7 @@
import java.io.IOException;
import java.util.LinkedList;
-import javax.jms.JMSException;
+
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic;
@@ -48,6 +48,10 @@
private String subscriberName;
private Destination regionDestination;
+ boolean empty=true;
+ private MessageId firstMessageId;
+ private MessageId lastMessageId;
+
/**
* @param topic
* @param clientId
@@ -73,7 +77,7 @@
* @return true if there are no pending messages
*/
public boolean isEmpty(){
- return batchList.isEmpty();
+ return empty;
}
public synchronized int size(){
@@ -86,27 +90,55 @@
}
public synchronized void addMessageLast(MessageReference node)
throws Exception{
- if(node!=null){
+ if(node!=null){
+ if( empty ) {
+ firstMessageId = node.getMessageId();
+ empty=false;
+ }
+ lastMessageId = node.getMessageId();
node.decrementReferenceCount();
}
}
- public synchronized boolean hasNext(){
- if(isEmpty()){
- try{
- fillBatch();
- }catch(Exception e){
- log.error("Failed to fill batch",e);
- throw new RuntimeException(e);
- }
- }
+ public synchronized boolean hasNext() {
return !isEmpty();
}
public synchronized MessageReference next(){
- Message result = (Message)batchList.removeFirst();
- result.setRegionDestination(regionDestination);
- return result;
+
+ if( empty ) {
+ return null;
+ } else {
+
+ // We may need to fill in the batch...
+ if(batchList.isEmpty()){
+ try{
+ fillBatch();
+ }catch(Exception e){
+ log.error("Failed to fill batch",e);
+ throw new RuntimeException(e);
+ }
+ if( batchList.isEmpty()) {
+ return null;
+ }
+ }
+
+ Message result = (Message)batchList.removeFirst();
+
+ if( firstMessageId != null ) {
+ // Skip messages until we get to the first message.
+ if( !result.getMessageId().equals(firstMessageId) )
+ return null;
+ firstMessageId = null;
+ }
+ if( lastMessageId != null ) {
+ if( result.getMessageId().equals(lastMessageId) ) {
+ empty=true;
+ }
+ }
+ result.setRegionDestination(regionDestination);
+ return result;
+ }
}
public void reset(){
@@ -130,13 +162,7 @@
// implementation
protected void fillBatch() throws Exception{
- store.recoverNextMessages(clientId,subscriberName,
- maxBatchSize,this);
- // this will add more messages to the batch list
- if(!batchList.isEmpty()){
- Message message=(Message)batchList.getLast();
-
- }
+ store.recoverNextMessages
(clientId,subscriberName,maxBatchSize,this);
}
public void gc() {