Author: grkvlt
Date: Mon Dec 13 00:41:01 2010
New Revision: 1044979

URL: http://svn.apache.org/viewvc?rev=1044979&view=rev
Log:
QPID-2970: Make performace tests multi-threaded

Added:
    qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Statistics.sh   (with 
props)
    qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Test.sh   (with props)
Modified:
    qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Framework.sh
    qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/config.properties
    
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
    
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
    
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java
    
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java
    
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java

Modified: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Framework.sh
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Framework.sh?rev=1044979&r1=1044978&r2=1044979&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Framework.sh (original)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Framework.sh Mon Dec 13 
00:41:01 2010
@@ -18,8 +18,16 @@
 # under the License.
 #
 
+# Set Qpid Version
+VERSION=0.5
+
 # Setup Java CLASSPATH
-CLASSPATH=${QPID_HOME}/lib/qpid-all-${VERSION}.jar:${QPID_HOME}/lib/qpid-perftests-${VERSION}.jar
+CLASSPATH=${QPID_HOME}/lib/qpid-all.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/qpid-perftests-${VERSION}.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-api-1.4.0.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-log4j12-1.4.0.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/log4j-1.2.12.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/geronimo-jms_1.1_spec-1.0.jar
 
 # Run Performance Test Framework
 echo "Running DLQ Performance Tests"

Added: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Statistics.sh
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Statistics.sh?rev=1044979&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Statistics.sh (added)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Statistics.sh Mon Dec 
13 00:41:01 2010
@@ -0,0 +1,34 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set Qpid Version
+VERSION=0.5
+
+# Setup Java CLASSPATH
+CLASSPATH=${QPID_HOME}/lib/qpid-all.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/qpid-perftests-${VERSION}.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-api-1.4.0.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-log4j12-1.4.0.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/log4j-1.2.12.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/geronimo-jms_1.1_spec-1.0.jar
+
+# Run Performance Test Framework
+echo "Running DLQ Performance Tests"
+java -cp ${CLASSPATH} org.apache.qpid.perftests.dlq.test.PerformanceStatistics 
$*

Propchange: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Statistics.sh
------------------------------------------------------------------------------
    svn:executable = *

Propchange: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Statistics.sh
------------------------------------------------------------------------------
    svn:keywords = Id

Added: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Test.sh
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Test.sh?rev=1044979&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Test.sh (added)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Test.sh Mon Dec 13 
00:41:01 2010
@@ -0,0 +1,34 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set Qpid Version
+VERSION=0.5
+
+# Setup Java CLASSPATH
+CLASSPATH=${QPID_HOME}/lib/qpid-all.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/qpid-perftests-${VERSION}.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-api-1.4.0.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/slf4j-log4j12-1.4.0.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/log4j-1.2.12.jar
+CLASSPATH=${CLASSPATH}:${QPID_HOME}/lib/geronimo-jms_1.1_spec-1.0.jar
+
+# Run Performance Test Framework
+echo "Running DLQ Performance Tests"
+java -cp ${CLASSPATH} org.apache.qpid.perftests.dlq.test.PerformanceTest $*

Propchange: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Test.sh
------------------------------------------------------------------------------
    svn:executable = *

Propchange: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/Test.sh
------------------------------------------------------------------------------
    svn:keywords = Id

Modified: qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/config.properties
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/config.properties?rev=1044979&r1=1044978&r2=1044979&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/config.properties 
(original)
+++ qpid/branches/0.5.x-dev/qpid/java/perftests/etc/dlq/config.properties Mon 
Dec 13 00:41:01 2010
@@ -4,22 +4,21 @@
 repeat = 10
 
 # shared properties
-broker = tcp://magenta:5672
-#broker = tcp://localhost:5672
+broker = tcp://localhost:5672
 maxRedelivery = 3
 maxPrefetch = 1
 session = SESSION_TRANSACTED
 queue = test
 count = 1000
 persistent = true
-maxRecords = 2000
+maxRecords = 10000
 
 # producer properties
 size = 4096
 messageIds = true
 
 ## consumer properties
