http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java new file mode 100644 index 0000000..908de0d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMessageConsumerTest.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class JmsMessageConsumerTest { + + private BrokerService brokerService; + private String brokerURI; + + @Rule public TestName name = new TestName(); + + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(false); + brokerService.start(); + brokerService.waitUntilStarted(); + + brokerURI = "vm://localhost?create=false"; + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void testSyncReceiveWithExpirationChecks() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2)); + connection.start(); + + producer.send(session.createTextMessage("test")); + + // Allow message to expire in the prefetch buffer + TimeUnit.SECONDS.sleep(4); + + assertNull(consumer.receive(1000)); + connection.close(); + } + + @Test + public void testSyncReceiveWithIgnoreExpirationChecks() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI); + factory.setConsumerExpiryCheckEnabled(false); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2)); + connection.start(); + + producer.send(session.createTextMessage("test")); + + // Allow message to expire in the prefetch buffer + TimeUnit.SECONDS.sleep(4); + + assertNotNull(consumer.receive(1000)); + connection.close(); + } + + @Test + public void testAsyncReceiveWithExpirationChecks() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI); + + final CountDownLatch received = new CountDownLatch(1); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + received.countDown(); + } + }); + MessageProducer producer = session.createProducer(destination); + producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2)); + + producer.send(session.createTextMessage("test")); + + // Allow message to expire in the prefetch buffer + TimeUnit.SECONDS.sleep(4); + connection.start(); + + assertFalse(received.await(1, TimeUnit.SECONDS)); + connection.close(); + } + + @Test + public void testAsyncReceiveWithoutExpirationChecks() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI); + factory.setConsumerExpiryCheckEnabled(false); + + final CountDownLatch received = new CountDownLatch(1); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + received.countDown(); + } + }); + MessageProducer producer = session.createProducer(destination); + producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2)); + + producer.send(session.createTextMessage("test")); + + // Allow message to expire in the prefetch buffer + TimeUnit.SECONDS.sleep(4); + connection.start(); + + assertTrue(received.await(5, TimeUnit.SECONDS)); + connection.close(); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java new file mode 100644 index 0000000..1d994b9 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java @@ -0,0 +1,641 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.advisory.ConsumerEvent; +import org.apache.activemq.advisory.ConsumerEventSource; +import org.apache.activemq.advisory.ConsumerListener; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.TopicRegion; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkBridge; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.util.MessageIdList; +import org.apache.activemq.util.Wait; +import org.apache.activemq.xbean.BrokerFactoryBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.Resource; + +/** + * Test case support that allows the easy management and connection of several + * brokers. + * + * + */ +public class JmsMultipleBrokersTestSupport extends CombinationTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(JmsMultipleBrokersTestSupport.class); + public static final String AUTO_ASSIGN_TRANSPORT = "tcp://localhost:0"; + public static int maxSetupTime = 5000; + + protected Map<String, BrokerItem> brokers; + protected Map<String, Destination> destinations; + + protected int messageSize = 1; + + protected boolean persistentDelivery = true; + protected boolean verbose; + + protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception { + return bridgeBrokers(localBrokerName, remoteBrokerName, false, 1, true); + } + + protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly) throws Exception { + BrokerService localBroker = brokers.get(localBrokerName).broker; + BrokerService remoteBroker = brokers.get(remoteBrokerName).broker; + + return bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 1, true, false); + } + + protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception { + BrokerService localBroker = brokers.get(localBrokerName).broker; + BrokerService remoteBroker = brokers.get(remoteBrokerName).broker; + + return bridgeBrokers(localBroker, remoteBroker, dynamicOnly, networkTTL, conduit, false); + } + + // Overwrite this method to specify how you want to bridge the two brokers + // By default, bridge them using add network connector of the local broker + // and the first connector of the remote broker + protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit, boolean failover) throws Exception { + List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors(); + URI remoteURI; + if (!transportConnectors.isEmpty()) { + remoteURI = transportConnectors.get(0).getConnectUri(); + String uri = "static:(" + remoteURI + ")"; + if (failover) { + uri = "static:(failover:(" + remoteURI + "))"; + } + NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri)); + connector.setName("to-" + remoteBroker.getBrokerName()); + connector.setDynamicOnly(dynamicOnly); + connector.setNetworkTTL(networkTTL); + connector.setConduitSubscriptions(conduit); + localBroker.addNetworkConnector(connector); + maxSetupTime = 2000; + return connector; + } else { + throw new Exception("Remote broker has no registered connectors."); + } + + } + + // This will interconnect all brokers using multicast + protected void bridgeAllBrokers() throws Exception { + bridgeAllBrokers("default", 1, false, false); + } + + protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception { + bridgeAllBrokers(groupName, ttl, suppressduplicateQueueSubs, false); + } + + protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs, boolean decreasePriority) throws Exception { + Collection<BrokerItem> brokerList = brokers.values(); + for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) { + BrokerService broker = i.next().broker; + List<TransportConnector> transportConnectors = broker.getTransportConnectors(); + + if (transportConnectors.isEmpty()) { + broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT)); + transportConnectors = broker.getTransportConnectors(); + } + + TransportConnector transport = transportConnectors.get(0); + transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName)); + NetworkConnector nc = broker.addNetworkConnector("multicast://default?group=" + groupName); + nc.setNetworkTTL(ttl); + nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs); + nc.setDecreaseNetworkConsumerPriority(decreasePriority); + } + + // Multicasting may take longer to setup + maxSetupTime = 8000; + } + + + protected void waitForBridgeFormation(final int min) throws Exception { + for (BrokerItem brokerItem : brokers.values()) { + final BrokerService broker = brokerItem.broker; + waitForBridgeFormation(broker, min, 0); + } + } + + public boolean waitForBridgeFormation(final BrokerService broker, final int min, final int bridgeIndex) throws Exception { + return waitForBridgeFormation(broker, min, bridgeIndex, Wait.MAX_WAIT_MILLIS*2); + } + + public boolean waitForBridgeFormation(final BrokerService broker, final int min, final int bridgeIndex, long wait) throws Exception { + + boolean result = false; + if (!broker.getNetworkConnectors().isEmpty()) { + result = Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + int activeCount = 0; + for (NetworkBridge bridge : broker.getNetworkConnectors().get(bridgeIndex).activeBridges()) { + if (bridge.getRemoteBrokerName() != null) { + LOG.info("found bridge[" + bridge + "] to " + bridge.getRemoteBrokerName() + " on broker :" + broker.getBrokerName()); + activeCount++; + } + } + return activeCount >= min; + }}, wait); + } + return result; + } + + protected void waitForMinTopicRegionConsumerCount(final String name, final int count) throws Exception { + final BrokerService broker = brokers.get(name).broker; + final TopicRegion topicRegion = (TopicRegion) ((RegionBroker) broker.getRegionBroker()).getTopicRegion(); + assertTrue("found expected consumers in topic region of" + name, Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("topic consumers: " + name +", " + topicRegion.getSubscriptions().toString()); + return topicRegion.getSubscriptions().size() >= count; + } + })); + } + + /** + * Timed wait for {@link #hasBridge(String, String)}. + * + * @see #hasBridge(String, String) + * + * @param localBrokerName + * - the name of the broker on the "local" side of the bridge + * @param remoteBrokerName + * - the name of the broker on the "remote" side of the bridge + * @param time + * - the maximum time to wait for the bridge to be established + * @param units + * - the units for <param>time</param> + * @throws InterruptedException + * - if the calling thread is interrupted + * @throws TimeoutException + * - if the bridge is not established within the time limit + * @throws Exception + * - some other unknown error occurs + */ + protected void waitForBridge(final String localBrokerName, + final String remoteBrokerName, long time, TimeUnit units) + throws InterruptedException, TimeoutException, Exception { + if (!Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() { + return hasBridge(localBrokerName, remoteBrokerName); + } + }, units.toMillis(time))) { + throw new TimeoutException("Bridge not established from broker " + + localBrokerName + " to " + remoteBrokerName + " within " + + units.toMillis(time) + " milliseconds."); + } + } + + /** + * Determines whether a bridge has been established between the specified + * brokers.Establishment means that connections have been created and broker + * info has been exchanged. Due to the asynchronous nature of the + * connections, there is still a possibility that the bridge may fail + * shortly after establishment. + * + * @param localBrokerName + * - the name of the broker on the "local" side of the bridge + * @param remoteBrokerName + * - the name of the broker on the "remote" side of the bridge + */ + protected boolean hasBridge(String localBrokerName, String remoteBrokerName) { + final BrokerItem fromBroker = brokers.get(localBrokerName); + if (fromBroker == null) { + throw new IllegalArgumentException("Unknown broker: " + + localBrokerName); + } + + for (BrokerInfo peerInfo : fromBroker.broker.getRegionBroker() + .getPeerBrokerInfos()) { + if (peerInfo.getBrokerName().equals(remoteBrokerName)) { + return true; + } + } + return false; + } + + protected void waitForBridgeFormation() throws Exception { + waitForBridgeFormation(1); + } + + protected void startAllBrokers() throws Exception { + Collection<BrokerItem> brokerList = brokers.values(); + for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) { + BrokerService broker = i.next().broker; + broker.start(); + broker.waitUntilStarted(); + } + + Thread.sleep(maxSetupTime); + } + + protected BrokerService createBroker(String brokerName) throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName(brokerName); + brokers.put(brokerName, new BrokerItem(broker)); + + return broker; + } + + protected BrokerService createBroker(URI brokerUri) throws Exception { + BrokerService broker = BrokerFactory.createBroker(brokerUri); + configureBroker(broker); + brokers.put(broker.getBrokerName(), new BrokerItem(broker)); + + return broker; + } + + protected void configureBroker(BrokerService broker) { + } + + protected BrokerService createBroker(Resource configFile) throws Exception { + BrokerFactoryBean brokerFactory = new BrokerFactoryBean(configFile); + brokerFactory.afterPropertiesSet(); + + BrokerService broker = brokerFactory.getBroker(); + brokers.put(broker.getBrokerName(), new BrokerItem(broker)); + + return broker; + } + + protected ConnectionFactory getConnectionFactory(String brokerName) throws Exception { + BrokerItem brokerItem = brokers.get(brokerName); + if (brokerItem != null) { + return brokerItem.factory; + } + return null; + } + + protected Connection createConnection(String brokerName) throws Exception { + BrokerItem brokerItem = brokers.get(brokerName); + if (brokerItem != null) { + return brokerItem.createConnection(); + } + return null; + } + + protected MessageConsumer createSyncConsumer(String brokerName, Destination dest) throws Exception { + BrokerItem brokerItem = brokers.get(brokerName); + if (brokerItem != null) { + Connection con = brokerItem.createConnection(); + con.start(); + Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = sess.createConsumer(dest); + return consumer; + } + return null; + } + + protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception { + return createConsumer(brokerName, dest, null, null); + } + + protected MessageConsumer createConsumer(String brokerName, Destination dest, String messageSelector) throws Exception { + return createConsumer(brokerName, dest, null, messageSelector); + } + + protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch) throws Exception { + return createConsumer(brokerName, dest, latch, null); + } + + protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch, String messageSelector) throws Exception { + BrokerItem brokerItem = brokers.get(brokerName); + if (brokerItem != null) { + return brokerItem.createConsumer(dest, latch, messageSelector); + } + return null; + } + + protected QueueBrowser createBrowser(String brokerName, Destination dest) throws Exception { + BrokerItem brokerItem = brokers.get(brokerName); + if (brokerItem != null) { + return brokerItem.createBrowser(dest); + } + return null; + } + + protected MessageConsumer createDurableSubscriber(String brokerName, Topic dest, String name) throws Exception { + BrokerItem brokerItem = brokers.get(brokerName); + if (brokerItem != null) { + return brokerItem.createDurableSubscriber(dest, name); + } + return null; + } + + protected MessageIdList getBrokerMessages(String brokerName) { + BrokerItem brokerItem = brokers.get(brokerName); + if (brokerItem != null) { + return brokerItem.getAllMessages(); + } + return null; + } + + protected MessageIdList getConsumerMessages(String brokerName, MessageConsumer consumer) { + BrokerItem brokerItem = brokers.get(brokerName); + if (brokerItem != null) { + return brokerItem.getConsumerMessages(consumer); + } + return null; + } + + protected void assertConsumersConnect(String brokerName, Destination destination, final int count, long timeout) throws Exception { + BrokerItem brokerItem = brokers.get(brokerName); + Connection conn = brokerItem.createConnection(); + conn.start(); + ConsumerEventSource ces = new ConsumerEventSource(conn, destination); + + try { + final AtomicInteger actualConnected = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + ces.setConsumerListener(new ConsumerListener(){ + public void onConsumerEvent(ConsumerEvent event) { + if( actualConnected.get() < count ) { + actualConnected.set(event.getConsumerCount()); + } + if( event.getConsumerCount() >= count ) { + latch.countDown(); + } + } + }); + ces.start(); + + latch.await(timeout, TimeUnit.MILLISECONDS); + assertTrue("Expected at least "+count+" consumers to connect, but only "+actualConnected.get()+" connectect within "+timeout+" ms", actualConnected.get() >= count); + + } finally { + ces.stop(); + conn.close(); + brokerItem.connections.remove(conn); + } + } + + + protected void sendMessages(String brokerName, Destination destination, int count) throws Exception { + sendMessages(brokerName, destination, count, null); + } + + protected void sendMessages(String brokerName, Destination destination, int count, HashMap<String, Object>properties) throws Exception { + BrokerItem brokerItem = brokers.get(brokerName); + + Connection conn = brokerItem.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = brokerItem.createProducer(destination, sess); + producer.setDeliveryMode(persistentDelivery ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + for (int i = 0; i < count; i++) { + TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i); + if (properties != null) { + for (String propertyName : properties.keySet()) { + msg.setObjectProperty(propertyName, properties.get(propertyName)); + } + } + producer.send(msg); + onSend(i, msg); + } + + producer.close(); + sess.close(); + conn.close(); + brokerItem.connections.remove(conn); + } + + protected void onSend(int i, TextMessage msg) { + } + + protected TextMessage createTextMessage(Session session, String initText) throws Exception { + TextMessage msg = session.createTextMessage(); + + // Pad message text + if (initText.length() < messageSize) { + char[] data = new char[messageSize - initText.length()]; + Arrays.fill(data, '*'); + String str = new String(data); + msg.setText(initText + str); + + // Do not pad message text + } else { + msg.setText(initText); + } + + return msg; + } + + protected ActiveMQDestination createDestination(String name, boolean topic) throws JMSException { + Destination dest; + if (topic) { + dest = new ActiveMQTopic(name); + destinations.put(name, dest); + return (ActiveMQDestination)dest; + } else { + dest = new ActiveMQQueue(name); + destinations.put(name, dest); + return (ActiveMQDestination)dest; + } + } + + protected void setUp() throws Exception { + super.setUp(); + brokers = new HashMap<String, BrokerItem>(); + destinations = new HashMap<String, Destination>(); + } + + protected void tearDown() throws Exception { + destroyAllBrokers(); + super.tearDown(); + } + + protected void destroyBroker(String brokerName) throws Exception { + BrokerItem brokerItem = brokers.remove(brokerName); + + if (brokerItem != null) { + brokerItem.destroy(); + } + } + + protected void destroyAllBrokers() throws Exception { + for (Iterator<BrokerItem> i = brokers.values().iterator(); i.hasNext();) { + BrokerItem brokerItem = i.next(); + brokerItem.destroy(); + } + brokers.clear(); + } + + // Class to group broker components together + public class BrokerItem { + public BrokerService broker; + public ActiveMQConnectionFactory factory; + public List<Connection> connections; + public Map<MessageConsumer, MessageIdList> consumers; + public MessageIdList allMessages = new MessageIdList(); + public boolean persistent; + private IdGenerator id; + + public BrokerItem(BrokerService broker) throws Exception { + this.broker = broker; + + factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + factory.setConnectionIDPrefix(broker.getBrokerName()); + consumers = Collections.synchronizedMap(new HashMap<MessageConsumer, MessageIdList>()); + connections = Collections.synchronizedList(new ArrayList<Connection>()); + allMessages.setVerbose(verbose); + id = new IdGenerator(broker.getBrokerName() + ":"); + } + + public Connection createConnection() throws Exception { + Connection conn = factory.createConnection(); + conn.setClientID(id.generateId()); + + connections.add(conn); + return conn; + } + + public MessageConsumer createConsumer(Destination dest) throws Exception { + return createConsumer(dest, null, null); + } + + public MessageConsumer createConsumer(Destination dest, String messageSelector) throws Exception { + return createConsumer(dest, null, messageSelector); + } + + public MessageConsumer createConsumer(Destination dest, CountDownLatch latch, String messageSelector) throws Exception { + Connection c = createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + return createConsumerWithSession(dest, s, latch, messageSelector); + } + + public MessageConsumer createConsumerWithSession(Destination dest, Session sess) throws Exception { + return createConsumerWithSession(dest, sess, null, null); + } + + public MessageConsumer createConsumerWithSession(Destination dest, Session sess, CountDownLatch latch, String messageSelector) throws Exception { + MessageConsumer client = sess.createConsumer(dest, messageSelector); + MessageIdList messageIdList = new MessageIdList(); + messageIdList.setCountDownLatch(latch); + messageIdList.setParent(allMessages); + client.setMessageListener(messageIdList); + consumers.put(client, messageIdList); + return client; + } + + public QueueBrowser createBrowser(Destination dest) throws Exception { + Connection c = createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + return s.createBrowser((Queue)dest); + } + + public MessageConsumer createDurableSubscriber(Topic dest, String name) throws Exception { + Connection c = createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + return createDurableSubscriber(dest, s, name); + } + + public MessageConsumer createDurableSubscriber(Topic dest, Session sess, String name) throws Exception { + MessageConsumer client = sess.createDurableSubscriber((Topic)dest, name); + MessageIdList messageIdList = new MessageIdList(); + messageIdList.setParent(allMessages); + client.setMessageListener(messageIdList); + consumers.put(client, messageIdList); + + return client; + } + + public MessageIdList getAllMessages() { + return allMessages; + } + + public MessageIdList getConsumerMessages(MessageConsumer consumer) { + return consumers.get(consumer); + } + + public MessageProducer createProducer(Destination dest) throws Exception { + Connection c = createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + return createProducer(dest, s); + } + + public MessageProducer createProducer(Destination dest, Session sess) throws Exception { + MessageProducer client = sess.createProducer(dest); + client.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + return client; + } + + public void destroy() throws Exception { + while (!connections.isEmpty()) { + Connection c = connections.remove(0); + try { + c.close(); + } catch (ConnectionClosedException e) { + } catch (JMSException e) { + } + } + + broker.stop(); + broker.waitUntilStopped(); + consumers.clear(); + + broker = null; + connections = null; + consumers = null; + factory = null; + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java new file mode 100644 index 0000000..5eaab8d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.MessageIdList; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; + +/** + * Test case support used to test multiple message comsumers and message + * producers connecting to a single broker. + * + * + */ +public class JmsMultipleClientsTestSupport { + + @Rule + public TestName testName = new TestName(); + + protected static final Logger LOG = LoggerFactory.getLogger(JmsMultipleClientsTestSupport.class); + + protected Map<MessageConsumer, MessageIdList> consumers = new HashMap<MessageConsumer, MessageIdList>(); // Map of consumer with messages + // received + protected int consumerCount = 1; + protected int producerCount = 1; + + protected int messageSize = 1024; + + protected boolean useConcurrentSend = true; + protected boolean autoFail = true; + protected boolean durable; + public boolean topic; + protected boolean persistent; + + protected BrokerService broker; + protected Destination destination; + protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>()); + protected MessageIdList allMessagesList = new MessageIdList(); + + private AtomicInteger producerLock; + + protected void startProducers(Destination dest, int msgCount) throws Exception { + startProducers(createConnectionFactory(), dest, msgCount); + } + + protected void startProducers(final ConnectionFactory factory, final Destination dest, final int msgCount) throws Exception { + // Use concurrent send + if (useConcurrentSend) { + producerLock = new AtomicInteger(producerCount); + + for (int i = 0; i < producerCount; i++) { + Thread t = new Thread(new Runnable() { + public void run() { + try { + sendMessages(factory.createConnection(), dest, msgCount); + } catch (Exception e) { + e.printStackTrace(); + } + + synchronized (producerLock) { + producerLock.decrementAndGet(); + producerLock.notifyAll(); + } + } + }); + + t.start(); + } + + // Wait for all producers to finish sending + synchronized (producerLock) { + while (producerLock.get() != 0) { + producerLock.wait(2000); + } + } + + // Use serialized send + } else { + for (int i = 0; i < producerCount; i++) { + sendMessages(factory.createConnection(), dest, msgCount); + } + } + } + + protected void sendMessages(Connection connection, Destination destination, int count) throws Exception { + connections.add(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + for (int i = 0; i < count; i++) { + TextMessage msg = createTextMessage(session, "" + i); + producer.send(msg); + } + + producer.close(); + session.close(); + connection.close(); + } + + protected TextMessage createTextMessage(Session session, String initText) throws Exception { + TextMessage msg = session.createTextMessage(); + + // Pad message text + if (initText.length() < messageSize) { + char[] data = new char[messageSize - initText.length()]; + Arrays.fill(data, '*'); + String str = new String(data); + msg.setText(initText + str); + + // Do not pad message text + } else { + msg.setText(initText); + } + + return msg; + } + + protected void startConsumers(Destination dest) throws Exception { + startConsumers(createConnectionFactory(), dest); + } + + protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception { + MessageConsumer consumer; + for (int i = 0; i < consumerCount; i++) { + if (durable && topic) { + consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i + 1)); + } else { + consumer = createMessageConsumer(factory.createConnection(), dest); + } + MessageIdList list = new MessageIdList(); + list.setParent(allMessagesList); + consumer.setMessageListener(list); + consumers.put(consumer, list); + } + } + + protected MessageConsumer createMessageConsumer(Connection conn, Destination dest) throws Exception { + connections.add(conn); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = sess.createConsumer(dest); + conn.start(); + + return consumer; + } + + protected TopicSubscriber createDurableSubscriber(Connection conn, Destination dest, String name) throws Exception { + conn.setClientID(name); + connections.add(conn); + conn.start(); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic)dest, name); + + return consumer; + } + + protected void waitForAllMessagesToBeReceived(int messageCount) throws Exception { + allMessagesList.waitForMessagesToArrive(messageCount); + } + + protected ActiveMQDestination createDestination() throws JMSException { + String name = "." + getClass().getName() + "." + getName(); + // ensure not inadvertently composite because of combos + name = name.replace(' ','_'); + name = name.replace(',','&'); + if (topic) { + destination = new ActiveMQTopic("Topic" + name); + return (ActiveMQDestination)destination; + } else { + destination = new ActiveMQQueue("Queue" + name); + return (ActiveMQDestination)destination; + } + } + + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("vm://localhost"); + } + + protected BrokerService createBroker() throws Exception { + return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true")); + } + + @Before + public void setUp() throws Exception { + broker = createBroker(); + broker.start(); + } + + @After + public void tearDown() throws Exception { + for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) { + Connection conn = iter.next(); + try { + conn.close(); + } catch (Throwable e) { + } + } + if (broker !=null ) { // FIXME remove + broker.stop(); + allMessagesList.flushMessages(); + consumers.clear(); + } + } + + /* + * Some helpful assertions for multiple consumers. + */ + protected void assertConsumerReceivedAtLeastXMessages(MessageConsumer consumer, int msgCount) { + MessageIdList messageIdList = consumers.get(consumer); + messageIdList.assertAtLeastMessagesReceived(msgCount); + } + + protected void assertConsumerReceivedAtMostXMessages(MessageConsumer consumer, int msgCount) { + MessageIdList messageIdList = consumers.get(consumer); + messageIdList.assertAtMostMessagesReceived(msgCount); + } + + protected void assertConsumerReceivedXMessages(MessageConsumer consumer, int msgCount) { + MessageIdList messageIdList = consumers.get(consumer); + messageIdList.assertMessagesReceivedNoWait(msgCount); + } + + protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) { + for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) { + assertConsumerReceivedAtLeastXMessages(i.next(), msgCount); + } + } + + protected void assertEachConsumerReceivedAtMostXMessages(int msgCount) { + for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) { + assertConsumerReceivedAtMostXMessages(i.next(), msgCount); + } + } + + protected void assertEachConsumerReceivedXMessages(int msgCount) { + for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) { + assertConsumerReceivedXMessages(i.next(), msgCount); + } + } + + protected void assertTotalMessagesReceived(int msgCount) { + allMessagesList.assertMessagesReceivedNoWait(msgCount); + + // now lets count the individual messages received + int totalMsg = 0; + for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) { + MessageIdList messageIdList = consumers.get(i.next()); + totalMsg += messageIdList.getMessageCount(); + } + assertEquals("Total of consumers message count", msgCount, totalMsg); + } + + + public String getName() { + return getName(false); + } + + public String getName(boolean original) { + String currentTestName = testName.getMethodName(); + currentTestName = currentTestName.replace("[",""); + currentTestName = currentTestName.replace("]",""); + return currentTestName; + } + + public void assertDestinationMemoryUsageGoesToZero() throws Exception { + assertEquals("destination memory is back to 0", 0, + TestSupport.getDestination(broker, ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage()); + } + + + + /* + * This is copied from AutoFailTestSupport. We may want to move it to someplace where more + * tests can use it. + */ + public static void dumpAllThreads(String prefix) { + Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces(); + for (Map.Entry<Thread, StackTraceElement[]> stackEntry : stacks.entrySet()) { + System.err.println(prefix + " " + stackEntry.getKey()); + for(StackTraceElement element : stackEntry.getValue()) { + System.err.println(" " + element); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java new file mode 100644 index 0000000..c063e24 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java @@ -0,0 +1,448 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import junit.framework.Test; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.BaseDestination; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JmsQueueBrowserTest extends JmsTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(ActiveMQXAConnectionFactoryTest.class); + public boolean isUseCache = false; + + public static Test suite() throws Exception { + return suite(JmsQueueBrowserTest.class); + } + + /** + * Tests the queue browser. Browses the messages then the consumer tries to receive them. The messages should still + * be in the queue even when it was browsed. + * + * @throws Exception + */ + public void testReceiveBrowseReceive() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destination); + connection.start(); + + Message[] outbound = new Message[]{session.createTextMessage("First Message"), + session.createTextMessage("Second Message"), + session.createTextMessage("Third Message")}; + + // lets consume any outstanding messages from previous test runs + while (consumer.receive(1000) != null) { + } + + producer.send(outbound[0]); + producer.send(outbound[1]); + producer.send(outbound[2]); + + // Get the first. + assertEquals(outbound[0], consumer.receive(1000)); + consumer.close(); + + QueueBrowser browser = session.createBrowser(destination); + Enumeration<?> enumeration = browser.getEnumeration(); + + // browse the second + assertTrue("should have received the second message", enumeration.hasMoreElements()); + assertEquals(outbound[1], enumeration.nextElement()); + + // browse the third. + assertTrue("Should have received the third message", enumeration.hasMoreElements()); + assertEquals(outbound[2], enumeration.nextElement()); + + // There should be no more. + boolean tooMany = false; + while (enumeration.hasMoreElements()) { + LOG.info("Got extra message: " + ((TextMessage) enumeration.nextElement()).getText()); + tooMany = true; + } + assertFalse(tooMany); + browser.close(); + + // Re-open the consumer. + consumer = session.createConsumer(destination); + // Receive the second. + assertEquals(outbound[1], consumer.receive(1000)); + // Receive the third. + assertEquals(outbound[2], consumer.receive(1000)); + consumer.close(); + } + + public void initCombosForTestBatchSendBrowseReceive() { + addCombinationValues("isUseCache", new Boolean[]{Boolean.TRUE, Boolean.FALSE}); + } + + public void testBatchSendBrowseReceive() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destination); + connection.start(); + + TextMessage[] outbound = new TextMessage[10]; + for (int i=0; i<10; i++) { + outbound[i] = session.createTextMessage( i + " Message"); + }; + + // lets consume any outstanding messages from previous test runs + while (consumer.receive(1000) != null) { + } + consumer.close(); + + for (int i=0;i<outbound.length; i++) { + producer.send(outbound[i]); + } + + QueueBrowser browser = session.createBrowser(destination); + Enumeration<?> enumeration = browser.getEnumeration(); + + for (int i=0; i<outbound.length; i++) { + assertTrue("should have a", enumeration.hasMoreElements()); + assertEquals(outbound[i], enumeration.nextElement()); + } + browser.close(); + + for (int i=0;i<outbound.length; i++) { + producer.send(outbound[i]); + } + + // verify second batch is visible to browse + browser = session.createBrowser(destination); + enumeration = browser.getEnumeration(); + for (int j=0; j<2;j++) { + for (int i=0; i<outbound.length; i++) { + assertTrue("should have a", enumeration.hasMoreElements()); + assertEquals("j=" + j + ", i=" + i, outbound[i].getText(), ((TextMessage) enumeration.nextElement()).getText()); + } + } + browser.close(); + + consumer = session.createConsumer(destination); + for (int i=0; i<outbound.length * 2; i++) { + assertNotNull("Got message: " + i, consumer.receive(2000)); + } + consumer.close(); + } + + public void initCombosForTestBatchSendJmxBrowseReceive() { + addCombinationValues("isUseCache", new Boolean[]{Boolean.TRUE, Boolean.FALSE}); + } + + public void testBatchSendJmxBrowseReceive() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destination); + connection.start(); + + TextMessage[] outbound = new TextMessage[10]; + for (int i=0; i<10; i++) { + outbound[i] = session.createTextMessage( i + " Message"); + }; + + // lets consume any outstanding messages from previous test runs + while (consumer.receive(1000) != null) { + } + consumer.close(); + + for (int i=0;i<outbound.length; i++) { + producer.send(outbound[i]); + } + + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST"); + + LOG.info("Create QueueView MBean..."); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + + long concount = proxy.getConsumerCount(); + LOG.info("Consumer Count :" + concount); + long messcount = proxy.getQueueSize(); + LOG.info("current number of messages in the queue :" + messcount); + + // lets browse + CompositeData[] compdatalist = proxy.browse(); + if (compdatalist.length == 0) { + fail("There is no message in the queue:"); + } + String[] messageIDs = new String[compdatalist.length]; + + for (int i = 0; i < compdatalist.length; i++) { + CompositeData cdata = compdatalist[i]; + + if (i == 0) { + LOG.info("Columns: " + cdata.getCompositeType().keySet()); + } + messageIDs[i] = (String)cdata.get("JMSMessageID"); + LOG.info("message " + i + " : " + cdata.values()); + } + + TabularData table = proxy.browseAsTable(); + LOG.info("Found tabular data: " + table); + assertTrue("Table should not be empty!", table.size() > 0); + + assertEquals("Queue size", outbound.length, proxy.getQueueSize()); + assertEquals("Queue size", outbound.length, compdatalist.length); + assertEquals("Queue size", outbound.length, table.size()); + + + LOG.info("Send another 10"); + for (int i=0;i<outbound.length; i++) { + producer.send(outbound[i]); + } + + LOG.info("Browse again"); + + messcount = proxy.getQueueSize(); + LOG.info("current number of messages in the queue :" + messcount); + + compdatalist = proxy.browse(); + if (compdatalist.length == 0) { + fail("There is no message in the queue:"); + } + messageIDs = new String[compdatalist.length]; + + for (int i = 0; i < compdatalist.length; i++) { + CompositeData cdata = compdatalist[i]; + + if (i == 0) { + LOG.info("Columns: " + cdata.getCompositeType().keySet()); + } + messageIDs[i] = (String)cdata.get("JMSMessageID"); + LOG.info("message " + i + " : " + cdata.values()); + } + + table = proxy.browseAsTable(); + LOG.info("Found tabular data: " + table); + assertTrue("Table should not be empty!", table.size() > 0); + + assertEquals("Queue size", outbound.length*2, proxy.getQueueSize()); + assertEquals("Queue size", outbound.length*2, compdatalist.length); + assertEquals("Queue size", outbound.length * 2, table.size()); + + consumer = session.createConsumer(destination); + for (int i=0; i<outbound.length * 2; i++) { + assertNotNull("Got message: " + i, consumer.receive(2000)); + } + consumer.close(); + } + + public void testBrowseReceive() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + + connection.start(); + + // create consumer + MessageConsumer consumer = session.createConsumer(destination); + // lets consume any outstanding messages from previous test runs + while (consumer.receive(1000) != null) { + } + + Message[] outbound = new Message[]{session.createTextMessage("First Message"), + session.createTextMessage("Second Message"), + session.createTextMessage("Third Message")}; + + MessageProducer producer = session.createProducer(destination); + producer.send(outbound[0]); + + // create browser first + QueueBrowser browser = session.createBrowser(destination); + Enumeration<?> enumeration = browser.getEnumeration(); + + // browse the first message + assertTrue("should have received the first message", enumeration.hasMoreElements()); + assertEquals(outbound[0], enumeration.nextElement()); + + // Receive the first message. + assertEquals(outbound[0], consumer.receive(1000)); + consumer.close(); + browser.close(); + producer.close(); + } + + public void testLargeNumberOfMessages() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + connection.start(); + + MessageProducer producer = session.createProducer(destination); + + int numberOfMessages = 4096; + + for (int i = 0; i < numberOfMessages; i++) { + producer.send(session.createTextMessage("Message: " + i)); + } + + QueueBrowser browser = session.createBrowser(destination); + Enumeration<?> enumeration = browser.getEnumeration(); + + assertTrue(enumeration.hasMoreElements()); + + int numberBrowsed = 0; + + while (enumeration.hasMoreElements()) { + Message browsed = (Message) enumeration.nextElement(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Browsed Message [{}]", browsed.getJMSMessageID()); + } + + numberBrowsed++; + } + + System.out.println("Number browsed: " + numberBrowsed); + assertEquals(numberOfMessages, numberBrowsed); + browser.close(); + producer.close(); + } + + public void testQueueBrowserWith2Consumers() throws Exception { + final int numMessages = 1000; + connection.setAlwaysSyncSend(false); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + ActiveMQQueue destinationPrefetch10 = new ActiveMQQueue("TEST?jms.prefetchSize=10"); + ActiveMQQueue destinationPrefetch1 = new ActiveMQQueue("TEST?jms.prefetchsize=1"); + connection.start(); + + ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection(userName, password); + connection2.start(); + connections.add(connection2); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destinationPrefetch10); + + // lets consume any outstanding messages from previous test runs + while (consumer.receive(1000) != null) { + } + + for (int i=0; i<numMessages; i++) { + TextMessage message = session.createTextMessage("Message: " + i); + producer.send(message); + } + + QueueBrowser browser = session2.createBrowser(destinationPrefetch1); + @SuppressWarnings("unchecked") + Enumeration<Message> browserView = browser.getEnumeration(); + + List<Message> messages = new ArrayList<Message>(); + for (int i = 0; i < numMessages; i++) { + Message m1 = consumer.receive(5000); + assertNotNull("m1 is null for index: " + i, m1); + messages.add(m1); + } + + int i = 0; + for (; i < numMessages && browserView.hasMoreElements(); i++) { + Message m1 = messages.get(i); + Message m2 = browserView.nextElement(); + assertNotNull("m2 is null for index: " + i, m2); + assertEquals(m1.getJMSMessageID(), m2.getJMSMessageID()); + } + + // currently browse max page size is ignored for a queue browser consumer + // only guarantee is a page size - but a snapshot of pagedinpending is + // used so it is most likely more + assertTrue("got at least our expected minimum in the browser: ", i > BaseDestination.MAX_PAGE_SIZE); + + assertFalse("nothing left in the browser", browserView.hasMoreElements()); + assertNull("consumer finished", consumer.receiveNoWait()); + } + + public void testBrowseClose() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + + connection.start(); + + TextMessage[] outbound = new TextMessage[]{session.createTextMessage("First Message"), + session.createTextMessage("Second Message"), + session.createTextMessage("Third Message")}; + + // create consumer + MessageConsumer consumer = session.createConsumer(destination); + // lets consume any outstanding messages from previous test runs + while (consumer.receive(1000) != null) { + } + + MessageProducer producer = session.createProducer(destination); + producer.send(outbound[0]); + producer.send(outbound[1]); + producer.send(outbound[2]); + + // create browser first + QueueBrowser browser = session.createBrowser(destination); + Enumeration<?> enumeration = browser.getEnumeration(); + + // browse some messages + assertEquals(outbound[0], enumeration.nextElement()); + assertEquals(outbound[1], enumeration.nextElement()); + //assertEquals(outbound[2], (Message) enumeration.nextElement()); + + browser.close(); + + // Receive the first message. + TextMessage msg = (TextMessage)consumer.receive(1000); + assertEquals("Expected " + outbound[0].getText() + " but received " + msg.getText(), outbound[0], msg); + msg = (TextMessage)consumer.receive(1000); + assertEquals("Expected " + outbound[1].getText() + " but received " + msg.getText(), outbound[1], msg); + msg = (TextMessage)consumer.receive(1000); + assertEquals("Expected " + outbound[2].getText() + " but received " + msg.getText(), outbound[2], msg); + + consumer.close(); + producer.close(); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService brokerService = super.createBroker(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setUseCache(isUseCache); + policyEntry.setMaxBrowsePageSize(4096); + policyMap.setDefaultEntry(policyEntry); + brokerService.setDestinationPolicy(policyMap); + return brokerService; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueCompositeSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueCompositeSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueCompositeSendReceiveTest.java new file mode 100644 index 0000000..f381ec0 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueCompositeSendReceiveTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Topic; + +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.test.JmsTopicSendReceiveTest; +import org.apache.activemq.util.Wait; + + +/** + * + */ +public class JmsQueueCompositeSendReceiveTest extends JmsTopicSendReceiveTest { + private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory + .getLog(JmsQueueCompositeSendReceiveTest.class); + + /** + * Sets a test to have a queue destination and non-persistent delivery mode. + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + topic = false; + deliveryMode = DeliveryMode.NON_PERSISTENT; + super.setUp(); + } + + /** + * Returns the consumer subject. + * + * @return String - consumer subject + * @see org.apache.activemq.test.TestSupport#getConsumerSubject() + */ + protected String getConsumerSubject() { + return "FOO.BAR.HUMBUG"; + } + + /** + * Returns the producer subject. + * + * @return String - producer subject + * @see org.apache.activemq.test.TestSupport#getProducerSubject() + */ + protected String getProducerSubject() { + return "FOO.BAR.HUMBUG,FOO.BAR.HUMBUG2"; + } + + /** + * Test if all the messages sent are being received. + * + * @throws Exception + */ + public void testSendReceive() throws Exception { + super.testSendReceive(); + messages.clear(); + Destination consumerDestination = consumeSession.createQueue("FOO.BAR.HUMBUG2"); + LOG.info("Created consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass()); + MessageConsumer consumer = null; + if (durable) { + LOG.info("Creating durable consumer"); + consumer = consumeSession.createDurableSubscriber((Topic) consumerDestination, getName()); + } else { + consumer = consumeSession.createConsumer(consumerDestination); + } + consumer.setMessageListener(this); + + assertMessagesAreReceived(); + LOG.info("" + data.length + " messages(s) received, closing down connections"); + } + + public void testDuplicate() throws Exception { + ActiveMQDestination queue = (ActiveMQDestination)session.createQueue("TEST,TEST"); + for (int i = 0; i < data.length; i++) { + Message message = createMessage(i); + configureMessage(message); + if (verbose) { + LOG.info("About to send a message: " + message + " with text: " + data[i]); + } + producer.send(queue, message); + } + + Thread.sleep(200); // wait for messages to be queued + + BrokerService broker = BrokerRegistry.getInstance().lookup("localhost"); + final Queue dest = (Queue)((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(new ActiveMQQueue("TEST")); + assertTrue("all messages were received", Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + return data.length == dest.getDestinationStatistics().getMessages().getCount(); + }})); + + dest.purge(); + assertEquals(0, dest.getDestinationStatistics().getMessages().getCount()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueRequestReplyTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueRequestReplyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueRequestReplyTest.java new file mode 100644 index 0000000..9282c0c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueRequestReplyTest.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +/** + * + */ +public class JmsQueueRequestReplyTest extends JmsTopicRequestReplyTest { + + /** + * Set up the test with a queue. + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + topic = false; + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java new file mode 100644 index 0000000..449edda --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq; + +/** + * + */ +public class JmsQueueSelectorTest extends JmsTopicSelectorTest { + public void setUp() throws Exception { + topic = false; + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTest.java new file mode 100644 index 0000000..73e3e24 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTest.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import org.apache.activemq.test.JmsTopicSendReceiveTest; + +/** + * + */ +public class JmsQueueSendReceiveTest extends JmsTopicSendReceiveTest { + + /** + * Set up the test with a queue. + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + topic = false; + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsStartBeforeBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsStartBeforeBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsStartBeforeBrokerTest.java new file mode 100644 index 0000000..367aaeb --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsStartBeforeBrokerTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.activemq.broker.BrokerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JmsQueueSendReceiveTwoConnectionsStartBeforeBrokerTest extends JmsQueueSendReceiveTwoConnectionsTest { + private static final Logger LOG = LoggerFactory.getLogger(JmsQueueSendReceiveTwoConnectionsStartBeforeBrokerTest.class); + + private Queue<Exception> errors = new ConcurrentLinkedQueue<Exception>(); + private int delayBeforeStartingBroker = 1000; + private BrokerService broker; + + public void startBroker() { + // Initialize the broker + LOG.info("Lets wait: " + delayBeforeStartingBroker + " millis before creating the broker"); + try { + Thread.sleep(delayBeforeStartingBroker); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + LOG.info("Now starting the broker"); + try { + broker = new BrokerService(); + broker.setPersistent(false); + broker.addConnector("tcp://localhost:61616"); + broker.start(); + } catch (Exception e) { + LOG.info("Caught: " + e); + errors.add(e); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?maxReconnectAttempts=10&useExponentialBackOff=false&initialReconnectDelay=200"); + } + + protected void setUp() throws Exception { + setAutoFail(true); + // now lets asynchronously start a broker + Thread thread = new Thread() { + public void run() { + startBroker(); + } + }; + thread.start(); + + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + + if (broker != null) { + broker.stop(); + } + if (!errors.isEmpty()) { + Exception e = errors.remove(); + throw e; + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsTest.java new file mode 100644 index 0000000..f29cc09 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveTwoConnectionsTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest; + +/** + * + */ +public class JmsQueueSendReceiveTwoConnectionsTest extends JmsTopicSendReceiveWithTwoConnectionsTest { + + /** + * Set up the test with a queue and using two connections. + * + * @see junit.framework.TestCase#setUp() + */ + @Override + protected void setUp() throws Exception { + topic = false; + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveUsingTwoSessionsTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveUsingTwoSessionsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveUsingTwoSessionsTest.java new file mode 100644 index 0000000..cb793d0 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueSendReceiveUsingTwoSessionsTest.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +/** + * + */ +public class JmsQueueSendReceiveUsingTwoSessionsTest extends JmsQueueSendReceiveTest { + + /** + * Set up the test using two sessions. + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + useSeparateSession = true; + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTopicCompositeSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTopicCompositeSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTopicCompositeSendReceiveTest.java new file mode 100644 index 0000000..d92696e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTopicCompositeSendReceiveTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.Topic; + +import org.apache.activemq.test.JmsTopicSendReceiveTest; + + +/** + * + */ +public class JmsQueueTopicCompositeSendReceiveTest extends JmsTopicSendReceiveTest { + private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory + .getLog(JmsQueueTopicCompositeSendReceiveTest.class); + Destination consumerDestination2; + MessageConsumer consumer2; + + /** + * Sets a test to have a queue destination and non-persistent delivery mode. + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + deliveryMode = DeliveryMode.NON_PERSISTENT; + topic = false; + super.setUp(); + consumerDestination2 = consumeSession.createTopic("FOO.BAR.HUMBUG2"); + LOG.info("Created consumer destination: " + consumerDestination2 + " of type: " + consumerDestination2.getClass()); + if (durable) { + LOG.info("Creating durable consumer"); + consumer2 = consumeSession.createDurableSubscriber((Topic) consumerDestination2, getName()); + } else { + consumer2 = consumeSession.createConsumer(consumerDestination2); + } + + } + + /** + * Returns the consumer subject. + * + * @return String - consumer subject + * @see org.apache.activemq.test.TestSupport#getConsumerSubject() + */ + protected String getConsumerSubject() { + return "FOO.BAR.HUMBUG"; + } + + /** + * Returns the producer subject. + * + * @return String - producer subject + * @see org.apache.activemq.test.TestSupport#getProducerSubject() + */ + protected String getProducerSubject() { + return "queue://FOO.BAR.HUMBUG,topic://FOO.BAR.HUMBUG2"; + } + + /** + * Test if all the messages sent are being received. + * + * @throws Exception + */ + public void testSendReceive() throws Exception { + super.testSendReceive(); + messages.clear(); + consumer2.setMessageListener(this); + assertMessagesAreReceived(); + LOG.info("" + data.length + " messages(s) received, closing down connections"); + } +}
