Author: jstrachan Date: Wed Aug 9 01:44:27 2006 New Revision: 429995 URL: http://svn.apache.org/viewvc?rev=429995&view=rev Log: a fix for AMQ-769 to allow the MessageGroupMap implementation to be specified via a policyEntry
http://incubator.apache.org/activemq/per-destination-policies.html so you can specify something like this... <broker> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">"> <messageGroupMapFactory> <simpleMessageGroupMapFactory/> Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java (with props) incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java (with props) incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java (with props) Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=429995&r1=429994&r2=429995&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Aug 9 01:44:27 2006 @@ -17,15 +17,12 @@ */ package org.apache.activemq.broker.region; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; +import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.region.group.MessageGroupHashBucket; +import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; import org.apache.activemq.broker.region.group.MessageGroupMap; +import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.broker.region.group.MessageGroupSet; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DispatchPolicy; @@ -47,7 +44,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; /** * The Queue is a List of MessageEntry objects that are dispatched to matching @@ -68,7 +69,6 @@ private LockOwner exclusiveOwner; private MessageGroupMap messageGroupOwners; - private int messageGroupHashBucketCount = 1024; protected long garbageSize = 0; protected long garbageSizeBeforeCollection = 1000; @@ -76,7 +76,8 @@ protected final MessageStore store; protected int highestSubscriptionPriority; private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); - + private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); + public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { this.destination = destination; @@ -364,7 +365,7 @@ public MessageGroupMap getMessageGroupOwners() { if (messageGroupOwners == null) { - messageGroupOwners = new MessageGroupHashBucket(messageGroupHashBucketCount); + messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap(); } return messageGroupOwners; } @@ -385,14 +386,14 @@ this.deadLetterStrategy = deadLetterStrategy; } - public int getMessageGroupHashBucketCount() { - return messageGroupHashBucketCount; + public MessageGroupMapFactory getMessageGroupMapFactory() { + return messageGroupMapFactory; } - public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) { - this.messageGroupHashBucketCount = messageGroupHashBucketCount; + public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) { + this.messageGroupMapFactory = messageGroupMapFactory; } - + public void resetStatistics() { getDestinationStatistics().reset(); } Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java?rev=429995&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java Wed Aug 9 01:44:27 2006 @@ -0,0 +1,52 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.group; + +/** + * A factory to create instances of [EMAIL PROTECTED] SimpleMessageGroupMap} when + * implementing the <a + * href="http://incubator.apache.org/activemq/message-groups.html">Message + * Groups</a> functionality. + * + * @org.apache.xbean.XBean + * + * @version $Revision$ + */ +public class MessageGroupHashBucketFactory implements MessageGroupMapFactory { + + private int bucketCount = 1024; + + public MessageGroupMap createMessageGroupMap() { + return new MessageGroupHashBucket(bucketCount); + } + + public int getBucketCount() { + return bucketCount; + } + + /** + * Sets the number of hash buckets to use for the message group + * functionality. This is only applicable to using message groups to + * parallelize processing of a queue while preserving order across an + * individual JMSXGroupID header value. This value sets the number of hash + * buckets that will be used (i.e. the maximum possible concurrency). + */ + public void setBucketCount(int bucketCount) { + this.bucketCount = bucketCount; + } + +} Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java?rev=429995&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java Wed Aug 9 01:44:27 2006 @@ -0,0 +1,29 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.group; + +/** + * Represents a factory used to create new instances of [EMAIL PROTECTED] MessageGroupMap} + * for a destination. + * + * @version $Revision$ + */ +public interface MessageGroupMapFactory { + + public MessageGroupMap createMessageGroupMap(); + +} Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java?rev=429995&view=auto ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java Wed Aug 9 01:44:27 2006 @@ -0,0 +1,33 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.group; + +/** + * A factory to create instances of [EMAIL PROTECTED] SimpleMessageGroupMap} when implementing the + * <a href="http://incubator.apache.org/activemq/message-groups.html">Message Groups</a> functionality. + * + * @org.apache.xbean.XBean + * + * @version $Revision$ + */ +public class SimpleMessageGroupMapFactory implements MessageGroupMapFactory { + + public MessageGroupMap createMessageGroupMap() { + return new SimpleMessageGroupMap(); + } + +} Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=429995&r1=429994&r2=429995&view=diff ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Wed Aug 9 01:44:27 2006 @@ -20,6 +20,8 @@ import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.TopicSubscription; +import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; +import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.filter.DestinationMapEntry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,11 +42,11 @@ private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private boolean sendAdvisoryIfNoConsumers; private DeadLetterStrategy deadLetterStrategy; - private int messageGroupHashBucketCount = 1024; private PendingMessageLimitStrategy pendingMessageLimitStrategy; private MessageEvictionStrategy messageEvictionStrategy; private long memoryLimit; - + private MessageGroupMapFactory messageGroupMapFactory; + public void configure(Queue queue) { if (dispatchPolicy != null) { queue.setDispatchPolicy(dispatchPolicy); @@ -52,7 +54,7 @@ if (deadLetterStrategy != null) { queue.setDeadLetterStrategy(deadLetterStrategy); } - queue.setMessageGroupHashBucketCount(messageGroupHashBucketCount); + queue.setMessageGroupMapFactory(getMessageGroupMapFactory()); if( memoryLimit>0 ) { queue.getUsageManager().setLimit(memoryLimit); } @@ -137,21 +139,6 @@ this.deadLetterStrategy = deadLetterStrategy; } - public int getMessageGroupHashBucketCount() { - return messageGroupHashBucketCount; - } - - /** - * Sets the number of hash buckets to use for the message group - * functionality. This is only applicable to using message groups to - * parallelize processing of a queue while preserving order across an - * individual JMSXGroupID header value. This value sets the number of hash - * buckets that will be used (i.e. the maximum possible concurrency). - */ - public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) { - this.messageGroupHashBucketCount = messageGroupHashBucketCount; - } - public PendingMessageLimitStrategy getPendingMessageLimitStrategy() { return pendingMessageLimitStrategy; } @@ -189,4 +176,20 @@ this.memoryLimit = memoryLimit; } + public MessageGroupMapFactory getMessageGroupMapFactory() { + if (messageGroupMapFactory == null) { + messageGroupMapFactory = new MessageGroupHashBucketFactory(); + } + return messageGroupMapFactory; + } + + /** + * Sets the factory used to create new instances of {MessageGroupMap} used to implement the + * <a href="http://incubator.apache.org/activemq/message-groups.html">Message Groups</a> functionality. + */ + public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) { + this.messageGroupMapFactory = messageGroupMapFactory; + } + + }
