User: d_jencks
Date: 01/09/19 12:16:54
Modified: src/main/org/jboss/test/jbossmq/perf
JBossMQPerfStressTestCase.java
Log:
Changed stress tests to have counts set from build.xml. You may set values in a
local.properties file.
Revision Changes Path
1.2 +712 -527
jbosstest/src/main/org/jboss/test/jbossmq/perf/JBossMQPerfStressTestCase.java
Index: JBossMQPerfStressTestCase.java
===================================================================
RCS file:
/cvsroot/jboss/jbosstest/src/main/org/jboss/test/jbossmq/perf/JBossMQPerfStressTestCase.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- JBossMQPerfStressTestCase.java 2001/09/12 04:55:39 1.1
+++ JBossMQPerfStressTestCase.java 2001/09/19 19:16:54 1.2
@@ -1,546 +1,731 @@
/*
- * Copyright (c) 2000 Hiram Chirino <[EMAIL PROTECTED]>
+ * JBoss, the OpenSource J2EE webOS
*
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.test.jbossmq.perf;
+import java.util.*;
+import javax.jms.*;
import javax.naming.*;
-import javax.jms.*;
-import java.util.*;
+import org.apache.log4j.Category;
+
+import org.jboss.test.JBossTestCase;
/**
- * JBossMQPerfStressTestCase.java
- *
- * Some simple tests of JBossMQ
+ * JBossMQPerfStressTestCase.java Some simple tests of JBossMQ
*
* @author
* @version
*/
+
+public class JBossMQPerfStressTestCase extends JBossTestCase
+{
-public class JBossMQPerfStressTestCase extends junit.framework.TestCase {
-
- // Provider specific
- static String TOPIC_FACTORY = "ConnectionFactory";
- static String QUEUE_FACTORY = "ConnectionFactory";
-
- static String TEST_QUEUE = "queue/testQueue";
- static String TEST_TOPIC = "topic/testTopic";
-
- static int PERFORMANCE_TEST_ITERATIONS = 1000;
- static byte[] PERFORMANCE_TEST_DATA_PAYLOAD = new byte[10*1024];
-
- static int TRANS_NONE = 0;
- static int TRANS_INDIVIDUAL = 1;
- static int TRANS_TOTAL = 2;
- static String[] TRANS_DESC = {"NOT", "individually", "totally"};
-
- //JMSProviderAdapter providerAdapter;
- static Context context;
- static QueueConnection queueConnection;
- static TopicConnection topicConnection;
-
- public JBossMQPerfStressTestCase(String name) throws Exception{
- super(name);
- }
-
-
- // Emptys out all the messages in a queue
- private void drainQueue() throws Exception {
-
- QueueSession session = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
- Queue queue = (Queue)context.lookup(TEST_QUEUE);
-
- QueueReceiver receiver = session.createReceiver(queue);
- Message message = receiver.receive( 2000 );
- int c=0;
- while( message != null ) {
- message = receiver.receive( 2000 );
- c++;
- }
-
- if( c!=0 )
- System.out.println(" Drained "+c+" messages from the queue");
-
- session.close();
-
- }
-
- private void waitForSynchMessage() throws Exception {
- QueueSession session = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
- Queue queue = (Queue)context.lookup(TEST_QUEUE);
-
- QueueReceiver receiver = session.createReceiver(queue);
- receiver.receive();
- session.close();
- }
-
- private void sendSynchMessage() throws Exception {
- QueueSession session = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
- Queue queue = (Queue)context.lookup(TEST_QUEUE);
-
- QueueSender sender = session.createSender(queue);
-
- Message message = session.createMessage();
- sender.send(message);
-
- session.close();
- }
-
- static public void main ( String []args ) {
-
- String newArgs[] = {
"org.jboss.test.jbossmq.perf.JBossMQPerfStressTestCase" };
- junit.swingui.TestRunner.main(newArgs);
-
-
- }
-
- public void runAsynchQueuePerformance(final int transacted, final int
persistence) throws Exception {
-
- {
- queueConnection.start();
- drainQueue();
- queueConnection.stop();
- }
-
- Thread sendThread = new Thread() {
- public void run() {
- try {
- QueueSession session =
queueConnection.createQueueSession(transacted!=TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
- Queue queue =
(Queue)context.lookup(TEST_QUEUE);
-
- QueueSender sender =
session.createSender(queue);
-
- BytesMessage message =
session.createBytesMessage();
-
message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
-
- long startTime = System.currentTimeMillis();
- for( int i=0; i < PERFORMANCE_TEST_ITERATIONS;
i++) {
- sender.send(queue, message,
persistence, 4, 0);
- //System.out.println(" Sent #"+i);
- if( transacted == TRANS_INDIVIDUAL )
- session.commit();
- }
-
- if( transacted == TRANS_TOTAL )
- session.commit();
-
- long endTime = System.currentTimeMillis();
-
- session.close();
-
- long pTime = endTime-startTime;
- System.out.println(" sent all messages in
"+((double)pTime/1000)+" seconds. ");
- } catch ( Exception e ) {
- e.printStackTrace();
- }
- }
- };
-
- QueueSession session =
queueConnection.createQueueSession(transacted!=TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
- Queue queue = (Queue)context.lookup(TEST_QUEUE);
- QueueReceiver receiver = session.createReceiver(queue);
-
- MessageListener listener = new MessageListener() {
- long startTime = System.currentTimeMillis();
- int i = 0;
- public void onMessage(Message message) {
- i++;
+ // Provider specific
+ static String TOPIC_FACTORY = "ConnectionFactory";
+ static String QUEUE_FACTORY = "ConnectionFactory";
+
+ static String TEST_QUEUE = "queue/testQueue";
+ static String TEST_TOPIC = "topic/testTopic";
+
+ // static int PERFORMANCE_TEST_ITERATIONS = 1000;
+ static byte[] PERFORMANCE_TEST_DATA_PAYLOAD = new byte[10 * 1024];
+
+ static int TRANS_NONE = 0;
+ static int TRANS_INDIVIDUAL = 1;
+ static int TRANS_TOTAL = 2;
+ static String[] TRANS_DESC = {"NOT", "individually", "totally"};
+
+ //JMSProviderAdapter providerAdapter;
+ static Context context;
+ static QueueConnection queueConnection;
+ static TopicConnection topicConnection;
+
+ /**
+ * Constructor for the JBossMQPerfStressTestCase object
+ *
+ * @param name Description of Parameter
+ * @exception Exception Description of Exception
+ */
+ public JBossMQPerfStressTestCase(String name) throws Exception
+ {
+ super(name);
+ }
+
+ /**
+ * The main entry-point for the JBossMQPerfStressTestCase class
+ *
+ * @param args The command line arguments
+ */
+ public static void main(String[] args)
+ {
+
+ String newArgs[] = {"org.jboss.test.jbossmq.perf.JBossMQPerfStressTestCase"};
+ junit.swingui.TestRunner.main(newArgs);
+
+ }
+
+ /**
+ * #Description of the Method
+ *
+ * @param transacted Description of Parameter
+ * @param persistence Description of Parameter
+ * @exception Exception Description of Exception
+ */
+ public void runAsynchQueuePerformance(final int transacted, final int
persistence) throws Exception
+ {
+ {
+ queueConnection.start();
+ drainQueue();
+ queueConnection.stop();
+ }
+ final int iterationCount = getIterationCount();
+ final Category log = getLog();
+
+ Thread sendThread =
+ new Thread()
+ {
+ /**
+ * Main processing method for the JBossMQPerfStressTestCase object
+ */
+ public void run()
+ {
+ try
+ {
+ QueueSession session =
queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+ QueueSender sender = session.createSender(queue);
+
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < iterationCount; i++)
+ {
+ sender.send(queue, message, persistence, 4, 0);
+ //getLog().debug(" Sent #"+i);
+ if (transacted == TRANS_INDIVIDUAL)
+ {
+ session.commit();
+ }
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ session.close();
+
+ long pTime = endTime - startTime;
+ log.debug(" sent all messages in " + ((double)pTime / 1000) + "
seconds. ");
+ }
+ catch (Exception e)
+ {
+ log.error("error", e);
+ }
+ }
+ };
+
+ QueueSession session = queueConnection.createQueueSession(transacted !=
TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+ QueueReceiver receiver = session.createReceiver(queue);
+
+ MessageListener listener =
+ new MessageListener()
+ {
+ long startTime = System.currentTimeMillis();
+ int i = 0;
+
+ /**
+ * #Description of the Method
+ *
+ * @param message Description of Parameter
+ */
+ public void onMessage(Message message)
+ {
+ i++;
// if( transacted == TRANS_INDIVIDUAL )
// session.commit();
- if( i >= PERFORMANCE_TEST_ITERATIONS ) {
- long endTime = System.currentTimeMillis();
- long pTime = endTime-startTime;
- System.out.println(" received all messages in
"+((double)pTime/1000)+" seconds. ");
-
- synchronized ( this ) {
- this.notify();
- }
- }
- }
- };
-
-
- System.out.println(" This test will send
"+PERFORMANCE_TEST_ITERATIONS+" "
-
+(persistence==DeliveryMode.PERSISTENT?"persistent":"non-persistent")+" messages. Each
with a payload of "
- +((double)PERFORMANCE_TEST_DATA_PAYLOAD.length/1024)+"Kb"
- +" Session is "+TRANS_DESC[transacted]+" transacted");
- long startTime = System.currentTimeMillis();
- sendThread.start();
- receiver.setMessageListener( listener );
- queueConnection.start();
- synchronized( listener ) {
- listener.wait();
- }
-
- if( transacted == TRANS_TOTAL )
- session.commit();
-
- session.close();
- sendThread.join();
- long endTime = System.currentTimeMillis();
- long pTime = endTime-startTime;
- System.out.println(" All threads finished after:
"+((double)pTime/1000)+" seconds. ");
-
- }
-
- public void runAsynchTopicPerformance(final int transacted, final int
persistence) throws Exception {
-
- {
- queueConnection.start();
- drainQueue( );
- }
-
- Thread sendThread = new Thread() {
- public void run() {
- try {
-
-
- TopicSession session =
topicConnection.createTopicSession(transacted!=TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
- Topic topic =
(Topic)context.lookup(TEST_TOPIC);
-
- TopicPublisher publisher =
session.createPublisher(topic);
-
- waitForSynchMessage();
-
- BytesMessage message =
session.createBytesMessage();
-
message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
-
- long startTime = System.currentTimeMillis();
- for( int i=0; i < PERFORMANCE_TEST_ITERATIONS;
i++) {
- publisher.publish(topic, message,
persistence, 4, 0);
- //System.out.println(" Sent #"+i);
- if( transacted == TRANS_INDIVIDUAL )
- session.commit();
- }
-
- if( transacted == TRANS_TOTAL )
- session.commit();
-
- long endTime = System.currentTimeMillis();
- session.close();
-
- long pTime = endTime-startTime;
- System.out.println(" sent all messages in
"+((double)pTime/1000)+" seconds. ");
- } catch ( Exception e ) {
- e.printStackTrace();
- }
- }
- };
-
- TopicSession session =
topicConnection.createTopicSession(transacted!=TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
- Topic topic = (Topic)context.lookup(TEST_TOPIC);
- TopicSubscriber subscriber = session.createSubscriber(topic);
-
- MessageListener listener = new MessageListener() {
- long startTime = System.currentTimeMillis();
- int i = 0;
- public void onMessage(Message message) {
- i++;
+ if (i >= iterationCount)
+ {
+ long endTime = System.currentTimeMillis();
+ long pTime = endTime - startTime;
+ log.debug(" received all messages in " + ((double)pTime / 1000)
+ " seconds. ");
+
+ synchronized (this)
+ {
+ this.notify();
+ }
+ }
+ }
+ };
+
+ getLog().debug(" This test will send " + getIterationCount() + " "
+ + (persistence == DeliveryMode.PERSISTENT ? "persistent" :
"non-persistent") + " messages. Each with a payload of "
+ + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb"
+ + " Session is " + TRANS_DESC[transacted] + " transacted");
+ long startTime = System.currentTimeMillis();
+ sendThread.start();
+ receiver.setMessageListener(listener);
+ queueConnection.start();
+ synchronized (listener)
+ {
+ listener.wait();
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ session.close();
+ sendThread.join();
+ long endTime = System.currentTimeMillis();
+ long pTime = endTime - startTime;
+ getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + "
seconds. ");
+
+ }
+
+ /**
+ * #Description of the Method
+ *
+ * @param transacted Description of Parameter
+ * @param persistence Description of Parameter
+ * @exception Exception Description of Exception
+ */
+ public void runAsynchTopicPerformance(final int transacted, final int
persistence) throws Exception
+ {
+ {
+ queueConnection.start();
+ drainQueue();
+ }
+
+ final int iterationCount = getIterationCount();
+ final Category log = getLog();
+
+ Thread sendThread =
+ new Thread()
+ {
+ /**
+ * Main processing method for the JBossMQPerfStressTestCase object
+ */
+ public void run()
+ {
+ try
+ {
+
+ TopicSession session =
topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = (Topic)context.lookup(TEST_TOPIC);
+
+ TopicPublisher publisher = session.createPublisher(topic);
+
+ waitForSynchMessage();
+
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < iterationCount; i++)
+ {
+ publisher.publish(topic, message, persistence, 4, 0);
+ //getLog().debug(" Sent #"+i);
+ if (transacted == TRANS_INDIVIDUAL)
+ {
+ session.commit();
+ }
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ long endTime = System.currentTimeMillis();
+ session.close();
+
+ long pTime = endTime - startTime;
+ log.debug(" sent all messages in " + ((double)pTime / 1000) + "
seconds. ");
+ }
+ catch (Exception e)
+ {
+ log.error("error", e);
+ }
+ }
+ };
+
+ TopicSession session = topicConnection.createTopicSession(transacted !=
TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = (Topic)context.lookup(TEST_TOPIC);
+ TopicSubscriber subscriber = session.createSubscriber(topic);
+
+ MessageListener listener =
+ new MessageListener()
+ {
+ long startTime = System.currentTimeMillis();
+ int i = 0;
+
+ /**
+ * #Description of the Method
+ *
+ * @param message Description of Parameter
+ */
+ public void onMessage(Message message)
+ {
+ i++;
// if( transacted == TRANS_INDIVIDUAL )
// session.commit();
- if( i >= PERFORMANCE_TEST_ITERATIONS ) {
- long endTime = System.currentTimeMillis();
- long pTime = endTime-startTime;
- System.out.println(" received all messages in
"+((double)pTime/1000)+" seconds. ");
-
- synchronized ( this ) {
- this.notify();
- }
- }
- }
- };
-
- System.out.println(" This test will send
"+PERFORMANCE_TEST_ITERATIONS+" "
-
+(persistence==DeliveryMode.PERSISTENT?"persistent":"non-persistent")+" messages. Each
with a payload of "
- +((double)PERFORMANCE_TEST_DATA_PAYLOAD.length/1024)+"Kb"
- +" Session is "+TRANS_DESC[transacted]+" transacted");
- long startTime = System.currentTimeMillis();
- sendThread.start();
- subscriber.setMessageListener( listener );
- topicConnection.start();
- sendSynchMessage();
- synchronized( listener ) {
- listener.wait();
- }
-
- if( transacted == TRANS_TOTAL )
- session.commit();
-
- session.close();
- sendThread.join();
- long endTime = System.currentTimeMillis();
- long pTime = endTime-startTime;
- System.out.println(" All threads finished after:
"+((double)pTime/1000)+" seconds. ");
-
- }
-
- public void runSynchQueuePerformance(final int transacted, final int
persistence) throws Exception {
-
- {
- queueConnection.start();
- drainQueue( );
- }
-
- Thread sendThread = new Thread() {
- public void run() {
- try {
- QueueSession session =
queueConnection.createQueueSession(transacted!=TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
- Queue queue =
(Queue)context.lookup(TEST_QUEUE);
-
- QueueSender sender =
session.createSender(queue);
-
- BytesMessage message =
session.createBytesMessage();
-
message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
-
- long startTime = System.currentTimeMillis();
- for( int i=0; i < PERFORMANCE_TEST_ITERATIONS;
i++) {
- sender.send(queue, message,
persistence, 4, 0);
- //System.out.println(" Sent #"+i);
- if( transacted == TRANS_INDIVIDUAL )
- session.commit();
- }
-
- if( transacted == TRANS_TOTAL )
- session.commit();
-
- session.close();
-
- long endTime = System.currentTimeMillis();
-
- long pTime = endTime-startTime;
- System.out.println(" sent all messages in
"+((double)pTime/1000)+" seconds. ");
- } catch ( Exception e ) {
- e.printStackTrace();
- }
- }
- };
-
- Thread recvThread = new Thread() {
- public void run() {
- try {
-
- QueueSession session =
queueConnection.createQueueSession(transacted!=TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
- Queue queue =
(Queue)context.lookup(TEST_QUEUE);
-
- QueueReceiver receiver =
session.createReceiver(queue);
- long startTime = System.currentTimeMillis();
- for( int i=0; i < PERFORMANCE_TEST_ITERATIONS;
i++) {
- receiver.receive();
- //System.out.println(" Received #"+i);
- if( transacted == TRANS_INDIVIDUAL )
- session.commit();
- }
-
- if( transacted == TRANS_TOTAL )
- session.commit();
-
- long endTime = System.currentTimeMillis();
-
- session.close();
-
- long pTime = endTime-startTime;
- System.out.println(" received all messages in
"+((double)pTime/1000)+" seconds. ");
-
- } catch ( Exception e ) {
- e.printStackTrace();
- }
- }
- };
-
- System.out.println(" This test will send
"+PERFORMANCE_TEST_ITERATIONS+" "
-
+(persistence==DeliveryMode.PERSISTENT?"persistent":"non-persistent")+" messages. Each
with a payload of "
- +((double)PERFORMANCE_TEST_DATA_PAYLOAD.length/1024)+"Kb"
- +" Session is "+TRANS_DESC[transacted]+" transacted");
- long startTime = System.currentTimeMillis();
- sendThread.start();
- recvThread.start();
- sendThread.join();
- recvThread.join();
- long endTime = System.currentTimeMillis();
- long pTime = endTime-startTime;
- System.out.println(" All threads finished after:
"+((double)pTime/1000)+" seconds. ");
-
- }
-
- public void runSynchTopicPerformance(final int transacted, final int
persistence) throws Exception {
-
-
- {
- queueConnection.start();
- topicConnection.start();
- drainQueue( );
- }
-
- Thread sendThread = new Thread() {
- public void run() {
- try {
-
-
- TopicSession session =
topicConnection.createTopicSession(transacted!=TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
- Topic topic =
(Topic)context.lookup(TEST_TOPIC);
-
- TopicPublisher publisher =
session.createPublisher(topic);
-
- waitForSynchMessage();
-
- BytesMessage message =
session.createBytesMessage();
-
message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
-
- long startTime = System.currentTimeMillis();
- for( int i=0; i < PERFORMANCE_TEST_ITERATIONS;
i++) {
- publisher.publish(topic, message,
persistence, 4, 0);
- //System.out.println(" Sent #"+i);
- if( transacted == TRANS_INDIVIDUAL )
- session.commit();
- }
-
- if( transacted == TRANS_TOTAL )
- session.commit();
-
- long endTime = System.currentTimeMillis();
-
- session.close();
-
- long pTime = endTime-startTime;
- System.out.println(" sent all messages in
"+((double)pTime/1000)+" seconds. ");
- } catch ( Exception e ) {
- e.printStackTrace();
- }
- }
- };
-
- Thread recvThread = new Thread() {
- public void run() {
- try {
-
- TopicSession session =
topicConnection.createTopicSession(transacted!=TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
- Topic topic =
(Topic)context.lookup(TEST_TOPIC);
- TopicSubscriber subscriber =
session.createSubscriber(topic);
-
- sendSynchMessage();
-
- long startTime = System.currentTimeMillis();
- for( int i=0; i < PERFORMANCE_TEST_ITERATIONS;
i++) {
- subscriber.receive();
- //System.out.println(" Received #"+i);
- if( transacted == TRANS_INDIVIDUAL )
- session.commit();
- }
-
- if( transacted == TRANS_TOTAL )
- session.commit();
-
- long endTime = System.currentTimeMillis();
-
- session.close();
-
- long pTime = endTime-startTime;
- System.out.println(" received all messages in
"+((double)pTime/1000)+" seconds. ");
-
- } catch ( Exception e ) {
- e.printStackTrace();
- }
- }
- };
-
- System.out.println(" This test will send
"+PERFORMANCE_TEST_ITERATIONS+" "
-
+(persistence==DeliveryMode.PERSISTENT?"persistent":"non-persistent")+" messages. Each
with a payload of "
- +((double)PERFORMANCE_TEST_DATA_PAYLOAD.length/1024)+"Kb"
- +" Session is "+TRANS_DESC[transacted]+" transacted");
- long startTime = System.currentTimeMillis();
- sendThread.start();
- recvThread.start();
- sendThread.join();
- recvThread.join();
- long endTime = System.currentTimeMillis();
- long pTime = endTime-startTime;
- System.out.println(" All threads finished after:
"+((double)pTime/1000)+" seconds. ");
-
- }
-
- protected void setUp() throws Exception {
-
- if( context == null ) {
-
- context = new InitialContext();
-
- QueueConnectionFactory queueFactory = (QueueConnectionFactory)
context.lookup(QUEUE_FACTORY);
- queueConnection = queueFactory.createQueueConnection();
-
- TopicConnectionFactory topicFactory =
(TopicConnectionFactory)context.lookup(TOPIC_FACTORY);
- topicConnection = topicFactory.createTopicConnection();
-
- System.out.println("Connection to JBossMQ established.");
- }
-
- }
-
- public void testAsynchQueuePerformance() throws Exception {
-
- System.out.println("Starting AsynchQueuePerformance test");
-
- runAsynchQueuePerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
- runAsynchQueuePerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
- runAsynchQueuePerformance(TRANS_INDIVIDUAL,
DeliveryMode.NON_PERSISTENT);
- runAsynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
- runAsynchQueuePerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
- runAsynchQueuePerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
-
- System.out.println("AsynchQueuePerformance passed");
- }
-
- public void testAsynchTopicPerformance() throws Exception {
-
- System.out.println("Starting AsynchTopicPerformance test");
-
- runAsynchTopicPerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
- runAsynchTopicPerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
- runAsynchTopicPerformance(TRANS_INDIVIDUAL,
DeliveryMode.NON_PERSISTENT);
- runAsynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
- runAsynchTopicPerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
- runAsynchTopicPerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
-
- System.out.println("AsynchTopicPerformance passed");
- }
-
- public void testSynchQueuePerformance() throws Exception {
-
- System.out.println("Starting SynchQueuePerformance test");
-
- runSynchQueuePerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
- runSynchQueuePerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
- runSynchQueuePerformance(TRANS_INDIVIDUAL,
DeliveryMode.NON_PERSISTENT);
- runSynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
- runSynchQueuePerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
- runSynchQueuePerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
-
- System.out.println("SynchQueuePerformance passed");
- }
-
- public void testSynchTopicPerformance() throws Exception {
-
- System.out.println("Starting SynchTopicPerformance test");
-
- runSynchTopicPerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
- runSynchTopicPerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
- runSynchTopicPerformance(TRANS_INDIVIDUAL,
DeliveryMode.NON_PERSISTENT);
- runSynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
- runSynchTopicPerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
- runSynchTopicPerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
+ if (i >= iterationCount)
+ {
+ long endTime = System.currentTimeMillis();
+ long pTime = endTime - startTime;
+ log.debug(" received all messages in " + ((double)pTime / 1000)
+ " seconds. ");
+
+ synchronized (this)
+ {
+ this.notify();
+ }
+ }
+ }
+ };
+
+ getLog().debug(" This test will send " + getIterationCount() + " "
+ + (persistence == DeliveryMode.PERSISTENT ? "persistent" :
"non-persistent") + " messages. Each with a payload of "
+ + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb"
+ + " Session is " + TRANS_DESC[transacted] + " transacted");
+ long startTime = System.currentTimeMillis();
+ sendThread.start();
+ subscriber.setMessageListener(listener);
+ topicConnection.start();
+ sendSynchMessage();
+ synchronized (listener)
+ {
+ listener.wait();
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ session.close();
+ sendThread.join();
+ long endTime = System.currentTimeMillis();
+ long pTime = endTime - startTime;
+ getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + "
seconds. ");
+
+ }
+
+ /**
+ * #Description of the Method
+ *
+ * @param transacted Description of Parameter
+ * @param persistence Description of Parameter
+ * @exception Exception Description of Exception
+ */
+ public void runSynchQueuePerformance(final int transacted, final int
persistence) throws Exception
+ {
+ {
+ queueConnection.start();
+ drainQueue();
+ }
+ final int iterationCount = getIterationCount();
+ final Category log = getLog();
+
+ Thread sendThread =
+ new Thread()
+ {
+ /**
+ * Main processing method for the JBossMQPerfStressTestCase object
+ */
+ public void run()
+ {
+ try
+ {
+ QueueSession session =
queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+ QueueSender sender = session.createSender(queue);
+
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < iterationCount; i++)
+ {
+ sender.send(queue, message, persistence, 4, 0);
+ //getLog().debug(" Sent #"+i);
+ if (transacted == TRANS_INDIVIDUAL)
+ {
+ session.commit();
+ }
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ session.close();
+
+ long endTime = System.currentTimeMillis();
+
+ long pTime = endTime - startTime;
+ log.debug(" sent all messages in " + ((double)pTime / 1000) + "
seconds. ");
+ }
+ catch (Exception e)
+ {
+ log.error("error", e);
+ }
+ }
+ };
+
+ Thread recvThread =
+ new Thread()
+ {
+ /**
+ * Main processing method for the JBossMQPerfStressTestCase object
+ */
+ public void run()
+ {
+ try
+ {
+
+ QueueSession session =
queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+ QueueReceiver receiver = session.createReceiver(queue);
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < iterationCount; i++)
+ {
+ receiver.receive();
+ //getLog().debug(" Received #"+i);
+ if (transacted == TRANS_INDIVIDUAL)
+ {
+ session.commit();
+ }
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ session.close();
+
+ long pTime = endTime - startTime;
+ log.debug(" received all messages in " + ((double)pTime / 1000)
+ " seconds. ");
+
+ }
+ catch (Exception e)
+ {
+ log.error("error", e);
+ }
+ }
+ };
+
+ getLog().debug(" This test will send " + getIterationCount() + " "
+ + (persistence == DeliveryMode.PERSISTENT ? "persistent" :
"non-persistent") + " messages. Each with a payload of "
+ + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb"
+ + " Session is " + TRANS_DESC[transacted] + " transacted");
+ long startTime = System.currentTimeMillis();
+ sendThread.start();
+ recvThread.start();
+ sendThread.join();
+ recvThread.join();
+ long endTime = System.currentTimeMillis();
+ long pTime = endTime - startTime;
+ getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + "
seconds. ");
+
+ }
+
+ /**
+ * #Description of the Method
+ *
+ * @param transacted Description of Parameter
+ * @param persistence Description of Parameter
+ * @exception Exception Description of Exception
+ */
+ public void runSynchTopicPerformance(final int transacted, final int
persistence) throws Exception
+ {
+ {
+ queueConnection.start();
+ topicConnection.start();
+ drainQueue();
+ }
+ final int iterationCount = getIterationCount();
+ final Category log = getLog();
+
+ Thread sendThread =
+ new Thread()
+ {
+ /**
+ * Main processing method for the JBossMQPerfStressTestCase object
+ */
+ public void run()
+ {
+ try
+ {
+
+ TopicSession session =
topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = (Topic)context.lookup(TEST_TOPIC);
+
+ TopicPublisher publisher = session.createPublisher(topic);
+
+ waitForSynchMessage();
+
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < iterationCount; i++)
+ {
+ publisher.publish(topic, message, persistence, 4, 0);
+ //getLog().debug(" Sent #"+i);
+ if (transacted == TRANS_INDIVIDUAL)
+ {
+ session.commit();
+ }
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ session.close();
+
+ long pTime = endTime - startTime;
+ log.debug(" sent all messages in " + ((double)pTime / 1000) + "
seconds. ");
+ }
+ catch (Exception e)
+ {
+ log.error("error", e);
+ }
+ }
+ };
+
+ Thread recvThread =
+ new Thread()
+ {
+ /**
+ * Main processing method for the JBossMQPerfStressTestCase object
+ */
+ public void run()
+ {
+ try
+ {
+
+ TopicSession session =
topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = (Topic)context.lookup(TEST_TOPIC);
+ TopicSubscriber subscriber = session.createSubscriber(topic);
+
+ sendSynchMessage();
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < iterationCount; i++)
+ {
+ subscriber.receive();
+ //getLog().debug(" Received #"+i);
+ if (transacted == TRANS_INDIVIDUAL)
+ {
+ session.commit();
+ }
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ session.close();
+
+ long pTime = endTime - startTime;
+ log.debug(" received all messages in " + ((double)pTime / 1000)
+ " seconds. ");
+
+ }
+ catch (Exception e)
+ {
+ log.error("error", e);
+ }
+ }
+ };
+
+ getLog().debug(" This test will send " + getIterationCount() + " "
+ + (persistence == DeliveryMode.PERSISTENT ? "persistent" :
"non-persistent") + " messages. Each with a payload of "
+ + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb"
+ + " Session is " + TRANS_DESC[transacted] + " transacted");
+ long startTime = System.currentTimeMillis();
+ sendThread.start();
+ recvThread.start();
+ sendThread.join();
+ recvThread.join();
+ long endTime = System.currentTimeMillis();
+ long pTime = endTime - startTime;
+ getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + "
seconds. ");
+
+ }
+
+ /**
+ * A unit test for JUnit
+ *
+ * @exception Exception Description of Exception
+ */
+ public void testAsynchQueuePerformance() throws Exception
+ {
+
+ getLog().debug("Starting AsynchQueuePerformance test");
+
+ runAsynchQueuePerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
+ runAsynchQueuePerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
+ runAsynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT);
+ runAsynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
+ runAsynchQueuePerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
+ runAsynchQueuePerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
+
+ getLog().debug("AsynchQueuePerformance passed");
+ }
+
+ /**
+ * A unit test for JUnit
+ *
+ * @exception Exception Description of Exception
+ */
+ public void testAsynchTopicPerformance() throws Exception
+ {
+
+ getLog().debug("Starting AsynchTopicPerformance test");
+
+ runAsynchTopicPerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
+ runAsynchTopicPerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
+ runAsynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT);
+ runAsynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
+ runAsynchTopicPerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
+ runAsynchTopicPerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
+
+ getLog().debug("AsynchTopicPerformance passed");
+ }
+
+ /**
+ * A unit test for JUnit
+ *
+ * @exception Exception Description of Exception
+ */
+ public void testSynchQueuePerformance() throws Exception
+ {
+
+ getLog().debug("Starting SynchQueuePerformance test");
+
+ runSynchQueuePerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
+ runSynchQueuePerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
+ runSynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT);
+ runSynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
+ runSynchQueuePerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
+ runSynchQueuePerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
+
+ getLog().debug("SynchQueuePerformance passed");
+ }
+
+ /**
+ * A unit test for JUnit
+ *
+ * @exception Exception Description of Exception
+ */
+ public void testSynchTopicPerformance() throws Exception
+ {
+
+ getLog().debug("Starting SynchTopicPerformance test");
+
+ runSynchTopicPerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
+ runSynchTopicPerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
+ runSynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT);
+ runSynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
+ runSynchTopicPerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
+ runSynchTopicPerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
+
+ getLog().debug("SynchTopicPerformance passed");
+ }
+
+ /**
+ * The JUnit setup method
+ *
+ * @exception Exception Description of Exception
+ */
+ protected void setUp() throws Exception
+ {
+
+ if (context == null)
+ {
+
+ context = new InitialContext();
+
+ QueueConnectionFactory queueFactory =
(QueueConnectionFactory)context.lookup(QUEUE_FACTORY);
+ queueConnection = queueFactory.createQueueConnection();
+
+ TopicConnectionFactory topicFactory =
(TopicConnectionFactory)context.lookup(TOPIC_FACTORY);
+ topicConnection = topicFactory.createTopicConnection();
+
+ getLog().debug("Connection to JBossMQ established.");
+ }
+
+ }
+
+
+ // Emptys out all the messages in a queue
+ private void drainQueue() throws Exception
+ {
+
+ QueueSession session = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+ QueueReceiver receiver = session.createReceiver(queue);
+ Message message = receiver.receive(2000);
+ int c = 0;
+ while (message != null)
+ {
+ message = receiver.receive(2000);
+ c++;
+ }
+
+ if (c != 0)
+ {
+ getLog().debug(" Drained " + c + " messages from the queue");
+ }
+
+ session.close();
+
+ }
+
+ private void waitForSynchMessage() throws Exception
+ {
+ QueueSession session = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+ QueueReceiver receiver = session.createReceiver(queue);
+ receiver.receive();
+ session.close();
+ }
+
+ private void sendSynchMessage() throws Exception
+ {
+ QueueSession session = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+ QueueSender sender = session.createSender(queue);
+
+ Message message = session.createMessage();
+ sender.send(message);
- System.out.println("SynchTopicPerformance passed");
- }
+ session.close();
+ }
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development