Author: aco
Date: Thu Dec 29 02:07:55 2005
New Revision: 359769
URL: http://svn.apache.org/viewcvs?rev=359769&view=rev
Log:
- Added test support for multiple consumers and producers
- Added test cases for queue and topic subscriptions
- Added test cases for the different dispatch policies
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java?rev=359769&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
Thu Dec 29 02:07:55 2005
@@ -0,0 +1,273 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Arrays;
+import java.util.Collections;
+import java.net.URI;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.*;
+
+/**
+ * Test case support used to test multiple message comsumers and message
producers connecting to a single broker.
+ *
+ * @version $Revision$
+ */
+public class JmsMultipleClientsTestSupport extends CombinationTestSupport {
+ private AtomicInteger producerLock;
+
+ protected Map consumers = new HashMap(); // Map of consumer with messages
received
+ protected int consumerCount = 1;
+ protected int producerCount = 1;
+
+ protected int messageSize = 1024;
+
+ protected boolean useConcurrentSend = true;
+ protected boolean durable = false;
+ protected boolean topic = false;
+
+ protected BrokerService broker;
+ protected Destination destination;
+ protected List connections = Collections.synchronizedList(new ArrayList());
+
+ protected void startProducers(Destination dest, int msgCount) throws
Exception {
+ startProducers(createConnectionFactory(), dest, msgCount);
+ }
+
+ protected void startProducers(final ConnectionFactory factory, final
Destination dest, final int msgCount) throws Exception {
+ // Use concurrent send
+ if (useConcurrentSend) {
+ producerLock = new AtomicInteger(producerCount);
+
+ for (int i=0; i<producerCount; i++) {
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ try {
+ sendMessages(factory.createConnection(), dest,
msgCount);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ synchronized (producerLock) {
+ producerLock.decrementAndGet();
+ producerLock.notifyAll();
+ }
+ }
+ });
+
+ t.start();
+ }
+
+ // Wait for all producers to finish sending
+ synchronized (producerLock) {
+ while (producerLock.get() != 0) {
+ producerLock.wait();
+ }
+ }
+
+
+ // Use serialized send
+ } else {
+ for (int i=0; i<producerCount; i++) {
+ sendMessages(factory.createConnection(), dest, msgCount);
+ }
+ }
+ }
+
+ protected void sendMessages(Connection connection, Destination
destination, int count) throws Exception {
+ connection.start();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+
+ for (int i = 0; i < count; i++) {
+ TextMessage msg = createTextMessage(session, "" + i);
+ producer.send(msg);
+ }
+
+ producer.close();
+ session.close();
+ connection.close();
+ }
+
+ protected TextMessage createTextMessage(Session session, String initText)
throws Exception {
+ TextMessage msg = session.createTextMessage();
+
+ // Pad message text
+ if (initText.length() < messageSize) {
+ char[] data = new char[messageSize - initText.length()];
+ Arrays.fill(data, '*');
+ String str = new String(data);
+ msg.setText(initText + str);
+
+ // Do not pad message text
+ } else {
+ msg.setText(initText);
+ }
+
+ return msg;
+ }
+
+ protected void startConsumers(Destination dest) throws Exception {
+ startConsumers(createConnectionFactory(), dest);
+ }
+
+ protected void startConsumers(ConnectionFactory factory, Destination dest)
throws Exception {
+ MessageConsumer consumer;
+ for (int i=0; i<consumerCount; i++) {
+ if (durable && topic) {
+ consumer = createDurableSubscriber(factory.createConnection(),
dest, "consumer" + (i+1));
+ } else {
+ consumer = createMessageConsumer(factory.createConnection(),
dest);
+ }
+ // Add consumer object and message list
+ consumers.put(consumer, new ArrayList());
+ }
+ }
+
+ protected MessageConsumer createMessageConsumer(Connection conn,
Destination dest) throws Exception {
+ connections.add(conn);
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer = sess.createConsumer(dest);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ List messageList = (List)consumers.get(consumer);
+ messageList.add(message);
+ }
+ });
+ conn.start();
+
+ return consumer;
+ }
+
+ protected TopicSubscriber createDurableSubscriber(Connection conn,
Destination dest, String name) throws Exception {
+ conn.setClientID(name);
+ connections.add(conn);
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TopicSubscriber consumer =
sess.createDurableSubscriber((javax.jms.Topic)dest, name);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ List messageList = (List)consumers.get(consumer);
+ messageList.add(message);
+ }
+ });
+ conn.start();
+
+ return consumer;
+ }
+
+ protected void waitForAllMessagesToBeReceived(int timeout) throws
Exception {
+ Thread.sleep(timeout);
+ }
+
+ protected ActiveMQDestination createDestination() throws JMSException {
+ if (topic) {
+ destination = new ActiveMQTopic("Topic");
+ return (ActiveMQDestination)destination;
+ } else {
+ destination = new ActiveMQQueue("Queue");
+ return (ActiveMQDestination)destination;
+ }
+ }
+
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory("vm://localhost");
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ return BrokerFactory.createBroker(new
URI("broker://()/localhost?persistent=false"));
+ }
+
+ protected void setUp() throws Exception {
+ super.setAutoFail(true);
+ super.setUp();
+ broker = createBroker();
+ broker.start();
+ }
+
+ protected void tearDown() throws Exception {
+ for (Iterator iter = connections.iterator(); iter.hasNext();) {
+ Connection conn= (Connection) iter.next();
+ try {
+ conn.close();
+ } catch (Throwable e) {
+ }
+ }
+ broker.stop();
+ super.tearDown();
+ }
+
+ /*
+ * Some helpful assertions for multiple consumers.
+ */
+ protected void assertConsumerReceivedAtLeastXMessages(MessageConsumer
consumer, int msgCount) {
+ List messageList = (List)consumers.get(consumer);
+ assertTrue("Consumer received less than " + msgCount + " messages.
Actual messages received is " + messageList.size(), (messageList.size() >=
msgCount));
+ }
+
+ protected void assertConsumerReceivedAtMostXMessages(MessageConsumer
consumer, int msgCount) {
+ List messageList = (List)consumers.get(consumer);
+ assertTrue("Consumer received more than " + msgCount + " messages.
Actual messages received is " + messageList.size(), (messageList.size() <=
msgCount));
+ }
+
+ protected void assertConsumerReceivedXMessages(MessageConsumer consumer,
int msgCount) {
+ List messageList = (List)consumers.get(consumer);
+ assertTrue("Consumer should have received exactly " + msgCount + "
messages. Actual messages received is " + messageList.size(),
(messageList.size() == msgCount));
+ }
+
+ protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) {
+ for (Iterator i=consumers.keySet().iterator();i.hasNext();) {
+ assertConsumerReceivedAtLeastXMessages((MessageConsumer)i.next(),
msgCount);
+ }
+ }
+
+ protected void assertEachConsumerReceivedAtMostXMessages(int msgCount) {
+ for (Iterator i=consumers.keySet().iterator();i.hasNext();) {
+ assertConsumerReceivedAtMostXMessages((MessageConsumer)i.next(),
msgCount);
+ }
+ }
+
+ protected void assertEachConsumerReceivedXMessages(int msgCount) {
+ for (Iterator i=consumers.keySet().iterator();i.hasNext();) {
+ assertConsumerReceivedXMessages((MessageConsumer)i.next(),
msgCount);
+ }
+ }
+
+ protected void assertTotalMessagesReceived(int msgCount) {
+ int totalMsg = 0;
+ for (Iterator i=consumers.keySet().iterator(); i.hasNext();) {
+ totalMsg += ((List)consumers.get(i.next())).size();
+ }
+
+ assertTrue("Total messages received should have been " + msgCount + ".
Actual messages received is " + totalMsg, (totalMsg == msgCount));
+ }
+}
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java?rev=359769&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
Thu Dec 29 02:07:55 2005
@@ -0,0 +1,147 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.JmsMultipleClientsTestSupport;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+
+public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
+ protected int messageCount = 1000; // 1000 Messages per producer
+ protected int prefetchCount = 10;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ durable = false;
+ topic = false;
+ }
+
+ public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws
Exception {
+ consumerCount = 2;
+ producerCount = 1;
+ messageCount = 1000;
+ prefetchCount = 1;
+ messageSize = 1024; // 1 Kb
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * producerCount);
+ }
+
+ public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws
Exception {
+ consumerCount = 2;
+ producerCount = 1;
+ messageCount = 1000;
+ prefetchCount = messageCount * 2;
+ messageSize = 1024; // 1 Kb
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * producerCount);
+ }
+
+ public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws
Exception {
+ consumerCount = 2;
+ producerCount = 1;
+ messageCount = 10;
+ prefetchCount = 1;
+ messageSize = 1024 * 1024 * 1; // 2 MB
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * producerCount);
+ }
+
+ public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws
Exception {
+ consumerCount = 2;
+ producerCount = 1;
+ messageCount = 10;
+ prefetchCount = messageCount * 2;
+ messageSize = 1024 * 1024 * 1; // 2 MB
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * producerCount);
+ }
+
+ public void testOneProducerManyConsumersFewMessages() throws Exception {
+ consumerCount = 50;
+ producerCount = 1;
+ messageCount = 10;
+ messageSize = 1; // 1 byte
+ prefetchCount = 10;
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * producerCount);
+ }
+
+ public void testOneProducerManyConsumersManyMessages() throws Exception {
+ consumerCount = 50;
+ producerCount = 1;
+ messageCount = 1000;
+ messageSize = 1; // 1 byte
+ prefetchCount = 10;
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * producerCount);
+ }
+
+ public void testManyProducersOneConsumer() throws Exception {
+ consumerCount = 1;
+ producerCount = 50;
+ messageCount = 100;
+ messageSize = 1; // 1 byte
+ prefetchCount = 10;
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * producerCount);
+ }
+
+ public void testManyProducersManyConsumers() throws Exception {
+ consumerCount = 50;
+ producerCount = 50;
+ messageCount = 100;
+ messageSize = 1; // 1 byte
+ prefetchCount = 10;
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * producerCount);
+ }
+
+ public void doMultipleClientsTest() throws Exception {
+ // Create destination
+ final ActiveMQDestination dest = createDestination();
+
+ // Create consumers
+ ActiveMQConnectionFactory consumerFactory =
(ActiveMQConnectionFactory)createConnectionFactory();
+ consumerFactory.getPrefetchPolicy().setAll(prefetchCount);
+
+ startConsumers(consumerFactory, dest);
+
+ // Wait for consumers to setup
+ Thread.sleep(500);
+
+ startProducers(dest, messageCount);
+
+ // Wait for messages to be received. Make it proportional to the
messages delivered.
+ waitForAllMessagesToBeReceived((producerCount * messageCount) / 2000);
+ }
+}
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java?rev=359769&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
Thu Dec 29 02:07:55 2005
@@ -0,0 +1,123 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+public class TopicSubscriptionTest extends QueueSubscriptionTest {
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ durable = true;
+ topic = true;
+ }
+
+ public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws
Exception {
+ consumerCount = 2;
+ producerCount = 1;
+ prefetchCount = 1;
+ messageSize = 1024;
+ messageCount = 1000;
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * consumerCount *
producerCount);
+ }
+
+ public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws
Exception {
+ consumerCount = 2;
+ producerCount = 1;
+ messageCount = 1000;
+ messageSize = 1024;
+ prefetchCount = messageCount * 2;
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * consumerCount *
producerCount);
+ }
+
+ public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws
Exception {
+ consumerCount = 2;
+ producerCount = 1;
+ messageCount = 10;
+ messageSize = 1024 * 1024 * 1; // 1 MB
+ prefetchCount = 1;
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * consumerCount *
producerCount);
+ }
+
+ public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws
Exception {
+ consumerCount = 2;
+ producerCount = 1;
+ messageCount = 10;
+ messageSize = 1024 * 1024 * 1; // 1 MB
+ prefetchCount = messageCount * 2;
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * consumerCount *
producerCount);
+ }
+
+ public void testOneProducerManyConsumersFewMessages() throws Exception {
+ consumerCount = 50;
+ producerCount = 1;
+ messageCount = 10;
+ messageSize = 1; // 1 byte
+ prefetchCount = 10;
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * consumerCount *
producerCount);
+ }
+
+ public void testOneProducerManyConsumersManyMessages() throws Exception {
+ consumerCount = 50;
+ producerCount = 1;
+ messageCount = 100;
+ messageSize = 1; // 1 byte
+ prefetchCount = 10;
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * consumerCount *
producerCount);
+ }
+
+
+ public void testManyProducersOneConsumer() throws Exception {
+ consumerCount = 1;
+ producerCount = 20;
+ messageCount = 100;
+ messageSize = 1; // 1 byte
+ prefetchCount = 10;
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * producerCount *
consumerCount);
+ }
+
+ public void testManyProducersManyConsumers() throws Exception {
+ consumerCount = 20;
+ producerCount = 20;
+ messageCount = 20;
+ messageSize = 1; // 1 byte
+ prefetchCount = 10;
+
+ doMultipleClientsTest();
+
+ assertTotalMessagesReceived(messageCount * producerCount *
consumerCount);
+ }
+}
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java?rev=359769&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
Thu Dec 29 02:07:55 2005
@@ -0,0 +1,88 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.broker.QueueSubscriptionTest;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+
+public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = super.createBroker();
+
+ PolicyEntry policy = new PolicyEntry();
+ policy.setDispatchPolicy(new RoundRobinDispatchPolicy());
+
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+
+ broker.setDestinationPolicy(pMap);
+
+ return broker;
+ }
+
+ public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws
Exception {
+ super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
+
+ // Ensure that each consumer should have received at least one message
+ // We cannot guarantee that messages will be equally divided, since
prefetch is one
+ assertEachConsumerReceivedAtLeastXMessages(1);
+ }
+
+ public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws
Exception {
+ super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
+ assertMessagesDividedAmongConsumers();
+ }
+
+ public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws
Exception {
+ super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
+
+ // Ensure that each consumer should have received at least one message
+ // We cannot guarantee that messages will be equally divided, since
prefetch is one
+ assertEachConsumerReceivedAtLeastXMessages(1);
+ }
+
+ public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws
Exception {
+ super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
+ assertMessagesDividedAmongConsumers();
+ }
+
+ public void testOneProducerManyConsumersFewMessages() throws Exception {
+ super.testOneProducerManyConsumersFewMessages();
+
+ // Since there are more consumers, each consumer should have received
at most one message only
+ assertMessagesDividedAmongConsumers();
+ }
+
+ public void testOneProducerManyConsumersManyMessages() throws Exception {
+ super.testOneProducerManyConsumersManyMessages();
+ assertMessagesDividedAmongConsumers();
+ }
+
+ public void testManyProducersManyConsumers() throws Exception {
+ super.testManyProducersManyConsumers();
+ assertMessagesDividedAmongConsumers();
+ }
+
+ public void assertMessagesDividedAmongConsumers() {
+ assertEachConsumerReceivedAtLeastXMessages((messageCount *
producerCount) / consumerCount);
+ assertEachConsumerReceivedAtMostXMessages(((messageCount *
producerCount) / consumerCount) + 1);
+ }
+}
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java?rev=359769&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
Thu Dec 29 02:07:55 2005
@@ -0,0 +1,76 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.broker.QueueSubscriptionTest;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = super.createBroker();
+
+ PolicyEntry policy = new PolicyEntry();
+ policy.setDispatchPolicy(new SimpleDispatchPolicy());
+
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+
+ broker.setDestinationPolicy(pMap);
+
+ return broker;
+ }
+
+ public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws
Exception {
+ super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
+
+ // One consumer should have received all messages, and the rest none
+ assertOneConsumerReceivedAllMessages(messageCount);
+ }
+
+ public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws
Exception {
+ super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
+
+ // One consumer should have received all messages, and the rest none
+ assertOneConsumerReceivedAllMessages(messageCount);
+ }
+
+ public void assertOneConsumerReceivedAllMessages(int messageCount) throws
Exception {
+ boolean found = false;
+ for (Iterator i=consumers.keySet().iterator(); i.hasNext();) {
+ List messageList = (List)consumers.get(i.next());
+ if (messageList.size() > 0) {
+ if (found) {
+ fail("No other consumers should have received any
messages");
+ } else {
+ assertTrue("Consumer should have received all " +
messageCount + " messages. Actual messages received is " + messageList.size(),
messageList.size()==messageCount);
+ found = true;
+ }
+ }
+ }
+
+ if (!found) {
+ fail("At least one consumer should have received all messages");
+ }
+ }
+}
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java?rev=359769&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
Thu Dec 29 02:07:55 2005
@@ -0,0 +1,107 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TopicSubscriptionTest;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
+
+import java.util.List;
+import java.util.Iterator;
+
+public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = super.createBroker();
+
+ PolicyEntry policy = new PolicyEntry();
+ policy.setDispatchPolicy(new StrictOrderDispatchPolicy());
+
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+
+ broker.setDestinationPolicy(pMap);
+
+ return broker;
+ }
+
+ public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws
Exception {
+ super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
+
+ assertReceivedMessagesAreOrdered();
+ }
+
+ public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws
Exception {
+ super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
+
+ assertReceivedMessagesAreOrdered();
+ }
+
+ public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws
Exception {
+ super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
+
+ assertReceivedMessagesAreOrdered();
+ }
+
+ public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws
Exception {
+ super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
+
+ assertReceivedMessagesAreOrdered();
+ }
+
+ public void testOneProducerManyConsumersFewMessages() throws Exception {
+ super.testOneProducerManyConsumersFewMessages();
+
+ assertReceivedMessagesAreOrdered();
+ }
+
+ public void testOneProducerManyConsumersManyMessages() throws Exception {
+ super.testOneProducerManyConsumersManyMessages();
+
+ assertReceivedMessagesAreOrdered();
+ }
+
+ public void testManyProducersOneConsumer() throws Exception {
+ super.testManyProducersOneConsumer();
+
+ assertReceivedMessagesAreOrdered();
+ }
+
+ public void testManyProducersManyConsumers() throws Exception {
+ super.testManyProducersManyConsumers();
+
+ assertReceivedMessagesAreOrdered();
+ }
+
+ public void assertReceivedMessagesAreOrdered() throws Exception {
+ // If there is only one consumer, messages is definitely ordered
+ if (consumers.size() <= 1) {
+ return;
+ }
+
+ // Get basis of order
+ Iterator i = consumers.keySet().iterator();
+ List messageOrder = (List)consumers.get(i.next());
+
+ for (;i.hasNext();) {
+ List messageList = (List)consumers.get(i.next());
+ assertTrue("Messages are not ordered.",
messageOrder.equals(messageList));
+ }
+ }
+}