Author: orudyy
Date: Thu Oct 16 14:13:20 2014
New Revision: 1632318

URL: http://svn.apache.org/r1632318
Log:
QPID-6158: Add sample utility to help to perform stress testing of broker

The utility was originally implemented by Robbie Gemmell

Added:
    
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
    qpid/trunk/qpid/java/tools/src/main/resources/
    qpid/trunk/qpid/java/tools/src/main/resources/stress-test-client.properties
Modified:
    qpid/trunk/qpid/java/tools/pom.xml

Modified: qpid/trunk/qpid/java/tools/pom.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/pom.xml?rev=1632318&r1=1632317&r2=1632318&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/pom.xml (original)
+++ qpid/trunk/qpid/java/tools/pom.xml Thu Oct 16 14:13:20 2014
@@ -51,7 +51,6 @@
       <groupId>org.apache.geronimo.specs</groupId>
       <artifactId>geronimo-jms_1.1_spec</artifactId>
       <version>${geronimo-jms-1-1-version}</version>
-      <scope>provided</scope>
     </dependency>
   </dependencies>
    

Added: 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java?rev=1632318&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
 (added)
+++ 
qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
 Thu Oct 16 14:13:20 2014
@@ -0,0 +1,446 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+public class StressTestClient
+{
+    private static final String QUEUE_NAME_PREFIX = 
"BURL:direct://amq.direct//stress-test-queue";
+    private static final String DURABLE_SUFFIX = "?durable='true'";
+
+    public static final String CONNECTIONS_ARG = "connections";
+    public static final String SESSIONS_ARG = "sessions";
+    public static final String CONSUME_IMMEDIATELY_ARG = "consumeImmediately";
+    public static final String CONSUMERS_ARG = "consumers";
+    public static final String CLOSE_CONSUMERS_ARG = "closeconsumers";
+    public static final String PRODUCERS_ARG = "producers";
+    public static final String MESSAGE_COUNT_ARG = "messagecount";
+    public static final String MESSAGE_SIZE_ARG = "size";
+    public static final String SUFFIX_ARG = "suffix";
+    public static final String REPETITIONS_ARG = "repetitions";
+    public static final String PERSISTENT_ARG = "persistent";
+    public static final String RANDOM_ARG = "random";
+    public static final String TIMEOUT_ARG = "timeout";
+    public static final String DELAYCLOSE_ARG = "delayclose";
+    public static final String REPORT_MOD_ARG = "reportmod";
+    public static final String LOW_PREFETCH_ARG = "lowprefetch";
+    public static final String TRANSACTED_ARG = "transacted";
+    public static final String TX_BATCH_ARG = "txbatch";
+
+    public static final String CONNECTIONS_DEFAULT = "1";
+    public static final String SESSIONS_DEFAULT = "1";
+    public static final String CONSUME_IMMEDIATELY_DEFAULT = "true";
+    public static final String CLOSE_CONSUMERS_DEFAULT = "true";
+    public static final String PRODUCERS_DEFAULT = "1";
+    public static final String CONSUMERS_DEFAULT = "1";
+    public static final String MESSAGE_COUNT_DEFAULT = "1";
+    public static final String MESSAGE_SIZE_DEFAULT = "256";
+    public static final String SUFFIX_DEFAULT = "";
+    public static final String REPETITIONS_DEFAULT = "1";
+    public static final String PERSISTENT_DEFAULT = "false";
+    public static final String RANDOM_DEFAULT = "true";
+    public static final String TIMEOUT_DEFAULT = "30000";
+    public static final String DELAYCLOSE_DEFAULT = "0";
+    public static final String REPORT_MOD_DEFAULT = "1";
+    public static final String LOW_PREFETCH_DEFAULT = "false";
+    public static final String TRANSACTED_DEFAULT = "false";
+    public static final String TX_BATCH_DEFAULT = "1";
+
+    private static final String CLASS = "StressTestClient";
+
+    public static void main(String[] args)
+    {
+        Map<String,String> options = new HashMap<>();
+        options.put(CONNECTIONS_ARG, CONNECTIONS_DEFAULT);
+        options.put(SESSIONS_ARG, SESSIONS_DEFAULT);
+        options.put(CONSUME_IMMEDIATELY_ARG, CONSUME_IMMEDIATELY_DEFAULT);
+        options.put(PRODUCERS_ARG, PRODUCERS_DEFAULT);
+        options.put(CONSUMERS_ARG, CONSUMERS_DEFAULT);
+        options.put(CLOSE_CONSUMERS_ARG, CLOSE_CONSUMERS_DEFAULT);
+        options.put(MESSAGE_COUNT_ARG, MESSAGE_COUNT_DEFAULT);
+        options.put(MESSAGE_SIZE_ARG, MESSAGE_SIZE_DEFAULT);
+        options.put(SUFFIX_ARG, SUFFIX_DEFAULT);
+        options.put(REPETITIONS_ARG, REPETITIONS_DEFAULT);
+        options.put(PERSISTENT_ARG, PERSISTENT_DEFAULT);
+        options.put(RANDOM_ARG, RANDOM_DEFAULT);
+        options.put(TIMEOUT_ARG, TIMEOUT_DEFAULT);
+        options.put(DELAYCLOSE_ARG, DELAYCLOSE_DEFAULT);
+        options.put(REPORT_MOD_ARG, REPORT_MOD_DEFAULT);
+        options.put(LOW_PREFETCH_ARG, LOW_PREFETCH_DEFAULT);
+        options.put(TRANSACTED_ARG, TRANSACTED_DEFAULT);
+        options.put(TX_BATCH_ARG, TX_BATCH_DEFAULT);
+
+        if(args.length == 1 &&
+                (args[0].equals("-h") || args[0].equals("--help") || 
args[0].equals("help")))
+        {
+            System.out.println("arg=value options: \n" + options.keySet());
+            return;
+        }
+
+        parseArgumentsIntoConfig(options, args);
+
+        StressTestClient testClient = new StressTestClient();
+        testClient.runTest(options);
+    }
+
+    public static void parseArgumentsIntoConfig(Map<String, String> 
initialValues, String[] args)
+    {
+        for(String arg: args)
+        {
+            String[] splitArg = arg.split("=");
+            if(splitArg.length != 2)
+            {
+                throw new IllegalArgumentException("arguments must have format 
<name>=<value>: " + arg);
+            }
+
+            if(initialValues.put(splitArg[0], splitArg[1]) == null)
+            {
+                throw new IllegalArgumentException("not a valid configuration 
property: " + arg);
+            }
+        }
+    }
+
+
+    private void runTest(Map<String,String> options)
+    {
+        int numConnections = Integer.parseInt(options.get(CONNECTIONS_ARG));
+        int numSessions = Integer.parseInt(options.get(SESSIONS_ARG));
+        int numProducers = Integer.parseInt(options.get(PRODUCERS_ARG));
+        int numConsumers = Integer.parseInt(options.get(CONSUMERS_ARG));
+        boolean closeConsumers = 
Boolean.valueOf(options.get(CLOSE_CONSUMERS_ARG));
+        boolean consumeImmediately = 
Boolean.valueOf(options.get(CONSUME_IMMEDIATELY_ARG));
+        int numMessage = Integer.parseInt(options.get(MESSAGE_COUNT_ARG));
+        int messageSize = Integer.parseInt(options.get(MESSAGE_SIZE_ARG));
+        int repetitions = Integer.parseInt(options.get(REPETITIONS_ARG));
+        String queueString = QUEUE_NAME_PREFIX + options.get(SUFFIX_ARG) + 
DURABLE_SUFFIX;
+        int deliveryMode = Boolean.valueOf(options.get(PERSISTENT_ARG)) ? 
DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+        boolean random = Boolean.valueOf(options.get(RANDOM_ARG));
+        long recieveTimeout = Long.parseLong(options.get(TIMEOUT_ARG));
+        long delayClose = Long.parseLong(options.get(DELAYCLOSE_ARG));
+        int reportingMod = Integer.parseInt(options.get(REPORT_MOD_ARG));
+        boolean lowPrefetch = Boolean.valueOf(options.get(LOW_PREFETCH_ARG));
+        boolean transacted = Boolean.valueOf(options.get(TRANSACTED_ARG));
+        int txBatch = Integer.parseInt(options.get(TX_BATCH_ARG));
+
+        System.out.println(CLASS + ": Using options: " + options);
+
+        System.out.println(CLASS + ": Creating message payload of " + 
messageSize + " (bytes)");
+        byte[] sentBytes = generateMessage(random, messageSize);
+
+        try
+        {
+            // Load JNDI properties
+            Properties properties = new Properties();
+            try(InputStream is = 
this.getClass().getClassLoader().getResourceAsStream("stress-test-client.properties"))
+            {
+                properties.load(is);
+            }
+            Context ctx = new InitialContext(properties);
+
+            ConnectionFactory conFac;
+            if(lowPrefetch)
+            {
+                System.out.println(CLASS + ": Using lowprefetch connection 
factory");
+                conFac = 
(ConnectionFactory)ctx.lookup("qpidConnectionfactoryLowPrefetch");
+            }
+            else
+            {
+                conFac = 
(ConnectionFactory)ctx.lookup("qpidConnectionfactory");
+            }
+
+            //ensure the queue to be used exists and is bound
+            System.out.println(CLASS + ": Creating queue: " + queueString);
+            Connection startupConn = conFac.createConnection();
+            Session startupSess = startupConn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Destination startupDestination = 
startupSess.createQueue(queueString);
+            MessageConsumer startupConsumer = 
startupSess.createConsumer(startupDestination);
+            startupConsumer.close();
+            startupSess.close();
+            startupConn.close();
+
+            for(int rep = 1 ; rep <= repetitions; rep++)
+            {
+                ArrayList<Connection> connectionList = new ArrayList<>();
+
+                for (int co= 1; co<= numConnections ; co++)
+                {
+                    if( co % reportingMod == 0)
+                    {
+                        System.out.println(CLASS + ": Creating connection " + 
co);
+                    }
+                    Connection conn = conFac.createConnection();
+                    conn.setExceptionListener(new ExceptionListener()
+                    {
+                        public void onException(JMSException jmse)
+                        {
+                            System.err.println(CLASS + ": The sample received 
an exception through the ExceptionListener");
+                            jmse.printStackTrace();
+                            System.exit(0);
+                        }
+                    });
+
+                    connectionList.add(conn);
+                    conn.start();
+                    for (int se= 1; se<= numSessions ; se++)
+                    {
+                        if( se % reportingMod == 0)
+                        {
+                            System.out.println(CLASS + ": Creating Session " + 
se);
+                        }
+                        try
+                        {
+                            Session sess;
+                            if(transacted)
+                            {
+                                sess = conn.createSession(true, 
Session.SESSION_TRANSACTED);
+                            }
+                            else
+                            {
+                                sess = conn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                            }
+
+                            BytesMessage message = sess.createBytesMessage();
+
+                            message.writeBytes(sentBytes);
+
+                            if(!random && numMessage == 1 && numSessions == 1 
&& numConnections == 1 && repetitions == 1)
+                            {
+                                //null the array to save memory
+                                sentBytes = null;
+                            }
+
+                            Destination destination = 
sess.createQueue(queueString);
+
+                            MessageConsumer consumer = null;
+                            for(int cns = 1 ; cns <= numConsumers ; cns++)
+                            {
+                                if( cns % reportingMod == 0)
+                                {
+                                    System.out.println(CLASS + ": Creating 
Consumer " + cns);
+                                }
+                                consumer = sess.createConsumer(destination);
+                            }
+
+                            for(int pr = 1 ; pr <= numProducers ; pr++)
+                            {
+                                if( pr % reportingMod == 0)
+                                {
+                                    System.out.println(CLASS + ": Creating 
Producer " + pr);
+                                }
+                                MessageProducer prod = 
sess.createProducer(destination);
+                                for(int me = 1; me <= numMessage ; me++)
+                                {
+                                    if( me % reportingMod == 0)
+                                    {
+                                        System.out.println(CLASS + ": Sending 
Message " + me);
+                                    }
+                                    prod.send(message, deliveryMode,
+                                            Message.DEFAULT_PRIORITY,
+                                            Message.DEFAULT_TIME_TO_LIVE);
+                                    if(transacted && me % txBatch == 0)
+                                    {
+                                        sess.commit();
+                                    }
+                                }
+                            }
+
+                            if(numConsumers > 0 && consumeImmediately)
+                            {
+                                for(int cs = 1 ; cs <= numMessage ; cs++)
+                                {
+                                    if( cs % reportingMod == 0)
+                                    {
+                                        System.out.println(CLASS + ": 
Consuming Message " + cs);
+                                    }
+                                    BytesMessage msg = (BytesMessage) 
consumer.receive(recieveTimeout);
+
+                                    if(transacted && cs % txBatch == 0)
+                                    {
+                                        sess.commit();
+                                    }
+
+                                    if(msg == null)
+                                    {
+                                        throw new RuntimeException("Expected 
message not received in allowed time: " + recieveTimeout);
+                                    }
+
+                                    validateReceivedMessageContent(sentBytes, 
msg, random, messageSize);
+                                }
+
+                                if(closeConsumers)
+                                {
+                                    consumer.close();
+                                }
+                            }
+
+                        }
+                        catch (Exception exp)
+                        {
+                            System.err.println(CLASS + ": Caught an Exception: 
" + exp);
+                            exp.printStackTrace();
+                        }
+
+                    }
+                }
+
+                if(numConsumers == -1 && !consumeImmediately)
+                {
+                    System.out.println(CLASS + ": Consuming left over 
messages, using recieve timeout:" + recieveTimeout);
+
+                    Connection conn = conFac.createConnection();
+                    Session sess = conn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                    Destination destination = sess.createQueue(queueString);
+                    MessageConsumer consumer = 
sess.createConsumer(destination);
+                    conn.start();
+
+                    int count = 0;
+                    while(true)
+                    {
+                        BytesMessage msg = (BytesMessage) 
consumer.receive(recieveTimeout);
+
+                        if(msg == null)
+                        {
+                            System.out.println(CLASS + ": Received " + count + 
" messages");
+                            break;
+                        }
+                        else
+                        {
+                            count++;
+                        }
+
+                        validateReceivedMessageContent(sentBytes, msg, random, 
messageSize);
+                    }
+
+                    consumer.close();
+                    sess.close();
+                    conn.close();
+                }
+
+                if(delayClose > 0)
+                {
+                    System.out.println(CLASS + ": Delaying closing 
connections: " + delayClose);
+                    Thread.sleep(delayClose);
+                }
+
+                // Close the connections to the server
+                System.out.println(CLASS + ": Closing connections");
+
+                for(int connection = 0 ; connection < connectionList.size() ; 
connection++)
+                {
+                    if( (connection+1) % reportingMod == 0)
+                    {
+                        System.out.println(CLASS + ": Closing connection " + 
(connection+1));
+                    }
+                    Connection c = connectionList.get(connection);
+                    c.close();
+                }
+
+                // Close the JNDI reference
+                System.out.println(CLASS + ": Closing JNDI context");
+                ctx.close();
+            }
+        }
+        catch (Exception exp)
+        {
+            System.err.println(CLASS + ": Caught an Exception: " + exp);
+            exp.printStackTrace();
+        }
+    }
+
+
+    private byte[] generateMessage(boolean random, int messageSize)
+    {
+        byte[] sentBytes = new byte[messageSize];
+        if(random)
+        {
+            //fill the array with numbers from 0-9
+            Random rand = new Random(System.currentTimeMillis());
+            for(int r = 0 ; r < messageSize ; r++)
+            {
+                sentBytes[r] = (byte) (48 + rand.nextInt(10));
+            }
+        }
+        else
+        {
+            //use sequential numbers from 0-9
+            for(int r = 0 ; r < messageSize ; r++)
+            {
+                sentBytes[r] = (byte) (48 + (r % 10));
+            }
+        }
+        return sentBytes;
+    }
+
+
+    private void validateReceivedMessageContent(byte[] sentBytes,
+            BytesMessage msg, boolean random, int messageSize) throws 
JMSException
+    {
+        Long length = msg.getBodyLength();
+
+        if(length != messageSize)
+        {
+            throw new RuntimeException("Incorrect number of bytes received");
+        }
+
+        byte[] recievedBytes = new byte[length.intValue()];
+        msg.readBytes(recievedBytes);
+
+        if(random)
+        {
+            if(!Arrays.equals(sentBytes, recievedBytes))
+            {
+                throw new RuntimeException("Incorrect value of bytes 
received");
+            }
+        }
+        else
+        {
+            for(int r = 0 ; r < messageSize ; r++)
+            {
+                if(! (recievedBytes[r] == (byte) (48 + (r % 10))))
+                {
+                    throw new RuntimeException("Incorrect value of bytes 
received");
+                }
+            }
+        }
+    }
+}
+

Added: 
qpid/trunk/qpid/java/tools/src/main/resources/stress-test-client.properties
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/resources/stress-test-client.properties?rev=1632318&view=auto
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/resources/stress-test-client.properties 
(added)
+++ qpid/trunk/qpid/java/tools/src/main/resources/stress-test-client.properties 
Thu Oct 16 14:13:20 2014
@@ -0,0 +1,3 @@
+java.naming.factory.initial = 
org.apache.qpid.jndi.PropertiesFileInitialContextFactory
+connectionfactory.qpidConnectionfactory = 
amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672'
+connectionfactory.qpidConnectionfactoryLowPrefetch=amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672?maxprefetch='10''



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to