http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java new file mode 100644 index 0000000..449d5e5 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3841Test { + + static final Logger LOG = LoggerFactory.getLogger(AMQ3841Test.class); + private final static int maxFileLength = 1024*1024*32; + private final static String destinationName = "TEST.QUEUE"; + BrokerService broker; + + @Before + public void setUp() throws Exception { + prepareBrokerWithMultiStore(true); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + + protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception { + BrokerService broker = new BrokerService(); + broker.setUseJmx(true); + broker.setBrokerName("localhost"); + broker.setPersistenceAdapter(kaha); + return broker; + } + + @Test + public void testRestartAfterQueueDelete() throws Exception { + + // Ensure we have an Admin View. + assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return (broker.getAdminView()) != null; + } + })); + + + broker.getAdminView().addQueue(destinationName); + + assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName))); + + broker.getAdminView().removeQueue(destinationName); + + broker.stop(); + broker.waitUntilStopped(); + + prepareBrokerWithMultiStore(false); + broker.start(); + + broker.getAdminView().addQueue(destinationName); + assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName))); + + } + + protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException { + KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); + kaha.setJournalMaxFileLength(maxFileLength); + kaha.setCleanupInterval(5000); + if (delete) { + kaha.deleteAllMessages(); + } + return kaha; + } + + public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception { + + MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter(); + if (deleteAllMessages) { + multiKahaDBPersistenceAdapter.deleteAllMessages(); + } + ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>(); + + FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter(); + template.setPersistenceAdapter(createStore(deleteAllMessages)); + template.setPerDestination(true); + adapters.add(template); + + multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); + broker = createBroker(multiKahaDBPersistenceAdapter); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3879Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3879Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3879Test.java new file mode 100644 index 0000000..f2bdc48 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3879Test.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3879Test { + + static final Logger LOG = LoggerFactory.getLogger(AMQ3841Test.class); + private BrokerService broker; + + private ActiveMQConnectionFactory factory; + + @Before + public void setUp() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + + factory = new ActiveMQConnectionFactory("vm://localhost"); + factory.setAlwaysSyncSend(true); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setBrokerName("localhost"); + broker.addConnector("vm://localhost"); + return broker; + } + + @Test + public void testConnectionDletesWrongTempDests() throws Exception { + + final Connection connection1 = factory.createConnection(); + final Connection connection2 = factory.createConnection(); + + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination tempDestAdvisory = AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC; + + MessageConsumer advisoryConsumer = session1.createConsumer(tempDestAdvisory); + connection1.start(); + + Destination tempQueue = session2.createTemporaryQueue(); + MessageProducer tempProducer = session2.createProducer(tempQueue); + + assertNotNull(advisoryConsumer.receive(5000)); + + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + try { + Thread.sleep(20); + connection1.close(); + } catch (Exception e) { + } + } + }); + + t.start(); + + for (int i = 0; i < 256; ++i) { + Message msg = session2.createTextMessage("Temp Data"); + tempProducer.send(msg); + Thread.sleep(2); + } + + t.join(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java new file mode 100644 index 0000000..c633103 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ResourceAllocationException; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3903Test { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3903Test.class); + + private static final String bindAddress = "tcp://0.0.0.0:0"; + private BrokerService broker; + private ActiveMQConnectionFactory cf; + + private static final int MESSAGE_COUNT = 100; + + @Before + public void setUp() throws Exception { + broker = this.createBroker(); + String address = broker.getTransportConnectors().get(0).getPublishableConnectString(); + broker.start(); + broker.waitUntilStarted(); + + cf = new ActiveMQConnectionFactory(address); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void testAdvisoryForFastGenericProducer() throws Exception { + doTestAdvisoryForFastProducer(true); + } + + @Test + public void testAdvisoryForFastDedicatedProducer() throws Exception { + doTestAdvisoryForFastProducer(false); + } + + public void doTestAdvisoryForFastProducer(boolean genericProducer) throws Exception { + + Connection connection = cf.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final TemporaryQueue queue = session.createTemporaryQueue(); + + final Topic advisoryTopic = AdvisorySupport.getFastProducerAdvisoryTopic((ActiveMQDestination) queue); + final Topic advisoryWhenFullTopic = AdvisorySupport.getFullAdvisoryTopic((ActiveMQDestination) queue); + + MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic); + MessageConsumer advisoryWhenFullConsumer = session.createConsumer(advisoryWhenFullTopic); + + MessageProducer producer = session.createProducer(genericProducer ? null : queue); + + try { + // send lots of messages to the tempQueue + for (int i = 0; i < MESSAGE_COUNT; i++) { + BytesMessage m = session.createBytesMessage(); + m.writeBytes(new byte[1024]); + if (genericProducer) { + producer.send(queue, m, DeliveryMode.PERSISTENT, 4, 0); + } else { + producer.send(m); + } + } + } catch (ResourceAllocationException expectedOnLimitReachedAfterFastAdvisory) {} + + // check one advisory message has produced on the advisoryTopic + Message advCmsg = advisoryConsumer.receive(4000); + assertNotNull(advCmsg); + + advCmsg = advisoryWhenFullConsumer.receive(4000); + assertNotNull(advCmsg); + + connection.close(); + LOG.debug("Connection closed, destinations should now become inactive."); + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(false); + answer.setUseJmx(false); + + PolicyEntry entry = new PolicyEntry(); + entry.setAdvisoryForFastProducers(true); + entry.setAdvisoryWhenFull(true); + entry.setMemoryLimit(10000); + PolicyMap map = new PolicyMap(); + map.setDefaultEntry(entry); + + answer.setDestinationPolicy(map); + answer.addConnector(bindAddress); + + answer.getSystemUsage().setSendFailIfNoSpace(true); + + return answer; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3932Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3932Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3932Test.java new file mode 100644 index 0000000..78017a6 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3932Test.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3932Test { + static final Logger LOG = LoggerFactory.getLogger(AMQ3932Test.class); + private Connection connection; + private BrokerService broker; + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + TransportConnector tcpConnector = broker.addConnector("tcp://localhost:0"); + broker.start(); + + ConnectionFactory factory = new ActiveMQConnectionFactory( + "failover:("+ tcpConnector.getPublishableConnectString() +")?jms.prefetchPolicy.queuePrefetch=0"); + connection = factory.createConnection(); + connection.start(); + } + + @After + public void tearDown() throws Exception { + connection.close(); + + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + } + + @Test + public void testPlainReceiveBlocks() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createQueue(getClass().getName())); + + broker.stop(); + broker.waitUntilStopped(); + broker = null; + + final CountDownLatch done = new CountDownLatch(1); + final CountDownLatch started = new CountDownLatch(1); + ExecutorService executor = Executors.newSingleThreadExecutor(); + + executor.execute(new Runnable() { + public void run() { + try { + started.countDown(); + LOG.info("Entering into a Sync receive call"); + consumer.receive(); + } catch (JMSException e) { + } + done.countDown(); + } + }); + + assertTrue(started.await(10, TimeUnit.SECONDS)); + assertFalse(done.await(20, TimeUnit.SECONDS)); + } + + @Test + public void testHungReceiveNoWait() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createQueue(getClass().getName())); + + broker.stop(); + broker.waitUntilStopped(); + broker = null; + + final CountDownLatch done = new CountDownLatch(1); + final CountDownLatch started = new CountDownLatch(1); + ExecutorService executor = Executors.newSingleThreadExecutor(); + + executor.execute(new Runnable() { + public void run() { + try { + started.countDown(); + LOG.info("Entering into a Sync receiveNoWait call"); + consumer.receiveNoWait(); + } catch (JMSException e) { + } + done.countDown(); + } + }); + + assertTrue(started.await(10, TimeUnit.SECONDS)); + assertTrue(done.await(20, TimeUnit.SECONDS)); + } + + @Test + public void testHungReceiveTimed() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createQueue(getClass().getName())); + + broker.stop(); + broker.waitUntilStopped(); + broker = null; + + final CountDownLatch done = new CountDownLatch(1); + final CountDownLatch started = new CountDownLatch(1); + ExecutorService executor = Executors.newSingleThreadExecutor(); + + executor.execute(new Runnable() { + public void run() { + try { + started.countDown(); + LOG.info("Entering into a timed Sync receive call"); + consumer.receive(10); + } catch (JMSException e) { + } + done.countDown(); + } + }); + + assertTrue(started.await(10, TimeUnit.SECONDS)); + assertTrue(done.await(20, TimeUnit.SECONDS)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java new file mode 100644 index 0000000..80a2fa3 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class AMQ3934Test { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3934Test.class); + private static BrokerService brokerService; + private static String TEST_QUEUE = "testQueue"; + private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE); + private static String BROKER_ADDRESS = "tcp://localhost:0"; + + private ActiveMQConnectionFactory connectionFactory; + private String connectionUri; + private String messageID; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(true); + connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); + brokerService.start(); + brokerService.waitUntilStarted(); + + connectionFactory = new ActiveMQConnectionFactory(connectionUri); + sendMessage(); + } + + public void sendMessage() throws Exception { + final Connection conn = connectionFactory.createConnection(); + try { + conn.start(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Destination queue = session.createQueue(TEST_QUEUE); + final Message toSend = session.createMessage(); + final MessageProducer producer = session.createProducer(queue); + producer.send(queue, toSend); + } finally { + conn.close(); + } + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + @Test + public void getMessage() throws Exception { + final QueueViewMBean queueView = getProxyToQueueViewMBean(); + final CompositeData messages[] = queueView.browse(); + messageID = (String) messages[0].get("JMSMessageID"); + assertNotNull(messageID); + assertNotNull(queueView.getMessage(messageID)); + LOG.debug("Attempting to remove message ID: " + messageID); + queueView.removeMessage(messageID); + assertNull(queueView.getMessage(messageID)); + } + + private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, NullPointerException, + JMSException { + final ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName()); + final QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance( + queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3961Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3961Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3961Test.java new file mode 100644 index 0000000..8afcaa9 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3961Test.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import javax.jms.ConnectionConsumer; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.ServerSession; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.TopicConnection; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ3961Test { + + private static BrokerService brokerService; + private static String BROKER_ADDRESS = "tcp://localhost:0"; + + private ActiveMQConnectionFactory connectionFactory; + private String connectionUri; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(true); + brokerService.setDeleteAllMessagesOnStartup(true); + connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); + brokerService.start(); + brokerService.waitUntilStarted(); + + connectionFactory = new ActiveMQConnectionFactory(connectionUri); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + public class TestServerSessionPool implements ServerSessionPool { + + private final TopicConnection connection; + + public TestServerSessionPool(final TopicConnection connection) { + this.connection = connection; + } + + @Override + public ServerSession getServerSession() throws JMSException { + final TopicSession topicSession = connection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); + return new TestServerSession(topicSession); + } + } + + public class TestServerSession implements ServerSession, MessageListener { + + private final TopicSession session; + + public TestServerSession(final TopicSession session) throws JMSException { + this.session = session; + session.setMessageListener(this); + } + + @Override + public Session getSession() throws JMSException { + return session; + } + + @Override + public void start() throws JMSException { + session.run(); + } + + @Override + public void onMessage(final Message message) { + synchronized (processedSessions) { + processedSessions.add(this); + } + } + } + + public static final int MESSAGE_COUNT = 16; + private final List<TestServerSession> processedSessions = new LinkedList<TestServerSession>(); + private final List<TestServerSession> committedSessions = new LinkedList<TestServerSession>(); + + @Test + public void testPrefetchInDurableSubscription() throws Exception { + final ActiveMQTopic topic = new ActiveMQTopic("TestTopic"); + + final TopicConnection initialSubConnection = connectionFactory.createTopicConnection(); + initialSubConnection.setClientID("TestClient"); + initialSubConnection.start(); + final TopicSession initialSubSession = initialSubConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); + final TopicSubscriber initialSubscriber = initialSubSession.createDurableSubscriber(topic, "TestSubscriber"); + + initialSubscriber.close(); + initialSubSession.close(); + initialSubConnection.close(); + + final TopicConnection publisherConnection = connectionFactory.createTopicConnection(); + publisherConnection.start(); + final TopicSession publisherSession = publisherConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + final TopicPublisher publisher = publisherSession.createPublisher(topic); + for (int i = 1; i <= MESSAGE_COUNT; i++) { + final Message msg = publisherSession.createTextMessage("Message #" + i); + publisher.publish(msg); + } + publisher.close(); + publisherSession.close(); + publisherConnection.close(); + + final TopicConnection connection = connectionFactory.createTopicConnection(); + connection.setClientID("TestClient"); + connection.start(); + final TestServerSessionPool pool = new TestServerSessionPool(connection); + final ConnectionConsumer connectionConsumer = connection.createDurableConnectionConsumer(topic, "TestSubscriber", null, pool, 1); + while (true) { + int lastMsgCount = 0; + int msgCount = 0; + do { + lastMsgCount = msgCount; + Thread.sleep(200L); + synchronized (processedSessions) { + msgCount = processedSessions.size(); + } + } while (lastMsgCount < msgCount); + + if (lastMsgCount == 0) { + break; + } + + final LinkedList<TestServerSession> collected; + synchronized (processedSessions) { + collected = new LinkedList<TestServerSession>(processedSessions); + processedSessions.clear(); + } + + final Iterator<TestServerSession> sessions = collected.iterator(); + while (sessions.hasNext()) { + final TestServerSession session = sessions.next(); + committedSessions.add(session); + session.getSession().commit(); + session.getSession().close(); + } + } + + connectionConsumer.close(); + final TopicSession finalSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + finalSession.unsubscribe("TestSubscriber"); + finalSession.close(); + connection.close(); + assertEquals(MESSAGE_COUNT, committedSessions.size()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3992Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3992Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3992Test.java new file mode 100644 index 0000000..c359c88 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3992Test.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerView; +import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3992Test { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3992Test.class); + private static BrokerService brokerService; + private static String BROKER_ADDRESS = "tcp://localhost:0"; + + private String connectionUri; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(true); + brokerService.setDeleteAllMessagesOnStartup(true); + connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + @Test + public void testDurableConsumerEnqueueCountWithZeroPrefetch() throws Exception { + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); + connectionFactory.getPrefetchPolicy().setAll(0); + + Connection connection = connectionFactory.createConnection(); + connection.setClientID(getClass().getName()); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic("DurableTopic"); + + MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub"); + + BrokerView view = brokerService.getAdminView(); + view.getDurableTopicSubscribers(); + + ObjectName subName = view.getDurableTopicSubscribers()[0]; + + DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) + brokerService.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true); + + assertEquals(0, sub.getEnqueueCounter()); + + LOG.info("Enqueue counter for sub before pull requests: " + sub.getEnqueueCounter()); + + // Trigger some pull Timeouts. + consumer.receive(500); + consumer.receive(500); + consumer.receive(500); + consumer.receive(500); + consumer.receive(500); + + // Let them all timeout. + Thread.sleep(600); + + LOG.info("Enqueue counter for sub after pull requests: " + sub.getEnqueueCounter()); + assertEquals(0, sub.getEnqueueCounter()); + + consumer.close(); + session.close(); + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java new file mode 100644 index 0000000..a567455 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DurableTopicSubscription; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.TopicRegion; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.SubscriptionKey; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ4062Test { + + private BrokerService service; + private PolicyEntry policy; + private ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions; + + private static final int PREFETCH_SIZE_5=5; + private String connectionUri; + + @Before + public void startBroker() throws IOException, Exception { + service=new BrokerService(); + service.setPersistent(true); + service.setDeleteAllMessagesOnStartup(true); + service.setUseJmx(false); + + KahaDBPersistenceAdapter pa=new KahaDBPersistenceAdapter(); + File dataFile=new File("createData"); + pa.setDirectory(dataFile); + pa.setJournalMaxFileLength(1024*1024*32); + + service.setPersistenceAdapter(pa); + + policy = new PolicyEntry(); + policy.setTopic(">"); + policy.setDurableTopicPrefetch(PREFETCH_SIZE_5); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + service.setDestinationPolicy(pMap); + + service.addConnector("tcp://localhost:0"); + + service.start(); + service.waitUntilStarted(); + + connectionUri = service.getTransportConnectors().get(0).getPublishableConnectString(); + } + + public void restartBroker() throws IOException, Exception { + service=new BrokerService(); + service.setPersistent(true); + service.setUseJmx(false); + service.setKeepDurableSubsActive(false); + + KahaDBPersistenceAdapter pa=new KahaDBPersistenceAdapter(); + File dataFile=new File("createData"); + pa.setDirectory(dataFile); + pa.setJournalMaxFileLength(1024*1024*32); + + service.setPersistenceAdapter(pa); + + policy = new PolicyEntry(); + policy.setTopic(">"); + policy.setDurableTopicPrefetch(PREFETCH_SIZE_5); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + service.setDestinationPolicy(pMap); + service.addConnector("tcp://localhost:0"); + service.start(); + service.waitUntilStarted(); + + connectionUri = service.getTransportConnectors().get(0).getPublishableConnectString(); + } + + @After + public void stopBroker() throws Exception { + service.stop(); + service.waitUntilStopped(); + service = null; + } + + @Test + public void testDirableSubPrefetchRecovered() throws Exception{ + + PrefetchConsumer consumer=new PrefetchConsumer(true, connectionUri); + consumer.recieve(); + durableSubscriptions=getDurableSubscriptions(); + ConsumerInfo info=getConsumerInfo(durableSubscriptions); + + //check if the prefetchSize equals to the size we set in the PolicyEntry + assertEquals(PREFETCH_SIZE_5, info.getPrefetchSize()); + + consumer.a.countDown(); + Producer p=new Producer(connectionUri); + p.send(); + p = null; + + service.stop(); + service.waitUntilStopped(); + durableSubscriptions=null; + + consumer = null; + stopBroker(); + + restartBroker(); + + getDurableSubscriptions(); + info=null; + info = getConsumerInfo(durableSubscriptions); + + //check if the prefetchSize equals to 0 after persistent storage recovered + //assertEquals(0, info.getPrefetchSize()); + + consumer=new PrefetchConsumer(false, connectionUri); + consumer.recieve(); + consumer.a.countDown(); + + info=null; + info = getConsumerInfo(durableSubscriptions); + + //check if the prefetchSize is the default size for durable consumer and the PolicyEntry + //we set earlier take no effect + //assertEquals(100, info.getPrefetchSize()); + //info.getPrefetchSize() is 100,it should be 5,because I set the PolicyEntry as follows, + //policy.setDurableTopicPrefetch(PREFETCH_SIZE_5); + assertEquals(5, info.getPrefetchSize()); + } + + @SuppressWarnings("unchecked") + private ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions() throws NoSuchFieldException, IllegalAccessException { + if(durableSubscriptions!=null) return durableSubscriptions; + RegionBroker regionBroker=(RegionBroker)service.getRegionBroker(); + TopicRegion region=(TopicRegion)regionBroker.getTopicRegion(); + Field field=TopicRegion.class.getDeclaredField("durableSubscriptions"); + field.setAccessible(true); + durableSubscriptions=(ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>)field.get(region); + return durableSubscriptions; + } + + private ConsumerInfo getConsumerInfo(ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions) { + ConsumerInfo info=null; + for(Iterator<DurableTopicSubscription> it=durableSubscriptions.values().iterator();it.hasNext();){ + Subscription sub = it.next(); + info=sub.getConsumerInfo(); + if(info.getSubscriptionName().equals(PrefetchConsumer.SUBSCRIPTION_NAME)){ + return info; + } + } + return null; + } + + public class PrefetchConsumer implements MessageListener{ + public static final String SUBSCRIPTION_NAME = "A_NAME_ABC_DEF"; + private final String user = ActiveMQConnection.DEFAULT_USER; + private final String password = ActiveMQConnection.DEFAULT_PASSWORD; + private final String uri; + private boolean transacted; + ActiveMQConnection connection; + Session session; + MessageConsumer consumer; + private boolean needAck=false; + CountDownLatch a=new CountDownLatch(1); + + public PrefetchConsumer(boolean needAck, String uri){ + this.needAck=needAck; + this.uri = uri; + } + + public void recieve() throws Exception{ + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, uri); + connection = (ActiveMQConnection)connectionFactory.createConnection(); + connection.setClientID("3"); + connection.start(); + + session = connection.createSession(transacted, Session.CLIENT_ACKNOWLEDGE); + Destination destination = session.createTopic("topic2"); + consumer = session.createDurableSubscriber((Topic)destination,SUBSCRIPTION_NAME); + consumer.setMessageListener(this); + } + + @Override + public void onMessage(Message message) { + try { + a.await(); + } catch (InterruptedException e1) { + } + if(needAck){ + try { + message.acknowledge(); + consumer.close(); + session.close(); + connection.close(); + } catch (JMSException e) { + } + } + } + } + + public class Producer { + + protected final String user = ActiveMQConnection.DEFAULT_USER; + + private final String password = ActiveMQConnection.DEFAULT_PASSWORD; + private final String uri; + private boolean transacted; + + public Producer(String uri) { + this.uri = uri; + } + + public void send() throws Exception{ + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, uri); + ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection(); + connection.start(); + + ActiveMQSession session = (ActiveMQSession)connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic("topic2"); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for(int i=0;i<100;i++){ + TextMessage om=session.createTextMessage("hello from producer"); + producer.send(om); + } + producer.close(); + session.close(); + connection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java new file mode 100644 index 0000000..389f1f6 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java @@ -0,0 +1,508 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.ArrayList; +import java.util.Date; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.ObjectName; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class AMQ4083Test { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3992Test.class); + private static BrokerService brokerService; + private static String BROKER_ADDRESS = "tcp://localhost:0"; + private static String TEST_QUEUE = "testQueue"; + private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE); + + private final int messageCount = 100; + + private String connectionUri; + private String[] data; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(true); + brokerService.setDeleteAllMessagesOnStartup(true); + connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); + brokerService.start(); + brokerService.waitUntilStarted(); + + data = new String[messageCount]; + + for (int i = 0; i < messageCount; i++) { + data[i] = "Text for message: " + i + " at " + new Date(); + } + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + @Test + public void testExpiredMsgsBeforeNonExpired() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + connection.getPrefetchPolicy().setQueuePrefetch(400); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + connection.start(); + + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + + // send a batch that expires in a short time. + for (int i = 0; i < 100; i++) { + producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000); + } + + // and send one that doesn't expire to we can ack it. + producer.send(session.createTextMessage()); + + // wait long enough so the first batch times out. + TimeUnit.SECONDS.sleep(5); + + final QueueViewMBean queueView = getProxyToQueueViewMBean(); + + assertEquals(101, queueView.getInFlightCount()); + + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + try { + message.acknowledge(); + } catch (JMSException e) { + } + } + }); + + TimeUnit.SECONDS.sleep(5); + + assertEquals(0, queueView.getInFlightCount()); + + for (int i = 0; i < 200; i++) { + producer.send(session.createTextMessage()); + } + + assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { + + public boolean isSatisified() throws Exception { + return queueView.getInFlightCount() == 0; + } + })); + + LOG.info("Dequeued Count: {}", queueView.getDequeueCount()); + LOG.info("Dispatch Count: {}", queueView.getDispatchCount()); + LOG.info("Enqueue Count: {}", queueView.getEnqueueCount()); + LOG.info("Expired Count: {}", queueView.getExpiredCount()); + LOG.info("InFlight Count: {}", queueView.getInFlightCount()); + } + + @Test + public void testExpiredMsgsBeforeNonExpiredWithTX() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + connection.getPrefetchPolicy().setQueuePrefetch(400); + + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + connection.start(); + + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + + // send a batch that expires in a short time. + for (int i = 0; i < 100; i++) { + producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000); + } + + // and send one that doesn't expire to we can ack it. + producer.send(session.createTextMessage()); + session.commit(); + + // wait long enough so the first batch times out. + TimeUnit.SECONDS.sleep(5); + + final QueueViewMBean queueView = getProxyToQueueViewMBean(); + + assertEquals(101, queueView.getInFlightCount()); + + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + try { + session.commit(); + } catch (JMSException e) { + } + } + }); + + TimeUnit.SECONDS.sleep(5); + + assertEquals(0, queueView.getInFlightCount()); + + for (int i = 0; i < 200; i++) { + producer.send(session.createTextMessage()); + } + session.commit(); + + assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getInFlightCount() == 0; + } + })); + + LOG.info("Dequeued Count: {}", queueView.getDequeueCount()); + LOG.info("Dispatch Count: {}", queueView.getDispatchCount()); + LOG.info("Enqueue Count: {}", queueView.getEnqueueCount()); + LOG.info("Expired Count: {}", queueView.getExpiredCount()); + LOG.info("InFlight Count: {}", queueView.getInFlightCount()); + } + + @Test + public void testExpiredMsgsInterleavedWithNonExpired() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + connection.getPrefetchPolicy().setQueuePrefetch(400); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + connection.start(); + + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + + // send a batch that expires in a short time. + for (int i = 0; i < 200; i++) { + + if ((i % 2) == 0) { + producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000); + } else { + producer.send(session.createTextMessage()); + } + } + + // wait long enough so the first batch times out. + TimeUnit.SECONDS.sleep(5); + + final QueueViewMBean queueView = getProxyToQueueViewMBean(); + + assertEquals(200, queueView.getInFlightCount()); + + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + try { + LOG.debug("Acking message: {}", message); + message.acknowledge(); + } catch (JMSException e) { + } + } + }); + + TimeUnit.SECONDS.sleep(5); + + assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getInFlightCount() == 0; + } + })); + + for (int i = 0; i < 200; i++) { + producer.send(session.createTextMessage()); + } + + assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getInFlightCount() == 0; + } + })); + + LOG.info("Dequeued Count: {}", queueView.getDequeueCount()); + LOG.info("Dispatch Count: {}", queueView.getDispatchCount()); + LOG.info("Enqueue Count: {}", queueView.getEnqueueCount()); + LOG.info("Expired Count: {}", queueView.getExpiredCount()); + LOG.info("InFlight Count: {}", queueView.getInFlightCount()); + } + + @Test + public void testExpiredMsgsInterleavedWithNonExpiredCumulativeAck() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + connection.getPrefetchPolicy().setQueuePrefetch(400); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + connection.start(); + + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + + // send a batch that expires in a short time. + for (int i = 0; i < 200; i++) { + + if ((i % 2) == 0) { + producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000); + } else { + producer.send(session.createTextMessage()); + } + } + + // wait long enough so the first batch times out. + TimeUnit.SECONDS.sleep(5); + + final QueueViewMBean queueView = getProxyToQueueViewMBean(); + + assertEquals(200, queueView.getInFlightCount()); + + final AtomicInteger msgCount = new AtomicInteger(); + + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + try { + if (msgCount.incrementAndGet() == 100) { + LOG.debug("Acking message: {}", message); + message.acknowledge(); + } + } catch (JMSException e) { + } + } + }); + + TimeUnit.SECONDS.sleep(5); + + assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getInFlightCount() == 0; + } + })); + + // Now we just ack each and see if our counters come out right in the end. + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + try { + LOG.debug("Acking message: {}", message); + message.acknowledge(); + } catch (JMSException e) { + } + } + }); + + for (int i = 0; i < 200; i++) { + producer.send(session.createTextMessage()); + } + + assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getInFlightCount() == 0; + } + })); + + LOG.info("Dequeued Count: {}", queueView.getDequeueCount()); + LOG.info("Dispatch Count: {}", queueView.getDispatchCount()); + LOG.info("Enqueue Count: {}", queueView.getEnqueueCount()); + LOG.info("Expired Count: {}", queueView.getExpiredCount()); + LOG.info("InFlight Count: {}", queueView.getInFlightCount()); + } + + @Test + public void testExpiredBatchBetweenNonExpiredMessages() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + connection.getPrefetchPolicy().setQueuePrefetch(400); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + connection.start(); + + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + + // Send one that doesn't expire so we can ack it. + producer.send(session.createTextMessage()); + + // send a batch that expires in a short time. + for (int i = 0; i < 100; i++) { + producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000); + } + + // and send one that doesn't expire so we can ack it. + producer.send(session.createTextMessage()); + + // wait long enough so the first batch times out. + TimeUnit.SECONDS.sleep(5); + + final QueueViewMBean queueView = getProxyToQueueViewMBean(); + + assertEquals(102, queueView.getInFlightCount()); + + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + try { + message.acknowledge(); + } catch (JMSException e) { + } + } + }); + + TimeUnit.SECONDS.sleep(5); + + assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getInFlightCount() == 0; + } + })); + + for (int i = 0; i < 200; i++) { + producer.send(session.createTextMessage()); + } + + assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getInFlightCount() == 0; + } + })); + + LOG.info("Dequeued Count: {}", queueView.getDequeueCount()); + LOG.info("Dispatch Count: {}", queueView.getDispatchCount()); + LOG.info("Enqueue Count: {}", queueView.getEnqueueCount()); + LOG.info("Expired Count: {}", queueView.getExpiredCount()); + LOG.info("InFlight Count: {}", queueView.getInFlightCount()); + } + + @Test + public void testConsumeExpiredQueueAndDlq() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + Connection connection = factory.createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producerNormal = session.createProducer(queue); + MessageProducer producerExpire = session.createProducer(queue); + producerExpire.setTimeToLive(500); + + MessageConsumer dlqConsumer = session.createConsumer(session.createQueue("ActiveMQ.DLQ")); + connection.start(); + + Connection consumerConnection = factory.createConnection(); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setAll(10); + ((ActiveMQConnection)consumerConnection).setPrefetchPolicy(prefetchPolicy); + Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + String msgBody = new String(new byte[20*1024]); + for (int i = 0; i < data.length; i++) { + Message message = session.createTextMessage(msgBody); + producerExpire.send(queue, message); + } + + for (int i = 0; i < data.length; i++) { + Message message = session.createTextMessage(msgBody); + producerNormal.send(queue, message); + } + + ArrayList<Message> messages = new ArrayList<Message>(); + Message received; + while ((received = consumer.receive(1000)) != null) { + messages.add(received); + if (messages.size() == 1) { + TimeUnit.SECONDS.sleep(1); + } + received.acknowledge(); + }; + + assertEquals("got messages", messageCount + 1, messages.size()); + + ArrayList<Message> dlqMessages = new ArrayList<Message>(); + while ((received = dlqConsumer.receive(1000)) != null) { + dlqMessages.add(received); + }; + + assertEquals("got dlq messages", data.length - 1, dlqMessages.size()); + + final QueueViewMBean queueView = getProxyToQueueViewMBean(); + + LOG.info("Dequeued Count: {}", queueView.getDequeueCount()); + LOG.info("Dispatch Count: {}", queueView.getDispatchCount()); + LOG.info("Enqueue Count: {}", queueView.getEnqueueCount()); + LOG.info("Expired Count: {}", queueView.getExpiredCount()); + LOG.info("InFlight Count: {}", queueView.getInFlightCount()); + } + + private QueueViewMBean getProxyToQueueViewMBean() throws Exception { + final ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName()); + final QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance( + queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java new file mode 100644 index 0000000..e8c1cf0 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4092Test extends TestCase { + + private static final Logger log = LoggerFactory.getLogger(AMQ4092Test.class); + + static final String QUEUE_NAME = "TEST"; + + // increase limits to expedite failure + static final int NUM_TO_SEND_PER_PRODUCER = 1000; // 10000 + static final int NUM_PRODUCERS = 5; // 40 + + static final ActiveMQQueue[] DESTINATIONS = new ActiveMQQueue[]{ + new ActiveMQQueue("A"), + new ActiveMQQueue("B") + // A/B seems to be sufficient for concurrentStoreAndDispatch=true + }; + + static final boolean debug = false; + + private BrokerService brokerService; + + private ActiveMQQueue destination; + private HashMap<Thread, Throwable> exceptions = new HashMap<Thread, Throwable>(); + private ExceptionListener exceptionListener = new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + exceptions.put(Thread.currentThread(), exception); + } + }; + + @Override + protected void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + ((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false); + brokerService.addConnector("tcp://localhost:0"); + brokerService.start(); + destination = new ActiveMQQueue(); + destination.setCompositeDestinations(DESTINATIONS); + Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + exceptions.put(t, e); + } + }); + } + + @Override + protected void tearDown() throws Exception { + // Stop any running threads. + brokerService.stop(); + } + + + public void testConcurrentGroups() throws Exception { + ExecutorService executorService = Executors.newCachedThreadPool(); + executorService.submit(new TestConsumer()); + for (int i=0; i<NUM_PRODUCERS; i++) { + executorService.submit(new TestProducer()); + } + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.MINUTES); + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + } + + + class TestProducer implements Runnable { + + public void produceMessages() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( + brokerService.getTransportConnectors().get(0).getConnectUri().toString() + ); + connectionFactory.setExceptionListener(exceptionListener); + connectionFactory.setUseAsyncSend(true); + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + String name = new String(new byte[2*1024]); + for (int i = 1; i <= NUM_TO_SEND_PER_PRODUCER; i++) { + + TextMessage message = session.createTextMessage(name + "_" + i); + for (int j=0; j<100; j++) { + message.setStringProperty("Prop" + j, ""+j); + } + message.setStringProperty("JMSXGroupID", Thread.currentThread().getName()+i); + message.setIntProperty("JMSXGroupSeq", 1); + producer.send(message); + } + + producer.close(); + session.close(); + connection.close(); + } + + @Override + public void run() { + try { + produceMessages(); + } catch (Exception e) { + e.printStackTrace(); + exceptions.put(Thread.currentThread(), e); + } + } + } + + class TestConsumer implements Runnable { + + private CountDownLatch finishLatch = new CountDownLatch(1); + + + + public void consume() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( + brokerService.getTransportConnectors().get(0).getConnectUri().toString() + ); + + connectionFactory.setExceptionListener(exceptionListener); + final int totalMessageCount = NUM_TO_SEND_PER_PRODUCER * DESTINATIONS.length * NUM_PRODUCERS; + final AtomicInteger counter = new AtomicInteger(); + final MessageListener listener = new MessageListener() { + public void onMessage(Message message) { + + if (debug) { + try { + log.info(((TextMessage) message).getText()); + } catch (JMSException e) { + e.printStackTrace(); + } + } + + boolean first = false; + try { + first = message.getBooleanProperty("JMSXGroupFirstForConsumer"); + } catch (JMSException e) { + e.printStackTrace(); + exceptions.put(Thread.currentThread(), e); + } + assertTrue("Always is first message", first); + if (counter.incrementAndGet() == totalMessageCount) { + log.info("Got all:" + counter.get()); + finishLatch.countDown(); + + } + } + }; + + int consumerCount = DESTINATIONS.length * 100; + Connection[] connections = new Connection[consumerCount]; + + Session[] sessions = new Session[consumerCount]; + MessageConsumer[] consumers = new MessageConsumer[consumerCount]; + + for (int i = 0; i < consumerCount; i++) { + connections[i] = connectionFactory.createConnection(); + connections[i].start(); + + sessions[i] = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE); + + consumers[i] = sessions[i].createConsumer(DESTINATIONS[i%DESTINATIONS.length], null); + consumers[i].setMessageListener(listener); + } + + + log.info("received " + counter.get() + " messages"); + + assertTrue("got all messages in time", finishLatch.await(4, TimeUnit.MINUTES)); + + log.info("received " + counter.get() + " messages"); + + for (MessageConsumer consumer : consumers) { + consumer.close(); + } + + for (Session session : sessions) { + session.close(); + } + + for (Connection connection : connections) { + connection.close(); + } + } + + @Override + public void run() { + try { + consume(); + } catch (Exception e) { + e.printStackTrace(); + exceptions.put(Thread.currentThread(), e); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java new file mode 100644 index 0000000..ffd69f1 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.Assert; + +public class AMQ4116Test extends EmbeddedBrokerTestSupport { + + private final String tcpAddr = "tcp://localhost:0"; + private String connectionUri; + + /** + * In this test, a message is produced and consumed from the test queue. + * Memory usage on the test queue should be reset to 0. The memory that was + * consumed is then sent to a second queue. Memory usage on the original + * test queue should remain 0, but actually increased when the second + * enqueue occurs. + */ + public void testVMTransport() throws Exception { + runTest(connectionFactory); + } + + /** + * This is an analog to the previous test, but occurs over TCP and passes. + */ + public void testTCPTransport() throws Exception { + runTest(new ActiveMQConnectionFactory(connectionUri)); + } + + private void runTest(ConnectionFactory connFactory) throws Exception { + // Verify that test queue is empty and not using any memory. + Destination physicalDestination = broker.getDestination(destination); + Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage()); + + // Enqueue a single message and verify that the test queue is using + // memory. + Connection conn = connFactory.createConnection(); + conn.start(); + Session session = conn.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(destination); + + producer.send(new ActiveMQMessage()); + + // Commit, which ensures message is in queue and memory usage updated. + session.commit(); + Assert.assertTrue(physicalDestination.getMemoryUsage().getUsage() > 0); + + // Consume the message and verify that the test queue is no longer using + // any memory. + MessageConsumer consumer = session.createConsumer(destination); + Message received = consumer.receive(); + Assert.assertNotNull(received); + + // Commit, which ensures message is removed from queue and memory usage + // updated. + session.commit(); + Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage()); + + // Resend the message to a different queue and verify that the original + // test queue is still not using any memory. + ActiveMQQueue secondDestination = new ActiveMQQueue(AMQ4116Test.class + ".second"); + MessageProducer secondPproducer = session.createProducer(secondDestination); + + secondPproducer.send(received); + + // Commit, which ensures message is in queue and memory usage updated. + // NOTE: This assertion fails due to bug. + session.commit(); + Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage()); + + conn.stop(); + } + + /** + * Create an embedded broker that has both TCP and VM connectors. + */ + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + connectionUri = broker.addConnector(tcpAddr).getPublishableConnectString(); + return broker; + } +}
