Author: rajdavies Date: Fri Nov 24 23:10:17 2006 New Revision: 479094 URL: http://svn.apache.org/viewvc?view=rev&rev=479094 Log: Update to fix http://issues.apache.org/activemq/browse/AMQ-791 Use in mmeory list in FilePendingMessageCursor - until memory limit reached - then use disk. USe FilePendingMessageCursor in TopicSubscription instead of LinkedList
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=479094&r1=479093&r2=479094 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri Nov 24 23:10:17 2006 @@ -26,6 +26,7 @@ import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; import org.apache.activemq.command.ActiveMQDestination; @@ -50,7 +51,7 @@ private static final Log log=LogFactory.getLog(TopicSubscription.class); - final protected LinkedList matched=new LinkedList(); + final protected FilePendingMessageCursor matched; final protected ActiveMQDestination dlqDestination=new ActiveMQQueue("ActiveMQ.DLQ"); final protected UsageManager usageManager; protected AtomicLong dispatched=new AtomicLong(); @@ -69,6 +70,7 @@ throws InvalidSelectorException{ super(broker,context,info); this.usageManager=usageManager; + this.matched = new FilePendingMessageCursor(info.getConsumerId().toString(), broker.getTempDataStore()); } public void add(MessageReference node) throws InterruptedException,IOException{ @@ -84,7 +86,7 @@ }else{ if(maximumPendingMessages!=0){ synchronized(matchedListMutex){ - matched.addLast(node); + matched.addMessageLast(node); // NOTE - be careful about the slaveBroker! if (maximumPendingMessages > 0) { @@ -94,15 +96,22 @@ max = maximumPendingMessages; } if (!matched.isEmpty() && matched.size() > max) { - removeExpiredMessages(matched); + removeExpiredMessages(); } // lets discard old messages as we are a slow consumer while (!matched.isEmpty() && matched.size() > maximumPendingMessages) { - MessageReference[] oldMessages = messageEvictionStrategy.evictMessages(matched); + int pageInSize = matched.size() - maximumPendingMessages; + //only page in a 1000 at a time - else we could blow da memory + pageInSize = Math.max(1000,pageInSize); + LinkedList list = matched.pageInList(pageInSize); + MessageReference[] oldMessages = messageEvictionStrategy.evictMessages(list); int messagesToEvict = oldMessages.length; for(int i = 0; i < messagesToEvict; i++) { - oldMessages[i].decrementReferenceCount(); + MessageReference oldMessage = oldMessages[i]; + oldMessage.decrementReferenceCount(); + matched.remove(oldMessage); + discarded++; if (log.isDebugEnabled()) { log.debug("Discarding message " + oldMessages[i]); @@ -126,29 +135,33 @@ * Discard any expired messages from the matched list. Called from a synchronized block. * @throws IOException */ - protected void removeExpiredMessages(LinkedList messages) throws IOException { - for(Iterator i=matched.iterator();i.hasNext();){ - MessageReference node=(MessageReference) i.next(); + protected void removeExpiredMessages() throws IOException { + matched.reset(); + while(matched.hasNext()) { + MessageReference node=matched.next(); if (node.isExpired()) { - i.remove(); + matched.remove(); dispatched.incrementAndGet(); node.decrementReferenceCount(); break; } } + matched.release(); } public void processMessageDispatchNotification(MessageDispatchNotification mdn){ synchronized(matchedListMutex){ - for(Iterator i=matched.iterator();i.hasNext();){ - MessageReference node=(MessageReference) i.next(); + matched.reset(); + while(matched.hasNext()) { + MessageReference node=matched.next(); if(node.getMessageId().equals(mdn.getMessageId())){ - i.remove(); + matched.remove(); dispatched.incrementAndGet(); node.decrementReferenceCount(); break; } } + matched.release(); } } @@ -322,9 +335,10 @@ private void dispatchMatched() throws IOException{ synchronized(matchedListMutex){ - for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){ - MessageReference message=(MessageReference) iter.next(); - iter.remove(); + matched.reset(); + while(matched.hasNext()) { + MessageReference message=(MessageReference) matched.next(); + matched.remove(); // Message may have been sitting in the matched list a while // waiting for the consumer to ak the message. @@ -335,6 +349,7 @@ dispatch(message); } + matched.release(); } } @@ -380,11 +395,7 @@ public void destroy() { synchronized(matchedListMutex){ - for (Iterator iter = matched.iterator(); iter.hasNext();) { - MessageReference node = (MessageReference) iter.next(); - node.decrementReferenceCount(); - } - matched.clear(); + matched.destroy(); } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=479094&r1=479093&r2=479094 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Fri Nov 24 23:10:17 2006 @@ -106,4 +106,8 @@ public boolean hasSpace() { return usageManager != null ? !usageManager.isFull() : true; } + + public boolean isFull() { + return usageManager != null ? usageManager.isFull() : false; + } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=479094&r1=479093&r2=479094 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Fri Nov 24 23:10:17 2006 @@ -11,47 +11,59 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.activemq.broker.region.cursors; import java.io.IOException; import java.util.Iterator; +import java.util.LinkedList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.Message; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.Store; +import org.apache.activemq.memory.UsageListener; +import org.apache.activemq.memory.UsageManager; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.kahadaptor.CommandMarshaller; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + /** * perist pending messages pending message (messages awaiting disptach to a consumer) cursor - * + * * @version $Revision$ */ -public class FilePendingMessageCursor extends AbstractPendingMessageCursor{ - private ListContainer list; +public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener{ + static private final Log log=LogFactory.getLog(FilePendingMessageCursor.class); + private Store store; + private String name; + private LinkedList memoryList=new LinkedList(); + private ListContainer diskList; private Iterator iter=null; private Destination regionDestination; + private Lock iterLock=new ReentrantLock(); + private Object mutex=new Object(); /** * @param name * @param store - * @throws IOException */ public FilePendingMessageCursor(String name,Store store){ - try{ - list=store.getListContainer(name); - list.setMarshaller(new CommandMarshaller(new OpenWireFormat())); - list.setMaximumCacheSize(0); - }catch(IOException e){ - throw new RuntimeException(e); - } + this.name=name; + this.store=store; } /** * @return true if there are no pending messages */ public boolean isEmpty(){ - return list.isEmpty(); + synchronized(mutex){ + return memoryList.isEmpty()&&isDiskListEmpty(); + } } /** @@ -59,7 +71,46 @@ * */ public void reset(){ - iter=list.listIterator(); + iterLock.lock(); + synchronized(mutex){ + iter=isSpaceInMemoryList()?memoryList.iterator():diskList.listIterator(); + } + } + + public void release(){ + iterLock.unlock(); + } + + public void destroy(){ + for(Iterator i=memoryList.iterator();i.hasNext();){ + Message node=(Message)i.next(); + node.decrementReferenceCount(); + } + memoryList.clear(); + if(!isDiskListEmpty()){ + getDiskList().clear(); + } + } + + public LinkedList pageInList(int maxItems){ + LinkedList result=new LinkedList(); + synchronized(mutex){ + int count=0; + for(Iterator i=memoryList.iterator();i.hasNext()&&count<maxItems;){ + result.add(i.next()); + count++; + } + if(count<maxItems&&!isDiskListEmpty()){ + for(Iterator i=getDiskList().iterator();i.hasNext()&&count<maxItems;){ + Message message=(Message)i.next(); + message.setRegionDestination(regionDestination); + message.incrementReferenceCount(); + result.add(message); + count++; + } + } + } + return result; } /** @@ -68,46 +119,66 @@ * @param node */ public void addMessageLast(MessageReference node){ - try{ - regionDestination=node.getMessage().getRegionDestination(); - node.decrementReferenceCount(); - }catch(IOException e){ - throw new RuntimeException(e); + synchronized(mutex){ + try{ + regionDestination=node.getMessage().getRegionDestination(); + if(isSpaceInMemoryList()){ + memoryList.add(node); + }else{ + flushToDisk(); + node.decrementReferenceCount(); + getDiskList().addLast(node); + } + }catch(IOException e){ + throw new RuntimeException(e); + } } - list.addLast(node); } /** * add message to await dispatch * - * @param position * @param node */ public void addMessageFirst(MessageReference node){ - try{ - regionDestination=node.getMessage().getRegionDestination(); - node.decrementReferenceCount(); - }catch(IOException e){ - throw new RuntimeException(e); + synchronized(mutex){ + try{ + regionDestination=node.getMessage().getRegionDestination(); + if(isSpaceInMemoryList()){ + memoryList.addFirst(node); + }else{ + flushToDisk(); + node.decrementReferenceCount(); + getDiskList().addFirst(node); + } + }catch(IOException e){ + throw new RuntimeException(e); + } } - list.addFirst(node); } /** * @return true if there pending messages to dispatch */ public boolean hasNext(){ - return iter.hasNext(); + synchronized(mutex){ + return iter.hasNext(); + } } /** * @return the next pending message */ public MessageReference next(){ - Message message=(Message) iter.next(); - message.setRegionDestination(regionDestination); - message.incrementReferenceCount(); - return message; + synchronized(mutex){ + Message message=(Message)iter.next(); + if(!isDiskListEmpty()){ + // got from disk + message.setRegionDestination(regionDestination); + message.incrementReferenceCount(); + } + return message; + } } /** @@ -115,17 +186,31 @@ * */ public void remove(){ - iter.remove(); + synchronized(mutex){ + iter.remove(); + } } - + + /** + * @param node + * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) + */ public void remove(MessageReference node){ - list.remove(node); + synchronized(mutex){ + memoryList.remove(node); + if(!isDiskListEmpty()){ + getDiskList().remove(node); + } + } } + /** * @return the number of pending messages */ public int size(){ - return list.size(); + synchronized(mutex){ + return memoryList.size()+(isDiskListEmpty()?0:getDiskList().size()); + } } /** @@ -133,6 +218,66 @@ * */ public void clear(){ - list.clear(); + synchronized(mutex){ + memoryList.clear(); + if(!isDiskListEmpty()){ + getDiskList().clear(); + } + } + } + + public boolean isFull(){ + // we always have space - as we can persist to disk + return false; + } + + public void setUsageManager(UsageManager usageManager){ + super.setUsageManager(usageManager); + usageManager.addUsageListener(this); + } + + public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){ + if(newPercentUsage>=100){ + try{ + if(iterLock.tryLock(500,TimeUnit.MILLISECONDS)){ + flushToDisk(); + iterLock.unlock(); + } + }catch(InterruptedException e){ + log.warn("caught an exception aquiring lock",e); + } + } + } + + protected boolean isSpaceInMemoryList(){ + return hasSpace()&&isDiskListEmpty(); + } + + protected void flushToDisk(){ + synchronized(mutex){ + for(Iterator i=memoryList.iterator();i.hasNext();){ + MessageReference node=(MessageReference)i.next(); + node.decrementReferenceCount(); + getDiskList().addLast(node); + } + memoryList.clear(); + } + } + + protected boolean isDiskListEmpty(){ + return diskList==null||diskList.isEmpty(); + } + + protected ListContainer getDiskList(){ + if(diskList==null){ + try{ + diskList=store.getListContainer(name); + diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat())); + diskList.setMaximumCacheSize(0); + }catch(IOException e){ + throw new RuntimeException(e); + } + } + return diskList; } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=479094&r1=479093&r2=479094 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Fri Nov 24 23:10:17 2006 @@ -140,4 +140,9 @@ * @see org.apache.activemq.memory.UsageManager */ public void setUsageManager(UsageManager usageManager); + + /** + * @return true if the cursor is full + */ + public boolean isFull(); }