Author: rajith
Date: Wed Jul 20 20:46:49 2011
New Revision: 1148935

URL: http://svn.apache.org/viewvc?rev=1148935&view=rev
Log:
Merge branch 'perf' into trunk

Conflicts:
        qpid/java/tools/bin/perf-report

Added:
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
    
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
    qpid/trunk/qpid/java/tools/bin/perf-report
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
    
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
    
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
    
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java?rev=1148935&r1=1148934&r2=1148935&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
 Wed Jul 20 20:46:49 2011
@@ -110,7 +110,7 @@ public class AMQPEncodedMapMessage exten
     }
     
     // for testing
-    Map<String,Object> getMap()
+    public Map<String,Object> getMap()
     {
         return _map;
     }

Modified: qpid/trunk/qpid/java/tools/bin/perf-report
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/bin/perf-report?rev=1148935&r1=1148934&r2=1148935&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/bin/perf-report (original)
+++ qpid/trunk/qpid/java/tools/bin/perf-report Wed Jul 20 20:46:49 2011
@@ -21,16 +21,16 @@
 # This will run the following test cases defined below and produce
 # a report in tabular format.
 
-SUB_MEM=-Xmx1024M
-PUB_MEM=-Xmx1024M
 QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}"
 
DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}"
 TOPIC="amq.topic/test"
 DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}"
 
+COMMON_CONFIG="-server 
-Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"
+
 waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; }
 cleanup()