-threads = 1
+threads = 5
 listener = false
 reject = 2
-rejectCount = 3
\ No newline at end of file
+rejectCount = 3

Modified: 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java?rev=1044979&r1=1044978&r2=1044979&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Client.java
 Mon Dec 13 00:41:01 2010
@@ -16,6 +16,14 @@ import org.apache.qpid.client.configurat
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Parent abstract class for all performance test clients that connect to a
+ * broker and perform test operations. All clients are {...@link Callable}
+ * objects that return an integer value, or throw an exception. The
+ * {...@link #connect()} method returns a boolean to indicate whether the
+ * broker connection succeeded, and can be used to abort tests if there is
+ * no available broker.
+ */
 public abstract class Client implements Callable<Integer>
 {
     protected static final Logger _log = LoggerFactory.getLogger(Client.class);
@@ -102,7 +110,7 @@ public abstract class Client implements 
         throw new RuntimeException("session property not recognised: " + 
sessionType);
     }
     
-    public void connect()
+    public boolean connect()
     {
         String url = "amqp://guest:guest@" + _client + "/test?brokerlist='" + 
_broker + "'&maxprefetch='" + _maxPrefetch + "'&maxdeliverycount='" + 
_maxRedelivery + "'";
         System.setProperty(ClientProperties.MAX_DELIVERY_RECORDS_PROP_NAME, 
Integer.toString(_maxRecords));
@@ -120,11 +128,12 @@ public abstract class Client implements 
                     System.exit(0);
                 }
             });
+            return true;
         }
         catch (Exception e)
         {
             _log.error("Unable to setup connection, client and producer on 
broker", e);
-            throw new RuntimeException(e);
+            return false;
         }
     }
     

Modified: 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java?rev=1044979&r1=1044978&r2=1044979&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/client/Receiver.java
 Mon Dec 13 00:41:01 2010
@@ -11,11 +11,10 @@ import java.util.concurrent.atomic.Atomi
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
+import javax.jms.Session;
 
 import org.apache.qpid.perftests.dlq.test.PerformanceTest;
 
-
-
 public class Receiver extends Client
 {
     private MessageConsumer _consumer;
@@ -23,11 +22,16 @@ public class Receiver extends Client
     private int _reject;
     private int _rejectCount;
     private Map<Integer, Integer> _rejected = new HashMap<Integer, Integer>();
+    private int _receivedCount = 0;
     
     private static volatile boolean _stopped;
     private static CountDownLatch _finished;
     private static AtomicInteger _id; 
-    private static AtomicInteger _received;
+    private static AtomicInteger _totalReceivedCount;
+    private static AtomicInteger _totalConsumedCount;
+    private static AtomicInteger _rejectedCount;
+    private static int _consumedCheck;
+    private static int _rejectedCheck;
      
     public Receiver(Properties props)
     {
@@ -39,17 +43,25 @@ public class Receiver extends Client
     public static void reset()
     {
         _id = new AtomicInteger(0);
-        _received = new AtomicInteger(0);
+        _totalReceivedCount = new AtomicInteger(0);
+        _totalConsumedCount = new AtomicInteger(0);
+        _rejectedCount = new AtomicInteger(0);
         _finished = new CountDownLatch(1);
         _stopped = false;
     }
+    
 
-    public void start() throws Exception
+    public synchronized void start() throws Exception
     {
         _listener = Boolean.parseBoolean(_props.getProperty(LISTENER));
         _reject = Integer.parseInt(_props.getProperty(REJECT));
         _rejectCount = Integer.parseInt(_props.getProperty(REJECT_COUNT));
-        
+
+        boolean sessionOk = (_transacted || _clientAck) ||
+                ((_sessionType == Session.AUTO_ACKNOWLEDGE || _sessionType == 
Session.DUPS_OK_ACKNOWLEDGE) && _listener);
+        _rejectedCheck = (!sessionOk || _messageIds || _maxRedelivery == 0 || 
_rejectCount < _maxRedelivery) ? 0 : _count / _reject;
+        _consumedCheck = (_count - _rejectedCheck); // + (sessionOk ? ((_count 
/ _reject) * _rejectCount) : 0);
+            
         _consumer = _session.createConsumer(_queue);
         
         _connection.start();
