Author: jstrachan
Date: Thu Jan 5 07:58:49 2006
New Revision: 366208
URL: http://svn.apache.org/viewcvs?rev=366208&view=rev
Log:
enabled the hash bucket based implementation of MessageGroupMap by default and
made the bucketCount configurable in the destination policy map
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=366208&r1=366207&r2=366208&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Jan 5 07:58:49 2006
@@ -19,9 +19,9 @@
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.group.MessageGroupHashBucket;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupSet;
-import org.apache.activemq.broker.region.group.SimpleMessageGroupMap;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
@@ -45,7 +45,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Set;
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
@@ -65,7 +64,8 @@
protected final DestinationStatistics destinationStatistics = new
DestinationStatistics();
private Subscription exclusiveOwner;
- private final MessageGroupMap messageGroupOwners = new
SimpleMessageGroupMap();
+ private MessageGroupMap messageGroupOwners;
+ private int messageGroupHashBucketCount = 1024;
protected long garbageSize = 0;
protected long garbageSizeBeforeCollection = 1000;
@@ -186,7 +186,7 @@
}
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
- MessageGroupSet ownedGroups =
messageGroupOwners.removeConsumer(consumerId);
+ MessageGroupSet ownedGroups =
getMessageGroupOwners().removeConsumer(consumerId);
synchronized (messages) {
if (!sub.getConsumerInfo().isBrowser()) {
@@ -323,6 +323,9 @@
}
public MessageGroupMap getMessageGroupOwners() {
+ if (messageGroupOwners == null) {
+ messageGroupOwners = new
MessageGroupHashBucket(messageGroupHashBucketCount );
+ }
return messageGroupOwners;
}
@@ -341,6 +344,15 @@
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
}
+
+ public int getMessageGroupHashBucketCount() {
+ return messageGroupHashBucketCount;
+ }
+
+ public void setMessageGroupHashBucketCount(int
messageGroupHashBucketCount) {
+ this.messageGroupHashBucketCount = messageGroupHashBucketCount;
+ }
+
// Implementation methods
//
-------------------------------------------------------------------------
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=366208&r1=366207&r2=366208&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Thu Jan 5 07:58:49 2006
@@ -34,6 +34,7 @@
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy;
+ private int messageGroupHashBucketCount = 1024;
public void configure(Queue queue) {
if (dispatchPolicy != null) {
@@ -42,6 +43,7 @@
if (deadLetterStrategy != null) {
queue.setDeadLetterStrategy(deadLetterStrategy);
}
+ queue.setMessageGroupHashBucketCount(messageGroupHashBucketCount);
}
public void configure(Topic topic) {
@@ -96,6 +98,20 @@
*/
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
+ }
+
+ public int getMessageGroupHashBucketCount() {
+ return messageGroupHashBucketCount;
+ }
+
+ /**
+ * Sets the number of hash buckets to use for the message group
functionality.
+ * This is only applicable to using message groups to parallelize
processing of a queue
+ * while preserving order across an individual JMSXGroupID header value.
+ * This value sets the number of hash buckets that will be used (i.e. the
maximum possible concurrency).
+ */
+ public void setMessageGroupHashBucketCount(int
messageGroupHashBucketCount) {
+ this.messageGroupHashBucketCount = messageGroupHashBucketCount;
}