Author: chirino
Date: Fri Feb 26 17:17:32 2010
New Revision: 916765
URL: http://svn.apache.org/viewvc?rev=916765&view=rev
Log:
better load test client.. clean up even on failure and provide a throughput
report during the test
Modified:
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/activemq/systest/StompLoadTest.java
Modified:
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/activemq/systest/StompLoadTest.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/activemq/systest/StompLoadTest.java?rev=916765&r1=916764&r2=916765&view=diff
==============================================================================
---
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/activemq/systest/StompLoadTest.java
(original)
+++
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/activemq/systest/StompLoadTest.java
Fri Feb 26 17:17:32 2010
@@ -3,6 +3,7 @@
import java.net.Socket;
import java.net.URI;
import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
@@ -11,6 +12,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import static java.lang.String.*;
+import static java.util.concurrent.TimeUnit.*;
+
/**
*
* Simulates load on the Stomp connector. All producers/consumers open/close a
@@ -27,11 +31,15 @@
final int producerSleep = 10;
final int consumerSleep = 10;
final int msgCount = 10000;
- final int producerCount = 5;
- final int consumerCount = 5;
+ final int producerCount = 10;
+ final int consumerCount = 10;
final int testTime = 30 * 60 * 1000;
- final String bindAddress = "stomp://0.0.0.0:61612";
+ final int sampleInterval = 5 * 1000;
+ final String bindAddress = "stomp://0.0.0.0:61613";
+ AtomicLong producerCounter = new AtomicLong();
+ AtomicLong consumerCounter = new AtomicLong();
+
public void testLoad() throws Exception {
for (int i = 0; i < producerCount; i++) {
@@ -44,15 +52,29 @@
consumerThread.start();
}
- Thread.sleep(testTime);
+ int samples = testTime/sampleInterval;
+ long start = System.nanoTime();
+ for( int i=0; i < samples; i++ ) {
+ Thread.sleep(sampleInterval);
+ long end = System.nanoTime();
+ printRate("Producer", producerCounter, end-start);
+ printRate("Consumer", consumerCounter, end-start);
+ start = end;
+ }
+ }
+
+ static final long NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS);
+
+ private void printRate(String name, AtomicLong counter, long nanos) {
+ long c = counter.getAndSet(0);
+ float rate_per_second = ((1.0f*c/nanos)*NANOS_PER_SECOND);
+ LOG.info(format("%s rate: %,.3f per second", name, rate_per_second));
}
- public StompConnection createConnection() throws Exception {
- StompConnection conn = new StompConnection();
+ public void connect(StompConnection conn) throws Exception {
URI connectUri = new URI(bindAddress);
conn.open(new Socket(connectUri.getHost(), connectUri.getPort()));
conn.connect("", "");
- return conn;
}
class ProducerThread extends Thread {
@@ -65,15 +87,20 @@
public void run() {
for (int i = 0; i < msgCount; i++) {
+ StompConnection conn = new StompConnection();
try {
- StompConnection conn = createConnection();
- String msg = "test message " + i;
- LOG.info(name + " sending " + msg);
+ connect(conn);
+ String msg = "Message #" + i+" from "+name;
conn.send("/queue/test", msg);
- conn.disconnect();
+ producerCounter.incrementAndGet();
Thread.sleep(producerSleep);
} catch (Exception e) {
e.printStackTrace();
+ } finally {
+ try {
+ conn.disconnect();
+ } catch (Exception ignore) {
+ }
}
}
}
@@ -89,18 +116,27 @@
public void run() {
for (int i = 0; i < msgCount; i++) {
+ StompConnection conn = new StompConnection();
try {
- StompConnection conn = createConnection();
+ connect(conn);
HashMap<String, String> headers = new HashMap<String,
String>();
headers.put("activemq.prefetchSize", "1");
conn.subscribe("/queue/test", "client", headers);
- StompFrame frame = conn.receive(1000);
+ StompFrame frame = conn.receive(1*1000);
conn.ack(frame);
- LOG.info(name + " received " + frame.getBody());
- conn.disconnect();
+ consumerCounter.incrementAndGet();
Thread.sleep(consumerSleep);
} catch (Exception e) {
e.printStackTrace();
+ } finally {
+ try {
+ conn.disconnect();
+ } catch (Exception ignore) {
+ }
+ try {
+ conn.close();
+ } catch (Exception ignore) {
+ }
}
}
}