http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java new file mode 100644 index 0000000..21c389f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java @@ -0,0 +1,466 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.QueueConnection; +import javax.jms.QueueReceiver; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.management.ObjectName; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.TimeUtils; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport { + static final String payload = new String(new byte[10 * 1024]); + private static final Logger LOG = LoggerFactory.getLogger(AMQ4485LowLimitTest.class); + final int portBase = 61600; + int numBrokers = 8; + final int numProducers = 30; + final int numMessages = 1000; + final int consumerSleepTime = 40; + StringBuilder brokersUrl = new StringBuilder(); + HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap<ActiveMQQueue, AtomicInteger>(); + private ArrayList<Throwable> exceptions = new ArrayList<Throwable>(); + + protected void buildUrlList() throws Exception { + for (int i = 0; i < numBrokers; i++) { + brokersUrl.append("tcp://localhost:" + (portBase + i)); + if (i != numBrokers - 1) { + brokersUrl.append(','); + } + } + } + + protected BrokerService createBroker(int brokerid) throws Exception { + return createBroker(brokerid, true); + } + + protected BrokerService createBroker(int brokerid, boolean addToNetwork) throws Exception { + + BrokerService broker = new BrokerService(); + broker.setPersistent(true); + broker.setDeleteAllMessagesOnStartup(true); + broker.getManagementContext().setCreateConnector(false); + + + broker.setUseJmx(true); + broker.setBrokerName("B" + brokerid); + broker.addConnector(new URI("tcp://localhost:" + (portBase + brokerid))); + + if (addToNetwork) { + addNetworkConnector(broker); + } + broker.setSchedulePeriodForDestinationPurge(0); + broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024l); + + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setExpireMessagesPeriod(0); + policyEntry.setQueuePrefetch(1000); + policyEntry.setMemoryLimit(2 * 1024 * 1024l); + policyEntry.setProducerFlowControl(false); + policyEntry.setEnableAudit(true); + policyEntry.setUseCache(true); + policyMap.put(new ActiveMQQueue("GW.>"), policyEntry); + + PolicyEntry inPolicyEntry = new PolicyEntry(); + inPolicyEntry.setExpireMessagesPeriod(0); + inPolicyEntry.setQueuePrefetch(1000); + inPolicyEntry.setMemoryLimit(5 * 1024 * 1024l); + inPolicyEntry.setProducerFlowControl(true); + inPolicyEntry.setEnableAudit(true); + inPolicyEntry.setUseCache(true); + policyMap.put(new ActiveMQQueue("IN"), inPolicyEntry); + + broker.setDestinationPolicy(policyMap); + + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true); + + brokers.put(broker.getBrokerName(), new BrokerItem(broker)); + return broker; + } + + private void addNetworkConnector(BrokerService broker) throws Exception { + StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString()); + networkConnectorUrl.append(')'); + + for (int i = 0; i < 2; i++) { + NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString())); + nc.setName("Bridge-" + i); + nc.setNetworkTTL(1); + nc.setDecreaseNetworkConsumerPriority(true); + nc.setDynamicOnly(true); + nc.setPrefetchSize(100); + nc.setDynamicallyIncludedDestinations( + Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("GW.*")})); + broker.addNetworkConnector(nc); + } + } + + // used to explore contention with concurrentStoreandDispatch - sync commit and task queue reversing + // order of cursor add and sequence assignment + public void x_testInterleavedSend() throws Exception { + + BrokerService b = createBroker(0, false); + b.start(); + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (portBase + 0)); + connectionFactory.setWatchTopicAdvisories(false); + + QueueConnection c1 = connectionFactory.createQueueConnection(); + QueueConnection c2 = connectionFactory.createQueueConnection(); + QueueConnection c3 = connectionFactory.createQueueConnection(); + + c1.start(); + c2.start(); + c3.start(); + + ActiveMQQueue dest = new ActiveMQQueue("IN"); + final Session s1 = c1.createQueueSession(true, Session.SESSION_TRANSACTED); + final TextMessage txMessage = s1.createTextMessage("TX"); + final TextMessage noTxMessage = s1.createTextMessage("NO_TX"); + + final MessageProducer txProducer = s1.createProducer(dest); + final MessageProducer nonTxProducer = c2.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(dest); + + txProducer.send(txMessage); + + ExecutorService executorService = Executors.newFixedThreadPool(2); + executorService.execute(new Runnable() { + @Override + public void run() { + try { + s1.commit(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + + executorService.execute(new Runnable() { + @Override + public void run() { + try { + nonTxProducer.send(noTxMessage); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.MINUTES); + + } + + public void testBrokers() throws Exception { + + buildUrlList(); + + for (int i = 0; i < numBrokers; i++) { + createBroker(i); + } + + startAllBrokers(); + waitForBridgeFormation(numBrokers - 1); + + verifyPeerBrokerInfos(numBrokers - 1); + + + final List<ConsumerState> consumerStates = startAllGWConsumers(numBrokers); + + startAllGWFanoutConsumers(numBrokers); + + LOG.info("Waiting for percolation of consumers.."); + TimeUnit.SECONDS.sleep(5); + + LOG.info("Produce mesages.."); + long startTime = System.currentTimeMillis(); + + // produce + produce(numMessages); + + assertTrue("Got all sent", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + for (ConsumerState tally : consumerStates) { + final int expected = numMessages * (tally.destination.isComposite() ? tally.destination.getCompositeDestinations().length : 1); + LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get()); + if (tally.accumulator.get() != expected) { + LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected); + if (tally.accumulator.get() > expected - 50) { + dumpQueueStat(null); + } + if (tally.expected.size() == 1) { + startConsumer(tally.brokerName, tally.destination); + }; + return false; + } + LOG.info("got tally on " + tally.brokerName); + } + return true; + } + }, 1000 * 60 * 1000l, 20*1000)); + + assertTrue("No exceptions:" + exceptions, exceptions.isEmpty()); + + LOG.info("done"); + long duration = System.currentTimeMillis() - startTime; + LOG.info("Duration:" + TimeUtils.printDuration(duration)); + + assertEquals("nothing in the dlq's", 0, dumpQueueStat(new ActiveMQQueue("ActiveMQ.DLQ"))); + + } + + private void startConsumer(String brokerName, ActiveMQDestination destination) throws Exception { + int id = Integer.parseInt(brokerName.substring(1)); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (portBase + id)); + connectionFactory.setWatchTopicAdvisories(false); + QueueConnection queueConnection = connectionFactory.createQueueConnection(); + queueConnection.start(); + + queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(destination); + queueConnection.close(); + } + + private long dumpQueueStat(ActiveMQDestination destination) throws Exception { + long sumTotal = 0; + Collection<BrokerItem> brokerList = brokers.values(); + for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) { + BrokerService brokerService = i.next().broker; + for (ObjectName objectName : brokerService.getAdminView().getQueues()) { + if (destination != null && objectName.toString().contains(destination.getPhysicalName())) { + QueueViewMBean qViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(objectName, QueueViewMBean.class, false); + LOG.info(brokerService.getBrokerName() + ", " + qViewMBean.getName() + ", Enqueue:" + qViewMBean.getEnqueueCount() + ", Size: " + qViewMBean.getQueueSize()); + sumTotal += qViewMBean.getQueueSize(); + } + } + } + return sumTotal; + } + + private void startAllGWFanoutConsumers(int nBrokers) throws Exception { + + StringBuffer compositeDest = new StringBuffer(); + for (int k = 0; k < nBrokers; k++) { + compositeDest.append("GW." + k); + if (k + 1 != nBrokers) { + compositeDest.append(','); + } + } + ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString()); + + for (int id = 0; id < nBrokers; id++) { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); + connectionFactory.setWatchTopicAdvisories(false); + + QueueConnection queueConnection = connectionFactory.createQueueConnection(); + queueConnection.start(); + + final QueueSession queueSession = queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED); + + final MessageProducer producer = queueSession.createProducer(compositeQ); + queueSession.createReceiver(new ActiveMQQueue("IN")).setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + producer.send(message); + queueSession.commit(); + } catch (Exception e) { + LOG.error("Failed to fanout to GW: " + message, e); + } + + } + }); + } + } + + private List<ConsumerState> startAllGWConsumers(int nBrokers) throws Exception { + List<ConsumerState> consumerStates = new LinkedList<ConsumerState>(); + for (int id = 0; id < nBrokers; id++) { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); + connectionFactory.setWatchTopicAdvisories(false); + + QueueConnection queueConnection = connectionFactory.createQueueConnection(); + queueConnection.start(); + + final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQQueue destination = new ActiveMQQueue("GW." + id); + QueueReceiver queueReceiver = queueSession.createReceiver(destination); + + final ConsumerState consumerState = new ConsumerState(); + consumerState.brokerName = ((ActiveMQConnection) queueConnection).getBrokerName(); + consumerState.receiver = queueReceiver; + consumerState.destination = destination; + for (int j = 0; j < numMessages * (consumerState.destination.isComposite() ? consumerState.destination.getCompositeDestinations().length : 1); j++) { + consumerState.expected.add(j); + } + + if (!accumulators.containsKey(destination)) { + accumulators.put(destination, new AtomicInteger(0)); + } + consumerState.accumulator = accumulators.get(destination); + + queueReceiver.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + if (consumerSleepTime > 0) { + TimeUnit.MILLISECONDS.sleep(consumerSleepTime); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + try { + consumerState.accumulator.incrementAndGet(); + try { + consumerState.expected.remove(((ActiveMQMessage) message).getProperty("NUM")); + } catch (IOException e) { + e.printStackTrace(); + } + //queueSession.commit(); + } catch (Exception e) { + LOG.error("Failed to commit slow receipt of " + message, e); + } + } + }); + + consumerStates.add(consumerState); + + } + return consumerStates; + } + + private void produce(final int numMessages) throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(numProducers); + final AtomicInteger toSend = new AtomicInteger(numMessages); + for (int i = 1; i <= numProducers; i++) { + final int id = i % numBrokers; + executorService.execute(new Runnable() { + @Override + public void run() { + try { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); + connectionFactory.setWatchTopicAdvisories(false); + QueueConnection queueConnection = connectionFactory.createQueueConnection(); + queueConnection.start(); + QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = queueSession.createProducer(null); + int val = 0; + while ((val = toSend.decrementAndGet()) >= 0) { + + int id = numMessages - val - 1; + + ActiveMQQueue compositeQ = new ActiveMQQueue("IN"); + Message textMessage = queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + id + " payload:" + payload); + textMessage.setIntProperty("NUM", id); + producer.send(compositeQ, textMessage); + } + queueConnection.close(); + + } catch (Throwable throwable) { + throwable.printStackTrace(); + exceptions.add(throwable); + } + } + }); + } + } + + private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) throws Exception { + final BrokerService broker = brokerItem.broker; + final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length); + return max == regionBroker.getPeerBrokerInfos().length; + } + }); + LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length); + List<String> missing = new ArrayList<String>(); + for (int i = 0; i < max; i++) { + missing.add("B" + i); + } + if (max != regionBroker.getPeerBrokerInfos().length) { + for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) { + LOG.info(info.getBrokerName()); + missing.remove(info.getBrokerName()); + } + LOG.info("Broker infos off.." + missing); + } + assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length); + } + + private void verifyPeerBrokerInfos(final int max) throws Exception { + Collection<BrokerItem> brokerList = brokers.values(); + for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) { + verifyPeerBrokerInfo(i.next(), max); + } + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + + class ConsumerState { + AtomicInteger accumulator; + String brokerName; + QueueReceiver receiver; + ActiveMQDestination destination; + ConcurrentLinkedQueue<Integer> expected = new ConcurrentLinkedQueue<Integer>(); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java new file mode 100644 index 0000000..c2cf53a --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Vector; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.QueueConnection; +import javax.jms.QueueReceiver; +import javax.jms.QueueSession; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.TimeUtils; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest extends JmsMultipleBrokersTestSupport { + static final String payload = new String(new byte[10 * 1024]); + private static final Logger LOG = LoggerFactory.getLogger(AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.class); + final int portBase = 61600; + final int numBrokers = 4; + final int numProducers = 10; + final int numMessages = 800; + final int consumerSleepTime = 20; + StringBuilder brokersUrl = new StringBuilder(); + HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap<ActiveMQQueue, AtomicInteger>(); + private ArrayList<Throwable> exceptions = new ArrayList<Throwable>(); + + protected void buildUrlList() throws Exception { + for (int i = 0; i < numBrokers; i++) { + brokersUrl.append("tcp://localhost:" + (portBase + i)); + if (i != numBrokers - 1) { + brokersUrl.append(','); + } + } + } + + protected BrokerService createBroker(int brokerid) throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(true); + broker.setDeleteAllMessagesOnStartup(true); + broker.getManagementContext().setCreateConnector(false); + + + broker.setUseJmx(true); + broker.setBrokerName("B" + brokerid); + broker.addConnector(new URI("tcp://localhost:" + (portBase + brokerid))); + + addNetworkConnector(broker); + broker.setSchedulePeriodForDestinationPurge(0); + broker.getSystemUsage().setSendFailIfNoSpace(true); + broker.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024); + + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setExpireMessagesPeriod(0); + policyEntry.setQueuePrefetch(1000); + policyEntry.setMemoryLimit(1024 * 1024l); + policyEntry.setOptimizedDispatch(false); + policyEntry.setProducerFlowControl(false); + policyEntry.setEnableAudit(true); + policyEntry.setUseCache(true); + policyMap.put(new ActiveMQQueue("GW.>"), policyEntry); + broker.setDestinationPolicy(policyMap); + + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(false); + + brokers.put(broker.getBrokerName(), new BrokerItem(broker)); + return broker; + } + + private void addNetworkConnector(BrokerService broker) throws Exception { + StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString()); + networkConnectorUrl.append(')'); + + for (int i = 0; i < 2; i++) { + NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString())); + nc.setName("Bridge-" + i); + nc.setNetworkTTL(1); + nc.setDecreaseNetworkConsumerPriority(true); + nc.setDynamicOnly(true); + nc.setPrefetchSize(100); + nc.setDynamicallyIncludedDestinations( + Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("GW.*")})); + broker.addNetworkConnector(nc); + } + } + + public void testBrokers() throws Exception { + + buildUrlList(); + + for (int i = 0; i < numBrokers; i++) { + createBroker(i); + } + + startAllBrokers(); + waitForBridgeFormation(numBrokers - 1); + + verifyPeerBrokerInfos(numBrokers - 1); + + + final List<ConsumerState> consumerStates = startAllGWConsumers(numBrokers); + + startAllGWFanoutConsumers(numBrokers); + + LOG.info("Waiting for percolation of consumers.."); + TimeUnit.SECONDS.sleep(5); + + LOG.info("Produce mesages.."); + long startTime = System.currentTimeMillis(); + + // produce + produce(numMessages); + + assertTrue("Got all sent", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + for (ConsumerState tally : consumerStates) { + final int expected = numMessages * (tally.destination.isComposite() ? tally.destination.getCompositeDestinations().length : 1); + LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get()); + if (tally.accumulator.get() != expected) { + LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected); + return false; + } + LOG.info("got tally on " + tally.brokerName); + } + return true; + } + }, 1000 * 60 * 1000l)); + + assertTrue("No exceptions:" + exceptions, exceptions.isEmpty()); + + LOG.info("done"); + long duration = System.currentTimeMillis() - startTime; + LOG.info("Duration:" + TimeUtils.printDuration(duration)); + } + + private void startAllGWFanoutConsumers(int nBrokers) throws Exception { + + StringBuffer compositeDest = new StringBuffer(); + for (int k = 0; k < nBrokers; k++) { + compositeDest.append("GW." + k); + if (k + 1 != nBrokers) { + compositeDest.append(','); + } + } + ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString()); + + for (int id = 0; id < nBrokers; id++) { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); + connectionFactory.setWatchTopicAdvisories(false); + + QueueConnection queueConnection = connectionFactory.createQueueConnection(); + queueConnection.start(); + + final QueueSession queueSession = queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED); + + final MessageProducer producer = queueSession.createProducer(compositeQ); + queueSession.createReceiver(new ActiveMQQueue("IN")).setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + producer.send(message); + queueSession.commit(); + } catch (Exception e) { + LOG.error("Failed to fanout to GW: " + message, e); + } + + } + }); + } + } + + private List<ConsumerState> startAllGWConsumers(int nBrokers) throws Exception { + List<ConsumerState> consumerStates = new LinkedList<ConsumerState>(); + for (int id = 0; id < nBrokers; id++) { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); + connectionFactory.setWatchTopicAdvisories(false); + + QueueConnection queueConnection = connectionFactory.createQueueConnection(); + queueConnection.start(); + + final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQQueue destination = new ActiveMQQueue("GW." + id); + QueueReceiver queueReceiver = queueSession.createReceiver(destination); + + final ConsumerState consumerState = new ConsumerState(); + consumerState.brokerName = ((ActiveMQConnection) queueConnection).getBrokerName(); + consumerState.receiver = queueReceiver; + consumerState.destination = destination; + for (int j = 0; j < numMessages * (consumerState.destination.isComposite() ? consumerState.destination.getCompositeDestinations().length : 1); j++) { + consumerState.expected.add(j); + } + + if (!accumulators.containsKey(destination)) { + accumulators.put(destination, new AtomicInteger(0)); + } + consumerState.accumulator = accumulators.get(destination); + + queueReceiver.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + if (consumerSleepTime > 0) { + TimeUnit.MILLISECONDS.sleep(consumerSleepTime); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + try { + consumerState.accumulator.incrementAndGet(); + try { + consumerState.expected.remove(((ActiveMQMessage) message).getProperty("NUM")); + } catch (IOException e) { + e.printStackTrace(); + } + } catch (Exception e) { + LOG.error("Failed to commit slow receipt of " + message, e); + } + } + }); + + consumerStates.add(consumerState); + + } + return consumerStates; + } + + private void produce(int numMessages) throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(numProducers); + final AtomicInteger toSend = new AtomicInteger(numMessages); + for (int i = 1; i <= numProducers; i++) { + final int id = i % numBrokers; + executorService.execute(new Runnable() { + @Override + public void run() { + try { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); + connectionFactory.setWatchTopicAdvisories(false); + QueueConnection queueConnection = connectionFactory.createQueueConnection(); + queueConnection.start(); + QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = queueSession.createProducer(null); + int val = 0; + while ((val = toSend.decrementAndGet()) >= 0) { + + ActiveMQQueue compositeQ = new ActiveMQQueue("IN"); + LOG.info("Send to: " + ((ActiveMQConnection) queueConnection).getBrokerName() + ", " + val + ", dest:" + compositeQ); + Message textMessage = queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + val + " payload:" + payload); + textMessage.setIntProperty("NUM", val); + producer.send(compositeQ, textMessage); + } + queueConnection.close(); + + } catch (Throwable throwable) { + throwable.printStackTrace(); + exceptions.add(throwable); + } + } + }); + } + } + + private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) throws Exception { + final BrokerService broker = brokerItem.broker; + final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length); + return max == regionBroker.getPeerBrokerInfos().length; + } + }); + LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length); + List<String> missing = new ArrayList<String>(); + for (int i = 0; i < max; i++) { + missing.add("B" + i); + } + if (max != regionBroker.getPeerBrokerInfos().length) { + for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) { + LOG.info(info.getBrokerName()); + missing.remove(info.getBrokerName()); + } + LOG.info("Broker infos off.." + missing); + } + assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length); + } + + private void verifyPeerBrokerInfos(final int max) throws Exception { + Collection<BrokerItem> brokerList = brokers.values(); + for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) { + verifyPeerBrokerInfo(i.next(), max); + } + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + + class ConsumerState { + AtomicInteger accumulator; + String brokerName; + QueueReceiver receiver; + ActiveMQDestination destination; + Vector<Integer> expected = new Vector<Integer>(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java new file mode 100644 index 0000000..1126d31 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.HashSet; +import java.util.Set; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.TransactionBroker; +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.transaction.Synchronization; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4485Test extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(AMQ4485Test.class); + BrokerService broker; + ActiveMQConnectionFactory factory; + final int messageCount = 20; + int memoryLimit = 40 * 1024; + final ActiveMQQueue destination = new ActiveMQQueue("QUEUE." + this.getClass().getName()); + final Vector<Throwable> exceptions = new Vector<Throwable>(); + final CountDownLatch slowSendResume = new CountDownLatch(1); + + + protected void configureBroker(long memoryLimit) throws Exception { + broker.setDeleteAllMessagesOnStartup(true); + broker.setAdvisorySupport(false); + + PolicyEntry policy = new PolicyEntry(); + policy.setExpireMessagesPeriod(0); + policy.setMemoryLimit(memoryLimit); + policy.setProducerFlowControl(false); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + + broker.setPlugins(new BrokerPlugin[] {new BrokerPluginSupport() { + @Override + public void send(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception { + if (messageSend.isInTransaction() && messageSend.getProperty("NUM") != null) { + final Integer num = (Integer) messageSend.getProperty("NUM"); + if (true) { + TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class); + transactionBroker.getTransaction(producerExchange.getConnectionContext(), messageSend.getTransactionId(), false).addSynchronization( + new Synchronization() { + public void afterCommit() throws Exception { + LOG.error("AfterCommit, NUM:" + num + ", " + messageSend.getMessageId() + ", tx: " + messageSend.getTransactionId()); + if (num == 5) { + // we want to add to cursor after usage is exhausted by message 20 and when + // all other messages have been processed + LOG.error("Pausing on latch in afterCommit for: " + num + ", " + messageSend.getMessageId()); + slowSendResume.await(20, TimeUnit.SECONDS); + LOG.error("resuming on latch afterCommit for: " + num + ", " + messageSend.getMessageId()); + } else if (messageCount + 1 == num) { + LOG.error("releasing latch. " + num + ", " + messageSend.getMessageId()); + slowSendResume.countDown(); + // for message X, we need to delay so message 5 can setBatch + TimeUnit.SECONDS.sleep(5); + LOG.error("resuming afterCommit for: " + num + ", " + messageSend.getMessageId()); + } + } + }); + } + } + super.send(producerExchange, messageSend); + } + } + }); + + } + + + public void testOutOfOrderTransactionCompletionOnMemoryLimit() throws Exception { + + Set<Integer> expected = new HashSet<Integer>(); + final Vector<Session> sessionVector = new Vector<Session>(); + ExecutorService executorService = Executors.newCachedThreadPool(); + for (int i = 1; i <= messageCount; i++) { + sessionVector.add(send(i, 1, true)); + expected.add(i); + } + + // get parallel commit so that the sync writes are batched + for (int i = 0; i < messageCount; i++) { + final int id = i; + executorService.submit(new Runnable() { + @Override + public void run() { + try { + sessionVector.get(id).commit(); + } catch (Exception fail) { + exceptions.add(fail); + } + } + }); + } + + final DestinationViewMBean queueViewMBean = (DestinationViewMBean) + broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0], DestinationViewMBean.class, false); + + // not sure how many messages will get enqueued + TimeUnit.SECONDS.sleep(3); + if (false) + assertTrue("all " + messageCount + " on the q", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("enqueueCount: " + queueViewMBean.getEnqueueCount()); + return messageCount == queueViewMBean.getEnqueueCount(); + } + })); + + LOG.info("Big send to blow available destination usage before slow send resumes"); + send(messageCount + 1, 35*1024, true).commit(); + + + // consume and verify all received + Connection cosumerConnection = factory.createConnection(); + cosumerConnection.start(); + MessageConsumer consumer = cosumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(destination); + for (int i = 1; i <= messageCount + 1; i++) { + BytesMessage bytesMessage = (BytesMessage) consumer.receive(10000); + assertNotNull("Got message: " + i + ", " + expected, bytesMessage); + MessageId mqMessageId = ((ActiveMQBytesMessage) bytesMessage).getMessageId(); + LOG.info("got: " + expected + ", " + mqMessageId + ", NUM=" + ((ActiveMQBytesMessage) bytesMessage).getProperty("NUM")); + expected.remove(((ActiveMQBytesMessage) bytesMessage).getProperty("NUM")); + } + } + + private Session send(int id, int messageSize, boolean transacted) throws Exception { + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(new byte[messageSize]); + bytesMessage.setIntProperty("NUM", id); + producer.send(bytesMessage); + LOG.info("Sent:" + bytesMessage.getJMSMessageID() + " session tx: " + ((ActiveMQBytesMessage) bytesMessage).getTransactionId()); + return session; + } + + protected void setUp() throws Exception { + super.setUp(); + broker = new BrokerService(); + broker.setBrokerName("thisOne"); + configureBroker(memoryLimit); + broker.start(); + factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true"); + factory.setWatchTopicAdvisories(false); + + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + broker = null; + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java new file mode 100644 index 0000000..346650e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.Enumeration; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4487Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ4487Test.class); + + private final String destinationName = "TEST.QUEUE"; + private BrokerService broker; + private ActiveMQConnectionFactory factory; + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.deleteAllMessages(); + broker.setUseJmx(false); + broker.setAdvisorySupport(false); + + PolicyEntry policy = new PolicyEntry(); + policy.setQueue(">"); + policy.setMaxProducersToAudit(75); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + + broker.start(); + broker.waitUntilStarted(); + factory = new ActiveMQConnectionFactory("vm://localhost"); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + private void sendMessages(int messageToSend) throws Exception { + String data = ""; + for (int i = 0; i < 1024 * 2; i++) { + data += "x"; + } + + Connection connection = factory.createConnection(); + connection.start(); + + for (int i = 0; i < messageToSend; i++) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(destinationName); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage(data)); + session.close(); + } + + connection.close(); + } + + @Test + public void testBrowsingWithLessThanMaxAuditDepth() throws Exception { + doTestBrowsing(75); + } + + @Test + public void testBrowsingWithMoreThanMaxAuditDepth() throws Exception { + doTestBrowsing(300); + } + + @SuppressWarnings("rawtypes") + private void doTestBrowsing(int messagesToSend) throws Exception { + + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(destinationName); + + sendMessages(messagesToSend); + + QueueBrowser browser = session.createBrowser(queue); + Enumeration enumeration = browser.getEnumeration(); + int received = 0; + while (enumeration.hasMoreElements()) { + Message m = (Message) enumeration.nextElement(); + assertNotNull(m); + + if (LOG.isDebugEnabled()) { + LOG.debug("Browsed Message: {}", m.getJMSMessageID()); + } + + received++; + if (received > messagesToSend) { + break; + } + } + + browser.close(); + + assertEquals(messagesToSend, received); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java new file mode 100644 index 0000000..64204bd --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +import static org.junit.Assert.assertNotNull; + +public class AMQ4504Test { + + BrokerService brokerService; + + @Before + public void setup() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.start(); + } + + @After + public void stop() throws Exception { + brokerService.stop(); + } + + @Test + public void testCompositeDestConsumer() throws Exception { + + final int numDests = 20; + final int numMessages = 200; + StringBuffer stringBuffer = new StringBuffer(); + for (int i=0; i<numDests; i++) { + if (stringBuffer.length() != 0) { + stringBuffer.append(','); + } + stringBuffer.append("ST." + i); + } + stringBuffer.append("?consumer.prefetchSize=100"); + ActiveMQQueue activeMQQueue = new ActiveMQQueue(stringBuffer.toString()); + ConnectionFactory factory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI()); + Connection connection = factory.createConnection(); + connection.start(); + MessageProducer producer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(activeMQQueue); + for (int i=0; i<numMessages; i++) { + producer.send(new ActiveMQTextMessage()); + } + + MessageConsumer consumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(activeMQQueue); + try { + for (int i=0; i< numMessages * numDests; i++) { + assertNotNull("recieved:" + i, consumer.receive(4000)); + } + } finally { + connection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java new file mode 100644 index 0000000..cf19982 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertTrue; + +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; +import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ4513Test { + + private BrokerService brokerService; + private String connectionUri; + + @Before + public void setup() throws Exception { + brokerService = new BrokerService(); + + connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString(); + + // Configure Dead Letter Strategy + DeadLetterStrategy strategy = new IndividualDeadLetterStrategy(); + ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true); + ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ."); + strategy.setProcessNonPersistent(false); + strategy.setProcessExpired(false); + + // Add policy and individual DLQ strategy + PolicyEntry policy = new PolicyEntry(); + policy.setTimeBeforeDispatchStarts(3000); + policy.setDeadLetterStrategy(strategy); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + brokerService.setDestinationPolicy(pMap); + + brokerService.setPersistent(false); + brokerService.start(); + } + + @After + public void stop() throws Exception { + brokerService.stop(); + } + + @Test(timeout=360000) + public void test() throws Exception { + + final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri); + + ExecutorService service = Executors.newFixedThreadPool(25); + + final Random ripple = new Random(System.currentTimeMillis()); + + for (int i = 0; i < 1000; ++i) { + service.execute(new Runnable() { + @Override + public void run() { + try { + ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTemporaryQueue(); + session.createProducer(destination); + connection.close(); + TimeUnit.MILLISECONDS.sleep(ripple.nextInt(20)); + } catch (Exception e) { + } + } + }); + + service.execute(new Runnable() { + @Override + public void run() { + try { + ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTemporaryQueue(); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.setTimeToLive(400); + producer.send(session.createTextMessage()); + producer.send(session.createTextMessage()); + TimeUnit.MILLISECONDS.sleep(500); + connection.close(); + } catch (Exception e) { + } + } + }); + + service.execute(new Runnable() { + @Override + public void run() { + try { + ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTemporaryQueue(); + session.createProducer(destination); + connection.close(); + TimeUnit.MILLISECONDS.sleep(ripple.nextInt(20)); + } catch (Exception e) { + } + } + }); + } + + service.shutdown(); + assertTrue(service.awaitTermination(5, TimeUnit.MINUTES)); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java new file mode 100644 index 0000000..6f5556d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; +import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ4517Test { + + private BrokerService brokerService; + private String connectionUri; + + @Before + public void setup() throws Exception { + brokerService = new BrokerService(); + + connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString(); + + // Configure Dead Letter Strategy + DeadLetterStrategy strategy = new IndividualDeadLetterStrategy(); + ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true); + ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ."); + strategy.setProcessNonPersistent(false); + strategy.setProcessExpired(false); + + // Add policy and individual DLQ strategy + PolicyEntry policy = new PolicyEntry(); + policy.setTimeBeforeDispatchStarts(3000); + policy.setDeadLetterStrategy(strategy); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + brokerService.setDestinationPolicy(pMap); + brokerService.setPersistent(false); + brokerService.start(); + } + + @After + public void stop() throws Exception { + brokerService.stop(); + } + + @Test(timeout=360000) + public void test() throws Exception { + + final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri); + + final AtomicBoolean advised = new AtomicBoolean(false); + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dlqDestination = session.createTopic(AdvisorySupport.MESSAGE_DLQ_TOPIC_PREFIX + ">"); + MessageConsumer consumer = session.createConsumer(dlqDestination); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + advised.set(true); + } + }); + connection.start(); + + ExecutorService service = Executors.newSingleThreadExecutor(); + + service.execute(new Runnable() { + @Override + public void run() { + try { + ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTemporaryQueue(); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.setTimeToLive(400); + producer.send(session.createTextMessage()); + producer.send(session.createTextMessage()); + TimeUnit.MILLISECONDS.sleep(500); + connection.close(); + } catch (Exception e) { + } + } + }); + + service.shutdown(); + assertTrue(service.awaitTermination(1, TimeUnit.MINUTES)); + assertFalse("Should not get any Advisories for DLQ'd Messages", advised.get()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java new file mode 100644 index 0000000..e544642 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; +import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ4518Test { + + private BrokerService brokerService; + private String connectionUri; + + @Before + public void setup() throws Exception { + brokerService = new BrokerService(); + + connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString(); + + // Configure Dead Letter Strategy + DeadLetterStrategy strategy = new IndividualDeadLetterStrategy(); + ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true); + ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ."); + strategy.setProcessNonPersistent(false); + strategy.setProcessExpired(false); + + // Add policy and individual DLQ strategy + PolicyEntry policy = new PolicyEntry(); + policy.setTimeBeforeDispatchStarts(3000); + policy.setDeadLetterStrategy(strategy); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + brokerService.setDestinationPolicy(pMap); + brokerService.setPersistent(false); + brokerService.start(); + } + + @After + public void stop() throws Exception { + brokerService.stop(); + } + + @Test(timeout=360000) + public void test() throws Exception { + + final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri); + + final AtomicBoolean advised = new AtomicBoolean(false); + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dlqDestination = session.createTopic(AdvisorySupport.EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + ">"); + MessageConsumer consumer = session.createConsumer(dlqDestination); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + advised.set(true); + } + }); + connection.start(); + + ExecutorService service = Executors.newSingleThreadExecutor(); + + service.execute(new Runnable() { + @Override + public void run() { + try { + ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTemporaryQueue(); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.setTimeToLive(400); + producer.send(session.createTextMessage()); + producer.send(session.createTextMessage()); + TimeUnit.MILLISECONDS.sleep(500); + connection.close(); + } catch (Exception e) { + } + } + }); + + service.shutdown(); + assertTrue(service.awaitTermination(1, TimeUnit.MINUTES)); + assertFalse("Should not get any Advisories for Expired Messages", advised.get()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java new file mode 100644 index 0000000..e8ab9f4 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularDataSupport; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.CompositeDataConstants; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ4530Test { + + private static BrokerService brokerService; + private static String TEST_QUEUE = "testQueue"; + private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE); + private static String BROKER_ADDRESS = "tcp://localhost:0"; + private static String KEY = "testproperty"; + private static String VALUE = "propvalue"; + + private ActiveMQConnectionFactory connectionFactory; + private String connectionUri; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(true); + connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); + brokerService.start(); + brokerService.waitUntilStarted(); + + connectionFactory = new ActiveMQConnectionFactory(connectionUri); + sendMessage(); + } + + public void sendMessage() throws Exception { + final Connection conn = connectionFactory.createConnection(); + try { + conn.start(); + final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Destination queue = session.createQueue(TEST_QUEUE); + final Message toSend = session.createMessage(); + toSend.setStringProperty(KEY, VALUE); + final MessageProducer producer = session.createProducer(queue); + producer.send(queue, toSend); + } finally { + conn.close(); + } + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + @SuppressWarnings("unchecked") + @Test + public void testStringPropertiesFromCompositeData() throws Exception { + final QueueViewMBean queueView = getProxyToQueueViewMBean(); + final CompositeData message = queueView.browse()[0]; + assertNotNull(message); + TabularDataSupport stringProperties = (TabularDataSupport) message.get(CompositeDataConstants.STRING_PROPERTIES); + assertNotNull(stringProperties); + assertThat(stringProperties.size(), is(greaterThan(0))); + Map.Entry<Object, Object> compositeDataEntry = (Map.Entry<Object, Object>) stringProperties.entrySet().toArray()[0]; + CompositeData stringEntry = (CompositeData) compositeDataEntry.getValue(); + assertThat(String.valueOf(stringEntry.get("key")), equalTo(KEY)); + assertThat(String.valueOf(stringEntry.get("value")), equalTo(VALUE)); + } + + private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, NullPointerException, + JMSException { + final ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName()); + final QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance( + queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java new file mode 100644 index 0000000..0be3226 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.io.Writer; +import java.lang.management.ManagementFactory; +import java.util.concurrent.CountDownLatch; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit test for simple App. + */ +public class AMQ4531Test extends TestCase { + + private final Logger LOG = LoggerFactory.getLogger(AMQ4531Test.class); + + private String connectionURI; + private MBeanServer mbeanServer; + private BrokerService broker; + + @Override + protected void setUp() throws Exception { + super.setUp(); + broker = new BrokerService(); + connectionURI = broker.addConnector("tcp://0.0.0.0:0?maximumConnections=1").getPublishableConnectString(); + broker.setPersistent(false); + broker.start(); + mbeanServer = ManagementFactory.getPlatformMBeanServer(); + } + + @Override + protected void tearDown() throws Exception { + broker.stop(); + super.tearDown(); + } + + /** + * Create the test case + * + * @param testName + * name of the test case + */ + public AMQ4531Test(String testName) { + super(testName); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() { + return new TestSuite(AMQ4531Test.class); + } + + public void testFDSLeak() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI); + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + connection.start(); + + int connections = 100; + final long original = openFileDescriptorCount(); + LOG.info("FD count: " + original); + final CountDownLatch done = new CountDownLatch(connections); + for (int i = 0; i < connections; i++) { + new Thread("worker: " + i) { + @Override + public void run() { + ActiveMQConnection connection = null; + try { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI); + connection = (ActiveMQConnection) factory.createConnection(); + connection.start(); + } catch (Exception e) { + LOG.debug(getStack(e)); + } finally { + try { + connection.close(); + } catch (Exception e) { + LOG.debug(getStack(e)); + } + done.countDown(); + LOG.debug("Latch count down called."); + } + } + }.start(); + } + + // Wait for all the clients to finish + LOG.info("Waiting for latch..."); + done.await(); + LOG.info("Latch complete."); + LOG.info("FD count: " + openFileDescriptorCount()); + + assertTrue("Too many open file descriptors: " + openFileDescriptorCount(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + long openFDs = openFileDescriptorCount(); + LOG.info("Current FD count [{}], original FD count[{}]", openFDs, original); + return (openFDs - original) < 10; + } + })); + } + + private long openFileDescriptorCount() throws Exception { + return ((Long) mbeanServer.getAttribute(new ObjectName("java.lang:type=OperatingSystem"), "OpenFileDescriptorCount")).longValue(); + } + + private String getStack(Throwable aThrowable) { + final Writer result = new StringWriter(); + final PrintWriter printWriter = new PrintWriter(result); + aThrowable.printStackTrace(printWriter); + return result.toString(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java new file mode 100644 index 0000000..47ce642 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit test for simple App. + */ +public class AMQ4554Test extends TestCase { + + private final Logger LOG = LoggerFactory.getLogger(AMQ4554Test.class); + + private String connectionURI; + private BrokerService broker; + + @Override + protected void setUp() throws Exception { + super.setUp(); + broker = new BrokerService(); + connectionURI = broker.addConnector("tcp://0.0.0.0:0?maximumConnections=1").getPublishableConnectString(); + broker.setPersistent(false); + broker.start(); + } + + @Override + protected void tearDown() throws Exception { + broker.stop(); + super.tearDown(); + } + + /** + * Create the test case + * + * @param testName + * name of the test case + */ + public AMQ4554Test(String testName) { + super(testName); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() { + return new TestSuite(AMQ4554Test.class); + } + + public void testMSXProducerTXID() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI); + Connection connection = factory.createConnection(); + connection.start(); + + Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = producerSession.createProducer(producerSession.createQueue("myQueue")); + TextMessage producerMessage = producerSession.createTextMessage("Test Message"); + producer.send(producerMessage); + producer.close(); + producerSession.commit(); + producerSession.close(); + + Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue("myQueue")); + Message consumerMessage = consumer.receive(1000); + try { + String txId = consumerMessage.getStringProperty("JMSXProducerTXID"); + assertNotNull(txId); + } catch(Exception e) { + LOG.info("Caught Exception that was not expected:", e); + fail("Should not throw"); + } + consumer.close(); + consumerSession.commit(); + consumerSession.close(); + connection.close(); + } + +}
