Author: dejanb
Date: Wed Mar 31 12:08:39 2010
New Revision: 929490

URL: http://svn.apache.org/viewvc?rev=929490&view=rev
Log:
fixing package names

Added:
    activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/
    
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java
    
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java
    
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java
    
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java
    
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java
    
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java
    activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml
Removed:
    activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/activemq/
Modified:
    activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml

Added: 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java
URL: 
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java?rev=929490&view=auto
==============================================================================
--- 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java
 (added)
+++ 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java
 Wed Mar 31 12:08:39 2010
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.systest;
+
+
+import java.util.Random;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.connection.SingleConnectionFactory;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+public class ConsumerThread extends Thread {   
+       private DefaultMessageListenerContainer container;
+       private MessageDrivenPojo messageListener;      
+       private boolean run;
+       private String destination;
+       private ConnectionFactory connectionFactory;
+       private boolean durable;                        
+       private int concurrentConsumers;
+       private boolean sessionTransacted;
+       private boolean pubSubDomain;
+       private boolean running;
+       private Log log = LogFactory.getLog(ConsumerThread.class);
+       private int numberOfQueues;
+       private String consumerName;
+       
+       @Override
+       public void run() {             
+               run = true;
+               createContainer();              
+               container.initialize();                         
+               container.start();
+               
+               running = true;
+               
+               while (run) {
+                       try {
+                               Thread.sleep(100);
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();                    
+                       }                       
+               }
+               
+               container.stop();
+               container.destroy();
+               
+               if (connectionFactory instanceof SingleConnectionFactory) {
+                       ((SingleConnectionFactory)connectionFactory).destroy();
+               }
+               
+               log.info("ConsumerThread closing down");
+       }
+
+       private DefaultMessageListenerContainer createContainer() {
+               Random generator = new Random(consumerName.hashCode());
+               int queueSuffix = generator.nextInt(numberOfQueues);
+               
+               
+               container = new DefaultMessageListenerContainer();              
                
+               container.setPubSubDomain(pubSubDomain);
+               container.setDestinationName(destination + queueSuffix);        
                        
+               container.setMessageListener(messageListener);
+               container.setConnectionFactory(connectionFactory);      
+               container.setConcurrentConsumers(concurrentConsumers);
+               container.setSessionTransacted(sessionTransacted);
+
+               container.afterPropertiesSet();
+               log.info("subscribing to " + destination + queueSuffix);
+               return container;
+       }
+               
+       /**
+        * @param messageListener the messageListener to set
+        */
+       public void setMessageDrivenPojo(MessageDrivenPojo messageListener) {
+               this.messageListener = messageListener;
+       }
+
+       /**
+        * @param run the run to set
+        */
+       public void setRun(boolean run) {
+               this.run = run;
+       }
+
+       /**
+        * @param destination the destination to set
+        */
+       public void setDestination(String destination) {
+               this.destination = destination;
+       }
+       
+       public void setNumberOfQueues(int no) {
+               this.numberOfQueues = no;
+       }
+       
+       public int getNumberOfQueues() {
+               return this.numberOfQueues;
+       }
+       
+
+       public void setConsumerName(String name) {
+               this.consumerName = name;
+       }
+
+       /**
+        * @param connectionFactory the connectionFactory to set
+        */
+       public void setConnectionFactory(ConnectionFactory connectionFactory) {
+               this.connectionFactory = connectionFactory;
+       }
+
+       /**
+        * @param durable the durable to set
+        */
+       public void setDurable(boolean durable) {
+               this.durable = durable;
+       }
+
+       /**
+        * @param concurrentConsumers the concurrentConsumers to set
+        */
+       public void setConcurrentConsumers(int concurrentConsumers) {
+               this.concurrentConsumers = concurrentConsumers;
+       }
+
+       /**
+        * @param sessionTransacted the sessionTransacted to set
+        */
+       public void setSessionTransacted(boolean sessionTransacted) {
+               this.sessionTransacted = sessionTransacted;
+       }
+
+       /**
+        * @param pubSubDomain the pubSubDomain to set
+        */
+       public void setPubSubDomain(boolean pubSubDomain) {
+               this.pubSubDomain = pubSubDomain;
+       }
+       
+       /**
+        * @return the messageListener
+        */
+       public MessageDrivenPojo getMessageDrivenPojo() {
+               return messageListener;
+       }       
+       
+       public boolean isRunning() {
+               return running;
+       }
+}