-{
+{  
   pids=`ps aux | grep java | grep Perf | awk '{print $2}'`
   if [ "$pids" != "" ]; then
     kill -3 $pids
@@ -42,30 +42,31 @@ cleanup()
 # $2 consumer options
 # $3 producer options
 run_testcase()
-{
-  sh run-sub $LOG_CONFIG $SUB_MEM $2 > sub.out &
-  waitfor sub.out "Warming up"
-  sh run-pub $LOG_CONFIG $PUB_MEM $3 > pub.out &
-  waitfor sub.out "Completed the test"
-  waitfor pub.out "Consumer has completed the test"
+{  
+  sh run-sub $COMMON_CONFIG $2 > sub.out &
+  sh run-pub $COMMON_CONFIG $3 > pub.out &
+  waitfor pub.out "Controller: Completed the test"
   sleep 2 #give a grace period to shutdown
-  print_result $1  
+  print_result $1
+  mv pub.out $1.pub.out
+  mv sub.out $1.sub.out
 }
 
 print_result()
 {
-  prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'`
-  sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'`
-  cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'` 
-  avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'`
-  min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'`
-  max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'`
+  prod_rate=`cat pub.out | grep "Avg Producer rate" | awk '{print $5}'`
+  sys_rate=`cat pub.out | grep "System Throughput" | awk '{print $4}'`
+  cons_rate=`cat pub.out | grep "Avg Consumer rate" | awk '{print $5}'` 
+  avg_latency=`cat pub.out | grep "Avg System Latency" | awk '{print $5}'`
+  min_latency=`cat pub.out | grep "Min System Latency" | awk '{print $5}'`
+  max_latency=`cat pub.out | grep "Max System Latency" | awk '{print $5}'`
 
-  printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate 
$prod_rate $cons_rate $avg_latency $min_latency $max_latency
+  printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|\n" $1 $sys_rate 
$prod_rate $cons_rate $avg_latency $min_latency $max_latency
   echo 
"------------------------------------------------------------------------------------------------"
 }
 
 trap cleanup EXIT
+rm -rf *.out #cleanup old files.
 
 echo "Test report on " `date +%F`
 echo 
"================================================================================================"

Added: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java?rev=1148935&view=auto
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java 
(added)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java 
Wed Jul 20 20:46:49 2011
@@ -0,0 +1,92 @@
+package org.apache.qpid.tools;
+
+/**
+ * In the future this will be replaced by a Clock abstraction
+ * that can utilize a realtime clock when running in RT Java.
+ */
+
+public class Clock
+{
+    private static Precision precision;
+    private static long offset = -1;  // in nano secs
+
+    public enum Precision
+    {
+        NANO_SECS, MILI_SECS;
+
+        static Precision getPrecision(String str)
+        {
+            if ("mili".equalsIgnoreCase(str))
+            {
+                return MILI_SECS;
+            }
+            else
+            {
+                return NANO_SECS;
+            }
+        }
+    };
+
+    static
+    {
+        precision = 
Precision.getPrecision(System.getProperty("precision","mili"));
+        //offset = Long.getLong("offset",-1);
+
+        System.out.println("Using precision : " + precision + " and offset " + 
offset);
+    }
+
+    public static Precision getPrecision()
+    {
+        return precision;
+    }
+
+    public static long getTime()
+    {
+        if (precision == Precision.NANO_SECS)
+        {
+            if (offset == -1)
+            {
+                return System.nanoTime();
+            }
+            else
+            {
+                return System.nanoTime() + offset;
+            }
+        }
+        else
+        {
+            if (offset == -1)
+            {
+                return System.currentTimeMillis();
+            }
+            else
+            {
+                return System.currentTimeMillis() + offset/convertToMiliSecs();
+            }
+        }
+    }
+
+    public static long convertToSecs()
+    {
+        if (precision == Precision.NANO_SECS)
+        {
+            return 1000000000;
+        }
+        else
+        {
+            return 1000;
+        }
+    }
+
+    public static long convertToMiliSecs()
+    {
+        if (precision == Precision.NANO_SECS)
+        {
+            return 1000000;
+        }
+        else
+        {
+            return 1;
+        }
+    }
+}

Modified: 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java?rev=1148935&r1=1148934&r2=1148935&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java 
(original)
+++ 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java 
Wed Jul 20 20:46:49 2011
@@ -21,9 +21,13 @@
 package org.apache.qpid.tools;
 
 import java.text.DecimalFormat;
+import java.util.UUID;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
 
 import org.apache.qpid.client.AMQAnyDestination;
@@ -31,12 +35,42 @@ import org.apache.qpid.client.AMQConnect
 
 public class PerfBase
 {
+    public final static String CODE = "CODE";
+    public final static String ID = "ID";
+    public final static String REPLY_ADDR = "REPLY_ADDR";
+    public final static String MAX_LATENCY = "MAX_LATENCY";
+    public final static String MIN_LATENCY = "MIN_LATENCY";
+    public final static String AVG_LATENCY = "AVG_LATENCY";
+    public final static String STD_DEV = "STD_DEV";
+    public final static String CONS_RATE = "CONS_RATE";
+    public final static String PROD_RATE = "PROD_RATE";
+    public final static String MSG_COUNT = "MSG_COUNT";
+    public final static String TIMESTAMP = "Timestamp";
+
+    String CONTROLLER_ADDR = 
System.getProperty("CONT_ADDR","CONTROLLER;{create: always, 
node:{x-declare:{auto-delete:true}}}");
+
     TestParams params;
     Connection con;
     Session session;
+    Session controllerSession;
     Destination dest;
-    Destination feedbackDest;
+    Destination myControlQueue;
+    Destination controllerQueue;
     DecimalFormat df = new DecimalFormat("###.##");
+    String id = UUID.randomUUID().toString();
+    String myControlQueueAddr = id + ";{create: always}";
+
+    MessageProducer sendToController;
+    MessageConsumer receiveFromController;
+
+    enum OPCode {
+        REGISTER_CONSUMER, REGISTER_PRODUCER,
+        PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP,
+        CONSUMER_READY, PRODUCER_READY,
+        PRODUCER_START,
+        RECEIVED_END_MSG, CONSUMER_STOP,
+        RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS
+    };
 
     enum MessageType {
         BYTES, TEXT, MAP, OBJECT;
@@ -88,9 +122,41 @@ public class PerfBase
         session = con.createSession(params.isTransacted(),
                                     params.isTransacted()? 
Session.SESSION_TRANSACTED:params.getAckMode());
 
+        controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
         dest = new AMQAnyDestination(params.getAddress());
+        controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR);
+        myControlQueue = session.createQueue(myControlQueueAddr);
         msgType = MessageType.getType(params.getMessageType());
         System.out.println("Using " + msgType + " messages");
+
+        sendToController = controllerSession.createProducer(controllerQueue);
+        receiveFromController = 
controllerSession.createConsumer(myControlQueue);
+    }
+
+    public synchronized void sendMessageToController(MapMessage m) throws 
Exception
+    {
+        m.setString(ID, id);
+        sendToController.send(m);
+    }
+
+    public void receiveFromController(OPCode expected) throws Exception
+    {
+        MapMessage m = (MapMessage)receiveFromController.receive();
+        OPCode code = OPCode.values()[m.getInt(CODE)];
+        System.out.println("Received Code : " + code);
+        if (expected != code)
+        {
+            throw new Exception("Expected OPCode : " + expected + " but 
received : " + code);
+        }
+
+    }
+
+    public void tearDown() throws Exception
+    {
+        session.close();
+        controllerSession.close();
+        con.close();
     }
 
     public void handleError(Exception e,String msg)

Modified: 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java?rev=1148935&r1=1148934&r2=1148935&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
 (original)
+++ 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
 Wed Jul 20 20:46:49 2011
@@ -23,12 +23,10 @@ package org.apache.qpid.tools;
 import java.util.ArrayList;
 import java.util.List;
 
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
+import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
 import javax.jms.TextMessage;
 
 import org.apache.qpid.thread.Threading;
@@ -99,6 +97,7 @@ public class PerfConsumer extends PerfBa
     public PerfConsumer()
     {
         super();
+        System.out.println("Consumer ID : " + id);
     }
 
     public void setUp() throws Exception
@@ -114,68 +113,87 @@ public class PerfConsumer extends PerfBa
         {
             sample = new ArrayList<Long>(params.getMsgCount());
         }
+
+        MapMessage m = controllerSession.createMapMessage();
+        m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
+        m.setString(REPLY_ADDR,myControlQueueAddr);
+        sendMessageToController(m);
     }
 
     public void warmup()throws Exception
     {
-        System.out.println("Warming up......");
-
+        receiveFromController(OPCode.CONSUMER_STARTWARMUP);
         boolean start = false;
-        while (!start)
+        Message msg = consumer.receive();
+        // This is to ensure we drain the queue before we start the actual 
test.
+        while ( msg != null)
         {
-            Message msg = consumer.receive();
-            if (msg.getBooleanProperty("End"))
+            if (msg.getBooleanProperty("End") == true)
             {
-                start = true;
-                MessageProducer temp = 
session.createProducer(msg.getJMSReplyTo());
-                temp.send(session.createMessage());
-                if (params.isTransacted())
-                {
-                    session.commit();
-                }
-                temp.close();
+                // It's more realistic for the consumer to signal this.
+                MapMessage m = controllerSession.createMapMessage();
+                m.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
+                sendMessageToController(m);
             }
+            msg = consumer.receive(1000);
         }
+
+        if (params.isTransacted())
+        {
+            session.commit();
+        }
+
+        MapMessage m = controllerSession.createMapMessage();
+        m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
+        sendMessageToController(m);
     }
 
     public void startTest() throws Exception
     {
-        System.out.println("Starting test......");
+        System.out.println("Consumer Starting test......");
         consumer.setMessageListener(this);
     }
 
-    public void printResults() throws Exception
+    public void sendResults() throws Exception
     {
-        synchronized (lock)
+        receiveFromController(OPCode.CONSUMER_STOP);
+
+        double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
+        double consRate   = 
(double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime);
+        double stdDev = 0.0;
+        if (printStdDev)
         {
-            lock.wait();
+            stdDev = calculateStdDev(avgLatency);
         }
+        MapMessage m  = controllerSession.createMapMessage();
+        m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
+        m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs());
+        m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs());
+        m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs());
+        m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs());
+        m.setDouble(CONS_RATE, consRate);
+        m.setLong(MSG_COUNT, rcvdMsgCount);
+        sendMessageToController(m);
 
