Author: rajdavies Date: Fri Nov 17 02:29:23 2006 New Revision: 476100 URL: http://svn.apache.org/viewvc?view=rev&rev=476100 Log: tightened up synchronization around dispatching
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=476100&r1=476099&r2=476100 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Nov 17 02:29:23 2006 @@ -90,6 +90,7 @@ private int maximumPagedInMessages = garbageSizeBeforeCollection * 2; private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext(); private final Object exclusiveLockMutex = new Object(); + private final Object doDispatchMutex = new Object(); private TaskRunner taskRunner; public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats, @@ -513,7 +514,7 @@ return rc; } - MessageStore getMessageStore() { + public MessageStore getMessageStore() { return store; } @@ -591,7 +592,7 @@ public void purge() throws Exception { - doDispatch(doPageIn()); + pageInMessages(); synchronized (pagedInMessages) { ConnectionContext c = createConnectionContext(); @@ -652,7 +653,7 @@ * @return the number of messages removed */ public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception { - doDispatch(doPageIn()); + pageInMessages(); int counter = 0; synchronized (pagedInMessages) { ConnectionContext c = createConnectionContext(); @@ -701,7 +702,7 @@ * @return the number of messages copied */ public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { - doDispatch(doPageIn()); + pageInMessages(); int counter = 0; synchronized (pagedInMessages) { for(Iterator i = pagedInMessages.iterator(); i.hasNext();) { @@ -751,7 +752,7 @@ * Moves the messages matching the given filter up to the maximum number of matched messages */ public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { - doDispatch(doPageIn()); + pageInMessages(); int counter = 0; synchronized (pagedInMessages) { for(Iterator i = pagedInMessages.iterator(); i.hasNext();) { @@ -784,7 +785,7 @@ */ public boolean iterate(){ try{ - doDispatch(doPageIn(false)); + pageInMessages(false); }catch(Exception e){ log.error("Failed to page in more queue messages ",e); } @@ -844,7 +845,7 @@ } destinationStatistics.getEnqueues().increment(); destinationStatistics.getMessages().increment(); - doDispatch(doPageIn(false)); + pageInMessages(false); } private List doPageIn() throws Exception{ @@ -893,6 +894,15 @@ queueMsgConext.clear(); dispatchValve.decrement(); } + } + } + + private void pageInMessages() throws Exception{ + pageInMessages(true); + } + private void pageInMessages(boolean force) throws Exception{ + synchronized(doDispatchMutex) { + doDispatch(doPageIn(force)); } }