Author: dejanb
Date: Wed Mar 31 12:08:39 2010
New Revision: 929490
URL: http://svn.apache.org/viewvc?rev=929490&view=rev
Log:
fixing package names
Added:
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java
activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml
Removed:
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/activemq/
Modified:
activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml
Added:
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java?rev=929490&view=auto
==============================================================================
---
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java
(added)
+++
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ConsumerThread.java
Wed Mar 31 12:08:39 2010
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.systest;
+
+
+import java.util.Random;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.connection.SingleConnectionFactory;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+public class ConsumerThread extends Thread {
+ private DefaultMessageListenerContainer container;
+ private MessageDrivenPojo messageListener;
+ private boolean run;
+ private String destination;
+ private ConnectionFactory connectionFactory;
+ private boolean durable;
+ private int concurrentConsumers;
+ private boolean sessionTransacted;
+ private boolean pubSubDomain;
+ private boolean running;
+ private Log log = LogFactory.getLog(ConsumerThread.class);
+ private int numberOfQueues;
+ private String consumerName;
+
+ @Override
+ public void run() {
+ run = true;
+ createContainer();
+ container.initialize();
+ container.start();
+
+ running = true;
+
+ while (run) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ container.stop();
+ container.destroy();
+
+ if (connectionFactory instanceof SingleConnectionFactory) {
+ ((SingleConnectionFactory)connectionFactory).destroy();
+ }
+
+ log.info("ConsumerThread closing down");
+ }
+
+ private DefaultMessageListenerContainer createContainer() {
+ Random generator = new Random(consumerName.hashCode());
+ int queueSuffix = generator.nextInt(numberOfQueues);
+
+
+ container = new DefaultMessageListenerContainer();
+ container.setPubSubDomain(pubSubDomain);
+ container.setDestinationName(destination + queueSuffix);
+ container.setMessageListener(messageListener);
+ container.setConnectionFactory(connectionFactory);
+ container.setConcurrentConsumers(concurrentConsumers);
+ container.setSessionTransacted(sessionTransacted);
+
+ container.afterPropertiesSet();
+ log.info("subscribing to " + destination + queueSuffix);
+ return container;
+ }
+
+ /**
+ * @param messageListener the messageListener to set
+ */
+ public void setMessageDrivenPojo(MessageDrivenPojo messageListener) {
+ this.messageListener = messageListener;
+ }
+
+ /**
+ * @param run the run to set
+ */
+ public void setRun(boolean run) {
+ this.run = run;
+ }
+
+ /**
+ * @param destination the destination to set
+ */
+ public void setDestination(String destination) {
+ this.destination = destination;
+ }
+
+ public void setNumberOfQueues(int no) {
+ this.numberOfQueues = no;
+ }
+
+ public int getNumberOfQueues() {
+ return this.numberOfQueues;
+ }
+
+
+ public void setConsumerName(String name) {
+ this.consumerName = name;
+ }
+
+ /**
+ * @param connectionFactory the connectionFactory to set
+ */
+ public void setConnectionFactory(ConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ /**
+ * @param durable the durable to set
+ */
+ public void setDurable(boolean durable) {
+ this.durable = durable;
+ }
+
+ /**
+ * @param concurrentConsumers the concurrentConsumers to set
+ */
+ public void setConcurrentConsumers(int concurrentConsumers) {
+ this.concurrentConsumers = concurrentConsumers;
+ }
+
+ /**
+ * @param sessionTransacted the sessionTransacted to set
+ */
+ public void setSessionTransacted(boolean sessionTransacted) {
+ this.sessionTransacted = sessionTransacted;
+ }
+
+ /**
+ * @param pubSubDomain the pubSubDomain to set
+ */
+ public void setPubSubDomain(boolean pubSubDomain) {
+ this.pubSubDomain = pubSubDomain;
+ }
+
+ /**
+ * @return the messageListener
+ */
+ public MessageDrivenPojo getMessageDrivenPojo() {
+ return messageListener;
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+}
Added:
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java?rev=929490&view=auto
==============================================================================
---
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java
(added)
+++
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/JDBCSpringTest.java
Wed Mar 31 12:08:39 2010
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.systest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.ConnectionFactory;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class JDBCSpringTest extends TestCase {
+
+ private static Log log = LogFactory.getLog(JDBCSpringTest.class);
+
+ int numberOfConsumerThreads = 20;
+ int numberOfProducerThreads = 20;
+ int numberOfMessages = 50;
+ int numberOfQueues = 5;
+ String url = "tcp://localhost:61616";
+ String config = "xbean:activemq-spring-jdbc.xml";
+
+ BrokerService broker;
+
+ public void setUp() throws Exception {
+ broker = BrokerFactory.createBroker(config);
+ broker.start();
+ broker.waitUntilStarted();
+ }
+
+
+ protected void tearDown() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+
+ public void testJDBCSpringTest() throws Exception {
+ log.info("Using " + numberOfConsumerThreads + " consumers, " +
+ numberOfProducerThreads + " producers, " +
+ numberOfMessages + " messages per publisher,
and " +
+ numberOfQueues + " queues.");
+
+ ConnectionFactory connectionFactory;
+
+ ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+ prefetch.setQueuePrefetch(1);
+ ActiveMQConnectionFactory amq = new
ActiveMQConnectionFactory(url);
+ amq.setPrefetchPolicy(prefetch);
+
+ connectionFactory = new PooledConnectionFactory(amq);
+
((PooledConnectionFactory)connectionFactory).setMaxConnections(5);
+
+
+ StringBuffer buffer = new StringBuffer();
+ for (int i=0; i<2048; i++) {
+ buffer.append(".");
+ }
+ String twoKbMessage = buffer.toString();
+
+ List<ProducerThread> ProducerThreads = new
ArrayList<ProducerThread>();
+ for (int i=0; i<numberOfProducerThreads; i++) {
+ ProducerThread thread = new ProducerThread();
+ thread.setMessage(twoKbMessage);
+ thread.setNumberOfMessagesToSend(numberOfMessages);
+ thread.setNumberOfQueues(numberOfQueues);
+ thread.setQueuePrefix("AMQ-2436.queue.");
+ thread.setConnectionFactory(connectionFactory);
+ //thread.setSendDelay(100);
+ ProducerThreads.add(thread);
+ }
+
+ List<Thread> ConsumerThreads = new ArrayList<Thread>();
+ for (int i=0; i<numberOfConsumerThreads; i++) {
+ ConsumerThread thread = new ConsumerThread();
+ MessageDrivenPojo mdp1 = new MessageDrivenPojo();
+ thread.setMessageDrivenPojo(mdp1);
+ thread.setConcurrentConsumers(1);
+ thread.setConnectionFactory(connectionFactory);
+ thread.setDestination("AMQ-2436.queue.");
+ thread.setPubSubDomain(false);
+ thread.setSessionTransacted(true);
+ thread.setNumberOfQueues(numberOfQueues);
+ thread.setConsumerName("consumer" + i);
+ ConsumerThreads.add(thread);
+ thread.start();
+ }
+
+
+ for (ProducerThread thread : ProducerThreads) {
+ thread.start();
+ }
+
+ boolean finished = false;
+ int previous = 0;
+ while (!finished) {
+
+ int totalMessages = 0;
+ for (Thread thread : ConsumerThreads) {
+ totalMessages +=
((ConsumerThread)thread).getMessageDrivenPojo().getMessageCount();
+ }
+ log.info(totalMessages + " received so far...");
+ if (totalMessages != 0 && previous == totalMessages) {
+ for (Thread thread : ConsumerThreads) {
+ ((ConsumerThread)thread).setRun(false);
+ }
+ fail("Received " + totalMessages + ", expected
" + (numberOfMessages * numberOfProducerThreads));
+ }
+ previous = totalMessages;
+
+ if (totalMessages >= (numberOfMessages *
numberOfProducerThreads)) {
+ finished = true;
+ log.info("Received all " + totalMessages + "
messages. Finishing.");
+
+ for (Thread thread : ConsumerThreads) {
+ ((ConsumerThread)thread).setRun(false);
+ }
+ for (Thread thread : ConsumerThreads) {
+ thread.join();
+ }
+
+ } else {
+ Thread.sleep(1000);
+ }
+ }
+ }
+
+}
Added:
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java?rev=929490&view=auto
==============================================================================
---
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java
(added)
+++
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/MessageDrivenPojo.java
Wed Mar 31 12:08:39 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.systest;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class MessageDrivenPojo implements MessageListener, Serializable {
+ private Log log = LogFactory.getLog(MessageDrivenPojo.class);
+ private AtomicInteger messageCount = new AtomicInteger();
+
+ /*
+ * (non-Javadoc)
+ * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
+ */
+ public void onMessage(Message message) {
+ messageCount.incrementAndGet();
+
+ if (log.isDebugEnabled()) {
+ try {
+ logMessage(message);
+ } catch (Exception e) {
+ log.error("Error:", e);
+ }
+ }
+
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ex ) {
+ log.error(ex);
+ }
+ }
+
+ private void logMessage(Message message) throws Exception {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("\nJMSMessageID:");
+ buffer.append(message.getJMSMessageID());
+ buffer.append("\nJMSCorrelationID:");
+ buffer.append(message.getJMSMessageID());
+ buffer.append("\nMessage Contents:\n");
+
+ if (message instanceof TextMessage) {
+ buffer.append(((TextMessage)message).getText());
+ } else {
+ buffer.append(message.toString());
+ }
+
+ log.debug(buffer.toString());
+ }
+
+ /**
+ * @return the stats
+ */
+ protected int getMessageCount() {
+ return messageCount.get();
+ }
+}
Added:
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java?rev=929490&view=auto
==============================================================================
---
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java
(added)
+++
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/NIOSpringTest.java
Wed Mar 31 12:08:39 2010
@@ -0,0 +1,13 @@
+package org.apache.activemq.systest;
+
+public class NIOSpringTest extends JDBCSpringTest {
+
+ public void setUp() throws Exception {
+ url = "nio://localhost:61616";
+ config = "xbean:activemq-spring-nio.xml";
+ super.setUp();
+ }
+
+
+
+}
Added:
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java?rev=929490&view=auto
==============================================================================
---
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java
(added)
+++
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/ProducerThread.java
Wed Mar 31 12:08:39 2010
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.systest;
+
+import java.util.Random;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ProducerThread extends Thread {
+ private JmsTemplate jmsTemplate;
+ private int numberOfTopics;
+ private int numberOfMessagesToSend;
+ private int messagesSent;
+ private Random generator;
+ private String queuePrefix;
+ private ConnectionFactory connectionFactory;
+ private String message;
+ private MessageCreator messageCreator;
+ private int sendDelay;
+ private Log log = LogFactory.getLog(ProducerThread.class);
+
+ @Override
+ public void run() {
+ initialize();
+ Random generator = new
Random(Thread.currentThread().getName().hashCode());
+
+ while (messagesSent < numberOfMessagesToSend) {
+ int queueSuffix = generator.nextInt(numberOfTopics);
+ jmsTemplate.send(queuePrefix + queueSuffix,
messageCreator);
+ messagesSent++;
+ log.debug(Thread.currentThread().getName() +
+ ": sent msg #" + messagesSent);
+ try {
+ Thread.sleep(sendDelay);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ log.info("ProducerThread shutting down.");
+ }
+
+ private void initialize() {
+ jmsTemplate = new JmsTemplate();
+ jmsTemplate.setPubSubDomain(false);
+ jmsTemplate.setConnectionFactory(connectionFactory);
+
+ messageCreator = new MessageCreator() {
+ public Message createMessage(Session session) throws
JMSException {
+ return session.createTextMessage(message);
+ }
+ };
+ }
+
+ /**
+ * @param numberOfTopics the numberOfTopics to set
+ */
+ protected void setNumberOfQueues(int numberOfTopics) {
+ this.numberOfTopics = numberOfTopics;
+ }
+ /**
+ * @param queuePrefix the queuePrefix to set
+ */
+ protected void setQueuePrefix(String queuePrefix) {
+ this.queuePrefix = queuePrefix;
+ }
+ /**
+ * @param connectionFactory the connectionFactory to set
+ */
+ protected void setConnectionFactory(ConnectionFactory
connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+ /**
+ * @param message the message to set
+ */
+ protected void setMessage(String message) {
+ this.message = message;
+ }
+
+ /**
+ * @param numberOfMessagesToSend the numberOfMessagesToSend to set
+ */
+ protected void setNumberOfMessagesToSend(int numberOfMessagesToSend) {
+ this.numberOfMessagesToSend = numberOfMessagesToSend;
+ }
+
+ public void setSendDelay(int sendDelay) {
+ this.sendDelay = sendDelay;
+ }
+
+ public int getMessagesSent() {
+ return messagesSent;
+ }
+}
Added:
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java?rev=929490&view=auto
==============================================================================
---
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java
(added)
+++
activemq/activemq-systest/trunk/src/test/java/org/apache/activemq/systest/StompLoadTest.java
Wed Mar 31 12:08:39 2010
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.systest;
+
+import java.net.Socket;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.transport.stomp.StompConnection;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import static java.lang.String.*;
+import static java.util.concurrent.TimeUnit.*;
+
+/**
+ *
+ * Simulates load on the Stomp connector. All producers/consumers open/close a
+ * connection on every command Configurable number of producers/consumers,
their
+ * speed and duration of test
+ *
+ * Start a broker with the desired configuration to test and then run this test
+ *
+ */
+public class StompLoadTest extends TestCase {
+
+ private static final Log LOG = LogFactory.getLog(StompLoadTest.class);
+
+ final int producerSleep = 10;
+ final int consumerSleep = 10;
+ final int msgCount = 10000;
+ final int producerCount = 10;
+ final int consumerCount = 10;
+ final int testTime = 30 * 60 * 1000;
+ final int sampleInterval = 5 * 1000;
+ final String bindAddress = "stomp://0.0.0.0:61613";
+
+ AtomicLong producerCounter = new AtomicLong();
+ AtomicLong consumerCounter = new AtomicLong();
+
+ public void testLoad() throws Exception {
+
+ for (int i = 0; i < producerCount; i++) {
+ ProducerThread producerThread = new ProducerThread("producer" + i);
+ producerThread.start();
+ }
+
+ for (int i = 0; i < consumerCount; i++) {
+ Thread consumerThread = new ConsumerThread("consumer" + i);
+ consumerThread.start();
+ }
+
+ int samples = testTime/sampleInterval;
+ long start = System.nanoTime();
+ for( int i=0; i < samples; i++ ) {
+ Thread.sleep(sampleInterval);
+ long end = System.nanoTime();
+ printRate("Producer", producerCounter, end-start);
+ printRate("Consumer", consumerCounter, end-start);
+ start = end;
+ }
+ }
+
+ static final long NANOS_PER_SECOND = NANOSECONDS.convert(1, SECONDS);
+
+ private void printRate(String name, AtomicLong counter, long nanos) {
+ long c = counter.getAndSet(0);
+ float rate_per_second = ((1.0f*c/nanos)*NANOS_PER_SECOND);
+ LOG.info(format("%s rate: %,.3f per second", name, rate_per_second));
+ }
+
+ public void connect(StompConnection conn) throws Exception {
+ URI connectUri = new URI(bindAddress);
+ conn.open(new Socket(connectUri.getHost(), connectUri.getPort()));
+ conn.connect("", "");
+ }
+
+ class ProducerThread extends Thread {
+
+ String name;
+
+ public ProducerThread(String name) {
+ this.name = name;
+ }
+
+ public void run() {
+ for (int i = 0; i < msgCount; i++) {
+ StompConnection conn = new StompConnection();
+ try {
+ connect(conn);
+ String msg = "Message #" + i+" from "+name;
+ conn.send("/queue/test", msg);
+ producerCounter.incrementAndGet();
+ Thread.sleep(producerSleep);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ conn.disconnect();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+ }
+ }
+
+ class ConsumerThread extends Thread {
+
+ String name;
+
+ public ConsumerThread(String name) {
+ this.name = name;
+ }
+
+ public void run() {
+ for (int i = 0; i < msgCount; i++) {
+ StompConnection conn = new StompConnection();
+ try {
+ connect(conn);
+ HashMap<String, String> headers = new HashMap<String,
String>();
+ headers.put("activemq.prefetchSize", "1");
+ conn.subscribe("/queue/test", "client", headers);
+ StompFrame frame = conn.receive(1*1000);
+ conn.ack(frame);
+ consumerCounter.incrementAndGet();
+ Thread.sleep(consumerSleep);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ conn.disconnect();
+ } catch (Exception ignore) {
+ }
+ try {
+ conn.close();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+ }
+ }
+
+}
Added:
activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml
URL:
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml?rev=929490&view=auto
==============================================================================
--- activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml
(added)
+++ activemq/activemq-systest/trunk/src/test/resources/activemq-spring-nio.xml
Wed Mar 31 12:08:39 2010
@@ -0,0 +1,54 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="${activemq.base}/data">
+
+ <managementContext>
+ <managementContext createConnector="false"/>
+ </managementContext>
+
+ <persistenceAdapter>
+ <kahaDB directory="target/kahadb"/>
+ </persistenceAdapter>
+
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry topic=">" producerFlowControl="true"
memoryLimit="1mb">
+ <pendingSubscriberPolicy>
+ <vmCursor />
+ </pendingSubscriberPolicy>
+ </policyEntry>
+ <policyEntry queue=">" producerFlowControl="true"
memoryLimit="1mb">
+ </policyEntry>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+
+ <transportConnectors>
+ <transportConnector name="nio" uri="nio://0.0.0.0:61616"/>
+ </transportConnectors>
+
+ </broker>
+
+</beans>
Modified: activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml
URL:
http://svn.apache.org/viewvc/activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml?rev=929490&r1=929489&r2=929490&view=diff
==============================================================================
--- activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml
(original)
+++ activemq/activemq-systest/trunk/src/test/resources/jmstest-camel.xml Wed
Mar 31 12:08:39 2010
@@ -45,7 +45,7 @@
<bean
id="testMDB"
- class="org.apache.activemq.activemq.systest.camel.TestMDB"
+ class="org.apache.activemq.systest.camel.TestMDB"
init-method="init"
destroy-method="destroy">
</bean>