Hello Developers,

i made some trivial enhancements to the ActiveMq examples.

This patch introduces parallel enqueueing/dequeuing of messages (without
connection pooling).
I used this tool for benchmarking/verifying/testing my ActiveMQ
configuration - maybe also other users
are interested in this.

Example:
---
ant producer -DparallelThreads=60 -Dmax=2000 -DmessageSize=1024
ant consumer -DparallelThreads=100 -Dmax=2000
---

The patch is based on the following release:
---
$ LANG=C svn info
Path: .
URL: https://svn.apache.org/repos/asf/activemq/trunk
Repository Root: https://svn.apache.org/repos/asf
Repository UUID: 13f79535-47bb-0310-9956-ffa450edef68
Revision: 932332
---

You can apply this, i accept the Apache licence.

Regards
Marc Schoechlin
Index: assembly/src/release/example/src/ProducerTool.java
===================================================================
--- assembly/src/release/example/src/ProducerTool.java	(Revision 932332)
+++ assembly/src/release/example/src/ProducerTool.java	(Arbeitskopie)
@@ -15,7 +15,9 @@
  * limitations under the License.
  */
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.Iterator;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -33,13 +35,14 @@
  * 
  * @version $Revision: 1.2 $
  */
-public class ProducerTool {
+public class ProducerTool extends Thread{
 
     private Destination destination;
     private int messageCount = 10;
     private long sleepTime;
     private boolean verbose = true;
     private int messageSize = 255;
+    private static int parallelThreads = 1;
     private long timeToLive;
     private String user = ActiveMQConnection.DEFAULT_USER;
     private String password = ActiveMQConnection.DEFAULT_PASSWORD;
@@ -48,28 +51,60 @@
     private boolean topic;
     private boolean transacted;
     private boolean persistent;
+	 private static Object lockResults = new Object();
 
     public static void main(String[] args) {
+	 	  ArrayList<ProducerTool> threads = new ArrayList();
         ProducerTool producerTool = new ProducerTool();
         String[] unknown = CommandLineSupport.setOptions(producerTool, args);
         if (unknown.length > 0) {
             System.out.println("Unknown options: " + Arrays.toString(unknown));
             System.exit(-1);
         }
-        producerTool.run();
+		  producerTool.showParameters();
+		  for (int threadCount=1; threadCount <= parallelThreads; threadCount++){
+        		producerTool = new ProducerTool();
+        	   CommandLineSupport.setOptions(producerTool, args);
+				producerTool.start();
+				threads.add(producerTool);
+		  }
+		 
+		while(true){ 
+		  Iterator<ProducerTool> itr = threads.iterator();
+		  int running = 0;
+		  while (itr.hasNext()) {
+				ProducerTool thread  = itr.next();
+			   if (thread.isAlive()){
+					running++;
+				}
+		  }
+		  if (running <= 0){
+				System.out.println("All threads completed their work");
+				break;
+		  }
+		  try{
+		  		Thread.sleep(1000);
+        }catch(Exception e){
+		  }
+	   }
     }
 
+	 public void showParameters(){
+        System.out.println("Connecting to URL: " + url);
+        System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
+        System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
+        System.out.println("Sleeping between publish " + sleepTime + " ms");
+        System.out.println("Running "+parallelThreads+" parallel threads");
+
+        if (timeToLive != 0) {
+            System.out.println("Messages time to live " + timeToLive + " ms");
+        }
+	 }
+
+
     public void run() {
         Connection connection = null;
         try {
-            System.out.println("Connecting to URL: " + url);
-            System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
-            System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
-            System.out.println("Sleeping between publish " + sleepTime + " ms");
-            if (timeToLive != 0) {
-                System.out.println("Messages time to live " + timeToLive + " ms");
-            }
-
             // Create the connection.
             ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
             connection = connectionFactory.createConnection();
@@ -97,15 +132,16 @@
             // Start sending messages
             sendLoop(session, producer);
 
-            System.out.println("Done.");
+            System.out.println("["+this.getName()+"] Done.");
 
-            // Use the ActiveMQConnection interface to dump the connection
-            // stats.
-            ActiveMQConnection c = (ActiveMQConnection)connection;
-            c.getConnectionStats().dump(new IndentPrinter());
+				synchronized(lockResults){
+            	ActiveMQConnection c = (ActiveMQConnection)connection;
+					System.out.println("***** Results of "+this.getName()+"\n");
+					c.getConnectionStats().dump(new IndentPrinter());
+				}
 
         } catch (Exception e) {
-            System.out.println("Caught: " + e);
+            System.out.println("["+this.getName()+"] Caught: " + e);
             e.printStackTrace();
         } finally {
             try {
@@ -126,18 +162,17 @@
                 if (msg.length() > 50) {
                     msg = msg.substring(0, 50) + "...";
                 }
-                System.out.println("Sending message: " + msg);
+                System.out.println("["+this.getName()+"] Sending message: '" + msg + "'");
             }
 
             producer.send(message);
+
             if (transacted) {
+                System.out.println("["+this.getName()+"] Committing "+messageCount+" messages");
                 session.commit();
             }
-
             Thread.sleep(sleepTime);
-
         }
-
     }
 
     private String createMessageText(int index) {
@@ -180,6 +215,10 @@
         this.timeToLive = timeToLive;
     }
 
+    public void setParallelThreads(int parallelThreads) {
+		  if(parallelThreads < 1) parallelThreads = 1;
+        this.parallelThreads = parallelThreads;
+    }
     public void setTopic(boolean topic) {
         this.topic = topic;
     }
Index: assembly/src/release/example/src/ConsumerTool.java
===================================================================
--- assembly/src/release/example/src/ConsumerTool.java	(Revision 932332)
+++ assembly/src/release/example/src/ConsumerTool.java	(Arbeitskopie)
@@ -17,6 +17,8 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Iterator;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -39,7 +41,7 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class ConsumerTool implements MessageListener, ExceptionListener {
+public class ConsumerTool extends Thread implements MessageListener, ExceptionListener{
 
     private boolean running;
 
@@ -47,9 +49,10 @@
     private Destination destination;
     private MessageProducer replyProducer;
 
-    private boolean pauseBeforeShutdown;
+    private boolean pauseBeforeShutdown = false;
     private boolean verbose = true;
     private int maxiumMessages;
+    private static int parallelThreads = 1;
     private String subject = "TOOL.DEFAULT";
     private boolean topic;
     private String user = ActiveMQConnection.DEFAULT_USER;
@@ -64,22 +67,58 @@
     private long receiveTimeOut;
 
     public static void main(String[] args) {
+	 	  ArrayList<ConsumerTool> threads = new ArrayList();
         ConsumerTool consumerTool = new ConsumerTool();
         String[] unknown = CommandLineSupport.setOptions(consumerTool, args);
         if (unknown.length > 0) {
             System.out.println("Unknown options: " + Arrays.toString(unknown));
             System.exit(-1);
         }
-        consumerTool.run();
+		  consumerTool.showParameters();
+		  for (int threadCount=1; threadCount <= parallelThreads; threadCount++){
+        		consumerTool = new ConsumerTool();
+        	   CommandLineSupport.setOptions(consumerTool, args);
+				consumerTool.start();
+				threads.add(consumerTool);
+		  }
+		 
+		while(true){ 
+		  Iterator<ConsumerTool> itr = threads.iterator();
+		  int running = 0;
+		  while (itr.hasNext()) {
+				ConsumerTool thread  = itr.next();
+			   if (thread.isAlive()){
+					running++;
+				}
+		  }
+
+		  if (running <= 0){
+				System.out.println("All threads completed their work");
+				break;
+		  }
+
+		  try{
+		  		Thread.sleep(1000);
+        }catch(Exception e){
+		  }
+	   }
+      Iterator<ConsumerTool> itr = threads.iterator();
+		while (itr.hasNext()) {
+		 	ConsumerTool thread  = itr.next();
+		}
     }
 
+	 public void showParameters(){
+            System.out.println("Connecting to URL: " + url);
+            System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
+            System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription");
+        		System.out.println("Running "+parallelThreads+" parallel threads");
+	 }
+
     public void run() {
         try {
             running = true;
 
-            System.out.println("Connecting to URL: " + url);
-            System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
-            System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription");
 
             ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
             Connection connection = connectionFactory.createConnection();
@@ -117,7 +156,7 @@
             }
 
         } catch (Exception e) {
-            System.out.println("Caught: " + e);
+            System.out.println("["+this.getName()+"] Caught: " + e);
             e.printStackTrace();
         }
     }
@@ -130,15 +169,15 @@
                 if (verbose) {
 
                     String msg = txtMsg.getText();
-                    if (msg.length() > 50) {
+						  int length = msg.length();
+                    if (length > 50) {
                         msg = msg.substring(0, 50) + "...";
                     }
-
-                    System.out.println("Received: " + msg);
+                    System.out.println("["+this.getName()+"] Received: '" + msg + "' (length "+length+")");
                 }
             } else {
                 if (verbose) {
-                    System.out.println("Received: " + message);
+                    System.out.println("["+this.getName()+"] Received: '" + message +"'");
                 }
             }
 
@@ -153,7 +192,7 @@
             }
 
         } catch (JMSException e) {
-            System.out.println("Caught: " + e);
+            System.out.println("["+this.getName()+"] Caught: " + e);
             e.printStackTrace();
         } finally {
             if (sleepTime > 0) {
@@ -166,7 +205,7 @@
     }
 
     public synchronized void onException(JMSException ex) {
-        System.out.println("JMS Exception occured.  Shutting down client.");
+        System.out.println("["+this.getName()+"] JMS Exception occured.  Shutting down client.");
         running = false;
     }
 
@@ -175,7 +214,7 @@
     }
 
     protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
-        System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
+        System.out.println("["+this.getName()+"] We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
 
         for (int i = 0; i < maxiumMessages && isRunning();) {
             Message message = consumer.receive(1000);
@@ -184,30 +223,30 @@
                 onMessage(message);
             }
         }
-        System.out.println("Closing connection");
+        System.out.println("["+this.getName()+"] Closing connection");
         consumer.close();
         session.close();
         connection.close();
         if (pauseBeforeShutdown) {
-            System.out.println("Press return to shut down");
+            System.out.println("["+this.getName()+"] Press return to shut down");
             System.in.read();
         }
     }
 
     protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) throws JMSException, IOException {
-        System.out.println("We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown");
+        System.out.println("["+this.getName()+"] We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown");
 
         Message message;
         while ((message = consumer.receive(timeout)) != null) {
             onMessage(message);
         }
 
-        System.out.println("Closing connection");
+        System.out.println("["+this.getName()+"] Closing connection");
         consumer.close();
         session.close();
         connection.close();
         if (pauseBeforeShutdown) {
-            System.out.println("Press return to shut down");
+            System.out.println("["+this.getName()+"] Press return to shut down");
             System.in.read();
         }
     }