Added: 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java
URL: 
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java?rev=929490&view=auto
==============================================================================
--- 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java
 (added)
+++ 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java
 Wed Mar 31 12:08:39 2010
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.systest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.ConnectionFactory;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class JDBCSpringTest extends TestCase { 
+       
+       private static Log log = LogFactory.getLog(JDBCSpringTest.class);
+       
+      int numberOfConsumerThreads = 20;
+      int numberOfProducerThreads = 20;
+      int numberOfMessages = 50;
+      int numberOfQueues = 5;
+      String url = "tcp://localhost:61616";
+      String config = "xbean:activemq-spring-jdbc.xml";
+       
+      BrokerService broker;
+      
+    public void setUp() throws Exception {
+       broker = BrokerFactory.createBroker(config);
+       broker.start();
+       broker.waitUntilStarted();
+    }
+    
+    
+       protected void tearDown() throws Exception {
+               broker.stop();
+               broker.waitUntilStopped();
+       }
+
+
+       public void testJDBCSpringTest() throws Exception {
+               log.info("Using " + numberOfConsumerThreads + " consumers, " + 
+                               numberOfProducerThreads + " producers, " + 
+                               numberOfMessages + " messages per publisher, 
and " +
+                               numberOfQueues + " queues.");
+               
+               ConnectionFactory connectionFactory;
+
+               ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+               prefetch.setQueuePrefetch(1);
+               ActiveMQConnectionFactory       amq = new 
ActiveMQConnectionFactory(url);
+               amq.setPrefetchPolicy(prefetch);
+         
+               connectionFactory = new PooledConnectionFactory(amq);
+               
((PooledConnectionFactory)connectionFactory).setMaxConnections(5);
+
+
+               StringBuffer buffer = new StringBuffer();               
+               for (int i=0; i<2048; i++) {
+                       buffer.append(".");
+               }               
+               String twoKbMessage = buffer.toString();
+               
+               List<ProducerThread> ProducerThreads = new 
ArrayList<ProducerThread>();
+               for (int i=0; i<numberOfProducerThreads; i++) {
+                       ProducerThread thread = new ProducerThread();
+                       thread.setMessage(twoKbMessage);
+                       thread.setNumberOfMessagesToSend(numberOfMessages);
+                       thread.setNumberOfQueues(numberOfQueues);
+                       thread.setQueuePrefix("AMQ-2436.queue.");
+                       thread.setConnectionFactory(connectionFactory);
+                       //thread.setSendDelay(100);
+                       ProducerThreads.add(thread);
+               }
+               
+               List<Thread> ConsumerThreads = new ArrayList<Thread>();
+               for (int i=0; i<numberOfConsumerThreads; i++) {
+                       ConsumerThread thread = new ConsumerThread();
+                       MessageDrivenPojo mdp1 = new MessageDrivenPojo();
+                       thread.setMessageDrivenPojo(mdp1);
+                       thread.setConcurrentConsumers(1);
+                       thread.setConnectionFactory(connectionFactory);
+                       thread.setDestination("AMQ-2436.queue.");
+                       thread.setPubSubDomain(false);
+                       thread.setSessionTransacted(true);
+                       thread.setNumberOfQueues(numberOfQueues);
+                       thread.setConsumerName("consumer" + i);
+                       ConsumerThreads.add(thread);
+                       thread.start();
+               }
+               
+               
+               for (ProducerThread thread : ProducerThreads) {
+                       thread.start();
+               }
+               
+               boolean finished = false;       
+               int previous = 0;
+               while (!finished) {
+                    
+                       int totalMessages = 0;  
+                       for (Thread thread : ConsumerThreads) {
+                               totalMessages += 
((ConsumerThread)thread).getMessageDrivenPojo().getMessageCount();
+                       }
+                       log.info(totalMessages + " received so far...");
+                       if (totalMessages != 0 && previous == totalMessages) {
+                               for (Thread thread : ConsumerThreads) {
+                                       ((ConsumerThread)thread).setRun(false);
+                               }
+                               fail("Received " + totalMessages + ", expected 
" + (numberOfMessages * numberOfProducerThreads));
+                       }
+                       previous = totalMessages;
+                       
+                       if (totalMessages >= (numberOfMessages * 
numberOfProducerThreads)) {
+                               finished = true;
+                               log.info("Received all " + totalMessages + " 
messages. Finishing.");
+                               
+                               for (Thread thread : ConsumerThreads) {
+                                       ((ConsumerThread)thread).setRun(false);
+                               }
+                               for (Thread thread : ConsumerThreads) {
+                                       thread.join();
+                               }
+
+                       } else {
+                               Thread.sleep(1000);
+                       }
+               }
+       }       
+    
+}

Added: 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java
URL: 
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java?rev=929490&view=auto
==============================================================================
--- 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java
 (added)
+++ 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java
 Wed Mar 31 12:08:39 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.systest;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class MessageDrivenPojo implements MessageListener, Serializable {
+       private Log log = LogFactory.getLog(MessageDrivenPojo.class);
+       private AtomicInteger messageCount = new AtomicInteger();       
+
+       /*
+        * (non-Javadoc)
+        * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
+        */
+       public void onMessage(Message message) {                                
+               messageCount.incrementAndGet();
+               
+               if (log.isDebugEnabled()) {
+                       try {
+                               logMessage(message);
+                       } catch (Exception e) {
+                               log.error("Error:", e);
+                       }
+               }
+               
+               try {
+                       Thread.sleep(200);
+               } catch (InterruptedException ex ) {
+                       log.error(ex);
+               }
+       }
+       
+       private void logMessage(Message message) throws Exception {
+               StringBuffer buffer = new StringBuffer();
+               buffer.append("\nJMSMessageID:");
+               buffer.append(message.getJMSMessageID());
+               buffer.append("\nJMSCorrelationID:");
+               buffer.append(message.getJMSMessageID());
+               buffer.append("\nMessage Contents:\n");
+               
+               if (message instanceof TextMessage) { 
+                       buffer.append(((TextMessage)message).getText());
+               } else {
+                       buffer.append(message.toString());
+               }
+               
+               log.debug(buffer.toString());
+       }
+
+       /**
+        * @return the stats
+        */
+       protected int getMessageCount() {
+               return messageCount.get();
+       }               
+}

Added: 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java
URL: 
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java?rev=929490&view=auto
==============================================================================
--- 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java
 (added)
+++ 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java
 Wed Mar 31 12:08:39 2010
@@ -0,0 +1,13 @@
+package org.apache.activemq.systest;
+
+public class NIOSpringTest extends JDBCSpringTest {
+       
+    public void setUp() throws Exception {
+        url = "nio://localhost:61616";
+        config = "xbean:activemq-spring-nio.xml";
+               super.setUp();
+       }
+    
+    
+       
+}

Added: 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java
URL: 
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java?rev=929490&view=auto
==============================================================================
--- 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java
 (added)
+++ 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java
 Wed Mar 31 12:08:39 2010
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.systest;
+
+import java.util.Random;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ProducerThread extends Thread {   
+       private JmsTemplate jmsTemplate;
+       private int numberOfTopics;
+       private int numberOfMessagesToSend;     
+       private int messagesSent;
+       private Random generator;
+       private String queuePrefix;
+       private ConnectionFactory connectionFactory;
+       private String message; 
+       private MessageCreator messageCreator;
+       private int sendDelay;
+       private Log log = LogFactory.getLog(ProducerThread.class);
+       
+       @Override
+       public void run() {
+               initialize();
+               Random generator = new 
Random(Thread.currentThread().getName().hashCode());
+               
+               while (messagesSent < numberOfMessagesToSend) {
+                       int queueSuffix = generator.nextInt(numberOfTopics);    
                
+                       jmsTemplate.send(queuePrefix + queueSuffix, 
messageCreator);                    
+                       messagesSent++;
+                       log.debug(Thread.currentThread().getName() + 
+                                       ": sent msg #" + messagesSent);
+                       try {
+                               Thread.sleep(sendDelay);
+                       } catch (InterruptedException e) {                      
        
+                               e.printStackTrace();
+                       }
+               }
+               
+               log.info("ProducerThread shutting down.");
+       }
+       
+       private void initialize() {
+               jmsTemplate = new JmsTemplate();
+               jmsTemplate.setPubSubDomain(false);
+               jmsTemplate.setConnectionFactory(connectionFactory);
+               
+               messageCreator = new MessageCreator() {
+                       public Message createMessage(Session session) throws 
JMSException {
+                               return session.createTextMessage(message);
+                       }                       
+               };
+       }
+
+       /**
+        * @param numberOfTopics the numberOfTopics to set
+        */
+       protected void setNumberOfQueues(int numberOfTopics) {
+               this.numberOfTopics = numberOfTopics;
+       }
+       /**
+        * @param queuePrefix the queuePrefix to set
+        */
+       protected void setQueuePrefix(String queuePrefix) {
+               this.queuePrefix = queuePrefix;
+       }
+       /**
+        * @param connectionFactory the connectionFactory to set
+        */
+       protected void setConnectionFactory(ConnectionFactory 
connectionFactory) {
+               this.connectionFactory = connectionFactory;
+       }
+       /**
+        * @param message the message to set
+        */
+       protected void setMessage(String message) {
+               this.message = message;
+       }
+
+       /**
+        * @param numberOfMessagesToSend the numberOfMessagesToSend to set
+        */
+       protected void setNumberOfMessagesToSend(int numberOfMessagesToSend) {
+               this.numberOfMessagesToSend = numberOfMessagesToSend;
+       }
+       
+       public void setSendDelay(int sendDelay) {
+               this.sendDelay = sendDelay;
+       }
+       
+       public int getMessagesSent() {
+               return messagesSent;
+       }
+}

Added: 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java
URL: 
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java?rev=929490&view=auto
==============================================================================
--- 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java
 (added)
+++ 
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java
 Wed Mar 31 12:08:39 2010
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.systest;
+
+import java.net.Socket;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.transport.stomp.StompConnection;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import static java.lang.String.*;
+import static java.util.concurrent.TimeUnit.*;
+
+/**
+ * 
+ * Simulates load on the Stomp connector. All producers/consumers open/close a
+ * connection on every command Configurable number of producers/consumers, 
their
+ * speed and duration of test
+ * 
+ * Start a broker with the desired configuration to test and then run this test
+ * 
+ */
+public class StompLoadTest extends TestCase {
+
+    private static final Log LOG = LogFactory.getLog(StompLoadTest.class);
+
+    final int producerSleep = 10;
+    final int consumerSleep = 10;
+    final int msgCount = 10000;
+    final int producerCount = 10;
+    final int consumerCount = 10;
+    final int testTime = 30 * 60 * 1000;
+    final int sampleInterval = 5 * 1000;
+    final String bindAddress = "stomp://0.0.0.0:61613";
+
+    AtomicLong producerCounter = new AtomicLong();
+    AtomicLong consumerCounter = new AtomicLong();
+    
+    public void testLoad() throws Exception {
+
+        for (int i = 0; i < producerCount; i++) {
+            ProducerThread producerThread = new ProducerThread("producer" + i);
+            producerThread.start();
+        }
+
+        for (int i = 0; i < consumerCount; i++) {
+            Thread consumerThread = new ConsumerThread("consumer" + i);
+            consumerThread.start();
+        }
+
+        int samples = testTime/sampleInterval;
+        long start = System.nanoTime();
+        for( int i=0; i < samples; i++ ) {
+            Thread.sleep(sampleInterval);
+            long end = System.nanoTime();
+            printRate("Producer", producerCounter, end-start);
+            printRate("Consumer", consumerCounter, end-start);
+            start = end;
+        }
+    }
+
+    static final long NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS);
+    
+    private void printRate(String name, AtomicLong counter, long nanos) {
+        long c = counter.getAndSet(0);
+        float rate_per_second = ((1.0f*c/nanos)*NANOS_PER_SECOND);
+        LOG.info(format("%s rate: %,.3f per second", name, rate_per_second));
+    }
+
+    public void connect(StompConnection conn) throws Exception {
+        URI connectUri = new URI(bindAddress);
+        conn.open(new Socket(connectUri.getHost(), connectUri.getPort()));
+        conn.connect("", "");
+    }
+
+    class ProducerThread extends Thread {
+
+        String name;
+
+        public ProducerThread(String name) {
+            this.name = name;
+        }
+
+        public void run() {
+            for (int i = 0; i < msgCount; i++) {
+                StompConnection conn = new StompConnection();
+                try {
+                    connect(conn);
+                    String msg = "Message #" + i+" from "+name;
+                    conn.send("/queue/test", msg);
+                    producerCounter.incrementAndGet();
+                    Thread.sleep(producerSleep);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+                    try {
+                        conn.disconnect();
+                    } catch (Exception ignore) {
+                    }
+                }
+            }
+        }
+    }
+
+    class ConsumerThread extends Thread {
+
+        String name;
+
+        public ConsumerThread(String name) {
+            this.name = name;
+        }
+
+        public void run() {
+            for (int i = 0; i < msgCount; i++) {
+                StompConnection conn = new StompConnection();
+                try {
+                    connect(conn);
+                    HashMap<String, String> headers = new HashMap<String, 
String>();
+                    headers.put("activemq.prefetchSize", "1");
+                    conn.subscribe("/queue/test", "client", headers);
+                    StompFrame frame = conn.receive(1*1000);
+                    conn.ack(frame);
+                    consumerCounter.incrementAndGet();
+                    Thread.sleep(consumerSleep);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+                    try {
+                        conn.disconnect();
+                    } catch (Exception ignore) {
+                    }
+                    try {
+                        conn.close();
+                    } catch (Exception ignore) {
+                    }
+                }
+            }
+        }
+    }
+
+}

Added: 
activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml
URL: 
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml?rev=929490&view=auto
==============================================================================
--- activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml 
(added)
+++ activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml 
Wed Mar 31 12:08:39 2010
@@ -0,0 +1,54 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+  xmlns="http://www.springframework.org/schema/beans";
+  xmlns:amq="http://activemq.apache.org/schema/core";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>
+
+    <broker xmlns="http://activemq.apache.org/schema/core"; 
brokerName="localhost" dataDirectory="${activemq.base}/data">
+
+        <managementContext>
+            <managementContext createConnector="false"/>
+        </managementContext>
+
+        <persistenceAdapter>
+            <kahaDB directory="target/kahadb"/>
+        </persistenceAdapter>
+              
+        <destinationPolicy>
+            <policyMap>
+              <policyEntries>
+                <policyEntry topic=">" producerFlowControl="true" 
memoryLimit="1mb">
+                  <pendingSubscriberPolicy>
+                    <vmCursor />
+                  </pendingSubscriberPolicy>
+                </policyEntry>
+                <policyEntry queue=">" producerFlowControl="true" 
memoryLimit="1mb">
+                </policyEntry>
+              </policyEntries>
+            </policyMap>
+        </destinationPolicy> 
+        
+        <transportConnectors>
+            <transportConnector name="nio" uri="nio://0.0.0.0:61616"/>
+        </transportConnectors>
+
+    </broker>
+    
+</beans>

Modified: activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml
URL: 
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml?rev=929490&r1=929489&r2=929490&view=diff
==============================================================================
--- activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml 
(original)
+++ activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml Wed 
Mar 31 12:08:39 2010
@@ -45,7 +45,7 @@
 
        <bean
                id="testMDB"
-               class="org.apache.activemq.activemq.systest.camel.TestMDB"
+               class="org.apache.activemq.systest.camel.TestMDB"
                init-method="init"
                destroy-method="destroy">
        </bean>


Reply via email to