-        double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
-        double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - 
testStartTime))*1000;
-        double consRate   = ((double)rcvdMsgCount/(double)(rcvdTime - 
startTime))*1000;
         System.out.println(new StringBuilder("Total Msgs Received : 
").append(rcvdMsgCount).toString());
         System.out.println(new StringBuilder("Consumer rate       : ").
                            append(df.format(consRate)).
                            append(" msg/sec").toString());
-        System.out.println(new StringBuilder("System Throughput   : ").
-                           append(df.format(throughput)).
-                           append(" msg/sec").toString());
         System.out.println(new StringBuilder("Avg Latency         : ").
-                           append(df.format(avgLatency)).
+                           
append(df.format(avgLatency/Clock.convertToMiliSecs())).
                            append(" ms").toString());
         System.out.println(new StringBuilder("Min Latency         : ").
-                           append(minLatency).
+                           
append(df.format(minLatency/Clock.convertToMiliSecs())).
                            append(" ms").toString());
         System.out.println(new StringBuilder("Max Latency         : ").
-                           append(maxLatency).
+                           
append(df.format(maxLatency/Clock.convertToMiliSecs())).
                            append(" ms").toString());
         if (printStdDev)
         {
             System.out.println(new StringBuilder("Std Dev             : ").
-                               append(calculateStdDev(avgLatency)).toString());
+                               
append(stdDev/Clock.convertToMiliSecs()).toString());
         }
-        System.out.println("Completed the test......\n");
+        System.out.println("Consumer has completed the test......\n");
     }
 
     public double calculateStdDev(double mean)
