Author: rajdavies
Date: Mon Feb 18 01:38:10 2008
New Revision: 628667
URL: http://svn.apache.org/viewvc?rev=628667&view=rev
Log:
Change Queue dispatch model to reduce contention for lots of
consumers
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
(with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java
(with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
Mon Feb 18 01:38:10 2008
@@ -179,6 +179,17 @@
public ActiveMQDestination getActiveMQDestination() {
return info != null ? info.getDestination() : null;
}
+
+ public boolean isBrowser() {
+ return info != null && info.isBrowser();
+ }
+
+ public int getInFlightUsage() {
+ if (info.getPrefetchSize() > 0) {
+ return (getInFlightSize() * 100)/info.getPrefetchSize();
+ }
+ return Integer.MAX_VALUE;
+ }
protected void doAddRecoveredMessage(MessageReference message) throws
Exception {
add(message);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Mon Feb 18 01:38:10 2008
@@ -28,7 +28,6 @@
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
/**
* @version $Revision: 1.12 $
@@ -44,8 +43,6 @@
void removeProducer(ConnectionContext context, ProducerInfo info) throws
Exception;
void send(ProducerBrokerExchange producerExchange, Message messageSend)
throws Exception;
-
- boolean lock(MessageReference node, LockOwner lockOwner);
void acknowledge(ConnectionContext context, Subscription sub, final
MessageAck ack, final MessageReference node) throws IOException;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Mon Feb 18 01:38:10 2008
@@ -85,10 +85,6 @@
return next.getMemoryUsage();
}
- public boolean lock(MessageReference node, LockOwner lockOwner) {
- return next.lock(node, lockOwner);
- }
-
public void removeSubscription(ConnectionContext context, Subscription
sub) throws Exception {
next.removeSubscription(context, sub);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
Mon Feb 18 01:38:10 2008
@@ -36,6 +36,7 @@
protected CountStatisticImpl messages;
protected PollCountStatisticImpl messagesCached;
protected CountStatisticImpl dispatched;
+ protected CountStatisticImpl inflight;
protected TimeStatisticImpl processTime;
public DestinationStatistics() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Mon Feb 18 01:38:10 2008
@@ -72,7 +72,7 @@
return active;
}
- protected boolean isFull() {
+ public boolean isFull() {
return !active || super.isFull();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
Mon Feb 18 01:38:10 2008
@@ -140,9 +140,6 @@
}
public boolean lock(LockOwner subscription) {
- if (!regionDestination.lock(this, subscription)) {
- return false;
- }
synchronized (this) {
if (dropped || (lockOwner != null && lockOwner != subscription)) {
return false;
@@ -152,8 +149,10 @@
}
}
- public synchronized void unlock() {
+ public synchronized boolean unlock() {
+ boolean result = lockOwner != null;
lockOwner = null;
+ return result;
}
public synchronized LockOwner getLockOwner() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
Mon Feb 18 01:38:10 2008
@@ -44,7 +44,7 @@
}
public boolean isDropped() {
- throw new RuntimeException("not implemented");
+ return false;
}
public boolean lock(LockOwner subscription) {
@@ -55,7 +55,8 @@
throw new RuntimeException("not implemented");
}
- public void unlock() {
+ public boolean unlock() {
+ return true;
}
public int decrementReferenceCount() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Feb 18 01:38:10 2008
@@ -360,13 +360,17 @@
protected void sendToDLQ(final ConnectionContext context, final
MessageReference node) throws IOException, Exception {
broker.sendToDeadLetterQueue(context, node);
}
-
+
+ public int getInFlightSize() {
+ return dispatched.size();
+ }
+
/**
* Used to determine if the broker can dispatch to the consumer.
*
* @return
*/
- protected boolean isFull() {
+ public boolean isFull() {
return isSlave() || dispatched.size() - prefetchExtension >=
info.getPrefetchSize();
}
@@ -603,6 +607,16 @@
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
+ }
+
+
+ public List<MessageReference> getInFlightMessages(){
+ List<MessageReference> result = new ArrayList<MessageReference>();
+ synchronized(pendingLock) {
+ result.addAll(dispatched);
+ result.addAll(pending.pageInList(1000));
+ }
+ return result;
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Feb 18 01:38:10 2008
@@ -22,6 +22,9 @@
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.InvalidSelectorException;
@@ -55,6 +58,7 @@
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.thread.DeterministicTaskRunner;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -75,23 +79,23 @@
private final List<Subscription> consumers = new
ArrayList<Subscription>(50);
private PendingMessageCursor messages;
private final LinkedHashMap<MessageId,MessageReference> pagedInMessages =
new LinkedHashMap<MessageId,MessageReference>();
- private LockOwner exclusiveOwner;
private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private DeadLetterStrategy deadLetterStrategy = new
SharedDeadLetterStrategy();
private MessageGroupMapFactory messageGroupMapFactory = new
MessageGroupHashBucketFactory();
- private final Object exclusiveLockMutex = new Object();
private final Object sendLock = new Object();
+ private final ExecutorService executor;
private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new
LinkedList<Runnable>();
private final ReentrantLock dispatchLock = new ReentrantLock();
+ private QueueDispatchSelector dispatchSelector;
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
wakeup();
}
};
-
- public Queue(Broker broker, ActiveMQDestination destination, final
SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats,
+
+ public Queue(Broker broker, final ActiveMQDestination destination, final
SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) throws Exception {
super(broker, store, destination,systemUsage, parentStats);
@@ -100,8 +104,31 @@
} else {
this.messages = new StoreQueueCursor(broker,this);
}
- this.taskRunner = taskFactory.createTaskRunner(this, "Queue " +
destination.getPhysicalName());
+
+ this.executor = Executors.newSingleThreadExecutor(new ThreadFactory()
{
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable,
"QueueThread:"+destination);
+ thread.setDaemon(true);
+ thread.setPriority(Thread.NORM_PRIORITY);
+ return thread;
+ }
+ });
+
+ this.taskRunner = new DeterministicTaskRunner(this.executor,this);
this.log = LogFactory.getLog(getClass().getName() + "." +
destination.getPhysicalName());
+ this.dispatchSelector=new QueueDispatchSelector(destination);
+
+ }
+
+ /**
+ * @param queue
+ * @param string
+ * @param b
+ * @return
+ */
+ private TaskRunner DedicatedTaskRunner(Queue queue, String string, boolean
b) {
+ // TODO Auto-generated method stub
+ return null;
}
public void initialize() throws Exception {
@@ -153,26 +180,7 @@
}
}
- /**
- * Lock a node
- *
- * @param node
- * @param lockOwner
- * @return true if can be locked
- * @see
org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference,
- * org.apache.activemq.broker.region.LockOwner)
- */
- public boolean lock(MessageReference node, LockOwner lockOwner) {
- synchronized (exclusiveLockMutex) {
- if (exclusiveOwner == lockOwner) {
- return true;
- }
- if (exclusiveOwner != null) {
- return false;
- }
- }
- return true;
- }
+
public void addSubscription(ConnectionContext context, Subscription sub)
throws Exception {
dispatchLock.lock();
@@ -185,54 +193,41 @@
synchronized (consumers) {
consumers.add(sub);
if (sub.getConsumerInfo().isExclusive()) {
- LockOwner owner = (LockOwner) sub;
- if (exclusiveOwner == null) {
- exclusiveOwner = owner;
- } else {
- // switch the owner if the priority is higher.
- if (owner.getLockPriority() > exclusiveOwner
- .getLockPriority()) {
- exclusiveOwner = owner;
- }
+ Subscription exclusiveConsumer =
dispatchSelector.getExclusiveConsumer();
+ if(exclusiveConsumer==null) {
+ exclusiveConsumer=sub;
+ }else if (sub.getConsumerInfo().getPriority() >
exclusiveConsumer.getConsumerInfo().getPriority()){
+ exclusiveConsumer=sub;
}
+ dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
}
}
-
- // we hold the lock on the dispatchValue - so lets build the paged
- // in
- // list directly;
- doPageIn(false);
-
// synchronize with dispatch method so that no new messages are
sent
// while
// setting up a subscription. avoid out of order messages,
// duplicates
// etc.
-
+ doPageIn(false);
msgContext.setDestination(destination);
synchronized (pagedInMessages) {
// Add all the matching messages in the queue to the
// subscription.
+
for (Iterator<MessageReference> i = pagedInMessages.values()
.iterator(); i.hasNext();) {
QueueMessageReference node = (QueueMessageReference) i
.next();
- if (node.isDropped()
- || (!sub.getConsumerInfo().isBrowser() && node
- .getLockOwner() != null)) {
- continue;
- }
- try {
+ if (!node.isDropped() && !node.isAcked() &&
(!node.isDropped() ||sub.getConsumerInfo().isBrowser())) {
msgContext.setMessageReference(node);
if (sub.matches(node, msgContext)) {
sub.add(node);
}
- } catch (IOException e) {
- log.warn("Could not load message: " + e, e);
}
}
+
}
- } finally {
+ wakeup();
+ }finally {
dispatchLock.unlock();
}
}
@@ -240,79 +235,60 @@
public void removeSubscription(ConnectionContext context, Subscription sub)
throws Exception {
destinationStatistics.getConsumers().decrement();
- // synchronize with dispatch method so that no new messages are sent
- // while
- // removing up a subscription.
- synchronized (consumers) {
- consumers.remove(sub);
- if (sub.getConsumerInfo().isExclusive()) {
- LockOwner owner = (LockOwner) sub;
- // Did we loose the exclusive owner??
- if (exclusiveOwner == owner) {
- // Find the exclusive consumer with the higest Lock
- // Priority.
- exclusiveOwner = null;
- for (Iterator<Subscription> iter = consumers.iterator();
iter
- .hasNext();) {
- Subscription s = iter.next();
- LockOwner so = (LockOwner) s;
- if (s.getConsumerInfo().isExclusive()
- && (exclusiveOwner == null || so
- .getLockPriority() > exclusiveOwner
- .getLockPriority())) {
- exclusiveOwner = so;
+ dispatchLock.lock();
+ try {
+ // synchronize with dispatch method so that no new messages are
sent
+ // while
+ // removing up a subscription.
+ synchronized (consumers) {
+ consumers.remove(sub);
+ if (sub.getConsumerInfo().isExclusive()) {
+ Subscription exclusiveConsumer = dispatchSelector
+ .getExclusiveConsumer();
+ if (exclusiveConsumer == sub) {
+ exclusiveConsumer = null;
+ for (Subscription s : consumers) {
+ if (s.getConsumerInfo().isExclusive()
+ && (exclusiveConsumer == null
+ || s.getConsumerInfo().getPriority() >
exclusiveConsumer
+ .getConsumerInfo().getPriority()))
{
+ exclusiveConsumer = s;
+
+ }
}
+
dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
}
}
- }
- if (consumers.isEmpty()) {
- messages.gc();
- }
- }
- sub.remove(context, this);
- boolean wasExclusiveOwner = false;
- if (exclusiveOwner == sub) {
- exclusiveOwner = null;
- wasExclusiveOwner = true;
- }
- ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
- MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(
- consumerId);
- if (!sub.getConsumerInfo().isBrowser()) {
- MessageEvaluationContext msgContext = new
MessageEvaluationContext();
+ ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
+ MessageGroupSet ownedGroups = getMessageGroupOwners()
+ .removeConsumer(consumerId);
+ // redeliver inflight messages
+ sub.remove(context, this);
- msgContext.setDestination(destination);
- // lets copy the messages to dispatch to avoid deadlock
- List<QueueMessageReference> messagesToDispatch = new
ArrayList<QueueMessageReference>();
- synchronized (pagedInMessages) {
- for (Iterator<MessageReference> i =
pagedInMessages.values().iterator(); i
- .hasNext();) {
+ List<MessageReference> list = new
ArrayList<MessageReference>();
+ for (Iterator<MessageReference> i = pagedInMessages.values()
+ .iterator(); i.hasNext();) {
QueueMessageReference node = (QueueMessageReference) i
.next();
- if (node.isDropped()) {
- continue;
- }
- String groupID = node.getGroupID();
- // Re-deliver all messages that the sub locked
- if (node.getLockOwner() == sub
- || wasExclusiveOwner
- || (groupID != null && ownedGroups
- .contains(groupID))) {
- messagesToDispatch.add(node);
+ if (!node.isDropped() && !node.isAcked()
+ && node.getLockOwner() == sub) {
+ if (node.unlock()) {
+ node.incrementRedeliveryCounter();
+ list.add(node);
+ }
}
}
- }
- // now lets dispatch from the copy of the collection to
- // avoid deadlocks
- for (Iterator<QueueMessageReference> iter = messagesToDispatch
- .iterator(); iter.hasNext();) {
- QueueMessageReference node = iter.next();
- node.incrementRedeliveryCounter();
- node.unlock();
- msgContext.setMessageReference(node);
- dispatchPolicy.dispatch(node, msgContext, consumers);
+ if (list != null && !consumers.isEmpty()) {
+ doDispatch(list);
+ }
}
+ if (consumers.isEmpty()) {
+ messages.gc();
+ }
+ wakeup();
+ }finally {
+ dispatchLock.unlock();
}
}
@@ -523,6 +499,9 @@
if (taskRunner != null) {
taskRunner.shutdown();
}
+ if (this.executor != null) {
+ this.executor.shutdownNow();
+ }
if (messages != null) {
messages.stop();
}
@@ -677,11 +656,7 @@
for (MessageReference ref : list) {
try {
QueueMessageReference r = (QueueMessageReference) ref;
-
- // We should only delete messages that can be locked.
- if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
removeMessage(c,(IndirectMessageReference) r);
- }
} catch (IOException e) {
}
}
@@ -791,19 +766,16 @@
for (MessageReference ref : list) {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
- // We should only copy messages that can be locked.
- if (lockMessage(r)) {
- r.incrementReferenceCount();
- try {
- Message m = r.getMessage();
- BrokerSupport.resend(context, m, dest);
- if (++movedCounter >= maximumMessages
- && maximumMessages > 0) {
- return movedCounter;
- }
- } finally {
- r.decrementReferenceCount();
+ r.incrementReferenceCount();
+ try {
+ Message m = r.getMessage();
+ BrokerSupport.resend(context, m, dest);
+ if (++movedCounter >= maximumMessages
+ && maximumMessages > 0) {
+ return movedCounter;
}
+ } finally {
+ r.decrementReferenceCount();
}
}
count++;
@@ -853,19 +825,17 @@
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
// We should only move messages that can be locked.
- if (lockMessage(r)) {
- r.incrementReferenceCount();
- try {
- Message m = r.getMessage();
- BrokerSupport.resend(context, m, dest);
- removeMessage(context, r);
- if (++movedCounter >= maximumMessages
- && maximumMessages > 0) {
- return movedCounter;
- }
- } finally {
- r.decrementReferenceCount();
+ r.incrementReferenceCount();
+ try {
+ Message m = r.getMessage();
+ BrokerSupport.resend(context, m, dest);
+ removeMessage(context, r);
+ if (++movedCounter >= maximumMessages
+ && maximumMessages > 0) {
+ return movedCounter;
}
+ } finally {
+ r.decrementReferenceCount();
}
}
count++;
@@ -885,7 +855,7 @@
}
if (result) {
try {
- pageInMessages(false);
+ pageInMessages(false);
} catch (Throwable e) {
log.error("Failed to page in more queue messages ", e);
@@ -895,7 +865,6 @@
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
}
- //must return false to prevent spinning
return false;
}
@@ -942,10 +911,7 @@
wakeup();
}
- protected boolean lockMessage(IndirectMessageReference r) {
- return r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER);
- }
-
+
protected ConnectionContext createConnectionContext() {
ConnectionContext answer = new ConnectionContext();
answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
@@ -972,7 +938,8 @@
private List<MessageReference> doPageIn(boolean force) throws Exception {
List<MessageReference> result = null;
dispatchLock.lock();
- try {
+ try{
+
final int toPageIn = getMaxPageSize() - pagedInMessages.size();
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
messages.setMaxBatchSize(toPageIn);
@@ -1009,16 +976,48 @@
}
return result;
}
-
+
private void doDispatch(List<MessageReference> list) throws Exception {
-
- if (list != null && !list.isEmpty()) {
- MessageEvaluationContext msgContext = new
MessageEvaluationContext();
- for (int i = 0; i < list.size(); i++) {
- MessageReference node = list.get(i);
- msgContext.setDestination(destination);
- msgContext.setMessageReference(node);
- dispatchPolicy.dispatch(node, msgContext, consumers);
+
+ if (list != null) {
+ synchronized (consumers) {
+ for (MessageReference node : list) {
+ Subscription target = null;
+ List<Subscription> targets = null;
+ for (Subscription s : consumers) {
+ if (dispatchSelector.canSelect(s, node)) {
+ if (!s.isFull()) {
+ s.add(node);
+ target = s;
+ break;
+ } else {
+ if (targets == null) {
+ targets = new ArrayList<Subscription>();
+ }
+ targets.add(s);
+ }
+ }
+ }
+ if (targets != null) {
+ // pick the least loaded to add the messag too
+
+ for (Subscription s : targets) {
+ if (target == null
+ || target.getInFlightUsage() > s
+ .getInFlightUsage()) {
+ target = s;
+ }
+ }
+ if (target != null) {
+ target.add(node);
+ }
+ }
+ if (target != null &&
!dispatchSelector.isExclusiveConsumer(target)) {
+ consumers.remove(target);
+ consumers.add(target);
+ }
+
+ }
}
}
}
@@ -1030,7 +1029,4 @@
private void pageInMessages(boolean force) throws Exception {
doDispatch(doPageIn(force));
}
-
-
-
}
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java?rev=628667&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
Mon Feb 18 01:38:10 2008
@@ -0,0 +1,115 @@
+/**
+ *
+ */
+package org.apache.activemq.broker.region;
+
+import java.io.IOException;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.region.group.MessageGroupMap;
+import org.apache.activemq.broker.region.policy.SimpleDispatchSelector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Queue dispatch policy that determines if a message can be sent to a
subscription
+ *
+ * @org.apache.xbean.XBean
+ * @version $Revision$
+ */
+public class QueueDispatchSelector extends SimpleDispatchSelector {
+ private static final Log LOG =
LogFactory.getLog(QueueDispatchSelector.class);
+ private Subscription exclusiveConsumer;
+
+
+ /**
+ * @param destination
+ */
+ public QueueDispatchSelector(ActiveMQDestination destination) {
+ super(destination);
+ }
+
+ public Subscription getExclusiveConsumer() {
+ return exclusiveConsumer;
+ }
+ public void setExclusiveConsumer(Subscription exclusiveConsumer) {
+ this.exclusiveConsumer = exclusiveConsumer;
+ }
+
+ public boolean isExclusiveConsumer(Subscription s) {
+ return s == this.exclusiveConsumer;
+ }
+
+
+ public boolean canSelect(Subscription subscription,
+ MessageReference m) throws Exception {
+ if (subscription.isBrowser() && super.canDispatch(subscription, m)) {
+ return true;
+ }
+
+ boolean result = super.canDispatch(subscription, m) ;
+ if (result) {
+ result = exclusiveConsumer == null
+ || exclusiveConsumer == subscription;
+ if (result) {
+ QueueMessageReference node = (QueueMessageReference) m;
+ // Keep message groups together.
+ String groupId = node.getGroupID();
+ int sequence = node.getGroupSequence();
+ if (groupId != null) {
+ MessageGroupMap messageGroupOwners = ((Queue) node
+ .getRegionDestination()).getMessageGroupOwners();
+
+ // If we can own the first, then no-one else should own the
+ // rest.
+ if (sequence == 1) {
+ assignGroup(subscription, messageGroupOwners,
node,groupId);
+ }else {
+
+ // Make sure that the previous owner is still valid,
we may
+ // need to become the new owner.
+ ConsumerId groupOwner;
+
+ groupOwner = messageGroupOwners.get(groupId);
+ if (groupOwner == null) {
+ assignGroup(subscription, messageGroupOwners,
node,groupId);
+ } else {
+ if
(groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
+ // A group sequence < 1 is an end of group
signal.
+ if (sequence < 0) {
+ messageGroupOwners.removeGroup(groupId);
+ }
+ } else {
+ result = false;
+ }
+ }
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ protected void assignGroup(Subscription subs,MessageGroupMap
messageGroupOwners, MessageReference n, String groupId) throws IOException {
+ messageGroupOwners.put(groupId,
subs.getConsumerInfo().getConsumerId());
+ Message message = n.getMessage();
+ if (message instanceof ActiveMQMessage) {
+ ActiveMQMessage activeMessage = (ActiveMQMessage)message;
+ try {
+ activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer",
true, false);
+ } catch (JMSException e) {
+ LOG.warn("Failed to set boolean header: " + e, e);
+ }
+ }
+ }
+
+
+
+
+}
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
Mon Feb 18 01:38:10 2008
@@ -36,7 +36,7 @@
boolean lock(LockOwner subscription);
- void unlock();
+ boolean unlock();
LockOwner getLockOwner();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Mon Feb 18 01:38:10 2008
@@ -17,13 +17,14 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@@ -67,54 +68,13 @@
}
protected boolean canDispatch(MessageReference n) throws IOException {
+ boolean result = true;
QueueMessageReference node = (QueueMessageReference)n;
- if (node.isAcked()) {
- return false;
- }
- // Keep message groups together.
- String groupId = node.getGroupID();
- int sequence = node.getGroupSequence();
- if (groupId != null) {
- MessageGroupMap messageGroupOwners =
((Queue)node.getRegionDestination()).getMessageGroupOwners();
-
- // If we can own the first, then no-one else should own the rest.
- if (sequence == 1) {
- if (node.lock(this)) {
- assignGroupToMe(messageGroupOwners, n, groupId);
- return true;
- } else {
- return false;
- }
- }
-
- // Make sure that the previous owner is still valid, we may
- // need to become the new owner.
- ConsumerId groupOwner;
- synchronized (node) {
- groupOwner = messageGroupOwners.get(groupId);
- if (groupOwner == null) {
- if (node.lock(this)) {
- assignGroupToMe(messageGroupOwners, n, groupId);
- return true;
- } else {
- return false;
- }
- }
- }
-
- if (groupOwner.equals(info.getConsumerId())) {
- // A group sequence < 1 is an end of group signal.
- if (sequence < 0) {
- messageGroupOwners.removeGroup(groupId);
- }
- return true;
- }
-
- return false;
-
- } else {
- return node.lock(this);
+ if (node.isAcked() || node.isDropped()) {
+ result = false;
}
+ result = result && (isBrowser() || node.lock(this));
+ return result;
}
/**
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Mon Feb 18 01:38:10 2008
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+import java.util.List;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
@@ -38,6 +39,7 @@
/**
* Used to add messages that match the subscription.
* @param node
+ * @throws Exception
* @throws InterruptedException
* @throws IOException
*/
@@ -169,6 +171,11 @@
boolean isHighWaterMark();
/**
+ * @return true if there is no space to dispatch messages
+ */
+ boolean isFull();
+
+ /**
* inform the MessageConsumer on the client to change it's prefetch
* @param newPrefetch
*/
@@ -186,11 +193,33 @@
int getPrefetchSize();
/**
+ * @return the number of messages awaiting acknowledgement
+ */
+ int getInFlightSize();
+
+ /**
+ * @return the in flight messages as a percentage of the prefetch size
+ */
+ int getInFlightUsage();
+
+ /**
* Informs the Broker if the subscription needs to intervention to recover
it's state
* e.g. DurableTopicSubscriber may do
* @see org.apache.activemq.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
boolean isRecoveryRequired();
+
+
+ /**
+ * @return true if a browser
+ */
+ boolean isBrowser();
+
+ /**
+ * Get the list of in flight messages
+ * @return list
+ */
+ List<MessageReference> getInFlightMessages();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Mon Feb 18 01:38:10 2008
@@ -33,6 +33,7 @@
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
+import org.apache.activemq.broker.region.policy.SimpleDispatchSelector;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
@@ -555,8 +556,7 @@
protected void dispatch(final ConnectionContext context, Message message)
throws Exception {
destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();
- dispatchValve.increment();
- MessageEvaluationContext msgContext =
context.getMessageEvaluationContext();
+ dispatchValve.increment();
try {
if (!subscriptionRecoveryPolicy.add(context, message)) {
return;
@@ -567,7 +567,7 @@
return;
}
}
-
+ MessageEvaluationContext msgContext =
context.getMessageEvaluationContext();
msgContext.setDestination(destination);
msgContext.setMessageReference(message);
@@ -575,7 +575,6 @@
onMessageWithNoConsumers(context, message);
}
} finally {
- msgContext.clear();
dispatchValve.decrement();
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=628667&r1=628666&r2=628667&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Mon Feb 18 01:38:10 2008
@@ -18,6 +18,8 @@
import java.io.IOException;
import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
@@ -37,7 +39,6 @@
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
-import org.apache.activemq.kaha.Store;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
@@ -51,8 +52,7 @@
protected PendingMessageCursor matched;
protected final SystemUsage usageManager;
protected AtomicLong dispatchedCounter = new AtomicLong();
- protected AtomicLong prefetchExtension = new AtomicLong();
-
+
boolean singleDestination = true;
Destination destination;
@@ -83,8 +83,7 @@
public void add(MessageReference node) throws Exception {
enqueueCounter.incrementAndGet();
node.incrementReferenceCount();
- if (!isFull() && !isSlave()) {
- optimizePrefetch();
+ if (!isFull() && matched.isEmpty() && !isSlave()) {
// if maximumPendingMessages is set we will only discard messages
// which
// have not been dispatched (i.e. we allow the prefetch buffer to
be
@@ -128,6 +127,7 @@
}
}
}
+ dispatchMatched();
}
}
}
@@ -177,20 +177,18 @@
public synchronized void acknowledge(final ConnectionContext context,
final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case.
- boolean wasFull = isFull();
if (ack.isStandardAck() || ack.isPoisonAck()) {
if (context.isInTransaction()) {
- prefetchExtension.addAndGet(ack.getMessageCount());
context.getTransaction().addSynchronization(new
Synchronization() {
public void afterCommit() throws Exception {
- synchronized (TopicSubscription.this) {
+ synchronized (TopicSubscription.this) {
if (singleDestination && destination != null) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
}
dequeueCounter.addAndGet(ack.getMessageCount());
- prefetchExtension.addAndGet(ack.getMessageCount());
+ dispatchMatched();
}
});
} else {
@@ -198,19 +196,14 @@
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
dequeueCounter.addAndGet(ack.getMessageCount());
- prefetchExtension.addAndGet(ack.getMessageCount());
- }
- if (wasFull && !isFull()) {
- dispatchMatched();
}
+ dispatchMatched();
return;
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
- prefetchExtension.addAndGet(ack.getMessageCount());
- if (wasFull && !isFull()) {
- dispatchMatched();
- }
+ dequeueCounter.addAndGet(ack.getMessageCount());
+ dispatchMatched();
return;
}
throw new JMSException("Invalid acknowledgment: " + ack);
@@ -287,22 +280,27 @@
// Implementation methods
//
-------------------------------------------------------------------------
- private boolean isFull() {
- return getDispatchedQueueSize() - prefetchExtension.get() >=
info.getPrefetchSize();
+ public boolean isFull() {
+ return getDispatchedQueueSize() >= info.getPrefetchSize();
}
-
+
+ public int getInFlightSize() {
+ return getDispatchedQueueSize();
+ }
+
+
/**
* @return true when 60% or more room is left for dispatching messages
*/
public boolean isLowWaterMark() {
- return (getDispatchedQueueSize() - prefetchExtension.get()) <=
(info.getPrefetchSize() * .4);
+ return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
}
/**
* @return true when 10% or less room is left for dispatching messages
*/
public boolean isHighWaterMark() {
- return (getDispatchedQueueSize() - prefetchExtension.get()) >=
(info.getPrefetchSize() * .9);
+ return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
}
/**
@@ -354,42 +352,30 @@
}
}
- /**
- * optimize message consumer prefetch if the consumer supports it
- */
- public void optimizePrefetch() {
- /*
- *
if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
- * &&context.getConnection().isManageable()){
- * if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() &&
- * isLowWaterMark()){
- * info.setCurrentPrefetchSize(info.getPrefetchSize());
- * updateConsumerPrefetch(info.getPrefetchSize()); }else
- * if(info.getCurrentPrefetchSize()==info.getPrefetchSize() &&
- * isHighWaterMark()){ // want to purge any outstanding acks held by
the
- * consumer info.setCurrentPrefetchSize(1); updateConsumerPrefetch(1);
} }
- */
- }
-
- private void dispatchMatched() throws IOException {
+ private void dispatchMatched() throws IOException {
synchronized (matchedListMutex) {
- try {
- matched.reset();
- while (matched.hasNext() && !isFull()) {
- MessageReference message =
(MessageReference)matched.next();
- matched.remove();
- // Message may have been sitting in the matched list a
while
- // waiting for the consumer to ak the message.
- if (broker.isExpired(message)) {
- message.decrementReferenceCount();
- broker.messageExpired(getContext(), message);
- dequeueCounter.incrementAndGet();
- continue; // just drop it.
+ if (!matched.isEmpty() && !isFull()) {
+ try {
+ matched.reset();
+
+ while (matched.hasNext() && !isFull()) {
+ MessageReference message = (MessageReference) matched
+ .next();
+ matched.remove();
+ // Message may have been sitting in the matched list a
+ // while
+ // waiting for the consumer to ak the message.
+ if (broker.isExpired(message)) {
+ message.decrementReferenceCount();
+ broker.messageExpired(getContext(), message);
+ dequeueCounter.incrementAndGet();
+ continue; // just drop it.
+ }
+ dispatch(message);
}
- dispatch(message);
+ } finally {
+ matched.release();
}
- } finally {
- matched.release();
}
}
}
@@ -456,7 +442,15 @@
}
public int getPrefetchSize() {
- return (int)(info.getPrefetchSize() + prefetchExtension.get());
+ return (int)info.getPrefetchSize();
+ }
+
+ /**
+ * Get the list of inflight messages
+ * @return the list
+ */
+ public synchronized List<MessageReference> getInFlightMessages(){
+ return matched.pageInList(1000);
}
}
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java?rev=628667&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java
Mon Feb 18 01:38:10 2008
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+
+/**
+ * Determines if a subscription can dispatch a message reference
+ *
+ */
+public interface DispatchSelector {
+
+
+ /**
+ * return true if a subscription can dispatch a message reference
+ * @param subscription
+ * @param node
+ * @return true if can dispatch
+ * @throws Exception
+ */
+
+ boolean canDispatch(Subscription subscription, MessageReference node)
throws Exception;
+}
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DispatchSelector.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java?rev=628667&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java
Mon Feb 18 01:38:10 2008
@@ -0,0 +1,34 @@
+/**
+ *
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.filter.MessageEvaluationContext;
+
+/**
+ * Simple dispatch policy that determines if a message can be sent to a
subscription
+ *
+ * @org.apache.xbean.XBean
+ * @version $Revision$
+ */
+public class SimpleDispatchSelector implements DispatchSelector {
+
+ private final ActiveMQDestination destination;
+
+ /**
+ * @param destination
+ */
+ public SimpleDispatchSelector(ActiveMQDestination destination) {
+ this.destination = destination;
+ }
+
+ public boolean canDispatch(Subscription subscription, MessageReference
node) throws Exception {
+ MessageEvaluationContext msgContext = new MessageEvaluationContext();
+ msgContext.setDestination(this.destination);
+ msgContext.setMessageReference(node);
+ return subscription.matches(node, msgContext);
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java
------------------------------------------------------------------------------
svn:eol-style = native