Author: rajith
Date: Wed Nov 11 00:21:27 2009
New Revision: 834724

URL: http://svn.apache.org/viewvc?rev=834724&view=rev
Log:
Moved MessageFactory to the tools module.
Added a Generic Sender and a Receiver. 
They can be run standalone or used as a building block to create more complex 
tests.
TestLauncher is a utility to start a sender or receiver in multiple threads 
with some added plumbing.
Please refer to each class to see the full set of options available.

Added:
    
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java
    
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
    
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
    
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java
    
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
Removed:
    
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java

Added: 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java?rev=834724&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java 
(added)
+++ 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java 
Wed Nov 11 00:21:27 2009
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Session;
+
+public abstract class Client
+{
+       protected Connection con;
+       protected Session ssn;
+    protected boolean durable = false;
+    protected boolean transacted = false;
+    protected int txSize = 10;
+    protected int ack_mode = Session.AUTO_ACKNOWLEDGE;
+    protected String contentType = "application/octet-stream";
+    protected Destination dest = null;
+        
+    protected long reportFrequency = 60000;  // every min
+    protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+    protected NumberFormat nf = new DecimalFormat("##.00");
+
+    protected long startTime = System.currentTimeMillis();
+    protected ErrorHandler errorHandler = null;
+    
+    public Client(Connection con) throws Exception
+    {
+       this.con = con;       
+       durable = Boolean.getBoolean("durable");
+       transacted = Boolean.getBoolean("transacted");
+       txSize = Integer.getInteger("tx_size",10);
+       contentType = 
System.getProperty("content_type","application/octet-stream");    
+       reportFrequency = Long.getLong("report_frequency", 60000);
+    }
+
+    public void close()
+    {
+       try
+       {
+               con.close();
+       }
+       catch (Exception e)
+       {
+               handleError("Error closing connection",e);
+       }
+    }
+    
+    public void setErrorHandler(ErrorHandler h)
+    {
+       this.errorHandler = h;
+    }
+    
+    public void handleError(String msg,Exception e)
+    {
+       if (errorHandler != null)
+       {
+               errorHandler.handleError(msg, e);
+       }
+       else
+       {
+               System.err.println(msg);
+               e.printStackTrace();
+       }
+    }
+}

Added: 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java?rev=834724&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
 (added)
+++ 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
 Wed Nov 11 00:21:27 2009
@@ -0,0 +1,6 @@
+package org.apache.qpid.testkit;
+
+public interface ErrorHandler {
+
+       public void handleError(String msg,Exception e);
+}

Added: 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java?rev=834724&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
 (added)
+++ 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
 Wed Nov 11 00:21:27 2009