@@ -71,7 +83,10 @@ public class Receiver extends Client
         while (!_stopped)
         {
                Message msg = _consumer.receive(1000);
-            processMessage(msg);
+            if (msg != null)
+            {
+                processMessage(msg);
+            }
         }
     }
     
@@ -79,7 +94,7 @@ public class Receiver extends Client
     {
         try
         {
-               _received.incrementAndGet();
+            _totalReceivedCount.incrementAndGet();
                int number = msg.getIntProperty("number");
                if (number % 100 == 0)
                {
@@ -90,17 +105,24 @@ public class Receiver extends Client
                if (rejectMessage)
                {
                    int rejectCount = 0;
-                   if (_rejected.containsKey(number))
+                   if (!_rejected.containsKey(number))
                    {
-                       rejectCount = _rejected.get(number);
+                           _rejected.put(number, 0);
                    }
-                   _rejected.put(number, ++rejectCount);
+                rejectCount = _rejected.get(number) + 1;
+                   _rejected.put(number, rejectCount);
                    if (rejectCount <= _rejectCount)
                    {
-                           if (rejectCount >= _maxRedelivery)
+                           if (rejectCount == _maxRedelivery)
                            {
-                               _log.info("rejecting message (" + rejectCount + 
") " + msg.getJMSMessageID());
+                               _rejectedCount.incrementAndGet();
+                               _log.info("client " + _client + " rejecting 
message (" + rejectCount + ") " + msg.getJMSMessageID());
                            }
+                    if (rejectCount > _maxRedelivery)
+                    {
+                        throw new RuntimeException("client " + _client + " 
received message " + msg.getJMSMessageID() +
+                                " " + rejectCount + " times");
+                    }
                            if (_transacted)
                            {
                                _session.rollback();
@@ -118,6 +140,8 @@ public class Receiver extends Client
                
                if (!rejectMessage)
                {
+                   _receivedCount++;
+                       _totalConsumedCount.incrementAndGet();
                    if (_transacted)
                    {
                            _session.commit();
@@ -126,12 +150,14 @@ public class Receiver extends Client
                    {
                        msg.acknowledge();
                    }
-                       if (number == (_count - 1))
-                       {
-                           _stopped = true;
-                           _finished.countDown();
-                       }
                }
+
+            if (_totalConsumedCount.get() >= _consumedCheck && 
_rejectedCount.get() >= _rejectedCheck)
+            {
+                _log.info("stopping receivers after " + 
_totalConsumedCount.get() + " received and " + _rejectedCount.get() + " 
rejected");
+                _stopped = true;
+                _finished.countDown();
+            }
         }
         catch (Exception e)
         {
@@ -156,6 +182,21 @@ public class Receiver extends Client
         
         _finished.await();
         PerformanceTest.countDown();
-        return _received.get();
+        return _receivedCount;
+    }
+    
+    public static int getTotalReceivedCount()
+    {
+        return _totalReceivedCount.get();
+    }
+    
+    public static int getConsumedCheck()
+    {
+        return _consumedCheck;
+    }
+    
+    public static int getRejectedCheck()
+    {
+        return _rejectedCheck;
     }
 }
\ No newline at end of file

Modified: 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java?rev=1044979&r1=1044978&r2=1044979&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceFramework.java
 Mon Dec 13 00:41:01 2010
@@ -8,7 +8,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
-import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Properties;
@@ -16,6 +15,10 @@ import java.util.Properties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Run a series of different performance tests, based on a set of variations of
+ * configuration properties, and collect the results and generated statistics.
+ */
 public class PerformanceFramework
 {
     private static final Logger _log = 
LoggerFactory.getLogger(PerformanceFramework.class);
@@ -69,7 +72,10 @@ public class PerformanceFramework
                                            out.println(_id + "," + 
maxRedelivery + "," + rejectCount + "," +
                                                        
Boolean.toString(messageIds == 0) + "," + Boolean.toString(listener == 0) +
                                                        "," + 
SESSION_VALUES[session]);
-                                               runOnce(_id);
+                                               if (!runOnce(_id))
+                                               {
+                                                   return;
+                                               }
                                }
                            }
                    }
@@ -77,21 +83,29 @@ public class PerformanceFramework
         }
     }
     
-    public void runOnce(int id)
+    public boolean runOnce(int id)
     {
         PerformanceStatistics stats = new PerformanceStatistics(_props);
         try
         {
             _log.info("starting test id " + id);
             String fileId = String.format("%04d", id);
-               stats.series(new File(_dir, fileId + "-series.csv"));
-            _log.info("test id " + id + " completed ok");
-               stats.statistics(new File(_dir, fileId + "-statistics.csv"));
+               if (stats.series(new File(_dir, fileId + "-series.csv")))
+               {
+                   _log.info("test id " + id + " completed ok");
+                       stats.statistics(new File(_dir, fileId + 
"-statistics.csv"));
+               }
+               else
+               {
+                   _log.error("connection failure, test series aborted");
+                   return false;
+               }
         }
         catch (Exception e)
         {
             _log.error("failed test id " + id + " with error", e);
         }
+        return true;
     }
 
     public static void main(String[] argv) throws Exception

Modified: 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java?rev=1044979&r1=1044978&r2=1044979&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceStatistics.java
 Mon Dec 13 00:41:01 2010
@@ -15,6 +15,10 @@ import java.util.Properties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Run a series of performance tests, based on specific configuration 
properties, and
+ * collect the results to generate statistics.
+ */
 public class PerformanceStatistics
 {
     private static final Logger _log = 
LoggerFactory.getLogger(PerformanceStatistics.class);
@@ -22,6 +26,7 @@ public class PerformanceStatistics
     private Properties _props;
     private List<Double> _sent = new ArrayList<Double>();
     private List<Double> _received = new ArrayList<Double>();
+    private List<Double> _consumed = new ArrayList<Double>();
     private List<Double> _rejected = new ArrayList<Double>();
     private List<Double> _duration = new ArrayList<Double>();
     private List<Double> _throughputIn = new ArrayList<Double>();
@@ -50,23 +55,31 @@ public class PerformanceStatistics
         _props = props;
     }
     
-    public void single(PrintStream out) throws Exception
+    public boolean single(PrintStream out) throws Exception
     {
-        PerformanceTest client = new PerformanceTest(_props);
-        client.test();
-        client.check(out);
-        _sent.add(client.getSent());
-        _received.add(client.getReceived());
-        _rejected.add(client.getRejected());
-        _duration.add(client.getDuration());
-        _throughputIn.add(client.getThroughputIn());
-        _throughputOut.add(client.getThroughputOut());
-        _bandwidthIn.add(client.getBandwidthIn());
-        _bandwidthOut.add(client.getBandwidthOut());
-        _latency.add(client.getLatency());
+        PerformanceTest test = new PerformanceTest(_props);
+        if (test.test())
+        {
+               test.check(out);
+               _sent.add(test.getSent());
+               _received.add(test.getTotalReceived());
+               _consumed.add(test.getConsumed());
+               _rejected.add(test.getRejected());
+               _duration.add(test.getDuration());
+               _throughputIn.add(test.getThroughputIn());
+               _throughputOut.add(test.getThroughputOut());
+               _bandwidthIn.add(test.getBandwidthIn());
+               _bandwidthOut.add(test.getBandwidthOut());
+               _latency.add(test.getLatency());
+            return true;
+        }
+        else
+        {
+            return false;
+        }
     }
     
-    public void series(File file) throws Exception
+    public boolean series(File file) throws Exception
     {
         try
         {
@@ -76,7 +89,10 @@ public class PerformanceStatistics
             for (int i = 0; i < repeat; i++)
             {
                 _log.info("starting individual test run " + i);
-                single(out);
+                if (!single(out))
+                {
+                    return false;
+                }
             }
         }
         catch (Exception e)
@@ -86,6 +102,7 @@ public class PerformanceStatistics
         
         _statistics.add(new Statistics(_sent, "sent"));
         _statistics.add(new Statistics(_received, "received"));
+        _statistics.add(new Statistics(_consumed, "consumed"));
         _statistics.add(new Statistics(_rejected, "rejected"));
         _statistics.add(new Statistics(_duration, "duration"));
         _statistics.add(new Statistics(_throughputIn, "throughputIn"));
@@ -93,6 +110,7 @@ public class PerformanceStatistics
         _statistics.add(new Statistics(_bandwidthIn, "bandwidthIn"));
         _statistics.add(new Statistics(_bandwidthOut, "bandwidthOut"));
         _statistics.add(new Statistics(_latency, "latency"));
+        return true;
     }
     
     public void statistics(File file)
@@ -126,7 +144,13 @@ public class PerformanceStatistics
         }
         
         PerformanceStatistics stats = new PerformanceStatistics(propertyFile);
-        stats.series(new File("series.csv"));
-        stats.statistics(new File("statistics.csv"));
+        if (stats.series(new File("series.csv")))
+        {
+               stats.statistics(new File("statistics.csv"));
+        }
+        else
+        {
+            System.err.println("connection faulre, test series aborted");
+        }
     }
 }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java?rev=1044979&r1=1044978&r2=1044979&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/perftests/src/main/java/org/apache/qpid/perftests/dlq/test/PerformanceTest.java
 Mon Dec 13 00:41:01 2010
