http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java new file mode 100644 index 0000000..f7409dd --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java @@ -0,0 +1,620 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; + +/* + * pause producers if consumers stall and verify broker drained before resume + */ +@RunWith(Parameterized.class) +public class AMQ5266StarvedConsumerTest { + static Logger LOG = LoggerFactory.getLogger(AMQ5266StarvedConsumerTest.class); + String activemqURL; + BrokerService brokerService; + + public int messageSize = 1000; + + @Parameterized.Parameter(0) + public int publisherMessagesPerThread = 1000; + + @Parameterized.Parameter(1) + public int publisherThreadCount = 20; + + @Parameterized.Parameter(2) + public int consumerThreadsPerQueue = 5; + + @Parameterized.Parameter(3) + public int destMemoryLimit = 50 * 1024; + + @Parameterized.Parameter(4) + public boolean useCache = true; + + @Parameterized.Parameter(5) + public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB; + + @Parameterized.Parameter(6) + public boolean optimizeDispatch = false; + private AtomicBoolean didNotReceive = new AtomicBoolean(false); + + @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}") + public static Iterable<Object[]> parameters() { + return Arrays.asList(new Object[][]{ + {1000, 40, 5, 1024*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, + {1000, 40, 5, 1024*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, + {1000, 40, 5, 1024*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true}, + + {500, 20, 20, 1024*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, + {500, 20, 20, 1024*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, + {500, 20, 20, 1024*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true}, + }); + } + + public int consumerBatchSize = 5; + + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setUseJmx(false); + brokerService.setAdvisorySupport(false); + + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract! + defaultEntry.setMaxAuditDepth(publisherThreadCount); + defaultEntry.setEnableAudit(true); + defaultEntry.setUseCache(useCache); + defaultEntry.setMaxPageSize(1000); + defaultEntry.setOptimizedDispatch(optimizeDispatch); + defaultEntry.setMemoryLimit(destMemoryLimit); + defaultEntry.setExpireMessagesPeriod(0); + policyMap.setDefaultEntry(defaultEntry); + brokerService.setDestinationPolicy(policyMap); + + brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024); + + TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0"); + brokerService.start(); + activemqURL = transportConnector.getPublishableConnectString(); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + CyclicBarrier globalProducerHalt = new CyclicBarrier(publisherThreadCount, new Runnable() { + @Override + public void run() { + // wait for queue size to go to zero + try { + while (((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount() > 0) { + LOG.info("Total messageCount: " + ((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount()); + TimeUnit.SECONDS.sleep(5); + } + } catch (Exception ignored) { + ignored.printStackTrace(); + } + } + }); + + @Test(timeout = 30 * 60 * 1000) + public void test() throws Exception { + + String activemqQueues = "activemq,activemq2,activemq3,activemq4";//,activemq5,activemq6,activemq7,activemq8,activemq9"; + + int consumerWaitForConsumption = 5 * 60 * 1000; + + ExportQueuePublisher publisher = null; + ExportQueueConsumer consumer = null; + + LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified."); + LOG.info("\nBuilding Publisher..."); + + publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount); + + LOG.info("Building Consumer..."); + + consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount); + + + LOG.info("Starting Publisher..."); + + publisher.start(); + + LOG.info("Starting Consumer..."); + + consumer.start(); + + int distinctPublishedCount = 0; + + + LOG.info("Waiting For Publisher Completion..."); + + publisher.waitForCompletion(); + + List publishedIds = publisher.getIDs(); + distinctPublishedCount = new TreeSet(publishedIds).size(); + + LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount); + + + long endWait = System.currentTimeMillis() + consumerWaitForConsumption; + while (!consumer.completed() && System.currentTimeMillis() < endWait) { + try { + int secs = (int) (endWait - System.currentTimeMillis()) / 1000; + LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs"); + Thread.sleep(10000); + } catch (Exception e) { + } + } + + LOG.info("\nConsumer Complete: " + consumer.completed() +", Shutting Down."); + + consumer.shutdown(); + + + LOG.info("Consumer Stats:"); + + for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) { + + List<String> idList = entry.getValue(); + + int distinctConsumed = new TreeSet<String>(idList).size(); + + StringBuilder sb = new StringBuilder(); + sb.append(" Queue: " + entry.getKey() + + " -> Total Messages Consumed: " + idList.size() + + ", Distinct IDs Consumed: " + distinctConsumed); + + int diff = distinctPublishedCount - distinctConsumed; + sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) "); + LOG.info(sb.toString()); + + assertEquals("expect to get all messages!", 0, diff); + + } + } + + public class ExportQueuePublisher { + + private final String amqUser = ActiveMQConnection.DEFAULT_USER; + private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; + private ActiveMQConnectionFactory connectionFactory = null; + private String activemqURL = null; + private String activemqQueues = null; + // Collection of distinct IDs that the publisher has published. + // After a message is published, its UUID will be written to this list for tracking. + // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs. + //private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>()); + private List<String> ids = Collections.synchronizedList(new ArrayList<String>()); + private List<PublisherThread> threads; + + public ExportQueuePublisher(String activemqURL, String activemqQueues, int messagesPerThread, int threadCount) throws Exception { + + this.activemqURL = activemqURL; + this.activemqQueues = activemqQueues; + + threads = new ArrayList<PublisherThread>(); + + // Build the threads and tell them how many messages to publish + for (int i = 0; i < threadCount; i++) { + PublisherThread pt = new PublisherThread(messagesPerThread); + threads.add(pt); + } + } + + public List<String> getIDs() { + return ids; + } + + // Kick off threads + public void start() throws Exception { + + for (PublisherThread pt : threads) { + pt.start(); + } + } + + // Wait for threads to complete. They will complete once they've published all of their messages. + public void waitForCompletion() throws Exception { + + for (PublisherThread pt : threads) { + pt.join(); + pt.close(); + } + } + + private Session newSession(QueueConnection queueConnection) throws Exception { + return queueConnection.createSession(true, Session.SESSION_TRANSACTED); + } + + private synchronized QueueConnection newQueueConnection() throws Exception { + + if (connectionFactory == null) { + connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); + connectionFactory.setWatchTopicAdvisories(false); + } + + // Set the redelivery count to -1 (infinite), or else messages will start dropping + // after the queue has had a certain number of failures (default is 6) + RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); + policy.setMaximumRedeliveries(-1); + + QueueConnection amqConnection = connectionFactory.createQueueConnection(); + amqConnection.start(); + return amqConnection; + } + + private class PublisherThread extends Thread { + + private int count; + private QueueConnection qc; + private Session session; + private MessageProducer mp; + private Queue q; + + private PublisherThread(int count) throws Exception { + + this.count = count; + + // Each Thread has its own Connection and Session, so no sync worries + qc = newQueueConnection(); + session = newSession(qc); + + // In our code, when publishing to multiple queues, + // we're using composite destinations like below + q = new ActiveMQQueue(activemqQueues); + mp = session.createProducer(null); + } + + public void run() { + + try { + + // Loop until we've published enough messages + while (count-- > 0) { + + TextMessage tm = session.createTextMessage(getMessageText()); + String id = UUID.randomUUID().toString(); + tm.setStringProperty("KEY", id); + ids.add(id); // keep track of the key to compare against consumer + + mp.send(q, tm); + session.commit(); + + if (didNotReceive.get()) { + globalProducerHalt.await(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + // Called by waitForCompletion + public void close() { + + try { + mp.close(); + } catch (Exception e) { + } + + try { + session.close(); + } catch (Exception e) { + } + + try { + qc.close(); + } catch (Exception e) { + } + } + } + + } + + String messageText; + private String getMessageText() { + + if (messageText == null) { + + synchronized (this) { + + if (messageText == null) { + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < messageSize; i++) { + sb.append("X"); + } + messageText = sb.toString(); + } + } + } + + return messageText; + } + + + public class ExportQueueConsumer { + + private final String amqUser = ActiveMQConnection.DEFAULT_USER; + private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; + private final int totalToExpect; + private ActiveMQConnectionFactory connectionFactory = null; + private String activemqURL = null; + private String activemqQueues = null; + private String[] queues = null; + // Map of IDs that were consumed, keyed by queue name. + // We'll compare these against what was published to know if any got stuck or dropped. + private Map<String, List<String>> idsByQueue = new HashMap<String, List<String>>(); + private Map<String, List<ConsumerThread>> threads; + + public ExportQueueConsumer(String activemqURL, String activemqQueues, int threadsPerQueue, int batchSize, int totalToExpect) throws Exception { + + this.activemqURL = activemqURL; + this.activemqQueues = activemqQueues; + this.totalToExpect = totalToExpect; + + queues = this.activemqQueues.split(","); + + for (int i = 0; i < queues.length; i++) { + queues[i] = queues[i].trim(); + } + + threads = new HashMap<String, List<ConsumerThread>>(); + + // For each queue, create a list of threads and set up the list of ids + for (String q : queues) { + + List<ConsumerThread> list = new ArrayList<ConsumerThread>(); + + idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>())); + + for (int i = 0; i < threadsPerQueue; i++) { + list.add(new ConsumerThread(q, batchSize)); + } + + threads.put(q, list); + } + } + + public Map<String, List<String>> getIDs() { + return idsByQueue; + } + + // Start the threads + public void start() throws Exception { + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.start(); + } + } + } + + // Tell the threads to stop + // Then wait for them to stop + public void shutdown() throws Exception { + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.shutdown(); + } + } + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.join(); + } + } + } + + private Session newSession(QueueConnection queueConnection) throws Exception { + return queueConnection.createSession(true, Session.SESSION_TRANSACTED); + } + + private synchronized QueueConnection newQueueConnection() throws Exception { + + if (connectionFactory == null) { + connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); + connectionFactory.setWatchTopicAdvisories(false); + } + + // Set the redelivery count to -1 (infinite), or else messages will start dropping + // after the queue has had a certain number of failures (default is 6) + RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); + policy.setMaximumRedeliveries(-1); + + QueueConnection amqConnection = connectionFactory.createQueueConnection(); + amqConnection.start(); + return amqConnection; + } + + public boolean completed() { + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + if (ct.isAlive()) { + LOG.info("thread for {} is still alive.", ct.qName); + return false; + } + } + } + return true; + } + + private class ConsumerThread extends Thread { + + private int batchSize; + private QueueConnection qc; + private Session session; + private MessageConsumer mc; + private List<String> idList; + private boolean shutdown = false; + private String qName; + + private ConsumerThread(String queueName, int batchSize) throws Exception { + + this.batchSize = batchSize; + + // Each thread has its own connection and session + qName = queueName; + qc = newQueueConnection(); + session = newSession(qc); + Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize); + mc = session.createConsumer(q); + + idList = idsByQueue.get(queueName); + } + + public void run() { + + try { + + int count = 0; + + // Keep reading as long as it hasn't been told to shutdown + while (!shutdown) { + + if (idList.size() >= totalToExpect) { + LOG.info("Got {} for q: {}", +idList.size(), qName); + session.commit(); + break; + } + Message m = mc.receive(4000); + + if (m != null) { + + // We received a non-null message, add the ID to our list + + idList.add(m.getStringProperty("KEY")); + + count++; + + // If we've reached our batch size, commit the batch and reset the count + + if (count == batchSize) { + session.commit(); + count = 0; + } + } else { + + // We didn't receive anything this time, commit any current batch and reset the count + + session.commit(); + count = 0; + + // Sleep a little before trying to read after not getting a message + + try { + if (idList.size() < totalToExpect) { + LOG.info("did not receive on {}, current count: {}", qName, idList.size()); + didNotReceive.set(true); + } + //sleep(3000); + } catch (Exception e) { + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + + // Once we exit, close everything + close(); + } + } + + public void shutdown() { + shutdown = true; + } + + public void close() { + + try { + mc.close(); + } catch (Exception e) { + } + + try { + session.close(); + } catch (Exception e) { + } + + try { + qc.close(); + } catch (Exception e) { + + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java new file mode 100644 index 0000000..e180746 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java @@ -0,0 +1,601 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; + +/** + * Stuck messages test client. + * <p/> + * Will kick of publisher and consumer simultaneously, and will usually result in stuck messages on the queue. + */ +@RunWith(Parameterized.class) +public class AMQ5266Test { + static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class); + String activemqURL = "tcp://localhost:61617"; + BrokerService brokerService; + + public int messageSize = 1000; + + @Parameterized.Parameter(0) + public int publisherMessagesPerThread = 1000; + + @Parameterized.Parameter(1) + public int publisherThreadCount = 20; + + @Parameterized.Parameter(2) + public int consumerThreadsPerQueue = 5; + + @Parameterized.Parameter(3) + public int destMemoryLimit = 50 * 1024; + + @Parameterized.Parameter(4) + public boolean useCache = true; + + @Parameterized.Parameter(5) + public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB; + + @Parameterized.Parameter(6) + public boolean optimizeDispatch = false; + + @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}") + public static Iterable<Object[]> parameters() { + return Arrays.asList(new Object[][]{ + {1, 1, 1, 50*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, true}, + {1000, 20, 5, 50*1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, + {100, 20, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.JDBC, false}, + {1000, 5, 20, 50*1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, + {1000, 20, 20, 1024*1024, true, TestSupport.PersistenceAdapterChoice.JDBC, false}, + + {1, 1, 1, 50*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, true}, + {100, 5, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, false}, + {1000, 20, 5, 50*1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, + {100, 20, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.KahaDB, false}, + {1000, 5, 20, 50*1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, + {1000, 20, 20, 1024*1024, true, TestSupport.PersistenceAdapterChoice.KahaDB, false}, + + {1, 1, 1, 50*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, true}, + {100, 5, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, false}, + {1000, 20, 5, 50*1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, + {100, 20, 5, 50*1024, false, TestSupport.PersistenceAdapterChoice.LevelDB, false}, + {1000, 5, 20, 50*1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, + {1000, 20, 20, 1024*1024, true, TestSupport.PersistenceAdapterChoice.LevelDB, false}, + + }); + } + + public int consumerBatchSize = 5; + + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setUseJmx(false); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setUseConsumerPriority(false); // java.lang.IllegalArgumentException: Comparison method violates its general contract! + defaultEntry.setMaxAuditDepth(publisherThreadCount); + defaultEntry.setEnableAudit(true); + defaultEntry.setUseCache(useCache); + defaultEntry.setMaxPageSize(1000); + defaultEntry.setOptimizedDispatch(optimizeDispatch); + defaultEntry.setMemoryLimit(destMemoryLimit); + defaultEntry.setExpireMessagesPeriod(0); + policyMap.setDefaultEntry(defaultEntry); + brokerService.setDestinationPolicy(policyMap); + + brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024); + + TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0"); + brokerService.start(); + activemqURL = transportConnector.getPublishableConnectString(); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void test() throws Exception { + + String activemqQueues = "activemq,activemq2";//,activemq3,activemq4,activemq5,activemq6,activemq7,activemq8,activemq9"; + + int consumerWaitForConsumption = 5 * 60 * 1000; + + ExportQueuePublisher publisher = null; + ExportQueueConsumer consumer = null; + + LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified."); + LOG.info("\nBuilding Publisher..."); + + publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount); + + LOG.info("Building Consumer..."); + + consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount); + + + LOG.info("Starting Publisher..."); + + publisher.start(); + + LOG.info("Starting Consumer..."); + + consumer.start(); + + int distinctPublishedCount = 0; + + + LOG.info("Waiting For Publisher Completion..."); + + publisher.waitForCompletion(); + + List publishedIds = publisher.getIDs(); + distinctPublishedCount = new TreeSet(publishedIds).size(); + + LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount); + + + long endWait = System.currentTimeMillis() + consumerWaitForConsumption; + while (!consumer.completed() && System.currentTimeMillis() < endWait) { + try { + int secs = (int) (endWait - System.currentTimeMillis()) / 1000; + LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs"); + Thread.sleep(10000); + } catch (Exception e) { + } + } + + LOG.info("\nConsumer Complete: " + consumer.completed() +", Shutting Down."); + + consumer.shutdown(); + + LOG.info("Consumer Stats:"); + + for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) { + + List<String> idList = entry.getValue(); + + int distinctConsumed = new TreeSet<String>(idList).size(); + + StringBuilder sb = new StringBuilder(); + sb.append(" Queue: " + entry.getKey() + + " -> Total Messages Consumed: " + idList.size() + + ", Distinct IDs Consumed: " + distinctConsumed); + + int diff = distinctPublishedCount - distinctConsumed; + sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) "); + LOG.info(sb.toString()); + + assertEquals("expect to get all messages!", 0, diff); + + } + } + + public class ExportQueuePublisher { + + private final String amqUser = ActiveMQConnection.DEFAULT_USER; + private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; + private ActiveMQConnectionFactory connectionFactory = null; + private String activemqURL = null; + private String activemqQueues = null; + // Collection of distinct IDs that the publisher has published. + // After a message is published, its UUID will be written to this list for tracking. + // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs. + //private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>()); + private List<String> ids = Collections.synchronizedList(new ArrayList<String>()); + private List<PublisherThread> threads; + + public ExportQueuePublisher(String activemqURL, String activemqQueues, int messagesPerThread, int threadCount) throws Exception { + + this.activemqURL = activemqURL; + this.activemqQueues = activemqQueues; + + threads = new ArrayList<PublisherThread>(); + + // Build the threads and tell them how many messages to publish + for (int i = 0; i < threadCount; i++) { + PublisherThread pt = new PublisherThread(messagesPerThread); + threads.add(pt); + } + } + + public List<String> getIDs() { + return ids; + } + + // Kick off threads + public void start() throws Exception { + + for (PublisherThread pt : threads) { + pt.start(); + } + } + + // Wait for threads to complete. They will complete once they've published all of their messages. + public void waitForCompletion() throws Exception { + + for (PublisherThread pt : threads) { + pt.join(); + pt.close(); + } + } + + private Session newSession(QueueConnection queueConnection) throws Exception { + return queueConnection.createSession(true, Session.SESSION_TRANSACTED); + } + + private synchronized QueueConnection newQueueConnection() throws Exception { + + if (connectionFactory == null) { + connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); + } + + // Set the redelivery count to -1 (infinite), or else messages will start dropping + // after the queue has had a certain number of failures (default is 6) + RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); + policy.setMaximumRedeliveries(-1); + + QueueConnection amqConnection = connectionFactory.createQueueConnection(); + amqConnection.start(); + return amqConnection; + } + + private class PublisherThread extends Thread { + + private int count; + private QueueConnection qc; + private Session session; + private MessageProducer mp; + + private PublisherThread(int count) throws Exception { + + this.count = count; + + // Each Thread has its own Connection and Session, so no sync worries + qc = newQueueConnection(); + session = newSession(qc); + + // In our code, when publishing to multiple queues, + // we're using composite destinations like below + Queue q = new ActiveMQQueue(activemqQueues); + mp = session.createProducer(q); + } + + public void run() { + + try { + + // Loop until we've published enough messages + while (count-- > 0) { + + TextMessage tm = session.createTextMessage(getMessageText()); + String id = UUID.randomUUID().toString(); + tm.setStringProperty("KEY", id); + ids.add(id); // keep track of the key to compare against consumer + + mp.send(tm); + session.commit(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + // Called by waitForCompletion + public void close() { + + try { + mp.close(); + } catch (Exception e) { + } + + try { + session.close(); + } catch (Exception e) { + } + + try { + qc.close(); + } catch (Exception e) { + } + } + } + + } + + String messageText; + private String getMessageText() { + + if (messageText == null) { + + synchronized (this) { + + if (messageText == null) { + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < messageSize; i++) { + sb.append("X"); + } + messageText = sb.toString(); + } + } + } + + return messageText; + } + + + public class ExportQueueConsumer { + + private final String amqUser = ActiveMQConnection.DEFAULT_USER; + private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; + private final int totalToExpect; + private ActiveMQConnectionFactory connectionFactory = null; + private String activemqURL = null; + private String activemqQueues = null; + private String[] queues = null; + // Map of IDs that were consumed, keyed by queue name. + // We'll compare these against what was published to know if any got stuck or dropped. + private Map<String, List<String>> idsByQueue = new HashMap<String, List<String>>(); + private Map<String, List<ConsumerThread>> threads; + + public ExportQueueConsumer(String activemqURL, String activemqQueues, int threadsPerQueue, int batchSize, int totalToExpect) throws Exception { + + this.activemqURL = activemqURL; + this.activemqQueues = activemqQueues; + this.totalToExpect = totalToExpect; + + queues = this.activemqQueues.split(","); + + for (int i = 0; i < queues.length; i++) { + queues[i] = queues[i].trim(); + } + + threads = new HashMap<String, List<ConsumerThread>>(); + + // For each queue, create a list of threads and set up the list of ids + for (String q : queues) { + + List<ConsumerThread> list = new ArrayList<ConsumerThread>(); + + idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>())); + + for (int i = 0; i < threadsPerQueue; i++) { + list.add(new ConsumerThread(q, batchSize)); + } + + threads.put(q, list); + } + } + + public Map<String, List<String>> getIDs() { + return idsByQueue; + } + + // Start the threads + public void start() throws Exception { + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.start(); + } + } + } + + // Tell the threads to stop + // Then wait for them to stop + public void shutdown() throws Exception { + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.shutdown(); + } + } + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.join(); + } + } + } + + private Session newSession(QueueConnection queueConnection) throws Exception { + return queueConnection.createSession(true, Session.SESSION_TRANSACTED); + } + + private synchronized QueueConnection newQueueConnection() throws Exception { + + if (connectionFactory == null) { + connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); + } + + // Set the redelivery count to -1 (infinite), or else messages will start dropping + // after the queue has had a certain number of failures (default is 6) + RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); + policy.setMaximumRedeliveries(-1); + + QueueConnection amqConnection = connectionFactory.createQueueConnection(); + amqConnection.start(); + return amqConnection; + } + + public boolean completed() { + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + if (ct.isAlive()) { + LOG.info("thread for {} is still alive.", ct.qName); + return false; + } + } + } + return true; + } + + private class ConsumerThread extends Thread { + + private int batchSize; + private QueueConnection qc; + private Session session; + private MessageConsumer mc; + private List<String> idList; + private boolean shutdown = false; + private String qName; + + private ConsumerThread(String queueName, int batchSize) throws Exception { + + this.batchSize = batchSize; + + // Each thread has its own connection and session + qName = queueName; + qc = newQueueConnection(); + session = newSession(qc); + Queue q = session.createQueue(queueName + "?consumer.prefetchSize=" + batchSize); + mc = session.createConsumer(q); + + idList = idsByQueue.get(queueName); + } + + public void run() { + + try { + + int count = 0; + + // Keep reading as long as it hasn't been told to shutdown + while (!shutdown) { + + if (idList.size() >= totalToExpect) { + LOG.info("Got {} for q: {}", +idList.size(), qName); + session.commit(); + break; + } + Message m = mc.receive(4000); + + if (m != null) { + + // We received a non-null message, add the ID to our list + + idList.add(m.getStringProperty("KEY")); + + count++; + + // If we've reached our batch size, commit the batch and reset the count + + if (count == batchSize) { + session.commit(); + count = 0; + } + } else { + + // We didn't receive anything this time, commit any current batch and reset the count + + session.commit(); + count = 0; + + // Sleep a little before trying to read after not getting a message + + try { + if (idList.size() < totalToExpect) { + LOG.info("did not receive on {}, current count: {}", qName, idList.size()); + } + //sleep(3000); + } catch (Exception e) { + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + + // Once we exit, close everything + close(); + } + } + + public void shutdown() { + shutdown = true; + } + + public void close() { + + try { + mc.close(); + } catch (Exception e) { + } + + try { + session.close(); + } catch (Exception e) { + } + + try { + qc.close(); + } catch (Exception e) { + + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java new file mode 100644 index 0000000..4ba6526 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerMBeanSupport; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class AMQ5274Test { + static Logger LOG = LoggerFactory.getLogger(AMQ5274Test.class); + String activemqURL; + BrokerService brokerService; + ActiveMQQueue dest = new ActiveMQQueue("TestQ"); + + + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultPolicy = new PolicyEntry(); + defaultPolicy.setExpireMessagesPeriod(1000); + policyMap.setDefaultEntry(defaultPolicy); + brokerService.setDestinationPolicy(policyMap); + activemqURL = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString(); + brokerService.start(); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void test() throws Exception { + LOG.info("Starting Test"); + assertTrue(brokerService.isStarted()); + + produce(); + consumeAndRollback(); + + // check reported queue size using JMX + long queueSize = getQueueSize(); + assertEquals("Queue " + dest.getPhysicalName() + " not empty, reporting " + queueSize + " messages.", 0, queueSize); + } + + private void consumeAndRollback() throws JMSException, InterruptedException { + ActiveMQConnection connection = createConnection(); + RedeliveryPolicy noRedelivery = new RedeliveryPolicy(); + noRedelivery.setMaximumRedeliveries(0); + connection.setRedeliveryPolicy(noRedelivery); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(dest); + Message m; + while ( (m = consumer.receive(4000)) != null) { + LOG.info("Got:" + m); + TimeUnit.SECONDS.sleep(1); + session.rollback(); + } + connection.close(); + } + + private void produce() throws Exception { + Connection connection = createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(dest); + producer.setTimeToLive(10000); + for (int i=0;i<20;i++) { + producer.send(session.createTextMessage("i="+i)); + } + connection.close(); + } + + private ActiveMQConnection createConnection() throws JMSException { + return (ActiveMQConnection) new ActiveMQConnectionFactory(activemqURL).createConnection(); + } + + + public long getQueueSize() throws Exception { + long queueSize = 0; + try { + QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(BrokerMBeanSupport.createDestinationName(brokerService.getBrokerObjectName(), dest), QueueViewMBean.class, false); + queueSize = queueViewMBean.getQueueSize(); + LOG.info("QueueSize for destination {} is {}", dest, queueSize); + } catch (Exception ex) { + LOG.error("Error retrieving QueueSize from JMX ", ex); + throw ex; + } + return queueSize; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java new file mode 100644 index 0000000..ff10b0d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Arrays; +import java.util.Random; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class AMQ5381Test { + + public static final byte[] ORIG_MSG_CONTENT = randomByteArray(); + public static final String AMQ5381_EXCEPTION_MESSAGE = "java.util.zip.DataFormatException: incorrect header check"; + + private BrokerService brokerService; + private String brokerURI; + + @Rule public TestName name = new TestName(); + + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(false); + brokerService.addConnector("tcp://localhost:0"); + brokerService.start(); + brokerService.waitUntilStarted(); + + brokerURI = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString(); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + } + + private ActiveMQConnection createConnection(boolean useCompression) throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI); + factory.setUseCompression(useCompression); + Connection connection = factory.createConnection(); + connection.start(); + return (ActiveMQConnection) connection; + } + + @Test + public void amq5381Test() throws Exception { + + // Consumer Configured for (useCompression=true) + final ActiveMQConnection consumerConnection = createConnection(true); + final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue consumerQueue = consumerSession.createQueue(name.getMethodName()); + final MessageConsumer consumer = consumerSession.createConsumer(consumerQueue); + + // Producer Configured for (useCompression=false) + final ActiveMQConnection producerConnection = createConnection(false); + final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue producerQueue = producerSession.createQueue(name.getMethodName()); + + try { + + final ActiveMQBytesMessage messageProduced = (ActiveMQBytesMessage) producerSession.createBytesMessage(); + messageProduced.writeBytes(ORIG_MSG_CONTENT); + Assert.assertFalse(messageProduced.isReadOnlyBody()); + + Assert.assertFalse( + "Produced Message's 'compressed' flag should remain false until the message is sent (where it will be compressed, if necessary)", + messageProduced.isCompressed()); + + final MessageProducer producer = producerSession.createProducer(null); + producer.send(producerQueue, messageProduced); + + Assert.assertEquals("Once sent, the produced Message's 'compressed' flag should match its Connection's 'useCompression' flag", + producerConnection.isUseCompression(), messageProduced.isCompressed()); + + final ActiveMQBytesMessage messageConsumed = (ActiveMQBytesMessage) consumer.receive(); + Assert.assertNotNull(messageConsumed); + Assert.assertTrue("Consumed Message should be read-only", messageConsumed.isReadOnlyBody()); + Assert.assertEquals("Consumed Message's 'compressed' flag should match the produced Message's 'compressed' flag", + messageProduced.isCompressed(), messageConsumed.isCompressed()); + + // ensure consumed message content matches what was originally set + final byte[] consumedMsgContent = new byte[(int) messageConsumed.getBodyLength()]; + messageConsumed.readBytes(consumedMsgContent); + + Assert.assertTrue("Consumed Message content should match the original Message content", Arrays.equals(ORIG_MSG_CONTENT, consumedMsgContent)); + + // make message writable so the consumer can modify and reuse it + makeWritable(messageConsumed); + + // modify message, attempt to trigger DataFormatException due + // to old incorrect compression logic + try { + messageConsumed.setStringProperty(this.getClass().getName(), "test"); + } catch (JMSException jmsE) { + if (AMQ5381_EXCEPTION_MESSAGE.equals(jmsE.getMessage())) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + jmsE.printStackTrace(pw); + + Assert.fail("AMQ5381 Error State Achieved: attempted to decompress BytesMessage contents that are not compressed\n" + sw.toString()); + } else { + throw jmsE; + } + } + + Assert.assertEquals( + "The consumed Message's 'compressed' flag should still match the produced Message's 'compressed' flag after it has been made writable", + messageProduced.isCompressed(), messageConsumed.isCompressed()); + + // simulate re-publishing message + simulatePublish(messageConsumed); + + // ensure consumed message content matches what was originally set + final byte[] modifiedMsgContent = new byte[(int) messageConsumed.getBodyLength()]; + messageConsumed.readBytes(modifiedMsgContent); + + Assert.assertTrue( + "After the message properties are modified and it is re-published, its message content should still match the original message content", + Arrays.equals(ORIG_MSG_CONTENT, modifiedMsgContent)); + } finally { + producerSession.close(); + producerConnection.close(); + consumerSession.close(); + consumerConnection.close(); + } + } + + protected static final int MAX_RANDOM_BYTE_ARRAY_SIZE_KB = 128; + + protected static byte[] randomByteArray() { + final Random random = new Random(); + final byte[] byteArray = new byte[random.nextInt(MAX_RANDOM_BYTE_ARRAY_SIZE_KB * 1024)]; + random.nextBytes(byteArray); + + return byteArray; + } + + protected static void makeWritable(final ActiveMQMessage message) { + message.setReadOnlyBody(false); + message.setReadOnlyProperties(false); + } + + protected static void simulatePublish(final ActiveMQBytesMessage message) throws JMSException { + message.reset(); + message.onSend(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java new file mode 100644 index 0000000..751d488 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.net.URI; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ5421Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ5421Test.class); + + private static final int DEST_COUNT = 1000; + private final Destination[] destination = new Destination[DEST_COUNT]; + private final MessageProducer[] producer = new MessageProducer[DEST_COUNT]; + private BrokerService brokerService; + private String connectionUri; + + protected ConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory(connectionUri); + conFactory.setWatchTopicAdvisories(false); + return conFactory; + } + + protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() { + AbortSlowAckConsumerStrategy strategy = new AbortSlowAckConsumerStrategy(); + strategy.setCheckPeriod(2000); + strategy.setMaxTimeSinceLastAck(5000); + strategy.setIgnoreIdleConsumers(false); + + return strategy; + } + + @Before + public void setUp() throws Exception { + brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false&useJmx=true")); + PolicyEntry policy = new PolicyEntry(); + + policy.setSlowConsumerStrategy(createSlowConsumerStrategy()); + policy.setQueuePrefetch(10); + policy.setTopicPrefetch(10); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + brokerService.setDestinationPolicy(pMap); + brokerService.addConnector("tcp://0.0.0.0:0"); + brokerService.start(); + + connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString(); + } + + @Test + public void testManyTempDestinations() throws Exception { + Connection connection = createConnectionFactory().createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + for (int i = 0; i < DEST_COUNT; i++) { + destination[i] = session.createTemporaryQueue(); + LOG.debug("Created temp queue: [}", i); + } + + for (int i = 0; i < DEST_COUNT; i++) { + producer[i] = session.createProducer(destination[i]); + LOG.debug("Created producer: {}", i); + TextMessage msg = session.createTextMessage(" testMessage " + i); + producer[i].send(msg); + LOG.debug("message sent: {}", i); + MessageConsumer consumer = session.createConsumer(destination[i]); + Message message = consumer.receive(1000); + Assert.assertTrue(message.equals(msg)); + } + + for (int i = 0; i < DEST_COUNT; i++) { + producer[i].close(); + } + + connection.close(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java new file mode 100644 index 0000000..6a2dc52 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.*; + +public class AMQ5450Test { + + static final Logger LOG = LoggerFactory.getLogger(AMQ5450Test.class); + private final static int maxFileLength = 1024*1024*32; + + private final static String POSTFIX_DESTINATION_NAME = ".dlq"; + + private final static String DESTINATION_NAME = "test" + POSTFIX_DESTINATION_NAME; + private final static String DESTINATION_NAME_2 = "2.test" + POSTFIX_DESTINATION_NAME; + private final static String DESTINATION_NAME_3 = "3.2.test" + POSTFIX_DESTINATION_NAME; + + private final static String[] DESTS = new String[] {DESTINATION_NAME, DESTINATION_NAME_2, DESTINATION_NAME_3, DESTINATION_NAME, DESTINATION_NAME}; + + + BrokerService broker; + private HashMap<Object, PersistenceAdapter> adapters = new HashMap(); + + @After + public void tearDown() throws Exception { + broker.stop(); + } + + protected BrokerService createAndStartBroker(PersistenceAdapter persistenceAdapter) throws Exception { + BrokerService broker = new BrokerService(); + broker.setUseJmx(false); + broker.setBrokerName("localhost"); + broker.setPersistenceAdapter(persistenceAdapter); + broker.setDeleteAllMessagesOnStartup(true); + broker.start(); + broker.waitUntilStarted(); + return broker; + } + + @Test + public void testPostFixMatch() throws Exception { + doTestPostFixMatch(false); + } + + @Test + public void testPostFixCompositeMatch() throws Exception { + doTestPostFixMatch(true); + } + + private void doTestPostFixMatch(boolean useComposite) throws Exception { + prepareBrokerWithMultiStore(useComposite); + + sendMessage(DESTINATION_NAME, "test 1"); + sendMessage(DESTINATION_NAME_2, "test 1"); + sendMessage(DESTINATION_NAME_3, "test 1"); + + assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME))); + assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2))); + assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_3))); + + for (String dest: DESTS) { + Destination destination2 = broker.getDestination(new ActiveMQQueue(dest)); + assertNotNull(destination2); + assertEquals(1, destination2.getMessageStore().getMessageCount()); + } + + HashMap numDests = new HashMap(); + for (PersistenceAdapter pa : adapters.values()) { + numDests.put(pa.getDestinations().size(), pa); + } + + // ensure wildcard does not match any + assertTrue("0 in wildcard matcher", adapters.get(null).getDestinations().isEmpty()); + + assertEquals("only two values", 2, numDests.size()); + assertTrue("0 in others", numDests.containsKey(0)); + + if (useComposite) { + assertTrue("3 in one", numDests.containsKey(3)); + } else { + assertTrue("1 in some", numDests.containsKey(1)); + } + + } + + protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException { + KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); + kaha.setJournalMaxFileLength(maxFileLength); + kaha.setCleanupInterval(5000); + if (delete) { + kaha.deleteAllMessages(); + } + return kaha; + } + + public void prepareBrokerWithMultiStore(boolean compositeMatch) throws Exception { + + MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter(); + multiKahaDBPersistenceAdapter.deleteAllMessages(); + ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>(); + + if (compositeMatch) { + StringBuffer compositeDestBuf = new StringBuffer(); + for (int i=1; i<=DESTS.length;i++) { + for (int j=0;j<i;j++) { + compositeDestBuf.append("*"); + if ((j+1 == i)) { + compositeDestBuf.append(POSTFIX_DESTINATION_NAME); + } else { + compositeDestBuf.append("."); + } + } + if (! (i+1 > DESTS.length)) { + compositeDestBuf.append(","); + } + } + adapters.add(createFilteredKahaDBByDestinationPrefix(compositeDestBuf.toString(), true)); + + } else { + // destination map does not do post fix wild card matches on paths, so we need to cover + // each path length + adapters.add(createFilteredKahaDBByDestinationPrefix("*" + POSTFIX_DESTINATION_NAME, true)); + adapters.add(createFilteredKahaDBByDestinationPrefix("*.*" + POSTFIX_DESTINATION_NAME, true)); + adapters.add(createFilteredKahaDBByDestinationPrefix("*.*.*" + POSTFIX_DESTINATION_NAME, true)); + adapters.add(createFilteredKahaDBByDestinationPrefix("*.*.*.*" + POSTFIX_DESTINATION_NAME, true)); + } + + // ensure wildcard matcher is there for other dests + adapters.add(createFilteredKahaDBByDestinationPrefix(null, true)); + + multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); + broker = createAndStartBroker(multiKahaDBPersistenceAdapter); + } + + private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix, boolean deleteAllMessages) + throws IOException { + FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter(); + template.setPersistenceAdapter(createStore(deleteAllMessages)); + if (destinationPrefix != null) { + template.setQueue(destinationPrefix); + } + adapters.put(destinationPrefix, template.getPersistenceAdapter()); + return template; + } + + private void sendMessage(String destinationName, String message) throws JMSException { + ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost"); + f.setAlwaysSyncSend(true); + Connection c = f.createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = s.createProducer(new ActiveMQQueue(destinationName)); + producer.send(s.createTextMessage(message)); + producer.close(); + s.close(); + c.stop(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java new file mode 100644 index 0000000..a8739ae --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import java.util.concurrent.TimeUnit; +import javax.jms.JMSException; +import javax.jms.TextMessage; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import junit.framework.Test; +import org.apache.activemq.broker.BrokerRestartTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter; +import org.apache.activemq.util.IOHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ5567Test extends BrokerRestartTestSupport { + protected static final Logger LOG = LoggerFactory.getLogger(AMQ5567Test.class); + ActiveMQQueue destination = new ActiveMQQueue("Q"); + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + super.configureBroker(broker); + broker.setPersistenceAdapter(persistenceAdapter); + } + + protected PolicyEntry getDefaultPolicy() { + PolicyEntry policy = new PolicyEntry(); + policy.setMemoryLimit(60*1024); + return policy; + } + + public void initCombosForTestPreparedTransactionNotDispatched() throws Exception { + PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[]{ + new KahaDBPersistenceAdapter(), + new LevelDBPersistenceAdapter(), + new JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat()) + }; + for (PersistenceAdapter adapter : persistenceAdapters) { + adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory())); + } + addCombinationValues("persistenceAdapter", persistenceAdapters); + } + + public void testPreparedTransactionNotDispatched() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("Q"); + + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + + connection.send(createPrepareTransaction(connectionInfo, txid)); + + + // send another non tx, will poke dispatch + message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + + + // Since prepared but not committed.. only one should get delivered + StubConnection connectionC = createConnection(); + ConnectionInfo connectionInfoC = createConnectionInfo(); + SessionInfo sessionInfoC = createSessionInfo(connectionInfoC); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfoC, destination); + connectionC.send(connectionInfoC); + connectionC.send(sessionInfoC); + connectionC.send(consumerInfo); + + Message m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10)); + LOG.info("received: " + m); + assertNotNull("Got message", m); + assertNull("Got non tx message", m.getTransactionId()); + + // cannot get the prepared message till commit + assertNull(receiveMessage(connectionC)); + assertNoMessagesLeft(connectionC); + + LOG.info("commit: " + txid); + connection.request(createCommitTransaction2Phase(connectionInfo, txid)); + + m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10)); + LOG.info("received: " + m); + assertNotNull("Got non null message", m); + + } + + public void initCombosForTestCursorStoreSync() throws Exception { + PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[]{ + new KahaDBPersistenceAdapter(), + new LevelDBPersistenceAdapter(), + new JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat()) + }; + for (PersistenceAdapter adapter : persistenceAdapters) { + adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory())); + } + addCombinationValues("persistenceAdapter", persistenceAdapters); + } + + public void testCursorStoreSync() throws Exception { + + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.request(message); + + connection.request(createPrepareTransaction(connectionInfo, txid)); + + QueueViewMBean proxy = getProxyToQueueViewMBean(); + assertTrue("cache is enabled", proxy.isCacheEnabled()); + + // send another non tx, will fill cursor + String payload = new String(new byte[10*1024]); + for (int i=0; i<6; i++) { + message = createMessage(producerInfo, destination); + message.setPersistent(true); + ((TextMessage)message).setText(payload); + connection.request(message); + } + + assertTrue("cache is disabled", !proxy.isCacheEnabled()); + + StubConnection connectionC = createConnection(); + ConnectionInfo connectionInfoC = createConnectionInfo(); + SessionInfo sessionInfoC = createSessionInfo(connectionInfoC); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfoC, destination); + connectionC.send(connectionInfoC); + connectionC.send(sessionInfoC); + connectionC.send(consumerInfo); + + Message m = null; + for (int i=0; i<3; i++) { + m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10)); + LOG.info("received: " + m); + assertNotNull("Got message", m); + assertNull("Got non tx message", m.getTransactionId()); + connectionC.request(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); + } + + LOG.info("commit: " + txid); + connection.request(createCommitTransaction2Phase(connectionInfo, txid)); + // consume the rest including the 2pc send in TX + + for (int i=0; i<4; i++) { + m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10)); + LOG.info("received[" + i + "] " + m); + assertNotNull("Got message", m); + if (i==3 ) { + assertNotNull("Got tx message", m.getTransactionId()); + } else { + assertNull("Got non tx message", m.getTransactionId()); + } + connectionC.request(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); + } + } + + private QueueViewMBean getProxyToQueueViewMBean() + throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + + ":destinationType=Queue,destinationName=" + destination.getQueueName() + + ",type=Broker,brokerName=localhost"); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, + QueueViewMBean.class, true); + return proxy; + } + + public static Test suite() { + return suite(AMQ5567Test.class); + } + +}