@@ -0,0 +1,225 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQConnection;
+
+/**
+ * A generic receiver which consumers a stream of messages
+ * from a given address in a broker (host/port) 
+ * until told to stop by killing it.
+ * 
+ * It participates in a feedback loop to ensure the producer
+ * doesn't fill up the queue. If it receives an "End" msg
+ * it sends a reply to the replyTo address in that msg.
+ * 
+ * It doesn't check for correctness or measure anything
+ * leaving those concerns to another entity. 
+ * However it prints a timestamp every x secs(-Dreport_frequency)
+ * as checkpoint to figure out how far the test has progressed if
+ * a failure occurred.
+ * 
+ * It also takes in an optional Error handler to
+ * pass out any error in addition to writing them to std err. 
+ * 
+ * This is intended more as building block to create
+ * more complex test cases. However there is a main method
+ * provided to use this standalone.
+ *
+ * The following options are available and configurable 
+ * via jvm args.
+ * 
+ * sync_rcv - Whether to consume sync (instead of using a listener).
+ * report_frequency - how often a timestamp is printed
+ * durable
+ * transacted
+ * tx_size - size of transaction batch in # msgs. 
+ */
+public class Receiver extends Client implements MessageListener
+{
+       // Until addressing is properly supported.
+       protected enum Reliability {
+               AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE;
+               
+               Reliability getReliability(String s)
+               {
+                       if (s.equalsIgnoreCase("at_most_once"))
+                       {
+                               return AT_MOST_ONCE;
+                       }
+                       else if (s.equalsIgnoreCase("at_least_once"))
+                       {
+                               return AT_LEAST_ONCE;
+                       }
+                       else
+                       {
+                               return EXACTLY_ONCE;
+                       }
+               }
+       };
+       
+       long msg_count = 0;
+       int sequence = 0;
+       boolean sync_rcv = Boolean.getBoolean("sync_rcv");
+       boolean uniqueDests = Boolean.getBoolean("unique_dests");
+       Reliability reliability = Reliability.EXACTLY_ONCE;
+       MessageConsumer consumer;
+    List<Integer> duplicateMessages = new ArrayList<Integer>();
+    
+    public Receiver(Connection con,Destination dest) throws Exception
+    {
+       super(con);
+       reliability = 
reliability.getReliability(System.getProperty("reliability","exactly_once"));
+       ssn = con.createSession(transacted,ack_mode);
+       consumer = ssn.createConsumer(dest);
+       if (!sync_rcv)
+       {
+               consumer.setMessageListener(this);
+       }
+       
+       System.out.println("Operating in mode : " + reliability);
+       System.out.println("Receiving messages from : " + dest);
+    }
+
+    public void onMessage(Message msg)
+    {          
+       handleMessage(msg);
+    }
+    
+    public void run() throws Exception
+    {
+       while(true)
+       {
+               if(sync_rcv)
+               {
+                       Message msg = consumer.receive();
+                       handleMessage(msg);
+               }
+               Thread.sleep(reportFrequency);
+               System.out.println(df.format(System.currentTimeMillis())
+                               + " - messages received : " + msg_count);
+       }
+    }
+    
+    private void handleMessage(Message m)
+    {
+       try
+        {   
+            if (m instanceof TextMessage && ((TextMessage) 
m).getText().equals("End"))
+            {
+                MessageProducer temp = ssn.createProducer(m.getJMSReplyTo());
+                Message controlMsg = ssn.createTextMessage();
+                temp.send(controlMsg);
+                if (transacted)
+                {
+                    ssn.commit();
+                }
+                temp.close();
+            }
+            else
+            {   
+               
+               int seq = m.getIntProperty("sequence");   
+               if (uniqueDests)
+               {
+                       if (seq == 0)
+                       {
+                               sequence = 0; // wrap around for each iteration
+                       }
+                       
+                       if (seq < sequence)
+                       {                    
+                           duplicateMessages.add(seq);
+                           if (reliability == Reliability.EXACTLY_ONCE)
+                           {
+                               throw new Exception(": Received a duplicate 
message (expected="
+                                               + sequence  + ",received=" + 
seq + ")" ); 
+                           }                    
+                       }
+                       else if (seq == sequence)
+                       {
+                               sequence++;
+                               msg_count ++;
+                       }
+                       else
+                       {  
+                               // Multiple publishers are not allowed in this 
test case.
+                               // So out of order messages are not allowed.
+                               throw new Exception(": Received an out of order 
message (expected="
+                                               + sequence  + ",received=" + 
seq + ")" ); 
+                       }
+               }
+                // Please note that this test case doesn't expect duplicates
+                // When testing for transactions.
+               if (transacted && msg_count % txSize == 0)
+               {
+                       ssn.commit();
+               }
+            }
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+               handleError("Exception receiving messages",e);
+        }      
+    }
+
+    // Receiver host port address
+    public static void main(String[] args) throws Exception
+    {
+       String host = "127.0.0.1";
+       int port = 5672;
+       
+       if (args.length > 0)
+       {
+               host = args[0];
+       }
+       if (args.length > 1)
+       {
+               port = Integer.parseInt(args[1]);
+       }       
+       // #3rd argument should be an address
+        // Any other properties is best configured via jvm args        
+        
+       AMQConnection con = new AMQConnection(
+                               
"amqp://username:passw...@topicclientid/test?brokerlist='tcp://"
+                                               + host + ":" + port + "'");
+        
+        // FIXME Need to add support for the new address format
+        // Then it's trivial to add destination for that.
+        Receiver rcv = new Receiver(con,null);
+        rcv.run();
+    }
+
+}

Added: 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java?rev=834724&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java 
(added)
+++ 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java 
Wed Nov 11 00:21:27 2009
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.tools.MessageFactory;
+
+/**
+ * A generic sender which sends a stream of messages
+ * to a given address in a broker (host/port) 
+ * until told to stop by killing it.
+ * 
+ * It has a feedback loop to ensure it doesn't fill
+ * up queues due to a slow consumer.
+ * 
+ * It doesn't check for correctness or measure anything
+ * leaving those concerns to another entity. 
+ * However it prints a timestamp every x secs(-Dreport_frequency)
+ * as checkpoint to figure out how far the test has progressed if
+ * a failure occurred.
+ * 
+ * It also takes in an optional Error handler to
+ * pass out any error in addition to writing them to std err. 
+ * 
+ * This is intended more as building block to create
+ * more complex test cases. However there is a main method
+ * provided to use this standalone.
+ *
+ * The following options are available and configurable 
+ * via jvm args.
+ * 
+ * msg_size (256) 
+ * msg_count (10) - # messages before waiting for feedback
+ * sleep_time (1000 ms) - sleep time btw each iteration
+ * report_frequency - how often a timestamp is printed
+ * durable
+ * transacted
+ * tx_size - size of transaction batch in # msgs. 
+ */
+public class Sender extends Client
+{
+    protected int msg_size = 256;
+    protected int msg_count = 10;
+    protected int iterations = -1;
+    protected long sleep_time = 1000;
+
+    protected Destination dest = null;
+    protected Destination replyTo =  null;
+    protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+    protected NumberFormat nf = new DecimalFormat("##.00");
+    
+    protected MessageProducer producer;
+    Random gen = new Random(19770905);
+    
+    public Sender(Connection con,Destination dest) throws Exception
+    {
+       super(con);
+       this.msg_size = Integer.getInteger("msg_size", 100);
+       this.msg_count = Integer.getInteger("msg_count", 10);
+       this.iterations = Integer.getInteger("iterations", -1);
+       this.sleep_time = Long.getLong("sleep_time", 1000);
+       this.ssn = con.createSession(transacted,Session.AUTO_ACKNOWLEDGE);
+       this.dest = dest;
+       this.producer = ssn.createProducer(dest);
+       this.replyTo = ssn.createTemporaryQueue();
+       
+       System.out.println("Sending messages to : " + dest);
+    }
+
+    /*
+     * If msg_size not specified it generates a message
+     * between 500-1500 bytes.
+     */
+    protected Message getNextMessage() throws Exception
+    {
+        int s =  msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size;
+        Message msg = (contentType.equals("text/plain")) ?
+                MessageFactory.createTextMessage(ssn, s):
+                MessageFactory.createBytesMessage(ssn, s);
+                
+       msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT
+                               : DeliveryMode.NON_PERSISTENT);
+       return msg;
+    }
+         
+    public void run()
+    {
+       try 
+       {
+               boolean infinite = (iterations == -1);
+                       for (int x=0; infinite || x < iterations; x++)
+                       {
+                               long now = System.currentTimeMillis();
+                           if (now - startTime >= reportFrequency)
+                           {
+                               System.out.println(df.format(now) + " - 
iterations : " + x);
+                               startTime = now;
+                           }
+                           
+                           for (int i = 0; i < msg_count; i++)
+                           {
+                               Message msg = getNextMessage();
+                               msg.setIntProperty("sequence",i);
+                               producer.send(msg);
+                               if (transacted && msg_count % txSize == 0)
+                               {
+                                       ssn.commit();
+                               }
+                           }
+                           TextMessage m = ssn.createTextMessage("End");
+                           m.setJMSReplyTo(replyTo);
+                           producer.send(m);
+
+                           if (transacted)
+                           {
+                               ssn.commit();
+                           }
+
+                           MessageConsumer feedbackConsumer = 
ssn.createConsumer(replyTo);
+                           feedbackConsumer.receive();
+                           feedbackConsumer.close();
+                           if (transacted)
+                           {
+                               ssn.commit();
+                           }
+                           Thread.sleep(sleep_time);
+                       }
+               }
+       catch (Exception e)
+        {
+            handleError("Exception sending messages",e);
+        }      
+    }
+    
+    // Receiver host port address
+    public static void main(String[] args) throws Exception
+    {
+       String host = "127.0.0.1";
+       int port = 5672;
+       
+       if (args.length > 0)
+       {
+               host = args[0];
+       }
+       if (args.length > 1)
+       {
+               port = Integer.parseInt(args[1]);
+       }
+       // #3rd argument should be an address
+        // Any other properties is best configured via jvm args
+       
+        AMQConnection con = new AMQConnection(
+                               
"amqp://username:passw...@topicclientid/test?brokerlist='tcp://"
+                                               + host + ":" + port + "'");
+        
+        // FIXME Need to add support for the new address format
+        // Then it's trivial to add destination for that.
+        Sender sender = new Sender(con,null);
+        sender.run();
+    }
+}

Added: 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java?rev=834724&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
 (added)
+++ 
qpid/trunk/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
 Wed Nov 11 00:21:27 2009
@@ -0,0 +1,361 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
+
+/**
+ * A basic test case class that could launch a Sender/Receiver
+ * or both, each on it's own separate thread.
+ * 
+ * If con_count == ssn_count, then each entity created will have
+ * it's own Connection. Else if con_count < ssn_count, then
+ * a connection will be shared by ssn_count/con_count # of entities.
+ * 
+ * The if both sender and receiver options are set, it will
+ * share a connection.   
+ *
+ * The following options are available as jvm args
+ * host, port
+ * con_count,ssn_count
+ * con_idle_time -  which determines heartbeat
+ * sender, receiver - booleans which indicate which entity to create.
+ * Setting them both is also a valid option.
+ */
+public class TestLauncher implements ErrorHandler
+{
+    protected String host = "127.0.0.1";
+    protected int port = 5672;
+    protected int session_count = 1;
+    protected int connection_count = 1;
+    protected long connection_idle_time = 5000;
+    protected boolean sender = false;
+    protected boolean receiver = false;
+    protected String url;
+
+    protected String queue_name =  "message_queue";
+    protected String exchange_name =  "amq.direct";
+    protected String routing_key =  "routing_key";
+    protected boolean uniqueDests = false;
+    protected boolean durable = false;
+    protected String failover = "";
+    protected AMQConnection controlCon;
+    protected Destination controlDest = null;
+    protected Session controlSession = null;
+    protected MessageProducer statusSender;
+    protected List<AMQConnection> clients = new ArrayList<AMQConnection>();
+    protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+    protected NumberFormat nf = new DecimalFormat("##.00");
+    protected String testName;    
+        
+    public TestLauncher()
+    {
+       testName = System.getProperty("test_name","UNKNOWN");
+       host = System.getProperty("host", "127.0.0.1");
+       port = Integer.getInteger("port", 5672);
+       session_count = Integer.getInteger("ssn_count", 1);
+       connection_count = Integer.getInteger("con_count", 1);
+       connection_idle_time = Long.getLong("con_idle_time", 5000);
+       sender = Boolean.getBoolean("sender");
+       receiver = Boolean.getBoolean("receiver");
+       
+       queue_name = System.getProperty("queue_name", "message_queue");
+       exchange_name = System.getProperty("exchange_name", "amq.direct");
+       routing_key = System.getProperty("routing_key", "routing_key");
+       failover = System.getProperty("failover", "");
+       uniqueDests = Boolean.getBoolean("unique_dests");
+       durable = Boolean.getBoolean("durable");
+       
+       url = "amqp://username:passw...@topicclientid/test?brokerlist='tcp://"
+                               + host + ":" + port + "?idle_timeout=" + 
connection_idle_time
+                               + "'";
+       
+       if (failover.equalsIgnoreCase("failover_exchange"))
+       {
+          url += "&failover='failover_exchange'";
+          
+          System.out.println("Failover exchange " + url );
+       }
+    }
+
+    public void setUpControlChannel()
+    {
+        try
+        {
+            controlCon = new AMQConnection(url);
+            controlCon.start();
+            
+            controlDest = new AMQQueue(new AMQShortString(""),
+                                     new AMQShortString("control"),
+                                     new AMQShortString("control"),
+                                     false, //exclusive
+                                     false, //auto-delete
+                                     false); // durable
+
+            // Create the session to setup the messages
+            controlSession = controlCon.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            statusSender = controlSession.createProducer(controlDest);
+
+        }
+        catch (Exception e)
+        {
+            handleError("Error while setting up the test",e);
+        }
+    }
+    
+    public void cleanup()
+    {
+       try
+       {
+               controlSession.close();
+               controlCon.close();
+               for (AMQConnection con : clients)
+               {
+                       con.close();
+               }
+       }
+           catch (Exception e)
+           {
+               handleError("Error while tearing down the test",e);
+           }
+    }
+        
+    public void start()
+    {
+        try
+        {
+           
+               int ssn_per_con = session_count;
+               if (connection_count < session_count)
+               {
+                       ssn_per_con = session_count/connection_count;
+               }
+               
+               for (int i = 0; i< connection_count; i++)
+               {
+                       AMQConnection con = new AMQConnection(url);
+                       con.start();
+                       clients.add(con);                       
+                       for (int j = 0; j< ssn_per_con; j++)
+               {
+                               String prefix = createPrefix(i,j);
+                               Destination dest = createDest(prefix);
+                               if (sender)
+                               {
+                                       createSender(prefix,con,dest,this);
+                               }
+                               
+                               if (receiver)
+                               {
+                                       createReceiver(prefix,con,dest,this);
+                               }
+               }
+               }
+        }
+        catch (Exception e)
+        {
+            handleError("Exception while setting up the test",e);
+        }
+
+    }
+    
+    protected void createReceiver(String index,final AMQConnection con, final 
Destination dest, final ErrorHandler h)
+    {
+       Runnable r = new Runnable()
+        {
+            public void run()
+            {
+               try 
+               {
+                  Receiver rcv = new Receiver(con,dest);
+                                  rcv.setErrorHandler(h);
+                                  rcv.run();
+                               }
+                   catch (Exception e) 
+                   {
+                                       h.handleError("Error Starting 
Receiver", e);
+                               }
+            }
+        };
+        
+        Thread t = null;
+        try
+        {
+            t = Threading.getThreadFactory().createThread(r);                  
    
+        }
+        catch(Exception e)
+        {
+            handleError("Error creating Receive thread",e);
+        }
+        
+        t.setName("ReceiverThread-" + index);
+        t.start();
+    }
+    
+    protected void createSender(String index,final AMQConnection con, final 
Destination dest, final ErrorHandler h)
+    {
+       Runnable r = new Runnable()
+        {
+            public void run()
+            {
+               try 
+               {
+                  Sender sender = new Sender(con, dest);
+                  sender.setErrorHandler(h);
+                  sender.run();
+                               }
+                   catch (Exception e) 
+                   {
+                                       h.handleError("Error Starting Sender", 
e);
+                               }
+            }
+        };
+        
+        Thread t = null;
+        try
+        {
+            t = Threading.getThreadFactory().createThread(r);                  
    
+        }
+        catch(Exception e)
+        {
+            handleError("Error creating Sender thread",e);
+        }
+        
+        t.setName("SenderThread-" + index);
+        t.start();
+    }
+
+    public void handleError(String msg,Exception e)
+    {
+       // In case sending the message fails
+        StringBuilder sb = new StringBuilder();
+        sb.append(msg);
+        sb.append(" @ ");
+        sb.append(df.format(new Date(System.currentTimeMillis())));
+        sb.append(" ");
+        sb.append(e.getMessage());
+        System.err.println(sb.toString());
+        e.printStackTrace();
+        
+        try 
+        {
+                       TextMessage errorMsg = 
controlSession.createTextMessage();
+                       errorMsg.setStringProperty("status", "error");
+                       errorMsg.setStringProperty("desc", msg);
+                       errorMsg.setStringProperty("time", df.format(new 
Date(System.currentTimeMillis())));        
+                       errorMsg.setStringProperty("exception-trace", 
serializeStackTrace(e));
+                       synchronized (this)
+                       {
+                               statusSender.send(errorMsg);
+                       }
+               } catch (JMSException e1) {
+                       e1.printStackTrace();
+               }       
+    }
+    
+    private String serializeStackTrace(Exception e)
+    {
+       ByteArrayOutputStream bOut = new ByteArrayOutputStream();
+       PrintStream printStream = new PrintStream(bOut);
+       e.printStackTrace(printStream);
+       printStream.close();
+       return bOut.toString();
+    }
+    
+    private String createPrefix(int i, int j)
+    {
+       return String.valueOf(i).concat(String.valueOf(j));
+    }
+    
+    /**
+     * The following are supported.
+     * 
+     * 1. A producer/consumer pair on a topic or a queue
+     * 2. A single producer with multiple consumers on topic/queue
+     * 
+     * Multiple consumers on a topic will result in a private queue
+     * for each consumers.
+     * 
+     * We want to avoid multiple producers on the same topic/queue
+     * as the queues will fill up in no time.
+     */
+    private Destination createDest(String prefix)
+    {
+       Destination dest = null;
+        if (exchange_name.equals("amq.topic"))
+        {
+            dest = new AMQTopic(
+                        new AMQShortString(exchange_name),
+                     new AMQShortString(uniqueDests ? prefix + routing_key :
+                                                                               
           routing_key),
+                     false,   //auto-delete
+                     null,   //queue name
+                     durable);
+        }
+        else
+        {
+            dest = new AMQQueue(
+                       new AMQShortString(exchange_name),
+                       new AMQShortString(uniqueDests ? prefix + routing_key :
+                                                              routing_key),
+                    new AMQShortString(uniqueDests ? prefix + queue_name :
+                                                             queue_name),
+                    false, //exclusive
+                    false, //auto-delete
+                    durable);
+        }
+        return dest;
+    }
+    
+    public static void main(String[] args)
+    {
+       final TestLauncher test = new TestLauncher();
+       test.setUpControlChannel();
+       test.start();
+       Runtime.getRuntime().addShutdownHook(new Thread() {
+           public void run() { test.cleanup(); }
+       });
+
+    }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to