@@ -16,13 +16,20 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.qpid.perftests.dlq.client.Check;
+import org.apache.qpid.perftests.dlq.client.Client;
 import org.apache.qpid.perftests.dlq.client.Create;
 import org.apache.qpid.perftests.dlq.client.Receiver;
 import org.apache.qpid.perftests.dlq.client.Sender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-
+/**
+ * Run a single performance test, based on specific configuration properties.
+ */
 public class PerformanceTest
 {
+    private static final Logger _log = 
LoggerFactory.getLogger(PerformanceFramework.class);
+    
     private static CountDownLatch _latch;
     
     private ExecutorService _executor;
@@ -30,7 +37,7 @@ public class PerformanceTest
     private int _size = 0;
     private int _threads = 0;
     private int _sent = 0;
-    private int _received = 0;
+    private int _consumed = 0;
     private int _rejected = 0;
     private long _started = 0;
     private long _finished = 0;
@@ -62,10 +69,14 @@ public class PerformanceTest
         _props = props;
     }
     
-    public void test() throws Exception
+    public boolean test() throws Exception
     {
-        Create create = new Create(_props);
-        create.connect();
+        Client create = new Create(_props);
+        if (!create.connect())
+        {
+            _log.error("initial connection failed");
+            return false;
+        }
         create.call();
         create.shutdown();
         
@@ -82,16 +93,16 @@ public class PerformanceTest
         _latch = new CountDownLatch(1);
         _started = System.nanoTime();
         
-        Sender sender = new Sender(_props);
+        Client sender = new Sender(_props);
         sender.connect();
         Future<Integer> send = _executor.submit(sender);
  
         Receiver.reset();
         List<Future<Integer>> receives = new ArrayList<Future<Integer>>();
-        List<Receiver> receivers = new ArrayList<Receiver>();
+        List<Client> receivers = new ArrayList<Client>();
         for (int i = 0; i < _threads; i++)
         {
-               Receiver receiver = new Receiver(_props);
+            Client receiver = new Receiver(_props);
                receiver.connect();
                receivers.add(receiver);
                receives.add(_executor.submit(receiver));
@@ -104,10 +115,10 @@ public class PerformanceTest
                _sent = send.get();
                for (Future<Integer> receive : receives)
                {
-                       _received += receive.get();
+                       _consumed += receive.get();
                }    
 
-               Check check = new Check(_props);
+               Client check = new Check(_props);
                check.connect();
                _rejected = check.call();
                check.shutdown();
@@ -119,12 +130,13 @@ public class PerformanceTest
         finally
         {
             sender.shutdown();
-            for (Receiver receiver : receivers)
+            for (Client receiver : receivers)
             {
                    receiver.shutdown();
             }
             _executor.shutdownNow();
         }
+        return true;
     }
     
     public void check(PrintStream out)
@@ -134,17 +146,13 @@ public class PerformanceTest
         {
             error.append("sent ").append(_sent).append(" not 
").append(_count).append('\n');
         }
-        boolean sessionOk = ((_session.equalsIgnoreCase(CLIENT_ACKNOWLEDGE)) 
|| (_session.equalsIgnoreCase(SESSION_TRANSACTED)) ||
-                ((_session.equalsIgnoreCase(AUTO_ACKNOWLEDGE) || 
_session.equalsIgnoreCase(DUPS_OK_ACKNOWLEDGE)) && _listener));
-        int rejected = (!sessionOk || !_messageIds || _maxRedelivery == 0 || 
_rejectCount < _maxRedelivery) ? 0 : _count / _reject;
-        if (_rejected != rejected)
+        if (_rejected != Receiver.getRejectedCheck())
         {
-            error.append("rejected ").append(_rejected).append(" not 
").append(rejected).append('\n');
+            error.append("rejected ").append(_rejected).append(" not 
").append(Receiver.getRejectedCheck()).append('\n');
         }
-        int received = (_count - rejected) + (sessionOk ? ((_count / _reject) 
* _rejectCount) : 0);
-        if (_received != received)
+        if (_consumed != Receiver.getConsumedCheck())
         {
-            error.append("received ").append(_received).append(" not 
").append(received).append('\n');
+            error.append("consumed ").append(_consumed).append(" not 
").append(Receiver.getConsumedCheck()).append('\n');
         }
         if (error.length() > 0)
         {
@@ -158,12 +166,12 @@ public class PerformanceTest
     
     public static String getHeader()
     {
-        return "sent,received,rejected,duration";
+        return "sent,received,consumed,rejected,duration";
     }
     
     public String toString()
     {
-        String results = String.format("%d,%d,%d,%f", _sent, _received, 
_rejected, getDuration());
+        String results = String.format("%d,%d,%d,%d,%f", _sent, 
Receiver.getTotalReceivedCount(), _consumed, _rejected, getDuration());
         return results;
     }
     
@@ -172,9 +180,14 @@ public class PerformanceTest
         return (double) _sent;
     }
     
-    public double getReceived()
+    public double getConsumed()
+    {
+        return (double) _consumed;
+    }
+    
+    public double getTotalReceived()
     {
-        return (double) _received;
+        return (double) Receiver.getTotalReceivedCount();
     }
     
     public double getDuration()
@@ -194,7 +207,7 @@ public class PerformanceTest
     
     public double getThroughputOut()
     {
-        return getReceived() / getDuration();
+        return getTotalReceived() / getDuration();
     }
     
     public double getBandwidthIn()
@@ -204,12 +217,12 @@ public class PerformanceTest
     
     public double getBandwidthOut()
     {
-        return (getReceived() * (double) _size) / getDuration();
+        return (getTotalReceived() * (double) _size) / getDuration();
     }
     
     public double getLatency()
     {
-        return getDuration() / getReceived();
+        return getDuration() / getTotalReceived();
     }
     
     public static void countDown()
@@ -230,8 +243,14 @@ public class PerformanceTest
             throw new RuntimeException("property file '" + 
propertyFile.getName() + "' must exist and be readable");
         }
         PerformanceTest client = new PerformanceTest(propertyFile);
-        client.test();
-        client.check(System.out);
+        if (client.test())
+        {
+               client.check(System.out);
+        }
+        else
+        {
+            System.err.println("test connection failure");
+        }
     }
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to