Repository: activemq Updated Branches: refs/heads/trunk 51566104a -> 46bc26cea
https://issues.apache.org/jira/browse/AMQ-5436 Add support for temp topics and queues to perf tests. This closes #49 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/46bc26ce Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/46bc26ce Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/46bc26ce Branch: refs/heads/trunk Commit: 46bc26cea50f4917ad8ecbf79416a62e5cd4bbb3 Parents: 5156610 Author: Timothy Bish <[email protected]> Authored: Tue Nov 25 15:05:30 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Tue Nov 25 15:05:30 2014 -0500 ---------------------------------------------------------------------- .../apache/activemq/tool/AbstractJmsClient.java | 63 ++++++++++++++---- .../tool/AbstractJmsMeasurableClient.java | 1 + .../apache/activemq/tool/JmsProducerClient.java | 19 ++++++ .../activemq/tool/AbstractJmsClientTest.java | 19 +++++- .../activemq/tool/JmsProducerClientTest.java | 69 ++++++++++++++++++++ 5 files changed, 157 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/46bc26ce/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java ---------------------------------------------------------------------- diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java index d2e38ab..9eb8e8b 100644 --- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java +++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java @@ -19,11 +19,7 @@ package org.apache.activemq.tool; import java.util.ArrayList; import java.util.List; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Session; +import javax.jms.*; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.tool.properties.JmsClientProperties; @@ -36,6 +32,8 @@ public abstract class AbstractJmsClient { private static final String QUEUE_SCHEME = "queue://"; private static final String TOPIC_SCHEME = "topic://"; + private static final String TEMP_QUEUE_SCHEME = "temp-queue://"; + private static final String TEMP_TOPIC_SCHEME = "temp-topic://"; public static final String DESTINATION_SEPARATOR = ","; protected ConnectionFactory factory; @@ -185,8 +183,18 @@ public abstract class AbstractJmsClient { } LOG.info("Creating composite destination: {}", compDestName); - return (destinationType == ActiveMQDestination.TOPIC_TYPE) ? - getSession().createTopic(compDestName) : getSession().createQueue(compDestName); + Destination destination; + Session session = getSession(); + if (destinationType == ActiveMQDestination.TOPIC_TYPE) { + destination = session.createTopic(compDestName); + } else if (destinationType == ActiveMQDestination.QUEUE_TYPE) { + destination = session.createQueue(compDestName); + } else { + throw new UnsupportedOperationException( + "Cannot create composite destinations using temporary queues or topics."); + } + assert (destination != null); + return destination; } private String[] mapToSimpleNames(String[] destNames) { @@ -198,22 +206,30 @@ public abstract class AbstractJmsClient { return simpleNames; } - private String getSimpleName(String destName) { + protected String getSimpleName(String destName) { String simpleName; if (destName.startsWith(QUEUE_SCHEME)) { simpleName = destName.substring(QUEUE_SCHEME.length()); } else if (destName.startsWith(TOPIC_SCHEME)) { simpleName = destName.substring(TOPIC_SCHEME.length()); + } else if (destName.startsWith(TEMP_QUEUE_SCHEME)) { + simpleName = destName.substring(TEMP_QUEUE_SCHEME.length()); + } else if (destName.startsWith(TEMP_TOPIC_SCHEME)) { + simpleName = destName.substring(TEMP_TOPIC_SCHEME.length()); } else { simpleName = destName; } return simpleName; } - private byte getDestinationType(String destName) { + protected byte getDestinationType(String destName) { assert (destName != null); if (destName.startsWith(QUEUE_SCHEME)) { return ActiveMQDestination.QUEUE_TYPE; + } else if (destName.startsWith(TEMP_QUEUE_SCHEME)) { + return ActiveMQDestination.TEMP_QUEUE_TYPE; + } else if (destName.startsWith(TEMP_TOPIC_SCHEME)) { + return ActiveMQDestination.TEMP_TOPIC_TYPE; } else { return ActiveMQDestination.TOPIC_TYPE; } @@ -221,10 +237,34 @@ public abstract class AbstractJmsClient { protected Destination createDestination(String destName) throws JMSException { String simpleName = getSimpleName(destName); - if (getDestinationType(destName) == ActiveMQDestination.QUEUE_TYPE) { + byte destinationType = getDestinationType(destName); + + if (destinationType == ActiveMQDestination.QUEUE_TYPE) { + LOG.info("Creating queue: {}", destName); return getSession().createQueue(simpleName); - } else { + } else if (destinationType == ActiveMQDestination.TOPIC_TYPE) { + LOG.info("Creating topic: {}", destName); return getSession().createTopic(simpleName); + } else { + return createTemporaryDestination(destName); + } + } + + protected Destination createTemporaryDestination(String destName) throws JMSException { + byte destinationType = getDestinationType(destName); + + if (destinationType == ActiveMQDestination.TEMP_QUEUE_TYPE) { + LOG.warn("Creating temporary queue. Requested name ({}) ignored.", destName); + TemporaryQueue temporaryQueue = getSession().createTemporaryQueue(); + LOG.info("Temporary queue created: {}", temporaryQueue.getQueueName()); + return temporaryQueue; + } else if (destinationType == ActiveMQDestination.TEMP_TOPIC_TYPE) { + LOG.warn("Creating temporary topic. Requested name ({}) ignored.", destName); + TemporaryTopic temporaryTopic = getSession().createTemporaryTopic(); + LOG.info("Temporary topic created: {}", temporaryTopic.getTopicName()); + return temporaryTopic; + } else { + throw new IllegalArgumentException("Unrecognized destination type: " + destinationType); } } @@ -237,7 +277,6 @@ public abstract class AbstractJmsClient { * @throws JMSException in case the call to JMS Session.commit() fails. */ public boolean commitTxIfNecessary() throws JMSException { - internalTxCounter++; if (getClient().isSessTransacted()) { if ((internalTxCounter % getClient().getCommitAfterXMsgs()) == 0) { http://git-wip-us.apache.org/repos/asf/activemq/blob/46bc26ce/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsMeasurableClient.java ---------------------------------------------------------------------- diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsMeasurableClient.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsMeasurableClient.java index a9d5598..194b76d 100644 --- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsMeasurableClient.java +++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsMeasurableClient.java @@ -23,6 +23,7 @@ import javax.jms.ConnectionFactory; import org.apache.activemq.tool.sampler.MeasurableClient; public abstract class AbstractJmsMeasurableClient extends AbstractJmsClient implements MeasurableClient { + protected AtomicLong throughput = new AtomicLong(0); public AbstractJmsMeasurableClient(ConnectionFactory factory) { http://git-wip-us.apache.org/repos/asf/activemq/blob/46bc26ce/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java ---------------------------------------------------------------------- diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java index 510d92d..4595ef3 100644 --- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java +++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java @@ -31,6 +31,7 @@ import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.TextMessage; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.tool.properties.JmsClientProperties; import org.apache.activemq.tool.properties.JmsProducerProperties; import org.slf4j.Logger; @@ -322,6 +323,24 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient { client = (JmsProducerProperties)clientProps; } + @Override + protected Destination createTemporaryDestination(String destName) throws JMSException { + String simpleName = getSimpleName(destName); + byte destinationType = getDestinationType(destName); + + // when we produce to temp destinations, we publish to them as + // though they were normal queues or topics + if (destinationType == ActiveMQDestination.TEMP_QUEUE_TYPE) { + LOG.info("Creating queue: {}", destName); + return getSession().createQueue(simpleName); + } else if (destinationType == ActiveMQDestination.TEMP_TOPIC_TYPE) { + LOG.info("Creating topic: {}", destName); + return getSession().createTopic(simpleName); + } else { + throw new IllegalArgumentException("Unrecognized destination type: " + destinationType); + } + } + protected String buildText(String text, int size) { byte[] data = new byte[size - text.length()]; Arrays.fill(data, (byte) 0); http://git-wip-us.apache.org/repos/asf/activemq/blob/46bc26ce/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java ---------------------------------------------------------------------- diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java b/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java index 5e8f792..2a4fcb9 100644 --- a/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java +++ b/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java @@ -1,7 +1,6 @@ package org.apache.activemq.tool; -import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE; -import static org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE; +import static org.apache.activemq.command.ActiveMQDestination.*; import static org.junit.Assert.assertEquals; import java.net.URI; @@ -86,6 +85,18 @@ public class AbstractJmsClientTest { } @Test + public void testCreateDestination_tempQueue() throws JMSException { + assertDestinationType(TEMP_QUEUE_TYPE, + asAmqDest(jmsClient.createDestination("temp-queue://dest"))); + } + + @Test + public void testCreateDestination_tempTopic() throws JMSException { + assertDestinationType(TEMP_TOPIC_TYPE, + asAmqDest(jmsClient.createDestination("temp-topic://dest"))); + } + + @Test public void testCreateDestinations_commaSeparated() throws JMSException { clientProperties.setDestName("queue://foo,topic://cheese"); Destination[] destinations = jmsClient.createDestinations(1); @@ -169,6 +180,10 @@ public class AbstractJmsClientTest { assertEquals(physicalName, destination.getPhysicalName()); } + private void assertDestinationType(byte destinationType, ActiveMQDestination destination) { + assertEquals(destinationType, destination.getDestinationType()); + } + private ActiveMQDestination asAmqDest(Destination destination) { return (ActiveMQDestination) destination; } http://git-wip-us.apache.org/repos/asf/activemq/blob/46bc26ce/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/JmsProducerClientTest.java ---------------------------------------------------------------------- diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/JmsProducerClientTest.java b/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/JmsProducerClientTest.java new file mode 100644 index 0000000..5676959 --- /dev/null +++ b/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/JmsProducerClientTest.java @@ -0,0 +1,69 @@ +package org.apache.activemq.tool; + +import static org.apache.activemq.command.ActiveMQDestination.*; +import static org.junit.Assert.*; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.tool.properties.JmsProducerProperties; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.jms.Destination; +import javax.jms.JMSException; +import java.net.URI; + +public class JmsProducerClientTest { + + private final String DEFAULT_DEST = "TEST.FOO"; + private static BrokerService brokerService; + private static ActiveMQConnectionFactory connectionFactory; + + private AbstractJmsClient jmsClient; + private JmsProducerProperties producerProperties; + + @BeforeClass + public static void setUpBrokerAndConnectionFactory() throws Exception { + brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false")); + brokerService.start(); + connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + } + + @AfterClass + public static void tearDownBroker() throws Exception { + brokerService.stop(); + } + + @Before + public void setUp() { + jmsClient = new JmsProducerClient(connectionFactory); + producerProperties = new JmsProducerProperties(); + producerProperties.setDestName(DEFAULT_DEST); + jmsClient.setClient(producerProperties); + } + + @Test + public void testCreateDestination_tempQueue() throws JMSException { + assertDestinationNameType("dest", QUEUE_TYPE, + asAmqDest(jmsClient.createDestination("temp-queue://dest"))); + } + + @Test + public void testCreateDestination_tempTopic() throws JMSException { + assertDestinationNameType("dest", TOPIC_TYPE, + asAmqDest(jmsClient.createDestination("temp-topic://dest"))); + } + + private void assertDestinationNameType(String physicalName, byte destinationType, ActiveMQDestination destination) { + assertEquals(destinationType, destination.getDestinationType()); + assertEquals(physicalName, destination.getPhysicalName()); + } + + private ActiveMQDestination asAmqDest(Destination destination) { + return (ActiveMQDestination) destination; + } +}