@@ -263,6 +302,10 @@
         this.subject = subject;
     }
 
+    public void setParallelThreads(int parallelThreads) {
+		  if(parallelThreads < 1) parallelThreads = 1;
+        this.parallelThreads = parallelThreads;
+    }
     public void setTopic(boolean topic) {
         this.topic = topic;
     }
@@ -286,5 +329,4 @@
     public void setVerbose(boolean verbose) {
         this.verbose = verbose;
     }
-
 }
Index: assembly/src/release/example/build.xml
===================================================================
--- assembly/src/release/example/build.xml	(Revision 932332)
+++ assembly/src/release/example/build.xml	(Arbeitskopie)
@@ -26,6 +26,7 @@
 	<property name="subject" value="TEST.FOO" />
 	<property name="durable" value="false" />
 	<property name="max" value="2000" />
+	<property name="parallelThreads" value="1" />
 	<property name="messageSize" value="1000" />
 	<property name="clientId" value="consumer1" />
 	<property name="producerClientId" value="null" />
@@ -92,7 +93,7 @@
                                     more information
                  receive-time-out - An integer to specify the time to wait for
                                     message consumption
- 
+                  parallelThreads - The number of parallel threads
 				
             --------------------------------------------------------				
             ant producer &lt;options&gt; - Creates a producer publishing a number of messages
@@ -112,6 +113,9 @@
                     transacted - A boolean to specify that you want to use 
                                  transactions? 
                        verbose - Used to print out more info; the default is true
+                   messageSize - The size of the message in 1-byte characters
+               parallelThreads - The number of parallel threads
+
 		   
            --------------------------------------------------------
 		
@@ -209,6 +213,7 @@
 			<arg value="--durable=${durable}" />
 			<arg value="--maxium-messages=${max}" />
 			<arg value="--client-id=${clientId}" />
+			<arg value="--parallel-threads=${parallelThreads}" />
 			<arg value="--transacted=${transacted}" />
 			<arg value="--sleep-time=${sleepTime}" />
 			<arg value="--verbose=${verbose}"/>
@@ -230,6 +235,7 @@
 			<arg value="--persistent=${durable}" />
 			<arg value="--message-count=${max}" />
 			<arg value="--message-size=${messageSize}" />
+			<arg value="--parallel-threads=${parallelThreads}" />
 			<arg value="--time-to-live=${timeToLive}" />
 			<arg value="--sleep-time=${sleepTime}" />
 			<arg value="--transacted=${transacted}" />

Reply via email to