Author: rajdavies
Date: Thu Sep 20 06:49:32 2007
New Revision: 577746
URL: http://svn.apache.org/viewvc?rev=577746&view=rev
Log:
make the locking more coarse grained
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=577746&r1=577745&r2=577746&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Sep 20 06:49:32 2007
@@ -58,7 +58,6 @@
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@@ -79,7 +78,6 @@
private final Log log;
private final ActiveMQDestination destination;
private final List<Subscription> consumers = new
CopyOnWriteArrayList<Subscription>();
- private final Valve dispatchValve = new Valve(true);
private final SystemUsage systemUsage;
private final MemoryUsage memoryUsage;
private final DestinationStatistics destinationStatistics = new
DestinationStatistics();
@@ -97,7 +95,6 @@
private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
private final MessageEvaluationContext queueMsgConext = new
MessageEvaluationContext();
private final Object exclusiveLockMutex = new Object();
- private final Object doDispatchMutex = new Object();
private TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new
LinkedList<Runnable>();
@@ -203,13 +200,15 @@
return true;
}
- public void addSubscription(ConnectionContext context, Subscription sub)
throws Exception {
+ public synchronized void addSubscription(ConnectionContext context,
Subscription sub) throws Exception {
sub.add(context, this);
destinationStatistics.getConsumers().increment();
maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize();
MessageEvaluationContext msgContext =
context.getMessageEvaluationContext();
try {
+
+ //needs to be synchronized - so no contention with dispatching
synchronized (consumers) {
consumers.add(sub);
if (sub.getConsumerInfo().isExclusive()) {
@@ -224,22 +223,26 @@
}
}
}
- // page in messages
- doPageIn();
+
+ //we hold the lock on the dispatchValue - so lets build the paged
in
+ //list directly;
+ buildList(false);
+
// synchronize with dispatch method so that no new messages are
sent
// while
// setting up a subscription. avoid out of order messages,
// duplicates
- // etc.
- dispatchValve.turnOff();
- try {
+ // etc.
+
+
+
msgContext.setDestination(destination);
synchronized (pagedInMessages) {
// Add all the matching messages in the queue to the
// subscription.
for (Iterator<MessageReference> i =
pagedInMessages.iterator(); i.hasNext();) {
QueueMessageReference node =
(QueueMessageReference)i.next();
- if (node.isDropped()) {
+ if (node.isDropped() ||
(!sub.getConsumerInfo().isBrowser() && node.getLockOwner()!=null)) {
continue;
}
try {
@@ -252,101 +255,94 @@
}
}
}
- } finally {
- dispatchValve.turnOn();
- }
+
+
+
} finally {
msgContext.clear();
}
}
- public void removeSubscription(ConnectionContext context, Subscription
sub) throws Exception {
-
- destinationStatistics.getConsumers().decrement();
- maximumPagedInMessages -= sub.getConsumerInfo().getPrefetchSize();
-
- // synchronize with dispatch method so that no new messages are sent
- // while
- // removing up a subscription.
- dispatchValve.turnOff();
- try {
-
- synchronized (consumers) {
- consumers.remove(sub);
- if (sub.getConsumerInfo().isExclusive()) {
- LockOwner owner = (LockOwner)sub;
- // Did we loose the exclusive owner??
- if (exclusiveOwner == owner) {
-
- // Find the exclusive consumer with the higest Lock
- // Priority.
- exclusiveOwner = null;
- for (Iterator<Subscription> iter =
consumers.iterator(); iter.hasNext();) {
- Subscription s = iter.next();
- LockOwner so = (LockOwner)s;
- if (s.getConsumerInfo().isExclusive() &&
(exclusiveOwner == null || so.getLockPriority() >
exclusiveOwner.getLockPriority())) {
- exclusiveOwner = so;
- }
- }
- }
- }
- if (consumers.isEmpty()) {
- messages.gc();
- }
-
- }
- sub.remove(context, this);
-
- boolean wasExclusiveOwner = false;
- if (exclusiveOwner == sub) {
- exclusiveOwner = null;
- wasExclusiveOwner = true;
- }
-
- ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
- MessageGroupSet ownedGroups =
getMessageGroupOwners().removeConsumer(consumerId);
-
- if (!sub.getConsumerInfo().isBrowser()) {
- MessageEvaluationContext msgContext =
context.getMessageEvaluationContext();
- try {
- msgContext.setDestination(destination);
-
- // lets copy the messages to dispatch to avoid deadlock
- List<QueueMessageReference> messagesToDispatch = new
ArrayList<QueueMessageReference>();
- synchronized (pagedInMessages) {
- for (Iterator<MessageReference> i =
pagedInMessages.iterator(); i.hasNext();) {
- QueueMessageReference node =
(QueueMessageReference)i.next();
- if (node.isDropped()) {
- continue;
- }
-
- String groupID = node.getGroupID();
-
- // Re-deliver all messages that the sub locked
- if (node.getLockOwner() == sub ||
wasExclusiveOwner || (groupID != null && ownedGroups.contains(groupID))) {
- messagesToDispatch.add(node);
- }
- }
- }
-
- // now lets dispatch from the copy of the collection to
- // avoid deadlocks
- for (Iterator<QueueMessageReference> iter =
messagesToDispatch.iterator(); iter.hasNext();) {
- QueueMessageReference node = iter.next();
- node.incrementRedeliveryCounter();
- node.unlock();
- msgContext.setMessageReference(node);
- dispatchPolicy.dispatch(node, msgContext, consumers);
- }
- } finally {
- msgContext.clear();
- }
- }
- } finally {
- dispatchValve.turnOn();
- }
-
- }
+ public synchronized void removeSubscription(ConnectionContext context,
+ Subscription sub) throws Exception{
+ destinationStatistics.getConsumers().decrement();
+ maximumPagedInMessages-=sub.getConsumerInfo().getPrefetchSize();
+ // synchronize with dispatch method so that no new messages are
sent
+ // while
+ // removing up a subscription.
+ synchronized(consumers){
+ consumers.remove(sub);
+ if(sub.getConsumerInfo().isExclusive()){
+ LockOwner owner=(LockOwner)sub;
+ // Did we loose the exclusive owner??
+ if(exclusiveOwner==owner){
+ // Find the exclusive consumer with the
higest Lock
+ // Priority.
+ exclusiveOwner=null;
+ for(Iterator<Subscription>
iter=consumers.iterator();iter
+ .hasNext();){
+ Subscription s=iter.next();
+ LockOwner so=(LockOwner)s;
+
if(s.getConsumerInfo().isExclusive()
+
&&(exclusiveOwner==null||so.getLockPriority()>exclusiveOwner
+
.getLockPriority())){
+ exclusiveOwner=so;
+ }
+ }
+ }
+ }
+ if(consumers.isEmpty()){
+ messages.gc();
+ }
+ }
+ sub.remove(context,this);
+ boolean wasExclusiveOwner=false;
+ if(exclusiveOwner==sub){
+ exclusiveOwner=null;
+ wasExclusiveOwner=true;
+ }
+ ConsumerId consumerId=sub.getConsumerInfo().getConsumerId();
+ MessageGroupSet
ownedGroups=getMessageGroupOwners().removeConsumer(
+ consumerId);
+ if(!sub.getConsumerInfo().isBrowser()){
+ MessageEvaluationContext msgContext=context
+ .getMessageEvaluationContext();
+ try{
+ msgContext.setDestination(destination);
+ // lets copy the messages to dispatch to avoid
deadlock
+ List<QueueMessageReference>
messagesToDispatch=new ArrayList<QueueMessageReference>();
+ synchronized(pagedInMessages){
+ for(Iterator<MessageReference>
i=pagedInMessages.iterator();i
+ .hasNext();){
+ QueueMessageReference
node=(QueueMessageReference)i
+ .next();
+ if(node.isDropped()){
+ continue;
+ }
+ String
groupID=node.getGroupID();
+ // Re-deliver all messages that
the sub locked
+ if(node.getLockOwner()==sub
+ ||wasExclusiveOwner
+
||(groupID!=null&&ownedGroups.contains(groupID))){
+
messagesToDispatch.add(node);
+ }
+ }
+ }
+ // now lets dispatch from the copy of the
collection to
+ // avoid deadlocks
+ for(Iterator<QueueMessageReference>
iter=messagesToDispatch
+ .iterator();iter.hasNext();){
+ QueueMessageReference node=iter.next();
+ node.incrementRedeliveryCounter();
+ node.unlock();
+ msgContext.setMessageReference(node);
+
dispatchPolicy.dispatch(node,msgContext,consumers);
+ }
+ }finally{
+ msgContext.clear();
+ }
+ }
+ }
public void send(final ProducerBrokerExchange producerExchange, final
Message message) throws Exception {
final ConnectionContext context =
producerExchange.getConnectionContext();
@@ -998,18 +994,20 @@
pageInMessages(false);
}
- private List<MessageReference> doPageIn() throws Exception {
- return doPageIn(true);
+
+ private List<MessageReference> doPageIn(boolean force) throws Exception {
+ List<MessageReference> result = null;
+ result = buildList(force);
+ return result;
}
- private List<MessageReference> doPageIn(boolean force) throws Exception {
+ private synchronized List<MessageReference> buildList(boolean force)
throws Exception {
final int toPageIn = maximumPagedInMessages - pagedInMessages.size();
List<MessageReference> result = null;
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
messages.setMaxBatchSize(toPageIn);
try {
- dispatchValve.increment();
int count = 0;
result = new ArrayList<MessageReference>(toPageIn);
synchronized (messages) {
@@ -1037,16 +1035,14 @@
}
} finally {
queueMsgConext.clear();
- dispatchValve.decrement();
}
}
return result;
}
- private void doDispatch(List<MessageReference> list) throws Exception {
+ private synchronized void doDispatch(List<MessageReference> list) throws
Exception {
if (list != null && !list.isEmpty()) {
try {
- dispatchValve.increment();
for (int i = 0; i < list.size(); i++) {
MessageReference node = list.get(i);
queueMsgConext.setDestination(destination);
@@ -1055,7 +1051,6 @@
}
} finally {
queueMsgConext.clear();
- dispatchValve.decrement();
}
}
}
@@ -1065,9 +1060,7 @@
}
private void pageInMessages(boolean force) throws Exception {
- synchronized (doDispatchMutex) {
doDispatch(doPageIn(force));
- }
}
}