@@ -189,25 +207,6 @@ public class PerfConsumer extends PerfBa
         return Math.round(Math.sqrt(v));
     }
 
-    public void notifyCompletion(Destination replyTo) throws Exception
-    {
-        MessageProducer tmp = session.createProducer(replyTo);
-        Message endMsg = session.createMessage();
-        tmp.send(endMsg);
-        if (params.isTransacted())
-        {
-            session.commit();
-        }
-        tmp.close();
-    }
-
-    public void tearDown() throws Exception
-    {
-        consumer.close();
-        session.close();
-        con.close();
-    }
-
     public void onMessage(Message msg)
     {
         try
@@ -220,22 +219,18 @@ public class PerfConsumer extends PerfBa
 
             if (msg.getBooleanProperty("End"))
             {
-                notifyCompletion(msg.getJMSReplyTo());
-
-                synchronized (lock)
-                {
-                   lock.notifyAll();
-                }
+                MapMessage m = controllerSession.createMapMessage();
+                m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
+                sendMessageToController(m);
             }
             else
             {
-                rcvdTime = System.currentTimeMillis();
+                rcvdTime = Clock.getTime();
                 rcvdMsgCount ++;
 
                 if (rcvdMsgCount == 1)
                 {
                     startTime = rcvdTime;
-                    testStartTime = msg.getJMSTimestamp();
                 }
 
                 if (transacted && (rcvdMsgCount % transSize == 0))
@@ -243,7 +238,7 @@ public class PerfConsumer extends PerfBa
                     session.commit();
                 }
 
-                long latency = rcvdTime - msg.getJMSTimestamp();
+                long latency = rcvdTime - msg.getLongProperty(TIMESTAMP);
                 maxLatency = Math.max(maxLatency, latency);
                 minLatency = Math.min(minLatency, latency);
                 totalLatency = totalLatency + latency;
@@ -261,14 +256,14 @@ public class PerfConsumer extends PerfBa
 
     }
 
-    public void test()
+    public void run()
     {
         try
         {
             setUp();
             warmup();
             startTest();
-            printResults();
+            sendResults();
             tearDown();
         }
         catch(Exception e)
@@ -284,7 +279,7 @@ public class PerfConsumer extends PerfBa
         {
             public void run()
             {
-                cons.test();
+                cons.run();
             }
         };
 

Modified: 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java?rev=1148935&r1=1148934&r2=1148935&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
 (original)
+++ 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
 Wed Jul 20 20:46:49 2011
@@ -26,8 +26,8 @@ import java.util.Random;
 
 import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
+import javax.jms.MapMessage;
 import javax.jms.Message;
-import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 
 import org.apache.qpid.thread.Threading;
@@ -67,17 +67,17 @@ public class PerfProducer extends PerfBa
     int msgSizeRange = 1024;
     boolean rateLimitProducer = false;
     double rateFactor = 0.4;
+    double rate = 0.0;
 
     public PerfProducer()
     {
         super();
+        System.out.println("Producer ID : " + id);
     }
 
     public void setUp() throws Exception
     {
         super.setUp();
-        feedbackDest = session.createTemporaryQueue();
-
         durable = params.isDurable();
         rateLimitProducer = params.getRate() > 0 ? true : false;
         if (rateLimitProducer)
@@ -116,6 +116,11 @@ public class PerfProducer extends PerfBa
         producer = session.createProducer(dest);
         producer.setDisableMessageID(params.isDisableMessageID());
         producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+
+        MapMessage m = controllerSession.createMapMessage();
+        m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
+        m.setString(REPLY_ADDR,myControlQueueAddr);
+        sendMessageToController(m);
     }
 
     Object createPayload(int size)
@@ -144,7 +149,6 @@ public class PerfProducer extends PerfBa
         }
     }
 
