Author: foconer
Date: Fri Jun 2 01:24:47 2006
New Revision: 411085
URL: http://svn.apache.org/viewvc?rev=411085&view=rev
Log:
Added support for sync and async.
Modified:
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
Modified:
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java?rev=411085&r1=411084&r2=411085&view=diff
==============================================================================
---
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
(original)
+++
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
Fri Jun 2 01:24:47 2006
@@ -29,6 +29,7 @@
private Destination destination = null;
private boolean isDurable = false;
+ private boolean isAsync = true;
public JmsConsumerClient(ConnectionFactory factory) {
this.factory = factory;
@@ -66,48 +67,53 @@
setDestination(getDestinationName());
}
- System.out.println("Connecting to URL: " + brokerUrl);
- System.out.println("Consuming: " + destination);
- System.out.println("Using " + (isDurable ? "durable" : "non-durable")
+ " subscription");
-
-
if (isDurable) {
createDurableSubscriber((Topic) getDestination(),
getClass().getName());
} else {
createMessageConsumer(getDestination());
}
- getMessageConsumer().setMessageListener(this);
- getConnection().start();
-
- try {
- Thread.sleep(duration);
- } catch (InterruptedException e) {
- throw new JMSException("Error while consumer is sleeping " +
e.getMessage());
+ if (isAsync) {
+ getMessageConsumer().setMessageListener(this);
+ getConnection().start();
+
+ try {
+ Thread.sleep(duration);
+ } catch (InterruptedException e) {
+ throw new JMSException("Error while consumer is sleeping " +
e.getMessage());
+ }
+ } else {
+ getConnection().start();
+ consumeMessages(getMessageConsumer(), duration);
}
- getMessageConsumer().close();
- getConnection().close();
-
- System.out.println("Throughput : " + this.getThroughput());
-
+ close(); //close consumer, session, and connection.
listener.onConfigEnd(this);
}
+ //Increments throughput
public void onMessage(Message message) {
- try {
- TextMessage textMessage = (TextMessage) message;
+ System.out.println(message.toString());
+ this.incThroughput();
+ }
- // lets force the content to be deserialized
- String text = textMessage.getText();
- System.out.println("message: " + text + ":" +
this.getThroughput());
- this.incThroughput();
- } catch (JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ protected void consumeMessages(MessageConsumer consumer, long duration)
throws JMSException {
+
+ long currentTime = System.currentTimeMillis();
+ long endTime = currentTime + duration;
+
+ while (System.currentTimeMillis() <= endTime) {
+ Message message = consumer.receive();
+ onMessage(message);
}
}
+ protected void close() throws JMSException {
+ getMessageConsumer().close();
+ getSession().close();
+ getConnection().close();
+ }
+
public static void main(String[] args) throws Exception {
JmsConsumerClient cons = new
JmsConsumerClient("org.apache.activemq.ActiveMQConnectionFactory",
"tcp://localhost:61616", "topic://TEST.FOO");
cons.setPerfEventListener(new PerfEventAdapter());
@@ -115,6 +121,22 @@
}
// Helper Methods
+
+ public boolean isDurable() {
+ return isDurable;
+ }
+
+ public void setDurable(boolean durable) {
+ isDurable = durable;
+ }
+
+ public boolean isAsync() {
+ return isAsync;
+ }
+
+ public void setAsync(boolean async) {
+ isAsync = async;
+ }
public String getDestinationName() {
return this.destinationName;