Author: rajdavies
Date: Tue Apr 8 05:43:36 2008
New Revision: 645881
URL: http://svn.apache.org/viewvc?rev=645881&view=rev
Log:
Updated to support multiple destinations
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
Tue Apr 8 05:43:36 2008
@@ -38,7 +38,7 @@
protected Connection connection;
protected MessageConsumer consumer;
protected long sleepDuration;
- protected boolean enableAudit = true;
+ protected boolean enableAudit = false;
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(16 *
1024,20);
protected PerfRate rate = new PerfRate();
@@ -82,7 +82,7 @@
if (enableAudit && !this.audit.isInOrder(msg.getJMSMessageID())) {
LOG.error("Message out of order!!" + msg);
}
- if (this.audit.isDuplicate(msg)){
+ if (enableAudit && this.audit.isDuplicate(msg)){
LOG.error("Duplicate Message!" + msg);
}
} catch (JMSException e1) {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfProducer.java
Tue Apr 8 05:43:36 2008
@@ -38,6 +38,7 @@
private Session session;
private final CountDownLatch stopped = new CountDownLatch(1);
private boolean running;
+ private int sleep = 0;
public PerfProducer(ConnectionFactory fac, Destination dest, byte[]
palyload) throws JMSException {
connection = fac.createConnection();
@@ -93,12 +94,23 @@
msg.writeBytes(payload);
producer.send(msg);
rate.increment();
+ if (sleep > 0) {
+ Thread.sleep(sleep);
+ }
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
stopped.countDown();
}
+ }
+
+ public int getSleep() {
+ return sleep;
+ }
+
+ public void setSleep(int sleep) {
+ this.sleep = sleep;
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/QueueConnectionMemoryTest.java
Tue Apr 8 05:43:36 2008
@@ -61,7 +61,7 @@
factory = createConnectionFactory(bindAddress);
Connection con = factory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationName);
+ Destination destination = createDestination(session, destinationName);
con.close();
for (int i = 0; i < 3; i++) {
Connection connection = factory.createConnection();
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java
Tue Apr 8 05:43:36 2008
@@ -28,8 +28,8 @@
public class SimpleDurableTopicNetworkTest extends SimpleNetworkTest {
protected void setUp() throws Exception {
- numberofProducers=6;
- numberOfConsumers=6;
+ numberofProducers=60;
+ numberOfConsumers=60;
samepleCount=1000;
playloadSize = 1024;
super.setUp();
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
Tue Apr 8 05:43:36 2008
@@ -25,6 +25,15 @@
* @version $Revision: 1.3 $
*/
public class SimpleDurableTopicTest extends SimpleTopicTest {
+
+ protected void setUp() throws Exception {
+ numberOfDestinations=6;
+ numberOfConsumers = 1;
+ numberofProducers = 1;
+ samepleCount=1000;
+ playloadSize = 1024;
+ super.setUp();
+ }
protected PerfProducer createProducer(ConnectionFactory fac, Destination
dest, int number, byte payload[]) throws JMSException {
PerfProducer pp = new PerfProducer(fac, dest, payload);
pp.setDeliveryMode(DeliveryMode.PERSISTENT);
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java
Tue Apr 8 05:43:36 2008
@@ -17,9 +17,11 @@
package org.apache.activemq.perf;
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.NetworkConnector;
import org.apache.commons.logging.Log;
@@ -29,45 +31,58 @@
public class SimpleNetworkTest extends SimpleTopicTest {
private static final Log LOG = LogFactory.getLog(SimpleNetworkTest.class);
- protected String consumerBindAddress = "tcp://localhost:61616";
+ //protected String consumerBindAddress =
"tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=1000,tcp://localhost:61617?wireFormat.maxInactivityDuration=1000";
+ protected String consumerBindAddress =
"tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=2000&socket.tcpNoDelayEnabled=false";
protected String producerBindAddress = "tcp://localhost:61617";
protected static final String CONSUMER_BROKER_NAME = "Consumer";
protected static final String PRODUCER_BROKER_NAME = "Producer";
protected BrokerService consumerBroker;
protected BrokerService producerBroker;
- protected ConnectionFactory consumerFactory;
- protected ConnectionFactory producerFactory;
+ protected ActiveMQConnectionFactory consumerFactory;
+ protected ActiveMQConnectionFactory producerFactory;
+
protected void setUp() throws Exception {
if (consumerBroker == null) {
- consumerBroker = createConsumerBroker(consumerBindAddress);
+ // consumerBroker = createConsumerBroker(consumerBindAddress);
}
if (producerBroker == null) {
producerBroker = createProducerBroker(producerBindAddress);
}
- consumerFactory =
createConnectionFactory("vm://"+CONSUMER_BROKER_NAME);
- producerFactory = createConnectionFactory("vm://"+
PRODUCER_BROKER_NAME);
- //consumerFactory = createConnectionFactory(consumerBindAddress);
- //producerFactory = createConnectionFactory(producerBindAddress);
+ //consumerFactory =
createConnectionFactory("vm://"+CONSUMER_BROKER_NAME);
+ //producerFactory = createConnectionFactory("vm://"+
PRODUCER_BROKER_NAME);
+ consumerFactory =
createConnectionFactory("failover://("+consumerBindAddress + "," +
producerBindAddress +")?randomize=false&backup=false");
+ //consumerFactory =
createConnectionFactory("failover://("+consumerBindAddress+")?backup=true");
+ consumerFactory.setDispatchAsync(true);
+ ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
+ policy.setQueuePrefetch(100);
+ consumerFactory.setPrefetchPolicy(policy);
+ producerFactory = createConnectionFactory(producerBindAddress);
Connection con = consumerFactory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationName);
- LOG.info("Testing against destination: " + destination);
- LOG.info("Running " + numberofProducers + " producer(s) and " +
numberOfConsumers + " consumer(s)");
- con.close();
- producers = new PerfProducer[numberofProducers];
- consumers = new PerfConsumer[numberOfConsumers];
- for (int i = 0; i < numberOfConsumers; i++) {
- consumers[i] = createConsumer(consumerFactory, destination, i);
- consumers[i].setSleepDuration(consumerSleepDuration);
- }
- for (int i = 0; i < numberofProducers; i++) {
- array = new byte[playloadSize];
- for (int j = i; j < array.length; j++) {
- array[j] = (byte)j;
+
+ producers = new PerfProducer[numberofProducers*numberOfDestinations];
+ consumers = new PerfConsumer[numberOfConsumers*numberOfDestinations];
+ int consumerCount = 0;
+ int producerCount = 0;
+ for (int k =0; k < numberOfDestinations;k++) {
+ Destination destination = createDestination(session,
destinationName+":"+k);
+ LOG.info("Testing against destination: " + destination);
+ for (int i = 0; i < numberOfConsumers; i++) {
+ consumers[consumerCount] = createConsumer(factory,
destination, consumerCount);
+
consumers[consumerCount].setSleepDuration(consumerSleepDuration);
+ consumerCount++;
+ }
+ for (int i = 0; i < numberofProducers; i++) {
+ array = new byte[playloadSize];
+ for (int j = i; j < array.length; j++) {
+ array[j] = (byte)j;
+ }
+ producers[producerCount] = createProducer(factory,
destination, i, array);
+ producerCount++;
}
- producers[i] = createProducer(producerFactory, destination, i,
array);
}
+ con.close();
}
protected void tearDown() throws Exception {
@@ -96,6 +111,7 @@
}
protected void configureConsumerBroker(BrokerService answer,String uri)
throws Exception {
+ configureBroker(answer);
answer.setPersistent(false);
answer.setBrokerName(CONSUMER_BROKER_NAME);
answer.setDeleteAllMessagesOnStartup(true);
@@ -111,13 +127,22 @@
}
protected void configureProducerBroker(BrokerService answer,String uri)
throws Exception {
+ configureBroker(answer);
answer.setBrokerName(PRODUCER_BROKER_NAME);
+ answer.setMonitorConnectionSplits(false);
+ //answer.setSplitSystemUsageForProducersConsumers(true);
answer.setPersistent(false);
answer.setDeleteAllMessagesOnStartup(true);
- NetworkConnector connector =
answer.addNetworkConnector("static://"+consumerBindAddress);
- connector.setDuplex(true);
+ NetworkConnector connector =
answer.addNetworkConnector("static://tcp://rexmac.home:61616?wireFormat.maxInactivityDuration=2000");
+ //connector.setNetworkTTL(3);
+ //connector.setDynamicOnly(true);
+ //connector.setDuplex(true);
answer.addConnector(uri);
answer.setUseShutdownHook(false);
+ }
+
+ protected void configureBroker(BrokerService service) throws Exception{
+
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueNetworkTest.java
Tue Apr 8 05:43:36 2008
@@ -17,23 +17,30 @@
package org.apache.activemq.perf;
+import java.util.ArrayList;
+import java.util.List;
+
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+
public class SimpleNonPersistentQueueNetworkTest extends SimpleNetworkTest {
- protected void setUp() throws Exception {
- numberOfConsumers = 10;
- numberofProducers = 10;
+ protected void setUp()throws Exception {
+ numberOfDestinations =20;
super.setUp();
}
protected PerfProducer createProducer(ConnectionFactory fac,
Destination dest, int number, byte[] payload) throws JMSException {
PerfProducer pp = new PerfProducer(fac, dest, payload);
- pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- pp.setTimeToLive(1000);
+ pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ // pp.setTimeToLive(1000);
+ //pp.setSleep(1);
return pp;
}
@@ -41,11 +48,36 @@
PerfConsumer consumer = new PerfConsumer(fac, dest);
boolean enableAudit = numberOfConsumers <= 1;
System.out.println("Enable Audit = " + enableAudit);
- consumer.setEnableAudit(enableAudit);
+ consumer.setEnableAudit(false);
return consumer;
}
+ public void testPerformance() throws JMSException,
InterruptedException {
+ //Thread.sleep(5000);
+ super.testPerformance();
+ }
+
protected Destination createDestination(Session s, String
destinationName) throws JMSException {
return s.createQueue(destinationName);
+ }
+
+ protected void configureBroker(BrokerService answer) throws Exception {
+ answer.setPersistent(false);
+ answer.setMonitorConnectionSplits(true);
+ final List<PolicyEntry> policyEntries = new
ArrayList<PolicyEntry>();
+ final PolicyEntry entry = new PolicyEntry();
+ entry.setQueue(">");
+ entry.setMemoryLimit(1024 * 1024 * 100); // Set to 1 MB
+ entry.setOptimizedDispatch(true);
+ entry.setProducerFlowControl(true);
+ entry.setMaxPageSize(10);
+ entry.setLazyDispatch(false);
+ policyEntries.add(entry);
+
+
+ final PolicyMap policyMap = new PolicyMap();
+ policyMap.setPolicyEntries(policyEntries);
+ answer.setDestinationPolicy(policyMap);
+ super.configureBroker(answer);
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
Tue Apr 8 05:43:36 2008
@@ -16,20 +16,53 @@
*/
package org.apache.activemq.perf;
+import java.util.ArrayList;
+import java.util.List;
+
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import
org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import
org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+
/**
* @version $Revision: 1.3 $
*/
public class SimpleNonPersistentQueueTest extends SimpleQueueTest {
+ protected void setUp() throws Exception {
+ numberOfConsumers = 10;
+ numberofProducers = 10;
+ //this.consumerSleepDuration=100;
+ super.setUp();
+ }
protected PerfProducer createProducer(ConnectionFactory fac, Destination
dest, int number, byte[] payload) throws JMSException {
PerfProducer pp = new PerfProducer(fac, dest, payload);
pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- pp.setTimeToLive(100);
+ //pp.setTimeToLive(100);
return pp;
+ }
+
+ protected void configureBroker(BrokerService answer,String uri) throws
Exception {
+ answer.setPersistent(false);
+ final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+ final PolicyEntry entry = new PolicyEntry();
+ entry.setQueue(">");
+ entry.setMemoryLimit(1024 * 1024 * 1); // Set to 1 MB
+ entry.setOptimizedDispatch(true);
+ entry.setLazyDispatch(true);
+ policyEntries.add(entry);
+
+
+ final PolicyMap policyMap = new PolicyMap();
+ policyMap.setPolicyEntries(policyEntries);
+ answer.setDestinationPolicy(policyMap);
+ super.configureBroker(answer, uri);
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
Tue Apr 8 05:43:36 2008
@@ -31,9 +31,7 @@
}
protected void setUp() throws Exception {
- numberOfConsumers = 1;
- numberofProducers = 2;
- this.consumerSleepDuration=0;
+
super.setUp();
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
Tue Apr 8 05:43:36 2008
@@ -35,8 +35,8 @@
private static final Log LOG = LogFactory.getLog(SimpleTopicTest.class);
protected BrokerService broker;
- // protected String
- //
bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false";
+ protected String
clientURI="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false&wireFormat.maxInactivityDuration=50000";
+ //protected String clientURI="tcp://localhost:61616";
protected String bindAddress="tcp://localhost:61616";
//protected String bindAddress = "tcp://localhost:61616";
//protected String bindAddress="vm://localhost?marshal=true";
@@ -46,12 +46,15 @@
protected String destinationName = getClass().getName();
protected int samepleCount = 20;
protected long sampleInternal = 10000;
- protected int numberOfConsumers = 1;
- protected int numberofProducers = 0;
+ protected int numberOfDestinations=1;
+ protected int numberOfConsumers = 10;
+ protected int numberofProducers = 10;
+ protected int totalNumberOfProducers;
+ protected int totalNumberOfConsumers;
protected int playloadSize = 1024;
protected byte[] array;
protected ConnectionFactory factory;
- protected Destination destination;
+
protected long consumerSleepDuration=0;
/**
@@ -63,26 +66,37 @@
if (broker == null) {
broker = createBroker(bindAddress);
}
- factory = createConnectionFactory(bindAddress);
+ factory = createConnectionFactory(clientURI);
Connection con = factory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- destination = createDestination(session, destinationName);
- LOG.info("Testing against destination: " + destination);
- LOG.info("Running " + numberofProducers + " producer(s) and " +
numberOfConsumers + " consumer(s)");
- con.close();
- producers = new PerfProducer[numberofProducers];
- consumers = new PerfConsumer[numberOfConsumers];
- for (int i = 0; i < numberOfConsumers; i++) {
- consumers[i] = createConsumer(factory, destination, i);
- consumers[i].setSleepDuration(consumerSleepDuration);
- }
- for (int i = 0; i < numberofProducers; i++) {
- array = new byte[playloadSize];
- for (int j = i; j < array.length; j++) {
- array[j] = (byte)j;
+
+
+ LOG.info("Running " + numberofProducers + " producer(s) and " +
numberOfConsumers + " consumer(s) per " + numberOfDestinations + "
Destination(s)");
+
+ totalNumberOfConsumers=numberOfConsumers*numberOfDestinations;
+ totalNumberOfProducers=numberofProducers*numberOfDestinations;
+ producers = new PerfProducer[totalNumberOfProducers];
+ consumers = new PerfConsumer[totalNumberOfConsumers];
+ int consumerCount = 0;
+ int producerCount = 0;
+ for (int k =0; k < numberOfDestinations;k++) {
+ Destination destination = createDestination(session,
destinationName+":"+k);
+ LOG.info("Testing against destination: " + destination);
+ for (int i = 0; i < numberOfConsumers; i++) {
+ consumers[consumerCount] = createConsumer(factory,
destination, consumerCount);
+
consumers[consumerCount].setSleepDuration(consumerSleepDuration);
+ consumerCount++;
+ }
+ for (int i = 0; i < numberofProducers; i++) {
+ array = new byte[playloadSize];
+ for (int j = i; j < array.length; j++) {
+ array[j] = (byte)j;
+ }
+ producers[producerCount] = createProducer(factory,
destination, i, array);
+ producerCount++;
}
- producers[i] = createProducer(factory, destination, i, array);
}
+ con.close();
super.setUp();
}
@@ -136,10 +150,10 @@
}
public void testPerformance() throws JMSException, InterruptedException {
- for (int i = 0; i < numberOfConsumers; i++) {
+ for (int i = 0; i < totalNumberOfConsumers; i++) {
consumers[i].start();
}
- for (int i = 0; i < numberofProducers; i++) {
+ for (int i = 0; i < totalNumberOfProducers; i++) {
producers[i].start();
}
LOG.info("Sampling performance " + samepleCount + " times at a " +
sampleInternal + " ms interval.");
@@ -148,10 +162,10 @@
dumpProducerRate();
dumpConsumerRate();
}
- for (int i = 0; i < numberofProducers; i++) {
+ for (int i = 0; i < totalNumberOfProducers; i++) {
producers[i].stop();
}
- for (int i = 0; i < numberOfConsumers; i++) {
+ for (int i = 0; i < totalNumberOfConsumers; i++) {
consumers[i].stop();
}
}
@@ -159,30 +173,36 @@
protected void dumpProducerRate() {
int totalRate = 0;
int totalCount = 0;
+ String producerString="Producers:";
for (int i = 0; i < producers.length; i++) {
PerfRate rate = producers[i].getRate().cloneAndReset();
totalRate += rate.getRate();
totalCount += rate.getTotalCount();
+ producerString+="["+i+":"+rate.getRate() +
","+rate.getTotalCount()+"];";
}
if (producers != null && producers.length > 0) {
int avgRate = totalRate / producers.length;
System.out.println("Avg producer rate = " + avgRate
+ " msg/sec | Total rate = " + totalRate + ", sent = "
+ totalCount);
+ // System.out.println(producerString);
}
}
protected void dumpConsumerRate() {
int totalRate = 0;
int totalCount = 0;
+ String consumerString="Consumers:";
for (int i = 0; i < consumers.length; i++) {
PerfRate rate = consumers[i].getRate().cloneAndReset();
totalRate += rate.getRate();
totalCount += rate.getTotalCount();
+ consumerString+="["+i+":"+rate.getRate() +
","+rate.getTotalCount()+"];";
}
if (consumers != null && consumers.length > 0) {
int avgRate = totalRate / consumers.length;
System.out.println("Avg consumer rate = " + avgRate + " msg/sec |
Total rate = " + totalRate + ", received = " + totalCount);
+ System.out.println(consumerString);
}
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java?rev=645881&r1=645880&r2=645881&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java
Tue Apr 8 05:43:36 2008
@@ -33,20 +33,15 @@
public class SlowConsumerTopicTest extends SimpleTopicTest {
protected PerfConsumer[] slowConsumers;
- protected int numberOfSlowConsumers = 1;
-
+
protected void setUp() throws Exception {
- numberOfConsumers = 0;
+
playloadSize = 10 * 1024;
super.setUp();
- slowConsumers = new SlowConsumer[numberOfSlowConsumers];
- for (int i = 0; i < numberOfSlowConsumers; i++) {
- slowConsumers[i] = createSlowConsumer(factory, destination, i);
- slowConsumers[i].start();
- }
}
+
- protected PerfConsumer createSlowConsumer(ConnectionFactory fac,
Destination dest, int number) throws JMSException {
+ protected PerfConsumer createConsumer(ConnectionFactory fac, Destination
dest, int number) throws JMSException {
return new SlowConsumer(fac, dest);
}