-
     protected Message getNextMessage() throws Exception
     {
         if (cacheMsg)
@@ -173,48 +177,37 @@ public class PerfProducer extends PerfBa
 
     public void warmup()throws Exception
     {
-        System.out.println("Warming up......");
-        MessageConsumer tmp = session.createConsumer(feedbackDest);
+        receiveFromController(OPCode.PRODUCER_STARTWARMUP);
+        System.out.println("Producer Warming up......");
 
         for (int i=0; i < params.getWarmupCount() -1; i++)
         {
             producer.send(getNextMessage());
         }
-        Message msg = session.createMessage();
-        msg.setBooleanProperty("End", true);
-        msg.setJMSReplyTo(feedbackDest);
-        producer.send(msg);
+        sendEndMessage();
 
         if (params.isTransacted())
         {
             session.commit();
         }
-
-        tmp.receive();
-
-        if (params.isTransacted())
-        {
-            session.commit();
-        }
-
-        tmp.close();
     }
 
     public void startTest() throws Exception
     {
-        System.out.println("Starting test......");
+        receiveFromController(OPCode.PRODUCER_START);
         int count = params.getMsgCount();
         boolean transacted = params.isTransacted();
         int tranSize =  params.getTransactionSize();
 
-        long limit = (long)(params.getRate() * rateFactor);
-        long timeLimit = (long)(SEC * rateFactor);
+        long limit = (long)(params.getRate() * rateFactor); // in msecs
+        long timeLimit = (long)(SEC * rateFactor); // in msecs
 
-        long start = System.currentTimeMillis();
+        long start = Clock.getTime(); // defaults to nano secs
         long interval = start;
         for(int i=0; i < count; i++ )
         {
             Message msg = getNextMessage();
+            msg.setLongProperty(TIMESTAMP, Clock.getTime());
             producer.send(msg);
             if ( transacted && ((i+1) % tranSize == 0))
             {
@@ -223,62 +216,53 @@ public class PerfProducer extends PerfBa
 
             if (rateLimitProducer && i%limit == 0)
             {
-                long elapsed = System.currentTimeMillis() - interval;
+                long elapsed = (Clock.getTime() - 
interval)*Clock.convertToMiliSecs(); // in msecs
                 if (elapsed < timeLimit)
                 {
                     Thread.sleep(elapsed);
                 }
-                interval = System.currentTimeMillis();
+                interval = Clock.getTime();
 
             }
         }
-        long time = System.currentTimeMillis() - start;
-        double rate = ((double)count/(double)time)*1000;
+        sendEndMessage();
+        if ( transacted)
+        {
+            session.commit();
+        }
+        long time = Clock.getTime() - start;
+        rate = (double)count*Clock.convertToSecs()/(double)time;
         System.out.println(new StringBuilder("Producer rate: ").
                                append(df.format(rate)).
                                append(" msg/sec").
                                toString());
+
+        System.out.println("Producer has completed the test......");
     }
 
-    public void waitForCompletion() throws Exception
+    public void sendEndMessage() throws Exception
     {
-        MessageConsumer tmp = session.createConsumer(feedbackDest);
         Message msg = session.createMessage();
         msg.setBooleanProperty("End", true);
-        msg.setJMSReplyTo(feedbackDest);
         producer.send(msg);
-
-        if (params.isTransacted())
-        {
-            session.commit();
-        }
-
-        tmp.receive();
-
-        if (params.isTransacted())
-        {
-            session.commit();
-        }
-
-        tmp.close();
-        System.out.println("Consumer has completed the test......");
     }
 
-    public void tearDown() throws Exception
+    public void sendResults() throws Exception
     {
-        producer.close();
-        session.close();
-        con.close();
+        MapMessage msg = controllerSession.createMapMessage();
+        msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
+        msg.setDouble(PROD_RATE, rate);
+        sendMessageToController(msg);
     }
 
-    public void test()
+    public void run()
     {
         try
         {
             setUp();
             warmup();
             startTest();
-            waitForCompletion();
+            sendResults();
             tearDown();
         }
         catch(Exception e)
@@ -287,15 +271,42 @@ public class PerfProducer extends PerfBa
         }
     }
 
+    public void startControllerIfNeeded()
+    {
+        if (!params.isExternalController())
+        {
+            final PerfTestController controller = new PerfTestController();
+            Runnable r = new Runnable()
+            {
+                public void run()
+                {
+                    controller.run();
+                }
+            };
+
+            Thread t;
+            try
+            {
+                t = Threading.getThreadFactory().createThread(r);
+            }
+            catch(Exception e)
+            {
+                throw new Error("Error creating controller thread",e);
+            }
+            t.start();
+        }
+    }
+
 
     public static void main(String[] args)
     {
         final PerfProducer prod = new PerfProducer();
+        prod.startControllerIfNeeded();
         Runnable r = new Runnable()
         {
             public void run()
             {
-                prod.test();
+                prod.run();
             }
         };
 

Added: 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java?rev=1148935&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
 (added)
+++ 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
 Wed Jul 20 20:46:49 2011
@@ -0,0 +1,296 @@
+package org.apache.qpid.tools;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+
+import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+
+public class PerfTestController extends PerfBase implements MessageListener
+{
+    long totalTestTime;
+
+    private double avgSystemLatency = 0.0;
+    private double minSystemLatency = Double.MAX_VALUE;
+    private double maxSystemLatency = 0;
+    private double avgSystemLatencyStdDev = 0.0;
+
+    private double avgSystemConsRate = 0.0;
+    private double maxSystemConsRate = 0.0;
+    private double minSystemConsRate = Double.MAX_VALUE;
+
+    private double avgSystemProdRate = 0.0;
+    private double maxSystemProdRate = 0.0;
+    private double minSystemProdRate = Double.MAX_VALUE;
+
+    private long totalMsgCount = 0;
+    private double totalSystemThroughput = 0.0;
+
+    private int consumerCount = Integer.getInteger("cons_count", 1);
+    private int producerCount = Integer.getInteger("prod_count", 1);
+    private Map<String,MapMessage> consumers;
+    private Map<String,MapMessage> producers;
+
+    private CountDownLatch consRegistered;
+    private CountDownLatch prodRegistered;
+    private CountDownLatch consReady;
+    private CountDownLatch prodReady;
+    private CountDownLatch receivedEndMsg;
+    private CountDownLatch receivedConsStats;
+    private CountDownLatch receivedProdStats;
+
+    private MessageConsumer consumer;
+    private boolean printStdDev = false;
+
+    public PerfTestController()
+    {
+        super();
+        consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount);
+        producers = new ConcurrentHashMap<String,MapMessage>(producerCount);
+
+        consRegistered = new CountDownLatch(consumerCount);
+        prodRegistered = new CountDownLatch(producerCount);
+        consReady = new CountDownLatch(consumerCount);
+        prodReady = new CountDownLatch(producerCount);
+        receivedConsStats = new CountDownLatch(consumerCount);
+        receivedProdStats = new CountDownLatch(producerCount);
+        receivedEndMsg = new CountDownLatch(producerCount);
+        printStdDev = params.isPrintStdDev();
+    }
+
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        consumer = controllerSession.createConsumer(controllerQueue);
+        consumer.setMessageListener(this);
+        consRegistered.await();
+        prodRegistered.await();
+        System.out.println("\nController: All producers and consumers have 
registered......\n");
+    }
+
+    public void warmup() throws Exception
+    {
+        System.out.println("Controller initiating warm up sequence......");
+        sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values());
+        sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values());
+        prodReady.await();
+        consReady.await();
+        System.out.println("\nController : All producers and consumers are 
ready to start the test......\n");
+    }
+
+    public void startTest() throws Exception
+    {
+        System.out.println("\nController Starting test......");
+        long start = Clock.getTime();
+        sendMessageToNodes(OPCode.PRODUCER_START,producers.values());
+        receivedEndMsg.await();
+        totalTestTime = Clock.getTime() - start;
+        sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values());
+        receivedProdStats.await();
+        receivedConsStats.await();
+    }
+
+    public void calcStats() throws Exception
+    {
+        double totLatency = 0.0;
+        double totStdDev = 0.0;
+        double totalConsRate = 0.0;
+        double totalProdRate = 0.0;
+
+        MapMessage conStat = null;  // for error handling
+        try
+        {
+            for (MapMessage m: consumers.values())
+            {
+                conStat = m;
+                minSystemLatency = 
Math.min(minSystemLatency,m.getDouble(MIN_LATENCY));
+                maxSystemLatency = 
Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY));
+                totLatency = totLatency + m.getDouble(AVG_LATENCY);
+                totStdDev = totStdDev + m.getDouble(STD_DEV);
+
+                minSystemConsRate = 
Math.min(minSystemConsRate,m.getDouble(CONS_RATE));
+                maxSystemConsRate = 
Math.max(maxSystemConsRate,m.getDouble(CONS_RATE));
+                totalConsRate = totalConsRate + m.getDouble(CONS_RATE);
+
+                totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT);
+            }
+        }
+        catch(Exception e)
+        {
+            System.out.println("Error calculating stats from Consumer : " + 
conStat);
+        }
+
+
+        MapMessage prodStat = null;  // for error handling
+        try
+        {
+            for (MapMessage m: producers.values())
+            {
+                prodStat = m;
+                minSystemProdRate = 
Math.min(minSystemProdRate,m.getDouble(PROD_RATE));
+                maxSystemProdRate = 
Math.max(maxSystemProdRate,m.getDouble(PROD_RATE));
+                totalProdRate = totalProdRate + m.getDouble(PROD_RATE);
+            }
+        }
+        catch(Exception e)
+        {
+            System.out.println("Error calculating stats from Producer : " + 
conStat);
+        }
+
+        avgSystemLatency = totLatency/consumers.size();
+        avgSystemLatencyStdDev = totStdDev/consumers.size();
+        avgSystemConsRate = totalConsRate/consumers.size();
+        avgSystemProdRate = totalProdRate/producers.size();
+
+        System.out.println("Total test time     : " + totalTestTime + " in " + 
Clock.getPrecision());
+
+        totalSystemThroughput = 
(totalMsgCount*Clock.convertToSecs()/totalTestTime);
+    }
+
+    public void printResults() throws Exception
+    {
+        System.out.println(new StringBuilder("Total Msgs Received : 
").append(totalMsgCount).toString());
+        System.out.println(new StringBuilder("System Throughput   : ").
+                           append(df.format(totalSystemThroughput)).
+                           append(" msg/sec").toString());
+        System.out.println(new StringBuilder("Avg Consumer rate   : ").
+                           append(df.format(avgSystemConsRate)).
+                           append(" msg/sec").toString());
+        System.out.println(new StringBuilder("Min Consumer rate   : ").
+                           append(df.format(minSystemConsRate)).
+                           append(" msg/sec").toString());
+        System.out.println(new StringBuilder("Max Consumer rate   : ").
+                           append(df.format(maxSystemConsRate)).
+                           append(" msg/sec").toString());
+
+        System.out.println(new StringBuilder("Avg Producer rate   : ").
+                           append(df.format(avgSystemProdRate)).
+                           append(" msg/sec").toString());
+        System.out.println(new StringBuilder("Min Producer rate   : ").
+                           append(df.format(minSystemProdRate)).
+                           append(" msg/sec").toString());
+        System.out.println(new StringBuilder("Max Producer rate   : ").
+                           append(df.format(maxSystemProdRate)).
+                           append(" msg/sec").toString());
+
+        System.out.println(new StringBuilder("Avg System Latency  : ").
+                           append(df.format(avgSystemLatency)).
+                           append(" ms").toString());
+        System.out.println(new StringBuilder("Min System Latency  : ").
+                           append(df.format(minSystemLatency)).
+                           append(" ms").toString());
+        System.out.println(new StringBuilder("Max System Latency  : ").
+                           append(df.format(maxSystemLatency)).
+                           append(" ms").toString());
+        if (printStdDev)
+        {
+            System.out.println(new StringBuilder("Avg System Std Dev  : ").
+                               append(avgSystemLatencyStdDev));
+        }
+        System.out.println("Controller: Completed the test......\n");
+    }
+
+    private synchronized void sendMessageToNodes(OPCode 
code,Collection<MapMessage> nodes) throws Exception
+    {
+        System.out.println("\nController: Sending code " + code);
+        MessageProducer tmpProd = controllerSession.createProducer(null);
+        MapMessage msg = controllerSession.createMapMessage();
+        msg.setInt(CODE, code.ordinal());
+        for (MapMessage node : nodes)
+        {
+            if (node.getString(REPLY_ADDR) == null)
+            {
+                System.out.println("REPLY_ADDR is null " + node);
+            }
+            else
+            {
+                System.out.println("Controller: Sending " + code + " to " + 
node.getString(REPLY_ADDR));
+            }
+            
tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg);
+        }
+    }
+
+    public void onMessage(Message msg)
+    {
+        try
+        {
+            MapMessage m = (MapMessage)msg;
+            OPCode code = OPCode.values()[m.getInt(CODE)];
+
+            System.out.println("\n---------Controller Received Code : " + 
code);
+            System.out.println("---------Data : " + 
((AMQPEncodedMapMessage)m).getMap());
+
+            switch (code)
+            {
+            case REGISTER_CONSUMER :
+                consumers.put(m.getString(ID),m);
+                consRegistered.countDown();
+                break;
+
+            case REGISTER_PRODUCER :
+                producers.put(m.getString(ID),m);
+                prodRegistered.countDown();
+                break;
+
+            case CONSUMER_READY :
+                consReady.countDown();
+                break;
+
+            case PRODUCER_READY :
+                prodReady.countDown();
+                break;
+
+            case RECEIVED_END_MSG :
+                receivedEndMsg.countDown();
+                break;
+
+            case RECEIVED_CONSUMER_STATS :
+                consumers.put(m.getString(ID),m);
+                receivedConsStats.countDown();
+                break;
+
+            case RECEIVED_PRODUCER_STATS :
+                producers.put(m.getString(ID),m);
+                receivedProdStats.countDown();
+                break;
+
+            default:
+                throw new Exception("Invalid OPCode " + code);
+            }
+        }
+        catch (Exception e)
+        {
+            handleError(e,"Error when receiving messages " + msg);
+        }
+    }
+
+    public void run()
+    {
+        try
+        {
+            setUp();
+            warmup();
+            startTest();
+            calcStats();
+            printResults();
+            tearDown();
+        }
+        catch(Exception e)
+        {
+            handleError(e,"Error when running test");
+        }
+    }
+
+    public static void main(String[] args)
+    {
+        PerfTestController controller = new PerfTestController();
+        controller.run();
+    }
+}

Modified: 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java?rev=1148935&r1=1148934&r2=1148935&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java 
(original)
+++ 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java 
Wed Jul 20 20:46:49 2011
@@ -71,6 +71,8 @@ public class TestParams
 
     private long rate = -1;
 
+    private boolean externalController = false;
+
     public TestParams()
     {
 
@@ -94,6 +96,7 @@ public class TestParams
         msgType = System.getProperty("msg_type","bytes");
         printStdDev = Boolean.getBoolean("print_std_dev");
         rate = Long.getLong("rate",-1);
+        externalController = Boolean.getBoolean("ext_controller");
     }
 
     public String getUrl()
@@ -190,4 +193,14 @@ public class TestParams
     {
         return rate;
     }
+
+    public boolean isExternalController()
+    {
+        return externalController;
+    }
+
+    public void setAddress(String addr)
+    {
+        address = addr;
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to