Author: rajdavies Date: Thu Dec 28 12:52:41 2006 New Revision: 490796 URL: http://svn.apache.org/viewvc?view=rev&rev=490796 Log: Added support for hasMessagesToDeliver() method
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=490796&r1=490795&r2=490796 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Thu Dec 28 12:52:41 2006 @@ -122,4 +122,8 @@ public void release(){ } + + public boolean hasMessagesBufferedToDeliver() { + return false; + } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=490796&r1=490795&r2=490796 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Thu Dec 28 12:52:41 2006 @@ -213,6 +213,10 @@ // we always have space - as we can persist to disk return false; } + + public boolean hasMessagesBufferedToDeliver() { + return !isEmpty(); + } public void setUsageManager(UsageManager usageManager){ super.setUsageManager(usageManager); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=490796&r1=490795&r2=490796 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Thu Dec 28 12:52:41 2006 @@ -166,4 +166,9 @@ * @return true if the cursor is full */ public boolean isFull(); + + /** + * @return true if the cursor has buffered messages ready to deliver + */ + public boolean hasMessagesBufferedToDeliver(); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?view=diff&rev=490796&r1=490795&r2=490796 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Thu Dec 28 12:52:41 2006 @@ -24,6 +24,7 @@ import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.commons.logging.Log; @@ -43,6 +44,7 @@ private MessageStore store; private final LinkedList batchList=new LinkedList(); private Destination regionDestination; + private int size = 0; /** * @param topic @@ -68,26 +70,48 @@ * @return true if there are no pending messages */ public boolean isEmpty(){ - return batchList.isEmpty(); + return size <= 0; + } + + public boolean hasMessagesBufferedToDeliver() { + return !batchList.isEmpty(); } public synchronized int size(){ try { - return store.getMessageCount(); + size = store.getMessageCount(); }catch(IOException e) { log.error("Failed to get message count",e); throw new RuntimeException(e); } + return size; } public synchronized void addMessageLast(MessageReference node) throws Exception{ if(node!=null){ node.decrementReferenceCount(); } + size++; + } + + public void addMessageFirst(MessageReference node) throws Exception{ + if(node!=null){ + node.decrementReferenceCount(); + } + size++; } + + public void remove(){ + size--; + } + + public void remove(MessageReference node){ + size--; + } + public synchronized boolean hasNext(){ - if(isEmpty()){ + if(batchList.isEmpty()){ try{ fillBatch(); }catch(Exception e){ @@ -95,7 +119,7 @@ throw new RuntimeException(e); } } - return !isEmpty(); + return !batchList.isEmpty(); } public synchronized MessageReference next(){ @@ -117,10 +141,15 @@ batchList.addLast(message); } - public void recoverMessageReference(String messageReference) - throws Exception{ - // shouldn't get called - throw new RuntimeException("Not supported"); + public void recoverMessageReference(String messageReference) throws Exception{ + Message msg=store.getMessage(new MessageId(messageReference)); + if(msg!=null){ + recoverMessage(msg); + }else{ + String err = "Failed to retrieve message for id: "+messageReference; + log.error(err); + throw new IOException(err); + } } public void gc() { Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?view=diff&rev=490796&r1=490795&r2=490796 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Thu Dec 28 12:52:41 2006 @@ -37,6 +37,7 @@ private QueueStorePrefetch persistent; private boolean started; private PendingMessageCursor currentCursor; + /** * Construct @@ -48,6 +49,7 @@ this.queue=queue; this.tmpStore=tmpStore; this.persistent=new QueueStorePrefetch(queue); + currentCursor = persistent; } public synchronized void start() throws Exception{ @@ -134,7 +136,7 @@ pendingCount--; } - public void remove(MessageReference node){ + public synchronized void remove(MessageReference node){ if (!node.isPersistent()) { nonPersistent.remove(node); }else { @@ -145,6 +147,7 @@ public synchronized void reset(){ nonPersistent.reset(); + persistent.reset(); } public int size(){ @@ -208,8 +211,12 @@ } protected synchronized PendingMessageCursor getNextCursor() throws Exception{ - if(currentCursor==null||currentCursor.isEmpty()){ + if(currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()){ currentCursor=currentCursor==persistent?nonPersistent:persistent; + //sanity check + if (currentCursor.isEmpty()) { + currentCursor=currentCursor==persistent?nonPersistent:persistent; + } } return currentCursor; }