Author: chirino
Date: Fri Dec 30 15:25:03 2005
New Revision: 360195
URL: http://svn.apache.org/viewcvs?rev=360195&view=rev
Log:
Depending on the test configuration parameters, it was possible to get an
OutOfMemory error. The causes were:
- The MessageList was holding on to all the messages being consumed, changed
this so that it only holds on to the messageIds
- Was using a non persistent broker, but was sending it persistent messages,
in the topic case, he holds on to the messages in a memory based message store.
By default we now send non persistent messages.
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java
- copied, changed from r360183,
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageList.java
Removed:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageList.java
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerTestWithSimpleMessageListTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java?rev=360195&r1=360194&r2=360195&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
Fri Dec 30 15:25:03 2005
@@ -19,7 +19,7 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.MessageList;
+import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerFactory;
@@ -53,11 +53,12 @@
protected boolean useConcurrentSend = true;
protected boolean durable = false;
protected boolean topic = false;
+ protected boolean persistent = false;
protected BrokerService broker;
protected Destination destination;
protected List connections = Collections.synchronizedList(new ArrayList());
- protected MessageList allMessagesList = new MessageList();
+ protected MessageIdList allMessagesList = new MessageIdList();
protected void startProducers(Destination dest, int msgCount) throws
Exception {
startProducers(createConnectionFactory(), dest, msgCount);
@@ -108,7 +109,8 @@
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
-
+ producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT);
+
for (int i = 0; i < count; i++) {
TextMessage msg = createTextMessage(session, "" + i);
producer.send(msg);
@@ -149,7 +151,7 @@
} else {
consumer = createMessageConsumer(factory.createConnection(),
dest);
}
- MessageList list = new MessageList();
+ MessageIdList list = new MessageIdList();
list.setParent(allMessagesList);
consumer.setMessageListener(list);
consumers.put(consumer, list);
@@ -222,18 +224,18 @@
* Some helpful assertions for multiple consumers.
*/
protected void assertConsumerReceivedAtLeastXMessages(MessageConsumer
consumer, int msgCount) {
- MessageList messageList = (MessageList)consumers.get(consumer);
- messageList.assertAtLeastMessagesReceived(msgCount);
+ MessageIdList messageIdList = (MessageIdList)consumers.get(consumer);
+ messageIdList.assertAtLeastMessagesReceived(msgCount);
}
protected void assertConsumerReceivedAtMostXMessages(MessageConsumer
consumer, int msgCount) {
- MessageList messageList = (MessageList)consumers.get(consumer);
- messageList.assertAtMostMessagesReceived(msgCount);
+ MessageIdList messageIdList = (MessageIdList)consumers.get(consumer);
+ messageIdList.assertAtMostMessagesReceived(msgCount);
}
protected void assertConsumerReceivedXMessages(MessageConsumer consumer,
int msgCount) {
- MessageList messageList = (MessageList)consumers.get(consumer);
- messageList.assertMessagesReceivedNoWait(msgCount);
+ MessageIdList messageIdList = (MessageIdList)consumers.get(consumer);
+ messageIdList.assertMessagesReceivedNoWait(msgCount);
}
protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) {
@@ -260,8 +262,8 @@
// now lets count the individual messages received
int totalMsg = 0;
for (Iterator i=consumers.keySet().iterator(); i.hasNext();) {
- MessageList messageList = (MessageList)consumers.get(i.next());
- totalMsg += messageList.getMessageCount();
+ MessageIdList messageIdList =
(MessageIdList)consumers.get(i.next());
+ totalMsg += messageIdList.getMessageCount();
}
assertEquals("Total of consumers message count", msgCount, totalMsg);
}
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java?rev=360195&r1=360194&r2=360195&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
Fri Dec 30 15:25:03 2005
@@ -24,36 +24,36 @@
topic = true;
}
- public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws
Exception {
+ public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws
Exception {
consumerCount = 2;
producerCount = 1;
+ messageCount = 100;
+ messageSize = 1024 * 1024 * 1; // 1 MB
prefetchCount = 1;
- messageSize = 1024;
- messageCount = 1000;
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * consumerCount *
producerCount);
}
- public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws
Exception {
+ public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws
Exception {
consumerCount = 2;
producerCount = 1;
- messageCount = 1000;
+ prefetchCount = 1;
messageSize = 1024;
- prefetchCount = messageCount * 2;
+ messageCount = 1000;
doMultipleClientsTest();
assertTotalMessagesReceived(messageCount * consumerCount *
producerCount);
}
- public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws
Exception {
+ public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws
Exception {
consumerCount = 2;
producerCount = 1;
- messageCount = 10;
- messageSize = 1024 * 1024 * 1; // 1 MB
- prefetchCount = 1;
+ messageCount = 1000;
+ messageSize = 1024;
+ prefetchCount = messageCount * 2;
doMultipleClientsTest();
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java?rev=360195&r1=360194&r2=360195&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
Fri Dec 30 15:25:03 2005
@@ -21,7 +21,7 @@
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
-import org.apache.activemq.util.MessageList;
+import org.apache.activemq.util.MessageIdList;
import java.util.Iterator;
import java.util.List;
@@ -59,8 +59,8 @@
public void assertOneConsumerReceivedAllMessages(int messageCount) throws
Exception {
boolean found = false;
for (Iterator i=consumers.keySet().iterator(); i.hasNext();) {
- MessageList messageList = (MessageList)consumers.get(i.next());
- int count = messageList.getMessageCount();
+ MessageIdList messageIdList =
(MessageIdList)consumers.get(i.next());
+ int count = messageIdList.getMessageCount();
if (count > 0) {
if (found) {
fail("No other consumers should have received any
messages");
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java?rev=360195&r1=360194&r2=360195&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
Fri Dec 30 15:25:03 2005
@@ -21,7 +21,7 @@
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
-import org.apache.activemq.util.MessageList;
+import org.apache.activemq.util.MessageIdList;
import java.util.List;
import java.util.Iterator;
@@ -42,20 +42,20 @@
return broker;
}
- public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws
Exception {
- super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
+ public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws
Exception {
+ super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
assertReceivedMessagesAreOrdered();
}
- public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws
Exception {
- super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
+ public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws
Exception {
+ super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
assertReceivedMessagesAreOrdered();
}
- public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws
Exception {
- super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
+ public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws
Exception {
+ super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
assertReceivedMessagesAreOrdered();
}
@@ -98,11 +98,11 @@
// Get basis of order
Iterator i = consumers.keySet().iterator();
- MessageList messageOrder = (MessageList)consumers.get(i.next());
+ MessageIdList messageOrder = (MessageIdList)consumers.get(i.next());
for (;i.hasNext();) {
- MessageList messageList = (MessageList)consumers.get(i.next());
- assertTrue("Messages are not ordered.",
messageOrder.equals(messageList));
+ MessageIdList messageIdList =
(MessageIdList)consumers.get(i.next());
+ assertTrue("Messages are not ordered.",
messageOrder.equals(messageIdList));
}
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerTestWithSimpleMessageListTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerTestWithSimpleMessageListTest.java?rev=360195&r1=360194&r2=360195&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerTestWithSimpleMessageListTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerTestWithSimpleMessageListTest.java
Fri Dec 30 15:25:03 2005
@@ -19,7 +19,7 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.MessageList;
+import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.ClassPathResource;
@@ -60,7 +60,7 @@
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
- MessageList listener = new MessageList();
+ MessageIdList listener = new MessageIdList();
consumer.setMessageListener(listener);
listener.waitForMessagesToArrive(messageCount);
listener.assertMessagesReceived(messageCount);
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java?rev=360195&r1=360194&r2=360195&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
Fri Dec 30 15:25:03 2005
@@ -19,7 +19,7 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.MessageList;
+import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.ClassPathResource;
@@ -49,7 +49,7 @@
connection.start();
MessageConsumer consumer = session.createConsumer(destination);
- MessageList listener = new MessageList();
+ MessageIdList listener = new MessageIdList();
listener.setVerbose(true);
consumer.setMessageListener(listener);
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java?rev=360195&r1=360194&r2=360195&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java
Fri Dec 30 15:25:03 2005
@@ -20,7 +20,7 @@
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.MessageList;
+import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,13 +47,13 @@
protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
protected MessageProducer[] producers;
protected Connection[] connections;
- protected MessageList messageList[];
+ protected MessageIdList messageIdList[];
protected void setUp() throws Exception {
connections = new Connection[NUMBER_IN_CLUSTER];
producers = new MessageProducer[NUMBER_IN_CLUSTER];
- messageList = new MessageList[NUMBER_IN_CLUSTER];
+ messageIdList = new MessageIdList[NUMBER_IN_CLUSTER];
Destination destination = createDestination();
String root = System.getProperty("activemq.store.dir");
@@ -67,8 +67,8 @@
producers[i] = session.createProducer(destination);
producers[i].setDeliveryMode(deliveryMode);
MessageConsumer consumer = createMessageConsumer(session,
destination);
- messageList[i] = new MessageList();
- consumer.setMessageListener(messageList[i]);
+ messageIdList[i] = new MessageIdList();
+ consumer.setMessageListener(messageIdList[i]);
}
System.out.println("Sleeping to ensure cluster is fully connected");
Thread.sleep(10000);
@@ -120,7 +120,7 @@
}
for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
- messageList[i].assertMessagesReceived(expectedReceiveCount());
+ messageIdList[i].assertMessagesReceived(expectedReceiveCount());
}
}
Copied:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java
(from r360183,
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageList.java)
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java?p2=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java&p1=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageList.java&r1=360183&r2=360195&rev=360195&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageList.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/MessageIdList.java
Fri Dec 30 15:25:03 2005
@@ -16,14 +16,13 @@
*/
package org.apache.activemq.util;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.TextMessage;
-
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
import junit.framework.Assert;
/**
@@ -38,38 +37,38 @@
*
* @version $Revision: 1.6 $
*/
-public class MessageList extends Assert implements MessageListener {
- private List messages = new ArrayList();
+public class MessageIdList extends Assert implements MessageListener {
+ private List messageIds = new ArrayList();
private Object semaphore;
private boolean verbose;
private MessageListener parent;
private long maximumDuration = 15000L;
- public MessageList() {
+ public MessageIdList() {
this(new Object());
}
- public MessageList(Object semaphore) {
+ public MessageIdList(Object semaphore) {
this.semaphore = semaphore;
}
public boolean equals(Object that) {
- if (that instanceof MessageList) {
- MessageList thatList = (MessageList) that;
- return getMessages().equals(thatList.getMessages());
+ if (that instanceof MessageIdList) {
+ MessageIdList thatList = (MessageIdList) that;
+ return getMessageIds().equals(thatList.getMessageIds());
}
return false;
}
public int hashCode() {
synchronized (semaphore) {
- return messages.hashCode() + 1;
+ return messageIds.hashCode() + 1;
}
}
public String toString() {
synchronized (semaphore) {
- return messages.toString();
+ return messageIds.toString();
}
}
@@ -78,31 +77,15 @@
*/
public List flushMessages() {
synchronized (semaphore) {
- List answer = new ArrayList(messages);
- messages.clear();
+ List answer = new ArrayList(messageIds);
+ messageIds.clear();
return answer;
}
}
- public synchronized List getMessages() {
+ public synchronized List getMessageIds() {
synchronized (semaphore) {
- return new ArrayList(messages);
- }
- }
-
- public synchronized List getTextMessages() {
- synchronized (semaphore) {
- ArrayList l = new ArrayList();
- for (Iterator iter = messages.iterator(); iter.hasNext();) {
- try {
- TextMessage m = (TextMessage) iter.next();
- l.add(m.getText());
- }
- catch (Throwable e) {
- l.add("" + e);
- }
- }
- return l;
+ return new ArrayList(messageIds);
}
}
@@ -110,18 +93,24 @@
if (parent != null) {
parent.onMessage(message);
}
- synchronized (semaphore) {
- messages.add(message);
- semaphore.notifyAll();
- }
- if (verbose) {
- System.out.println("###Êreceived message: " + message);
+ String id=null;
+ try {
+ id = message.getJMSMessageID();
+ synchronized (semaphore) {
+ messageIds.add(id);
+ semaphore.notifyAll();
+ }
+ if (verbose) {
+ System.out.println("###Êreceived message: " + message);
+ }
+ } catch (JMSException e) {
+ e.printStackTrace();
}
}
public int getMessageCount() {
synchronized (semaphore) {
- return messages.size();
+ return messageIds.size();
}
}