http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java new file mode 100644 index 0000000..d5e5835 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.lang.reflect.Field; +import java.net.URI; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.TransportConnection; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.transport.vm.VMTransportFactory; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author <a href="http://www.christianposta.com/blog">Christian Posta</a> + */ +public class AMQ4222Test extends TestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ4222Test.class); + + protected BrokerService brokerService; + + @Override + protected void setUp() throws Exception { + super.setUp(); + topic = false; + brokerService = createBroker(); + } + + @Override + protected void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false")); + broker.start(); + broker.waitUntilStarted(); + return broker; + } + + @Override + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("vm://localhost"); + } + + public void testTempQueueCleanedUp() throws Exception { + + Destination requestQueue = createDestination(); + + Connection producerConnection = createConnection(); + producerConnection.start(); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = producerSession.createProducer(requestQueue); + Destination replyTo = producerSession.createTemporaryQueue(); + MessageConsumer producerSessionConsumer = producerSession.createConsumer(replyTo); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + // let's listen to the response on the queue + producerSessionConsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + if (message instanceof TextMessage) { + LOG.info("You got a message: " + ((TextMessage) message).getText()); + countDownLatch.countDown(); + } + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + + producer.send(createRequest(producerSession, replyTo)); + + Connection consumerConnection = createConnection(); + consumerConnection.start(); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(requestQueue); + final MessageProducer consumerProducer = consumerSession.createProducer(null); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + consumerProducer.send(message.getJMSReplyTo(), message); + } catch (JMSException e) { + LOG.error("error sending a response on the temp queue"); + e.printStackTrace(); + } + } + }); + + countDownLatch.await(2, TimeUnit.SECONDS); + + // producer has not gone away yet... + org.apache.activemq.broker.region.Destination tempDestination = getDestination(brokerService, + (ActiveMQDestination) replyTo); + assertNotNull(tempDestination); + + // clean up + producer.close(); + producerSession.close(); + producerConnection.close(); + + // producer has gone away.. so the temp queue should not exist anymore... let's see.. + // producer has not gone away yet... + tempDestination = getDestination(brokerService, + (ActiveMQDestination) replyTo); + assertNull(tempDestination); + + // now.. the connection on the broker side for the dude producing to the temp dest will + // still have a reference in his producerBrokerExchange.. this will keep the destination + // from being reclaimed by GC if there is never another send that producer makes... + // let's see if that reference is there... + final TransportConnector connector = VMTransportFactory.CONNECTORS.get("localhost"); + assertNotNull(connector); + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return connector.getConnections().size() == 1; + } + })); + TransportConnection transportConnection = connector.getConnections().get(0); + Map<ProducerId, ProducerBrokerExchange> exchanges = getProducerExchangeFromConn(transportConnection); + assertEquals(1, exchanges.size()); + ProducerBrokerExchange exchange = exchanges.values().iterator().next(); + + // so this is the reason for the test... we don't want these exchanges to hold a reference + // to a region destination.. after a send is completed, the destination is not used anymore on + // a producer exchange + assertNull(exchange.getRegionDestination()); + assertNull(exchange.getRegion()); + + } + + @SuppressWarnings("unchecked") + private Map<ProducerId, ProducerBrokerExchange> getProducerExchangeFromConn(TransportConnection transportConnection) throws NoSuchFieldException, IllegalAccessException { + Field f = TransportConnection.class.getDeclaredField("producerExchanges"); + f.setAccessible(true); + Map<ProducerId, ProducerBrokerExchange> producerExchanges = + (Map<ProducerId, ProducerBrokerExchange>)f.get(transportConnection); + return producerExchanges; + } + + private Message createRequest(Session session, Destination replyTo) throws JMSException { + Message message = session.createTextMessage("Payload"); + message.setJMSReplyTo(replyTo); + return message; + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java new file mode 100644 index 0000000..8e6a96f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.ConsumerThread; +import org.apache.activemq.util.ProducerThread; +import org.apache.activemq.util.Wait; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class AMQ4323Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ4323Test.class); + + BrokerService broker = null; + File kahaDbDir = null; + private final Destination destination = new ActiveMQQueue("q"); + final String payload = new String(new byte[1024]); + + protected void startBroker(boolean delete) throws Exception { + broker = new BrokerService(); + + //Start with a clean directory + kahaDbDir = new File(broker.getBrokerDataDirectory(), "KahaDB"); + deleteDir(kahaDbDir); + + broker.setSchedulerSupport(false); + broker.setDeleteAllMessagesOnStartup(delete); + broker.setPersistent(true); + broker.setUseJmx(false); + broker.addConnector("tcp://localhost:0"); + + PolicyMap map = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setUseCache(false); + map.setDefaultEntry(entry); + broker.setDestinationPolicy(map); + + configurePersistence(broker, delete); + + broker.start(); + LOG.info("Starting broker.."); + } + + protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception { + KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); + + // ensure there are a bunch of data files but multiple entries in each + adapter.setJournalMaxFileLength(1024 * 20); + + // speed up the test case, checkpoint an cleanup early and often + adapter.setCheckpointInterval(500); + adapter.setCleanupInterval(500); + + if (!deleteAllOnStart) { + adapter.setForceRecoverIndex(true); + } + + } + + private boolean deleteDir(File dir) { + if (dir.isDirectory()) { + String[] children = dir.list(); + for (int i = 0; i < children.length; i++) { + boolean success = deleteDir(new File(dir, children[i])); + if (!success) { + return false; + } + } + } + + return dir.delete(); + } + + private int getFileCount(File dir){ + if (dir.isDirectory()) { + String[] children = dir.list(); + return children.length; + } + + return 0; + } + + @Test + public void testCleanupOfFiles() throws Exception { + final int messageCount = 500; + startBroker(true); + int fileCount = getFileCount(kahaDbDir); + assertEquals(4, fileCount); + + Connection connection = new ActiveMQConnectionFactory( + broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); + connection.start(); + Session producerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session consumerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ProducerThread producer = new ProducerThread(producerSess, destination) { + @Override + protected Message createMessage(int i) throws Exception { + return sess.createTextMessage(payload + "::" + i); + } + }; + producer.setMessageCount(messageCount); + ConsumerThread consumer = new ConsumerThread(consumerSess, destination); + consumer.setBreakOnNull(false); + consumer.setMessageCount(messageCount); + + producer.start(); + producer.join(); + + consumer.start(); + consumer.join(); + + assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived()); + + // verify cleanup + assertTrue("gc worked", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + int fileCount = getFileCount(kahaDbDir); + LOG.info("current filecount:" + fileCount); + return 4 == fileCount; + } + })); + + broker.stop(); + broker.waitUntilStopped(); + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java new file mode 100644 index 0000000..aa3ac2c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ4356Test { + + private static BrokerService brokerService; + private static String BROKER_ADDRESS = "tcp://localhost:0"; + + private String connectionUri; + private ActiveMQConnectionFactory cf; + private final String CLIENT_ID = "AMQ4356Test"; + private final String SUBSCRIPTION_NAME = "AMQ4356Test"; + + private void createBroker(boolean deleteOnStart) throws Exception { + brokerService = new BrokerService(); + brokerService.setUseJmx(true); + brokerService.setDeleteAllMessagesOnStartup(deleteOnStart); + connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); + brokerService.start(); + brokerService.waitUntilStarted(); + + } + + private void startBroker() throws Exception { + createBroker(true); + } + + private void restartBroker() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + createBroker(false); + } + + @Before + public void setUp() throws Exception { + startBroker(); + cf = new ActiveMQConnectionFactory(connectionUri); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + @Test + public void testVirtualTopicUnsubDurable() throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID(CLIENT_ID); + connection.start(); + + // create consumer 'cluster' + ActiveMQQueue queue1 = new ActiveMQQueue(getVirtualTopicConsumerName()); + ActiveMQQueue queue2 = new ActiveMQQueue(getVirtualTopicConsumerName()); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer c1 = session.createConsumer(queue1); + c1.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + } + }); + MessageConsumer c2 = session.createConsumer(queue2); + c2.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + } + }); + + ActiveMQTopic topic = new ActiveMQTopic(getVirtualTopicName()); + MessageConsumer c3 = session.createDurableSubscriber(topic, SUBSCRIPTION_NAME); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + c3.close(); + + // create topic producer + MessageProducer producer = session.createProducer(topic); + assertNotNull(producer); + + int total = 10; + for (int i = 0; i < total; i++) { + producer.send(session.createTextMessage("message: " + i)); + } + + assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + session.unsubscribe(SUBSCRIPTION_NAME); + connection.close(); + + assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + restartBroker(); + + assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + } + + protected String getVirtualTopicName() { + return "VirtualTopic.TEST"; + } + + protected String getVirtualTopicConsumerName() { + return "Consumer.A.VirtualTopic.TEST"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java new file mode 100644 index 0000000..0e1c465 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; +import org.apache.activemq.command.ActiveMQDestination; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4361Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ4361Test.class); + + private BrokerService service; + private String brokerUrlString; + + @Before + public void setUp() throws Exception { + service = new BrokerService(); + service.setDeleteAllMessagesOnStartup(true); + service.setUseJmx(false); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setMemoryLimit(1); + policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy()); + policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); + policy.setProducerFlowControl(true); + policyMap.setDefaultEntry(policy); + service.setDestinationPolicy(policyMap); + + service.setAdvisorySupport(false); + brokerUrlString = service.addConnector("tcp://localhost:0").getPublishableConnectString(); + service.start(); + service.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (service != null) { + service.stop(); + service.waitUntilStopped(); + } + } + + @Test + public void testCloseWhenHunk() throws Exception { + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrlString); + connectionFactory.setProducerWindowSize(1024); + + // TINY QUEUE is flow controlled after 1024 bytes + final ActiveMQDestination destination = + ActiveMQDestination.createDestination("queue://TINY_QUEUE", (byte) 0xff); + + Connection connection = connectionFactory.createConnection(); + connection.start(); + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(destination); + producer.setTimeToLive(0); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + final AtomicReference<Exception> publishException = new AtomicReference<Exception>(null); + final AtomicReference<Exception> closeException = new AtomicReference<Exception>(null); + final AtomicLong lastLoop = new AtomicLong(System.currentTimeMillis() + 100); + + Thread pubThread = new Thread(new Runnable() { + @Override + public void run() { + try { + byte[] data = new byte[1000]; + new Random(0xdeadbeef).nextBytes(data); + for (int i = 0; i < 10000; i++) { + lastLoop.set(System.currentTimeMillis()); + ObjectMessage objMsg = session.createObjectMessage(); + objMsg.setObject(data); + producer.send(destination, objMsg); + } + } catch (Exception e) { + publishException.set(e); + } + } + }, "PublishingThread"); + pubThread.start(); + + // wait for publisher to deadlock + while (System.currentTimeMillis() - lastLoop.get() < 2000) { + Thread.sleep(100); + } + LOG.info("Publisher deadlock detected."); + + Thread closeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + LOG.info("Attempting close.."); + producer.close(); + } catch (Exception e) { + closeException.set(e); + } + } + }, "ClosingThread"); + closeThread.start(); + + try { + closeThread.join(30000); + } catch (InterruptedException ie) { + assertFalse("Closing thread didn't complete in 10 seconds", true); + } + + try { + pubThread.join(30000); + } catch (InterruptedException ie) { + assertFalse("Publishing thread didn't complete in 10 seconds", true); + } + + assertNull(closeException.get()); + assertNotNull(publishException.get()); + } +} + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java new file mode 100644 index 0000000..13e1cfb --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4368Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ4368Test.class); + + private BrokerService broker; + private ActiveMQConnectionFactory connectionFactory; + private final Destination destination = new ActiveMQQueue("large_message_queue"); + private String connectionUri; + + @Before + public void setUp() throws Exception { + broker = createBroker(); + connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); + broker.start(); + connectionFactory = new ActiveMQConnectionFactory(connectionUri); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + + PolicyEntry policy = new PolicyEntry(); + policy.setUseCache(false); + broker.setDestinationPolicy(new PolicyMap()); + broker.getDestinationPolicy().setDefaultEntry(policy); + + KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter(); + kahadb.setCheckForCorruptJournalFiles(true); + kahadb.setCleanupInterval(1000); + + kahadb.deleteAllMessages(); + broker.setPersistenceAdapter(kahadb); + broker.getSystemUsage().getMemoryUsage().setLimit(1024*1024*100); + broker.setUseJmx(false); + + return broker; + } + + abstract class Client implements Runnable { + private final String name; + final AtomicBoolean done = new AtomicBoolean(); + CountDownLatch startedLatch; + CountDownLatch doneLatch = new CountDownLatch(1); + Connection connection; + Session session; + final AtomicLong size = new AtomicLong(); + + Client(String name, CountDownLatch startedLatch) { + this.name = name; + this.startedLatch = startedLatch; + } + + public void start() { + LOG.info("Starting: " + name); + new Thread(this, name).start(); + } + + public void stopAsync() { + done.set(true); + } + + public void stop() throws InterruptedException { + stopAsync(); + if (!doneLatch.await(20, TimeUnit.MILLISECONDS)) { + try { + connection.close(); + doneLatch.await(); + } catch (Exception e) { + } + } + } + + @Override + public void run() { + try { + connection = createConnection(); + connection.start(); + try { + session = createSession(); + work(); + } finally { + try { + connection.close(); + } catch (JMSException ignore) { + } + LOG.info("Stopped: " + name); + } + } catch (Exception e) { + e.printStackTrace(); + done.set(true); + } finally { + doneLatch.countDown(); + } + } + + protected Session createSession() throws JMSException { + return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + protected Connection createConnection() throws JMSException { + return connectionFactory.createConnection(); + } + + abstract protected void work() throws Exception; + } + + class ProducingClient extends Client { + + ProducingClient(String name, CountDownLatch startedLatch) { + super(name, startedLatch); + } + + private String createMessage() { + StringBuffer stringBuffer = new StringBuffer(); + for (long i = 0; i < 1000000; i++) { + stringBuffer.append("1234567890"); + } + return stringBuffer.toString(); + } + + @Override + protected void work() throws Exception { + String data = createMessage(); + MessageProducer producer = session.createProducer(destination); + startedLatch.countDown(); + while (!done.get()) { + producer.send(session.createTextMessage(data)); + long i = size.incrementAndGet(); + if ((i % 1000) == 0) { + LOG.info("produced " + i + "."); + } + } + } + } + + class ConsumingClient extends Client { + public ConsumingClient(String name, CountDownLatch startedLatch) { + super(name, startedLatch); + } + + @Override + protected void work() throws Exception { + MessageConsumer consumer = session.createConsumer(destination); + startedLatch.countDown(); + while (!done.get()) { + Message msg = consumer.receive(100); + if (msg != null) { + size.incrementAndGet(); + } + } + } + } + + @Test + public void testENTMQ220() throws Exception { + LOG.info("Start test."); + CountDownLatch producer1Started = new CountDownLatch(1); + CountDownLatch producer2Started = new CountDownLatch(1); + CountDownLatch listener1Started = new CountDownLatch(1); + + final ProducingClient producer1 = new ProducingClient("1", producer1Started); + final ProducingClient producer2 = new ProducingClient("2", producer2Started); + final ConsumingClient listener1 = new ConsumingClient("subscriber-1", listener1Started); + final AtomicLong lastSize = new AtomicLong(); + + try { + + producer1.start(); + producer2.start(); + listener1.start(); + + producer1Started.await(15, TimeUnit.SECONDS); + producer2Started.await(15, TimeUnit.SECONDS); + listener1Started.await(15, TimeUnit.SECONDS); + + lastSize.set(listener1.size.get()); + for (int i = 0; i < 10; i++) { + Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return listener1.size.get() > lastSize.get(); + } + }); + long size = listener1.size.get(); + LOG.info("Listener 1: consumed: " + (size - lastSize.get())); + assertTrue("No messages received on iteration: " + i, size > lastSize.get()); + lastSize.set(size); + } + } finally { + LOG.info("Stopping clients"); + producer1.stop(); + producer2.stop(); + listener1.stop(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java new file mode 100644 index 0000000..73d6d69 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4407Test { + + static final Logger LOG = LoggerFactory.getLogger(AMQ4407Test.class); + private final static int maxFileLength = 1024*1024*32; + + private final static String PREFIX_DESTINATION_NAME = "queue"; + + private final static String DESTINATION_NAME = PREFIX_DESTINATION_NAME + ".test"; + private final static String DESTINATION_NAME_2 = PREFIX_DESTINATION_NAME + "2.test"; + private final static String DESTINATION_NAME_3 = PREFIX_DESTINATION_NAME + "3.test"; + + BrokerService broker; + + @Before + public void setUp() throws Exception { + prepareBrokerWithMultiStore(true); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + + protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception { + BrokerService broker = new BrokerService(); + broker.setUseJmx(true); + broker.setBrokerName("localhost"); + broker.setPersistenceAdapter(kaha); + return broker; + } + + @Test + public void testRestartAfterQueueDelete() throws Exception { + + // Ensure we have an Admin View. + assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return (broker.getAdminView()) != null; + } + })); + + + LOG.info("Adding destinations: {}, {}, {}", new Object[] {DESTINATION_NAME, DESTINATION_NAME_3, DESTINATION_NAME_3}); + sendMessage(DESTINATION_NAME, "test 1"); + sendMessage(DESTINATION_NAME_2, "test 1"); + sendMessage(DESTINATION_NAME_3, "test 1"); + + assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME))); + assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2))); + assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_3))); + + + LOG.info("Removing destination: {}", DESTINATION_NAME_2); + broker.getAdminView().removeQueue(DESTINATION_NAME_2); + + LOG.info("Recreating destination: {}", DESTINATION_NAME_2); + sendMessage(DESTINATION_NAME_2, "test 1"); + + Destination destination2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)); + assertNotNull(destination2); + assertEquals(1, destination2.getMessageStore().getMessageCount()); + } + + protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException { + KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); + kaha.setJournalMaxFileLength(maxFileLength); + kaha.setCleanupInterval(5000); + if (delete) { + kaha.deleteAllMessages(); + } + return kaha; + } + + public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception { + + MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter(); + if (deleteAllMessages) { + multiKahaDBPersistenceAdapter.deleteAllMessages(); + } + ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>(); + + adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME, deleteAllMessages)); + adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME + "2", deleteAllMessages)); + adapters.add(createFilteredKahaDBByDestinationPrefix(null, deleteAllMessages)); + + multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); + broker = createBroker(multiKahaDBPersistenceAdapter); + } + + /** + * Create filtered KahaDB adapter by destination prefix. + * + * @param destinationPrefix + * @param deleteAllMessages + * @return + * @throws IOException + */ + private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix, boolean deleteAllMessages) + throws IOException { + FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter(); + template.setPersistenceAdapter(createStore(deleteAllMessages)); + if (destinationPrefix != null) { + template.setQueue(destinationPrefix + ".>"); + } + return template; + } + + + /** + * Send message to particular destination. + * + * @param destinationName + * @param message + * @throws JMSException + */ + private void sendMessage(String destinationName, String message) throws JMSException { + ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost"); + f.setAlwaysSyncSend(true); + Connection c = f.createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = s.createProducer(new ActiveMQQueue(destinationName)); + producer.send(s.createTextMessage(message)); + producer.close(); + s.close(); + c.stop(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java new file mode 100644 index 0000000..ff973e9 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class AMQ4413Test { + + static final Logger LOG = LoggerFactory.getLogger(AMQ4413Test.class); + + final String brokerUrl = "tcp://localhost:0"; + private String connectionUri; + final int numMsgsTriggeringReconnection = 2; + final int numMsgs = 30; + final int numTests = 75; + final ExecutorService threadPool = Executors.newCachedThreadPool(); + + @Test + public void testDurableSubMessageLoss() throws Exception{ + // start embedded broker + BrokerService brokerService = new BrokerService(); + connectionUri = brokerService.addConnector(brokerUrl).getPublishableConnectString(); + brokerService.setPersistent(false); + brokerService.setUseJmx(false); + brokerService.setKeepDurableSubsActive(true); + brokerService.setAdvisorySupport(false); + brokerService.start(); + LOG.info("##### broker started"); + + // repeat test 50 times + try { + for (int i = 0; i < numTests; ++i) { + LOG.info("##### test " + i + " started"); + test(); + } + + LOG.info("##### tests are done"); + } catch (Exception e) { + e.printStackTrace(); + LOG.info("##### tests failed!"); + } finally { + threadPool.shutdown(); + brokerService.stop(); + LOG.info("##### broker stopped"); + } + } + + private void test() throws Exception { + + final String topicName = "topic-" + UUID.randomUUID(); + final String clientId = "client-" + UUID.randomUUID(); + final String subName = "sub-" + UUID.randomUUID(); + + // create (and only create) subscription first + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + factory.setWatchTopicAdvisories(false); + Connection connection = factory.createConnection(); + connection.setClientID(clientId); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(topicName); + TopicSubscriber durableSubscriptionCreator = session.createDurableSubscriber(topic, subName); + + connection.stop(); + durableSubscriptionCreator.close(); + session.close(); + connection.close(); + + // publisher task + Callable<Boolean> publisher = new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + Connection connection = null; + + try { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + factory.setWatchTopicAdvisories(false); + connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(topicName); + + MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + producer.setPriority(Message.DEFAULT_PRIORITY); + producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE); + + for (int seq = 1; seq <= numMsgs; ++seq) { + TextMessage msg = session.createTextMessage(String.valueOf(seq)); + producer.send(msg); + LOG.info("pub sent msg: " + seq); + Thread.sleep(1L); + } + + LOG.info("pub is done"); + } finally { + if (connection != null) { + try { + connection.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } + return Boolean.TRUE; + } + }; + + // subscriber task + Callable<Boolean> durableSubscriber = new Callable<Boolean>() { + ActiveMQConnectionFactory factory; + Connection connection; + Session session; + Topic topic; + TopicSubscriber consumer; + + @Override + public Boolean call() throws Exception { + factory = new ActiveMQConnectionFactory(connectionUri); + factory.setWatchTopicAdvisories(false); + + try { + connect(); + + for (int seqExpected = 1; seqExpected <= numMsgs; ++seqExpected) { + TextMessage msg = (TextMessage) consumer.receive(3000L); + if (msg == null) { + LOG.info("expected: " + seqExpected + ", actual: timed out", msg); + return Boolean.FALSE; + } + + int seq = Integer.parseInt(msg.getText()); + + LOG.info("sub received msg: " + seq); + + if (seqExpected != seq) { + LOG.info("expected: " + seqExpected + ", actual: " + seq); + return Boolean.FALSE; + } + + if (seq % numMsgsTriggeringReconnection == 0) { + close(false); + connect(); + + LOG.info("sub reconnected"); + } + } + + LOG.info("sub is done"); + } finally { + try { + close(true); + } catch (Exception e) { + e.printStackTrace(); + } + } + + return Boolean.TRUE; + } + + void connect() throws Exception { + connection = factory.createConnection(); + connection.setClientID(clientId); + connection.start(); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = session.createTopic(topicName); + consumer = session.createDurableSubscriber(topic, subName); + } + + void close(boolean unsubscribe) throws Exception { + if (connection != null) { + connection.stop(); + } + + if (consumer != null) { + consumer.close(); + } + + if (session != null) { + if (unsubscribe) { + session.unsubscribe(subName); + } + session.close(); + } + + if (connection != null) { + connection.close(); + } + } + }; + + ArrayList<Future<Boolean>> results = new ArrayList<Future<Boolean>>(); + results.add(threadPool.submit(publisher)); + results.add(threadPool.submit(durableSubscriber)); + + for (Future<Boolean> result : results) { + assertTrue(result.get()); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java new file mode 100644 index 0000000..2f7ae69 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.jms.support.JmsUtils; + +public class AMQ4469Test { + + private static final int maxConnections = 100; + + private final ExecutorService executor = Executors.newCachedThreadPool(); + private String connectionUri; + private BrokerService service; + private TransportConnector connector; + + @Before + public void setUp() throws Exception { + service = new BrokerService(); + service.setPersistent(false); + service.setUseJmx(false); + connector = service.addConnector("tcp://0.0.0.0:0?maximumConnections="+maxConnections); + connectionUri = connector.getPublishableConnectString(); + service.start(); + service.waitUntilStarted(); + } + + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(connectionUri); + } + + @Test + public void testMaxConnectionControl() throws Exception { + final ConnectionFactory cf = createConnectionFactory(); + final CountDownLatch startupLatch = new CountDownLatch(1); + for(int i = 0; i < maxConnections + 20; i++) { + executor.submit(new Runnable() { + @Override + public void run() { + Connection conn = null; + try { + startupLatch.await(); + conn = cf.createConnection(); + conn.start(); + } catch (Exception e) { + e.printStackTrace(); + JmsUtils.closeConnection(conn); + } + } + }); + } + + TcpTransportServer transportServer = (TcpTransportServer)connector.getServer(); + // ensure the max connections is in effect + assertEquals(maxConnections, transportServer.getMaximumConnections()); + // No connections at first + assertEquals(0, connector.getConnections().size()); + // Release the latch to set up connections in parallel + startupLatch.countDown(); + TimeUnit.SECONDS.sleep(5); + + final TransportConnector connector = this.connector; + + // Expect the max connections is created + assertTrue("Expected: " + maxConnections + " found: " + connector.getConnections().size(), + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return connector.getConnections().size() == maxConnections; + } + }) + ); + } + + @After + public void tearDown() throws Exception { + executor.shutdown(); + + service.stop(); + service.waitUntilStopped(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java new file mode 100644 index 0000000..42c391c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class AMQ4472Test { + private static final Logger LOG = LoggerFactory.getLogger(AMQ4472Test.class); + + @Test + public void testLostMessage() { + Connection connection = null; + try { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.useJmx=false"); + connection = connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination test_data_destination = session.createQueue("test"+System.currentTimeMillis()); + + MessageConsumer consumer = session.createConsumer(test_data_destination); + LOG.info("Consumer 1 connected"); + + MessageProducer producer = session.createProducer(test_data_destination); + producer.send(session.createTextMessage("Message 1")); + + // committing the session prior to the close + session.commit(); + + // starting a new transaction + producer.send(session.createTextMessage("Message 2")); + + // in a new transaction, with prefetch>0, the message + // 1 will be pending till second commit + LOG.info("Closing consumer 1..."); + consumer.close(); + + // create a consumer + consumer = session.createConsumer(test_data_destination); + LOG.info("Consumer 2 connected"); + + // retrieve message previously committed to tmp queue + Message message = consumer.receive(10000); + if (message != null) { + LOG.info("Got message 1:", message); + assertEquals("expected message", "Message 1", ((TextMessage) message).getText()); + session.commit(); + } else { + LOG.error("Expected message but it never arrived"); + } + assertNotNull(message); + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + connection.close(); + } catch (JMSException e) { + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java new file mode 100644 index 0000000..3d11cef --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertFalse; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; +import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.util.TimeStampingBrokerPlugin; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ4475Test { + + private final Log LOG = LogFactory.getLog(AMQ4475Test.class); + + private final int NUM_MSGS = 1000; + private final int MAX_THREADS = 20; + + private BrokerService broker; + private String connectionUri; + + private final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS); + private final ActiveMQQueue original = new ActiveMQQueue("jms/AQueue"); + private final ActiveMQQueue rerouted = new ActiveMQQueue("jms/AQueue_proxy"); + + @Before + public void setUp() throws Exception { + TimeStampingBrokerPlugin tsbp = new TimeStampingBrokerPlugin(); + tsbp.setZeroExpirationOverride(432000000); + tsbp.setTtlCeiling(432000000); + tsbp.setFutureOnly(true); + + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(true); + broker.setPlugins(new BrokerPlugin[] {tsbp}); + connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); + + // Configure Dead Letter Strategy + DeadLetterStrategy strategy = new IndividualDeadLetterStrategy(); + strategy.setProcessExpired(true); + ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true); + ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ."); + strategy.setProcessNonPersistent(true); + + // Add policy and individual DLQ strategy + PolicyEntry policy = new PolicyEntry(); + policy.setTimeBeforeDispatchStarts(3000); + policy.setDeadLetterStrategy(strategy); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void after() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void testIndividualDeadLetterAndTimeStampPlugin() { + LOG.info("Starting test .."); + + long startTime = System.nanoTime(); + + // Produce to network + List<Future<ProducerTask>> tasks = new ArrayList<Future<ProducerTask>>(); + + for (int index = 0; index < 1; index++) { + ProducerTask p = new ProducerTask(connectionUri, original, NUM_MSGS); + Future<ProducerTask> future = executor.submit(p, p); + tasks.add(future); + } + + ForwardingConsumerThread f1 = new ForwardingConsumerThread(original, rerouted, NUM_MSGS); + f1.start(); + ConsumerThread c1 = new ConsumerThread(connectionUri, rerouted, NUM_MSGS); + c1.start(); + + LOG.info("Waiting on consumers and producers to exit"); + + try { + for (Future<ProducerTask> future : tasks) { + ProducerTask e = future.get(); + LOG.info("[Completed] " + e.dest.getPhysicalName()); + } + executor.shutdown(); + LOG.info("Producing threads complete, waiting on ACKs"); + f1.join(TimeUnit.MINUTES.toMillis(2)); + c1.join(TimeUnit.MINUTES.toMillis(2)); + } catch (ExecutionException e) { + LOG.warn("Caught unexpected exception: {}", e); + throw new RuntimeException(e); + } catch (InterruptedException ie) { + LOG.warn("Caught unexpected exception: {}", ie); + throw new RuntimeException(ie); + } + + assertFalse(f1.isFailed()); + assertFalse(c1.isFailed()); + + long estimatedTime = System.nanoTime() - startTime; + + LOG.info("Testcase duration (seconds): " + estimatedTime / 1000000000.0); + LOG.info("Consumers and producers exited, all msgs received as expected"); + } + + public class ProducerTask implements Runnable { + private final String uri; + private final ActiveMQQueue dest; + private final int count; + + public ProducerTask(String uri, ActiveMQQueue dest, int count) { + this.uri = uri; + this.dest = dest; + this.count = count; + } + + @Override + public void run() { + + Connection connection = null; + try { + String destName = ""; + + try { + destName = dest.getQueueName(); + } catch (JMSException e) { + LOG.warn("Caught unexpected exception: {}", e); + } + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uri); + + connection = connectionFactory.createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(dest); + connection.start(); + + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + String msg = "Test Message"; + + for (int i = 0; i < count; i++) { + producer.send(session.createTextMessage(msg + dest.getQueueName() + " " + i)); + } + + LOG.info("[" + destName + "] Sent " + count + " msgs"); + } catch (Exception e) { + LOG.warn("Caught unexpected exception: {}", e); + } finally { + try { + connection.close(); + } catch (Throwable e) { + LOG.warn("Caught unexpected exception: {}", e); + } + } + } + } + + public class ForwardingConsumerThread extends Thread { + + private final ActiveMQQueue original; + private final ActiveMQQueue forward; + private int blockSize = 0; + private final int PARALLEL = 1; + private boolean failed; + + public ForwardingConsumerThread(ActiveMQQueue original, ActiveMQQueue forward, int total) { + this.original = original; + this.forward = forward; + this.blockSize = total / PARALLEL; + } + + public boolean isFailed() { + return failed; + } + + @Override + public void run() { + Connection connection = null; + try { + + for (int index = 0; index < PARALLEL; index++) { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + + connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(original); + MessageProducer producer = session.createProducer(forward); + connection.start(); + int count = 0; + + while (count < blockSize) { + + Message msg1 = consumer.receive(10000); + if (msg1 != null) { + if (msg1 instanceof ActiveMQTextMessage) { + if (count % 100 == 0) { + LOG.info("Consuming -> " + ((ActiveMQTextMessage) msg1).getDestination() + " count=" + count); + } + + producer.send(msg1); + + count++; + } else { + LOG.info("Skipping unknown msg type " + msg1); + } + } else { + break; + } + } + + LOG.info("[" + original.getQueueName() + "] completed segment (" + index + " of " + blockSize + ")"); + connection.close(); + } + } catch (Exception e) { + LOG.warn("Caught unexpected exception: {}", e); + } finally { + LOG.debug(getName() + ": is stopping"); + try { + connection.close(); + } catch (Throwable e) { + } + } + } + } + + public class ConsumerThread extends Thread { + + private final String uri; + private final ActiveMQQueue dest; + private int blockSize = 0; + private final int PARALLEL = 1; + private boolean failed; + + public ConsumerThread(String uri, ActiveMQQueue dest, int total) { + this.uri = uri; + this.dest = dest; + this.blockSize = total / PARALLEL; + } + + public boolean isFailed() { + return failed; + } + + @Override + public void run() { + Connection connection = null; + try { + + for (int index = 0; index < PARALLEL; index++) { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); + + connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(dest); + connection.start(); + int count = 0; + + while (count < blockSize) { + + Object msg1 = consumer.receive(10000); + if (msg1 != null) { + if (msg1 instanceof ActiveMQTextMessage) { + if (count % 100 == 0) { + LOG.info("Consuming -> " + ((ActiveMQTextMessage) msg1).getDestination() + " count=" + count); + } + + count++; + } else { + LOG.info("Skipping unknown msg type " + msg1); + } + } else { + failed = true; + break; + } + } + + LOG.info("[" + dest.getQueueName() + "] completed segment (" + index + " of " + blockSize + ")"); + connection.close(); + } + } catch (Exception e) { + LOG.warn("Caught unexpected exception: {}", e); + } finally { + LOG.debug(getName() + ": is stopping"); + try { + connection.close(); + } catch (Throwable e) { + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java new file mode 100644 index 0000000..5a48540 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.leveldb.LevelDBStore; + +public class AMQ4485LowLimitLevelDBTest extends AMQ4485LowLimitTest { + + public AMQ4485LowLimitLevelDBTest() { + super(); + numBrokers = 2; + } + + protected BrokerService createBroker(int brokerid, boolean addToNetwork) throws Exception { + BrokerService broker = super.createBroker(brokerid, addToNetwork); + + LevelDBStore levelDBStore = new LevelDBStore(); + levelDBStore.setDirectory(new File(broker.getBrokerDataDirectory(),"levelDB")); + broker.setPersistenceAdapter(levelDBStore); + return broker; + } +} \ No newline at end of file
