Author: foconer
Date: Fri Jun  2 01:24:47 2006
New Revision: 411085

URL: http://svn.apache.org/viewvc?rev=411085&view=rev
Log:
Added support for sync and async.

Modified:
    
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java

Modified: 
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java?rev=411085&r1=411084&r2=411085&view=diff
==============================================================================
--- 
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
 (original)
+++ 
incubator/activemq/trunk/tooling/maven-activemq-perf-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
 Fri Jun  2 01:24:47 2006
@@ -29,6 +29,7 @@
     private Destination destination = null;
 
     private boolean isDurable = false;
+    private boolean isAsync = true;
 
     public JmsConsumerClient(ConnectionFactory factory) {
         this.factory = factory;
@@ -66,48 +67,53 @@
             setDestination(getDestinationName());
         }
 
-        System.out.println("Connecting to URL: " + brokerUrl);
-        System.out.println("Consuming: " + destination);
-        System.out.println("Using " + (isDurable ? "durable" : "non-durable") 
+ " subscription");
-
-
         if (isDurable) {
             createDurableSubscriber((Topic) getDestination(), 
getClass().getName());
         } else {
             createMessageConsumer(getDestination());
         }
 
-        getMessageConsumer().setMessageListener(this);
-        getConnection().start();
-
-        try {
-            Thread.sleep(duration);
-        } catch (InterruptedException e) {
-            throw new JMSException("Error while consumer is sleeping " + 
e.getMessage());
+        if (isAsync) {
+            getMessageConsumer().setMessageListener(this);
+            getConnection().start();
+
+            try {
+                Thread.sleep(duration);
+            } catch (InterruptedException e) {
+                throw new JMSException("Error while consumer is sleeping " + 
e.getMessage());
+            }
+        } else {
+            getConnection().start();
+            consumeMessages(getMessageConsumer(), duration);
         }
 
-        getMessageConsumer().close();
-        getConnection().close();
-
-        System.out.println("Throughput : " + this.getThroughput());
-
+        close(); //close consumer, session, and connection.
         listener.onConfigEnd(this);
     }
 
+    //Increments throughput
     public void onMessage(Message message) {
-        try {
-            TextMessage textMessage = (TextMessage) message;
+        System.out.println(message.toString());
+        this.incThroughput();
+    }
 
-            // lets force the content to be deserialized
-            String text = textMessage.getText();
-            System.out.println("message: " + text + ":" + 
this.getThroughput());
-            this.incThroughput();
-        } catch (JMSException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
+    protected void consumeMessages(MessageConsumer consumer, long duration) 
throws JMSException {
+
+        long currentTime = System.currentTimeMillis();
+        long endTime = currentTime + duration;
+
+        while (System.currentTimeMillis() <= endTime) {
+            Message message = consumer.receive();
+            onMessage(message);
         }
     }
 
+    protected void close() throws JMSException {
+        getMessageConsumer().close();
+        getSession().close();
+        getConnection().close();
+    }
+
     public static void main(String[] args) throws Exception {
         JmsConsumerClient cons = new 
JmsConsumerClient("org.apache.activemq.ActiveMQConnectionFactory", 
"tcp://localhost:61616", "topic://TEST.FOO");
         cons.setPerfEventListener(new PerfEventAdapter());
@@ -115,6 +121,22 @@
     }
 
     // Helper Methods
+
+    public boolean isDurable() {
+        return isDurable;
+    }
+
+    public void setDurable(boolean durable) {
+        isDurable = durable;
+    }
+
+    public boolean isAsync() {
+        return isAsync;
+    }
+
+    public void setAsync(boolean async) {
+        isAsync = async;
+    }
 
     public String getDestinationName() {
         return this.destinationName;


Reply via email to