Author: rajdavies
Date: Thu Sep 20 06:49:32 2007
New Revision: 577746

URL: http://svn.apache.org/viewvc?rev=577746&view=rev
Log:
make the locking more coarse grained

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=577746&r1=577745&r2=577746&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Thu Sep 20 06:49:32 2007
@@ -58,7 +58,6 @@
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
@@ -79,7 +78,6 @@
     private final Log log;
     private final ActiveMQDestination destination;
     private final List<Subscription> consumers = new 
CopyOnWriteArrayList<Subscription>();
-    private final Valve dispatchValve = new Valve(true);
     private final SystemUsage systemUsage;
     private final MemoryUsage memoryUsage;
     private final DestinationStatistics destinationStatistics = new 
DestinationStatistics();
@@ -97,7 +95,6 @@
     private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
     private final MessageEvaluationContext queueMsgConext = new 
MessageEvaluationContext();
     private final Object exclusiveLockMutex = new Object();
-    private final Object doDispatchMutex = new Object();
     private TaskRunner taskRunner;
     
     private final LinkedList<Runnable> messagesWaitingForSpace = new 
LinkedList<Runnable>();
@@ -203,13 +200,15 @@
         return true;
     }
 
-    public void addSubscription(ConnectionContext context, Subscription sub) 
throws Exception {
+    public synchronized  void addSubscription(ConnectionContext context, 
Subscription sub) throws Exception {
         sub.add(context, this);
         destinationStatistics.getConsumers().increment();
         maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize();
 
         MessageEvaluationContext msgContext = 
context.getMessageEvaluationContext();
         try {
+               
+               //needs to be synchronized - so no contention with dispatching
             synchronized (consumers) {
                 consumers.add(sub);
                 if (sub.getConsumerInfo().isExclusive()) {
@@ -224,22 +223,26 @@
                     }
                 }
             }
-            // page in messages
-            doPageIn();
+            
+            //we hold the lock on the dispatchValue - so lets build the paged 
in
+            //list directly;
+            buildList(false);
+           
             // synchronize with dispatch method so that no new messages are 
sent
             // while
             // setting up a subscription. avoid out of order messages,
             // duplicates
-            // etc.
-            dispatchValve.turnOff();
-            try {
+            // etc.  
+            
+         
+            
                 msgContext.setDestination(destination);
                 synchronized (pagedInMessages) {
                     // Add all the matching messages in the queue to the
                     // subscription.
                     for (Iterator<MessageReference> i = 
pagedInMessages.iterator(); i.hasNext();) {
                         QueueMessageReference node = 
(QueueMessageReference)i.next();
-                        if (node.isDropped()) {
+                        if (node.isDropped() ||  
(!sub.getConsumerInfo().isBrowser() && node.getLockOwner()!=null)) {
                             continue;
                         }
                         try {
@@ -252,101 +255,94 @@
                         }
                     }
                 }
-            } finally {
-                dispatchValve.turnOn();
-            }
+          
+            
+            
         } finally {
             msgContext.clear();
         }
     }
 
-    public void removeSubscription(ConnectionContext context, Subscription 
sub) throws Exception {
-
-        destinationStatistics.getConsumers().decrement();
-        maximumPagedInMessages -= sub.getConsumerInfo().getPrefetchSize();
-
-        // synchronize with dispatch method so that no new messages are sent
-        // while
-        // removing up a subscription.
-        dispatchValve.turnOff();
-        try {
-
-            synchronized (consumers) {
-                consumers.remove(sub);
-                if (sub.getConsumerInfo().isExclusive()) {
-                    LockOwner owner = (LockOwner)sub;
-                    // Did we loose the exclusive owner??
-                    if (exclusiveOwner == owner) {
-
-                        // Find the exclusive consumer with the higest Lock
-                        // Priority.
-                        exclusiveOwner = null;
-                        for (Iterator<Subscription> iter = 
consumers.iterator(); iter.hasNext();) {
-                            Subscription s = iter.next();
-                            LockOwner so = (LockOwner)s;
-                            if (s.getConsumerInfo().isExclusive() && 
(exclusiveOwner == null || so.getLockPriority() > 
exclusiveOwner.getLockPriority())) {
-                                exclusiveOwner = so;
-                            }
-                        }
-                    }
-                }
-                if (consumers.isEmpty()) {
-                    messages.gc();
-                }
-
-            }
-            sub.remove(context, this);
-
-            boolean wasExclusiveOwner = false;
-            if (exclusiveOwner == sub) {
-                exclusiveOwner = null;
-                wasExclusiveOwner = true;
-            }
-
-            ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
-            MessageGroupSet ownedGroups = 
getMessageGroupOwners().removeConsumer(consumerId);
-
-            if (!sub.getConsumerInfo().isBrowser()) {
-                MessageEvaluationContext msgContext = 
context.getMessageEvaluationContext();
-                try {
-                    msgContext.setDestination(destination);
-
-                    // lets copy the messages to dispatch to avoid deadlock
-                    List<QueueMessageReference> messagesToDispatch = new 
ArrayList<QueueMessageReference>();
-                    synchronized (pagedInMessages) {
-                        for (Iterator<MessageReference> i = 
pagedInMessages.iterator(); i.hasNext();) {
-                            QueueMessageReference node = 
(QueueMessageReference)i.next();
-                            if (node.isDropped()) {
-                                continue;
-                            }
-
-                            String groupID = node.getGroupID();
-
-                            // Re-deliver all messages that the sub locked
-                            if (node.getLockOwner() == sub || 
wasExclusiveOwner || (groupID != null && ownedGroups.contains(groupID))) {
-                                messagesToDispatch.add(node);
-                            }
-                        }
-                    }
-
-                    // now lets dispatch from the copy of the collection to
-                    // avoid deadlocks
-                    for (Iterator<QueueMessageReference> iter = 
messagesToDispatch.iterator(); iter.hasNext();) {
-                        QueueMessageReference node = iter.next();
-                        node.incrementRedeliveryCounter();
-                        node.unlock();
-                        msgContext.setMessageReference(node);
-                        dispatchPolicy.dispatch(node, msgContext, consumers);
-                    }
-                } finally {
-                    msgContext.clear();
-                }
-            }
-        } finally {
-            dispatchValve.turnOn();
-        }
-
-    }
+    public synchronized void removeSubscription(ConnectionContext context,
+               Subscription sub) throws Exception{
+               destinationStatistics.getConsumers().decrement();
+               maximumPagedInMessages-=sub.getConsumerInfo().getPrefetchSize();
+               // synchronize with dispatch method so that no new messages are 
sent
+               // while
+               // removing up a subscription.
+               synchronized(consumers){
+                       consumers.remove(sub);
+                       if(sub.getConsumerInfo().isExclusive()){
+                               LockOwner owner=(LockOwner)sub;
+                               // Did we loose the exclusive owner??
+                               if(exclusiveOwner==owner){
+                                       // Find the exclusive consumer with the 
higest Lock
+                                       // Priority.
+                                       exclusiveOwner=null;
+                                       for(Iterator<Subscription> 
iter=consumers.iterator();iter
+                                               .hasNext();){
+                                               Subscription s=iter.next();
+                                               LockOwner so=(LockOwner)s;
+                                               
if(s.getConsumerInfo().isExclusive()
+                                                       
&&(exclusiveOwner==null||so.getLockPriority()>exclusiveOwner
+                                                               
.getLockPriority())){
+                                                       exclusiveOwner=so;
+                                               }
+                                       }
+                               }
+                       }
+                       if(consumers.isEmpty()){
+                               messages.gc();
+                       }
+               }
+               sub.remove(context,this);
+               boolean wasExclusiveOwner=false;
+               if(exclusiveOwner==sub){
+                       exclusiveOwner=null;
+                       wasExclusiveOwner=true;
+               }
+               ConsumerId consumerId=sub.getConsumerInfo().getConsumerId();
+               MessageGroupSet 
ownedGroups=getMessageGroupOwners().removeConsumer(
+                       consumerId);
+               if(!sub.getConsumerInfo().isBrowser()){
+                       MessageEvaluationContext msgContext=context
+                               .getMessageEvaluationContext();
+                       try{
+                               msgContext.setDestination(destination);
+                               // lets copy the messages to dispatch to avoid 
deadlock
+                               List<QueueMessageReference> 
messagesToDispatch=new ArrayList<QueueMessageReference>();
+                               synchronized(pagedInMessages){
+                                       for(Iterator<MessageReference> 
i=pagedInMessages.iterator();i
+                                               .hasNext();){
+                                               QueueMessageReference 
node=(QueueMessageReference)i
+                                                       .next();
+                                               if(node.isDropped()){
+                                                       continue;
+                                               }
+                                               String 
groupID=node.getGroupID();
+                                               // Re-deliver all messages that 
the sub locked
+                                               if(node.getLockOwner()==sub
+                                                       ||wasExclusiveOwner
+                                                       
||(groupID!=null&&ownedGroups.contains(groupID))){
+                                                       
messagesToDispatch.add(node);
+                                               }
+                                       }
+                               }
+                               // now lets dispatch from the copy of the 
collection to
+                               // avoid deadlocks
+                               for(Iterator<QueueMessageReference> 
iter=messagesToDispatch
+                                       .iterator();iter.hasNext();){
+                                       QueueMessageReference node=iter.next();
+                                       node.incrementRedeliveryCounter();
+                                       node.unlock();
+                                       msgContext.setMessageReference(node);
+                                       
dispatchPolicy.dispatch(node,msgContext,consumers);
+                               }
+                       }finally{
+                               msgContext.clear();
+                       }
+               }
+       }
 
     public void send(final ProducerBrokerExchange producerExchange, final 
Message message) throws Exception {
         final ConnectionContext context = 
producerExchange.getConnectionContext();
@@ -998,18 +994,20 @@
         pageInMessages(false);
     }
 
-    private List<MessageReference> doPageIn() throws Exception {
-        return doPageIn(true);
+    
+    private   List<MessageReference> doPageIn(boolean force) throws Exception {
+        List<MessageReference> result  = null;
+               result  = buildList(force);
+       return  result;
     }
 
-    private List<MessageReference> doPageIn(boolean force) throws Exception {
+    private   synchronized List<MessageReference> buildList(boolean force) 
throws Exception {
 
         final int toPageIn = maximumPagedInMessages - pagedInMessages.size();
         List<MessageReference> result = null;
         if ((force || !consumers.isEmpty()) && toPageIn > 0) {
             messages.setMaxBatchSize(toPageIn);
             try {
-                dispatchValve.increment();
                 int count = 0;
                 result = new ArrayList<MessageReference>(toPageIn);
                 synchronized (messages) {
@@ -1037,16 +1035,14 @@
                 }
             } finally {
                 queueMsgConext.clear();
-                dispatchValve.decrement();
             }
         }
         return result;
     }
 
-    private void doDispatch(List<MessageReference> list) throws Exception {
+    private  synchronized void doDispatch(List<MessageReference> list) throws 
Exception {
         if (list != null && !list.isEmpty()) {
             try {
-                dispatchValve.increment();
                 for (int i = 0; i < list.size(); i++) {
                     MessageReference node = list.get(i);
                     queueMsgConext.setDestination(destination);
@@ -1055,7 +1051,6 @@
                 }
             } finally {
                 queueMsgConext.clear();
-                dispatchValve.decrement();
             }
         }
     }
@@ -1065,9 +1060,7 @@
     }
 
     private void pageInMessages(boolean force) throws Exception {
-        synchronized (doDispatchMutex) {
             doDispatch(doPageIn(force));
-        }
     }
 
 }


Reply via email to