http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java new file mode 100644 index 0000000..4c8527a --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import javax.jms.*; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test to determine if expired messages are being reaped if there is + * no active consumer connected to the broker. + */ +public class MessageExpirationReaperTest { + + private BrokerService broker; + private ConnectionFactory factory; + private ActiveMQConnection connection; + private final String destinationName = "TEST.Q"; + private final String brokerUrl = "tcp://localhost:0"; + private final String brokerName = "testBroker"; + private String connectionUri; + + @Before + public void init() throws Exception { + createBroker(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + + factory = createConnectionFactory(); + connection = (ActiveMQConnection) factory.createConnection(); + connection.setClientID("test-connection"); + connection.start(); + } + + @After + public void cleanUp() throws Exception { + connection.close(); + broker.stop(); + } + + protected void createBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setBrokerName(brokerName); + broker.addConnector(brokerUrl); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(500); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + + broker.start(); + } + + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(connectionUri); + } + + protected Session createSession() throws Exception { + return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + @Test + public void testExpiredMessageReaping() throws Exception { + + Session producerSession = createSession(); + ActiveMQDestination destination = (ActiveMQDestination) producerSession.createQueue(destinationName); + MessageProducer producer = producerSession.createProducer(destination); + producer.setTimeToLive(1000); + + final int count = 3; + // Send some messages with an expiration + for (int i = 0; i < count; i++) { + TextMessage message = producerSession.createTextMessage("" + i); + producer.send(message); + } + + // Let the messages expire + Thread.sleep(2000); + + DestinationViewMBean view = createView(destination); + + assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount()); + assertEquals("Incorrect queue size count", 0, view.getQueueSize()); + assertEquals("Incorrect expired size count", view.getEnqueueCount(), view.getExpiredCount()); + + // Send more messages with an expiration + for (int i = 0; i < count; i++) { + TextMessage message = producerSession.createTextMessage("" + i); + producer.send(message); + } + + // Let the messages expire + Thread.sleep(2000); + + // Simply browse the queue + Session browserSession = createSession(); + QueueBrowser browser = browserSession.createBrowser((Queue) destination); + assertFalse("no message in the browser", browser.getEnumeration().hasMoreElements()); + + // The messages expire and should be reaped because of the presence of + // the queue browser + assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount()); + } + + @Test + public void testExpiredMessagesOnTopic() throws Exception{ + Session session = createSession(); + + // use a zero prefetch so messages don't go inflight + ActiveMQTopic destination = new ActiveMQTopic(destinationName + "?consumer.prefetchSize=0"); + + MessageProducer producer = session.createProducer(destination); + + // should have a durable sub because it's a little tricky to get messages to expire in + // non-durable subs.. with durable subs, we can just expire in the topic using the expire + // period.. also.. durable sub has to be "inactive" for the expire checker to actually + // expire the messages + MessageConsumer consumer = session.createDurableSubscriber(destination, "test-durable"); + + producer.setTimeToLive(500); + + final int count = 3; + // Send some messages with an expiration + for (int i = 0; i < count; i++) { + TextMessage message = session.createTextMessage("" + i); + producer.send(message); + } + + DestinationViewMBean view = createView(destination); + // not expired yet... + assertEquals("Incorrect enqueue count", 3, view.getEnqueueCount() ); + + // close consumer so topic thinks consumer is inactive + consumer.close(); + + // Let the messages reach an expiry time + Thread.sleep(2000); + + assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount()); + assertEquals("Incorrect queue size count", 0, view.getQueueSize()); + assertEquals("Incorrect expired size count", view.getEnqueueCount(), view.getExpiredCount()); + } + + protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { + String domain = "org.apache.activemq"; + ObjectName name; + if (destination.isQueue()) { + name = new ObjectName(domain + ":type=Broker,brokerName=" + brokerName + ",destinationType=Queue,destinationName=" + destinationName); + } else { + name = new ObjectName(domain + ":type=Broker,brokerName=" + brokerName + ",destinationType=Topic,destinationName=" + destinationName); + } + return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, + true); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java new file mode 100644 index 0000000..f85bdba --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +public class MessageSender { + private MessageProducer producer; + private Session session; + + public MessageSender(String queueName, Connection connection, boolean useTransactedSession, boolean topic) throws Exception { + session = useTransactedSession ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(topic ? session.createTopic(queueName) : session.createQueue(queueName)); + } + + public void send(String payload) throws Exception { + ObjectMessage message = session.createObjectMessage(); + message.setObject(payload); + producer.send(message); + if (session.getTransacted()) { + session.commit(); + } + } + + public MessageProducer getProducer() { + return producer; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java new file mode 100644 index 0000000..68055bb --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.usage.SystemUsage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * Try and replicate: + * Caused by: java.io.IOException: Could not locate data file data--188 + * at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:302) + * at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:614) + * at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:523) + */ + +public class MissingDataFileTest extends TestCase { + + private static final Logger LOG = LoggerFactory.getLogger(MissingDataFileTest.class); + + private static int counter = 500; + + private static int hectorToHaloCtr; + private static int xenaToHaloCtr; + private static int troyToHaloCtr; + + private static int haloToHectorCtr; + private static int haloToXenaCtr; + private static int haloToTroyCtr; + + private final String hectorToHalo = "hectorToHalo"; + private final String xenaToHalo = "xenaToHalo"; + private final String troyToHalo = "troyToHalo"; + + private final String haloToHector = "haloToHector"; + private final String haloToXena = "haloToXena"; + private final String haloToTroy = "haloToTroy"; + + + private BrokerService broker; + + private Connection hectorConnection; + private Connection xenaConnection; + private Connection troyConnection; + private Connection haloConnection; + + private final Object lock = new Object(); + final boolean useTopic = false; + final boolean useSleep = true; + + protected static final String payload = new String(new byte[500]); + + public Connection createConnection() throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + return factory.createConnection(); + } + + public Session createSession(Connection connection, boolean transacted) throws JMSException { + return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + } + + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.addConnector("tcp://localhost:61616").setName("Default"); + + SystemUsage systemUsage; + systemUsage = new SystemUsage(); + systemUsage.getMemoryUsage().setLimit(10 * 1024 * 1024); // Just a few messags + broker.setSystemUsage(systemUsage); + + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter(); + kahaDBPersistenceAdapter.setJournalMaxFileLength(16*1024); + kahaDBPersistenceAdapter.setCleanupInterval(500); + broker.setPersistenceAdapter(kahaDBPersistenceAdapter); + + broker.start(); + LOG.info("Starting broker.."); + } + + @Override + public void tearDown() throws Exception { + hectorConnection.close(); + xenaConnection.close(); + troyConnection.close(); + haloConnection.close(); + broker.stop(); + } + + public void testForNoDataFoundError() throws Exception { + + startBroker(); + hectorConnection = createConnection(); + Thread hectorThread = buildProducer(hectorConnection, hectorToHalo, false, useTopic); + Receiver hHectorReceiver = new Receiver() { + @Override + public void receive(String s) throws Exception { + haloToHectorCtr++; + if (haloToHectorCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + possiblySleep(haloToHectorCtr); + } + }; + buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver, useTopic); + + troyConnection = createConnection(); + Thread troyThread = buildProducer(troyConnection, troyToHalo); + Receiver hTroyReceiver = new Receiver() { + @Override + public void receive(String s) throws Exception { + haloToTroyCtr++; + if (haloToTroyCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + possiblySleep(haloToTroyCtr); + } + }; + buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver, false); + + xenaConnection = createConnection(); + Thread xenaThread = buildProducer(xenaConnection, xenaToHalo); + Receiver hXenaReceiver = new Receiver() { + @Override + public void receive(String s) throws Exception { + haloToXenaCtr++; + if (haloToXenaCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + possiblySleep(haloToXenaCtr); + } + }; + buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver, false); + + haloConnection = createConnection(); + final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection, false); + final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection, false); + final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection, false); + Receiver hectorReceiver = new Receiver() { + @Override + public void receive(String s) throws Exception { + hectorToHaloCtr++; + troySender.send(payload); + if (hectorToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + possiblySleep(hectorToHaloCtr); + } + } + }; + Receiver xenaReceiver = new Receiver() { + @Override + public void receive(String s) throws Exception { + xenaToHaloCtr++; + hectorSender.send(payload); + if (xenaToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + possiblySleep(xenaToHaloCtr); + } + }; + Receiver troyReceiver = new Receiver() { + @Override + public void receive(String s) throws Exception { + troyToHaloCtr++; + xenaSender.send(payload); + if (troyToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver, false); + buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver, false); + buildReceiver(haloConnection, troyToHalo, true, troyReceiver, false); + + haloConnection.start(); + + troyConnection.start(); + troyThread.start(); + + xenaConnection.start(); + xenaThread.start(); + + hectorConnection.start(); + hectorThread.start(); + waitForMessagesToBeDelivered(); + // number of messages received should match messages sent + assertEquals(hectorToHaloCtr, counter); + LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages"); + assertEquals(xenaToHaloCtr, counter); + LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages"); + assertEquals(troyToHaloCtr, counter); + LOG.info("troyToHalo received " + troyToHaloCtr + " messages"); + assertEquals(haloToHectorCtr, counter); + LOG.info("haloToHector received " + haloToHectorCtr + " messages"); + assertEquals(haloToXenaCtr, counter); + LOG.info("haloToXena received " + haloToXenaCtr + " messages"); + assertEquals(haloToTroyCtr, counter); + LOG.info("haloToTroy received " + haloToTroyCtr + " messages"); + + } + + protected void possiblySleep(int count) throws InterruptedException { + if (useSleep) { + if (count % 100 == 0) { + Thread.sleep(5000); + } + } + + } + + protected void waitForMessagesToBeDelivered() { + // let's give the listeners enough time to read all messages + long maxWaitTime = counter * 1000; + long waitTime = maxWaitTime; + long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis(); + + synchronized (lock) { + boolean hasMessages = true; + while (hasMessages && waitTime >= 0) { + try { + lock.wait(200); + } catch (InterruptedException e) { + LOG.error(e.toString()); + } + // check if all messages have been received + hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter + || haloToTroyCtr < counter; + waitTime = maxWaitTime - (System.currentTimeMillis() - start); + } + } + } + + public MessageSender buildTransactionalProducer(String queueName, Connection connection, boolean isTopic) throws Exception { + + return new MessageSender(queueName, connection, true, isTopic); + } + + public Thread buildProducer(Connection connection, final String queueName) throws Exception { + return buildProducer(connection, queueName, false, false); + } + + public Thread buildProducer(Connection connection, final String queueName, boolean transacted, boolean isTopic) throws Exception { + final MessageSender producer = new MessageSender(queueName, connection, transacted, isTopic); + Thread thread = new Thread() { + @Override + public synchronized void run() { + for (int i = 0; i < counter; i++) { + try { + producer.send(payload ); + } catch (Exception e) { + throw new RuntimeException("on " + queueName + " send", e); + } + } + } + }; + return thread; + } + + public void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver, boolean isTopic) throws Exception { + final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer inputMessageConsumer = session.createConsumer(isTopic ? session.createTopic(queueName) : session.createQueue(queueName)); + MessageListener messageListener = new MessageListener() { + + @Override + public void onMessage(Message message) { + try { + ObjectMessage objectMessage = (ObjectMessage)message; + String s = (String)objectMessage.getObject(); + receiver.receive(s); + if (session.getTransacted()) { + session.commit(); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + inputMessageConsumer.setMessageListener(messageListener); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java new file mode 100644 index 0000000..195ccbd --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test for AMQ-3965. + * A consumer may be stalled in case it uses optimizeAcknowledge and receives + * a number of messages that expire before being dispatched to application code. + * See for more details. + * + */ +public class OptimizeAcknowledgeWithExpiredMsgsTest { + + private final static Logger LOG = LoggerFactory.getLogger(OptimizeAcknowledgeWithExpiredMsgsTest.class); + + private BrokerService broker = null; + + private String connectionUri; + + /** + * Creates a broker instance but does not start it. + * + * @param brokerUri - transport uri of broker + * @param brokerName - name for the broker + * @return a BrokerService instance with transport uri and broker name set + * @throws Exception + */ + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setDeleteAllMessagesOnStartup(true); + broker.setUseJmx(false); + connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); + return broker; + } + + @Before + public void setUp() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + } + + /** + * Tests for AMQ-3965 + * Creates connection into broker using optimzeAcknowledge and prefetch=100 + * Creates producer and consumer. Producer sends 45 msgs that will expire + * at consumer (but before being dispatched to app code). + * Producer then sends 60 msgs without expiry. + * + * Consumer receives msgs using a MessageListener and increments a counter. + * Main thread sleeps for 5 seconds and checks the counter value. + * If counter != 60 msgs (the number of msgs that should get dispatched + * to consumer) the test fails. + */ + @Test + public void testOptimizedAckWithExpiredMsgs() throws Exception + { + ActiveMQConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100"); + + // Create JMS resources + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("TEST.FOO"); + + // ***** Consumer code ***** + MessageConsumer consumer = session.createConsumer(destination); + + final MyMessageListener listener = new MyMessageListener(); + connection.setExceptionListener((ExceptionListener) listener); + + // ***** Producer Code ***** + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); + TextMessage message; + + // Produce msgs that will expire quickly + for (int i=0; i<45; i++) { + message = session.createTextMessage(text); + producer.send(message,1,1,100); + LOG.trace("Sent message: "+ message.getJMSMessageID() + + " with expiry 10 msec"); + } + // Produce msgs that don't expire + for (int i=0; i<60; i++) { + message = session.createTextMessage(text); + producer.send(message,1,1,60000); + // producer.send(message); + LOG.trace("Sent message: "+ message.getJMSMessageID() + + " with expiry 30 sec"); + } + consumer.setMessageListener(listener); + + sleep(1000); // let the batch of 45 expire. + + connection.start(); + + assertTrue("Should receive all expected messages, counter at " + listener.getCounter(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return listener.getCounter() == 60; + } + })); + + LOG.info("Received all expected messages with counter at: " + listener.getCounter()); + + // Cleanup + producer.close(); + consumer.close(); + session.close(); + connection.close(); + } + + @Test + public void testOptimizedAckWithExpiredMsgsSync() throws Exception + { + ActiveMQConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100"); + + // Create JMS resources + Connection connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("TEST.FOO"); + + // ***** Consumer code ***** + MessageConsumer consumer = session.createConsumer(destination); + + // ***** Producer Code ***** + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); + TextMessage message; + + // Produce msgs that will expire quickly + for (int i=0; i<45; i++) { + message = session.createTextMessage(text); + producer.send(message,1,1,10); + LOG.trace("Sent message: "+ message.getJMSMessageID() + + " with expiry 10 msec"); + } + // Produce msgs that don't expire + for (int i=0; i<60; i++) { + message = session.createTextMessage(text); + producer.send(message,1,1,30000); + // producer.send(message); + LOG.trace("Sent message: "+ message.getJMSMessageID() + + " with expiry 30 sec"); + } + sleep(200); + + int counter = 1; + for (; counter <= 60; ++counter) { + assertNotNull(consumer.receive(2000)); + LOG.info("counter at " + counter); + } + LOG.info("Received all expected messages with counter at: " + counter); + + // Cleanup + producer.close(); + consumer.close(); + session.close(); + connection.close(); + } + + @Test + public void testOptimizedAckWithExpiredMsgsSync2() throws Exception + { + ActiveMQConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100"); + + // Create JMS resources + Connection connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("TEST.FOO"); + + // ***** Consumer code ***** + MessageConsumer consumer = session.createConsumer(destination); + + // ***** Producer Code ***** + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); + TextMessage message; + + // Produce msgs that don't expire + for (int i=0; i<56; i++) { + message = session.createTextMessage(text); + producer.send(message,1,1,30000); + // producer.send(message); + LOG.trace("Sent message: "+ message.getJMSMessageID() + + " with expiry 30 sec"); + } + // Produce msgs that will expire quickly + for (int i=0; i<44; i++) { + message = session.createTextMessage(text); + producer.send(message,1,1,10); + LOG.trace("Sent message: "+ message.getJMSMessageID() + + " with expiry 10 msec"); + } + // Produce some moremsgs that don't expire + for (int i=0; i<4; i++) { + message = session.createTextMessage(text); + producer.send(message,1,1,30000); + // producer.send(message); + LOG.trace("Sent message: "+ message.getJMSMessageID() + + " with expiry 30 sec"); + } + + sleep(200); + + int counter = 1; + for (; counter <= 60; ++counter) { + assertNotNull(consumer.receive(2000)); + LOG.info("counter at " + counter); + } + LOG.info("Received all expected messages with counter at: " + counter); + + // Cleanup + producer.close(); + consumer.close(); + session.close(); + connection.close(); + } + + private void sleep(int milliSecondTime) { + try { + Thread.sleep(milliSecondTime); + } catch (InterruptedException igonred) { + } + } + + /** + * Standard JMS MessageListener + */ + private class MyMessageListener implements MessageListener, ExceptionListener { + + private AtomicInteger counter = new AtomicInteger(0); + + public void onMessage(final Message message) { + try { + LOG.trace("Got Message " + message.getJMSMessageID()); + LOG.info("counter at " + counter.incrementAndGet()); + } catch (final Exception e) { + } + } + + public int getCounter() { + return counter.get(); + } + + public synchronized void onException(JMSException ex) { + LOG.error("JMS Exception occured. Shutting down client."); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java new file mode 100644 index 0000000..34e3866 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OutOfOrderTestCase extends TestCase { + + private static final Logger log = LoggerFactory.getLogger(OutOfOrderTestCase.class); + + private static final String BROKER_URL = "tcp://localhost:0"; + private static final int PREFETCH = 10; + private static final String CONNECTION_URL_OPTIONS = "?jms.prefetchPolicy.all=" + PREFETCH; + + private static final String DESTINATION = "QUEUE?consumer.exclusive=true"; + + private BrokerService brokerService; + private Session session; + private Connection connection; + private String connectionUri; + + private int seq = 0; + + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setUseJmx(true); + brokerService.addConnector(BROKER_URL); + brokerService.deleteAllMessages(); + brokerService.start(); + brokerService.waitUntilStarted(); + + connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString(); + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri + CONNECTION_URL_OPTIONS); + connection = connectionFactory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + } + + protected void tearDown() throws Exception { + session.close(); + connection.close(); + brokerService.stop(); + } + + public void testOrder() throws Exception { + + log.info("Producing messages 0-29 . . ."); + Destination destination = session.createQueue(DESTINATION); + final MessageProducer messageProducer = session + .createProducer(destination); + try { + for (int i = 0; i < 30; ++i) { + final Message message = session + .createTextMessage(createMessageText(i)); + message.setStringProperty("JMSXGroupID", "FOO"); + + messageProducer.send(message); + log.info("sent " + toString(message)); + } + } finally { + messageProducer.close(); + } + + log.info("Consuming messages 0-9 . . ."); + consumeBatch(); + + log.info("Consuming messages 10-19 . . ."); + consumeBatch(); + + log.info("Consuming messages 20-29 . . ."); + consumeBatch(); + } + + protected void consumeBatch() throws Exception { + Destination destination = session.createQueue(DESTINATION); + final MessageConsumer messageConsumer = session.createConsumer(destination); + try { + for (int i = 0; i < 10; ++i) { + final Message message = messageConsumer.receive(1000L); + log.info("received " + toString(message)); + assertEquals("Message out of order", createMessageText(seq++), ((TextMessage) message).getText()); + message.acknowledge(); + } + } finally { + messageConsumer.close(); + } + } + + private String toString(final Message message) throws JMSException { + String ret = "received message '" + ((TextMessage) message).getText() + "' - " + message.getJMSMessageID(); + if (message.getJMSRedelivered()) + ret += " (redelivered)"; + return ret; + + } + + private static String createMessageText(final int index) { + return "message #" + index; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java new file mode 100644 index 0000000..80adaed --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.io.Serializable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.BrokerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test case demonstrating situation where messages are not delivered to + * consumers. + */ +public class QueueWorkerPrefetchTest extends TestCase implements + MessageListener { + private static final Logger LOG = LoggerFactory + .getLogger(QueueWorkerPrefetchTest.class); + private static final int BATCH_SIZE = 10; + private static final long WAIT_TIMEOUT = 1000 * 10; + + /** The connection URL. */ + private static final String BROKER_BIND_ADDRESS = "tcp://localhost:0"; + + /** + * The queue prefetch size to use. A value greater than 1 seems to make + * things work. + */ + private static final int QUEUE_PREFETCH_SIZE = 1; + + /** + * The number of workers to use. A single worker with a prefetch of 1 works. + */ + private static final int NUM_WORKERS = 2; + + /** Embedded JMS broker. */ + private BrokerService broker; + + /** The master's producer object for creating work items. */ + private MessageProducer workItemProducer; + + /** The master's consumer object for consuming ack messages from workers. */ + private MessageConsumer masterItemConsumer; + + /** The number of acks received by the master. */ + private final AtomicLong acksReceived = new AtomicLong(0); + + private final AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>(); + + private String connectionUri; + + /** Messages sent to the work-item queue. */ + private static class WorkMessage implements Serializable { + private static final long serialVersionUID = 1L; + private final int id; + + public WorkMessage(int id) { + this.id = id; + } + + @Override + public String toString() { + return "Work: " + id; + } + } + + /** + * The worker process. Consume messages from the work-item queue, possibly + * creating more messages to submit to the work-item queue. For each work + * item, send an ack to the master. + */ + private static class Worker implements MessageListener { + /** + * Counter shared between workers to decided when new work-item messages + * are created. + */ + private static AtomicInteger counter = new AtomicInteger(0); + + /** Session to use. */ + private Session session; + + /** Producer for sending ack messages to the master. */ + private MessageProducer masterItemProducer; + + /** Producer for sending new work items to the work-items queue. */ + private MessageProducer workItemProducer; + + public Worker(Session session) throws JMSException { + this.session = session; + masterItemProducer = session.createProducer(session + .createQueue("master-item")); + Queue workItemQueue = session.createQueue("work-item"); + workItemProducer = session.createProducer(workItemQueue); + MessageConsumer workItemConsumer = session + .createConsumer(workItemQueue); + workItemConsumer.setMessageListener(this); + } + + public void onMessage(javax.jms.Message message) { + try { + WorkMessage work = (WorkMessage) ((ObjectMessage) message) + .getObject(); + + long c = counter.incrementAndGet(); + + // Don't create a new work item for every BATCH_SIZE message. */ + if (c % BATCH_SIZE != 0) { + // Send new work item to work-item queue. + workItemProducer.send(session + .createObjectMessage(new WorkMessage(work.id + 1))); + } + + // Send ack to master. + masterItemProducer.send(session.createObjectMessage(work)); + } catch (JMSException e) { + throw new IllegalStateException("Something has gone wrong", e); + } + } + + /** Close of JMS resources used by worker. */ + public void close() throws JMSException { + masterItemProducer.close(); + workItemProducer.close(); + session.close(); + } + } + + /** Master message handler. Process ack messages. */ + public void onMessage(javax.jms.Message message) { + long acks = acksReceived.incrementAndGet(); + latch.get().countDown(); + if (acks % 1 == 0) { + LOG.info("Master now has ack count of: " + acksReceived); + } + } + + protected void setUp() throws Exception { + // Create the message broker. + super.setUp(); + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(true); + broker.addConnector(BROKER_BIND_ADDRESS); + broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + } + + protected void tearDown() throws Exception { + // Shut down the message broker. + broker.deleteAllMessages(); + broker.stop(); + super.tearDown(); + } + + public void testActiveMQ() throws Exception { + // Create the connection to the broker. + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setQueuePrefetch(QUEUE_PREFETCH_SIZE); + connectionFactory.setPrefetchPolicy(prefetchPolicy); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + Session masterSession = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + workItemProducer = masterSession.createProducer(masterSession + .createQueue("work-item")); + masterItemConsumer = masterSession.createConsumer(masterSession + .createQueue("master-item")); + masterItemConsumer.setMessageListener(this); + + // Create the workers. + Worker[] workers = new Worker[NUM_WORKERS]; + for (int i = 0; i < NUM_WORKERS; i++) { + workers[i] = new Worker(connection.createSession(false, + Session.AUTO_ACKNOWLEDGE)); + } + + // Send a message to the work queue, and wait for the BATCH_SIZE acks + // from the workers. + acksReceived.set(0); + latch.set(new CountDownLatch(BATCH_SIZE)); + workItemProducer.send(masterSession + .createObjectMessage(new WorkMessage(1))); + + if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) { + fail("First batch only received " + acksReceived + " messages"); + } + + LOG.info("First batch received"); + + // Send another message to the work queue, and wait for the next 1000 acks. It is + // at this point where the workers never get notified of this message, as they + // have a large pending queue. Creating a new worker at this point however will + // receive this new message. + acksReceived.set(0); + latch.set(new CountDownLatch(BATCH_SIZE)); + workItemProducer.send(masterSession + .createObjectMessage(new WorkMessage(1))); + + if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) { + fail("Second batch only received " + acksReceived + " messages"); + } + + LOG.info("Second batch received"); + + // Cleanup all JMS resources. + for (int i = 0; i < NUM_WORKERS; i++) { + workers[i].close(); + } + masterSession.close(); + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java new file mode 100644 index 0000000..4790e42 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class RawRollbackSharedConsumerTests { + + private static ConnectionFactory connectionFactory; + private static Destination queue; + private static BrokerService broker; + + @BeforeClass + public static void clean() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setUseJmx(true); + broker.start(); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + connectionFactory.setBrokerURL("vm://localhost?async=false"); + RawRollbackSharedConsumerTests.connectionFactory = connectionFactory; + queue = new ActiveMQQueue("queue"); + } + + @AfterClass + public static void close() throws Exception { + broker.stop(); + } + + @Before + public void clearData() throws Exception { + getMessages(false); // drain queue + convertAndSend("foo"); + convertAndSend("bar"); + } + + + @After + public void checkPostConditions() throws Exception { + + Thread.sleep(1000L); + List<String> list = getMessages(false); + assertEquals(2, list.size()); + + } + + @Test + public void testReceiveMessages() throws Exception { + + List<String> list = getMessages(true); + assertEquals(2, list.size()); + assertTrue(list.contains("foo")); + + } + + private void convertAndSend(String msg) throws Exception { + Connection connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage(msg)); + producer.close(); + session.commit(); + session.close(); + connection.close(); + } + + private List<String> getMessages(boolean rollback) throws Exception { + Connection connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + String next = ""; + List<String> msgs = new ArrayList<String>(); + MessageConsumer consumer = session.createConsumer(queue); + while (next != null) { + next = (String) receiveAndConvert(consumer); + if (next != null) + msgs.add(next); + } + consumer.close(); + if (rollback) { + session.rollback(); + } else { + session.commit(); + } + session.close(); + connection.close(); + return msgs; + } + + private String receiveAndConvert(MessageConsumer consumer) throws Exception { + Message message = consumer.receive(100L); + if (message==null) { + return null; + } + return ((TextMessage)message).getText(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java new file mode 100644 index 0000000..93abb28 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class RawRollbackTests { + + private static ConnectionFactory connectionFactory; + private static Destination queue; + private static BrokerService broker; + + @BeforeClass + public static void clean() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setUseJmx(true); + broker.start(); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + connectionFactory.setBrokerURL("vm://localhost?async=false&waitForStart=5000&jms.prefetchPolicy.all=0"); + RawRollbackTests.connectionFactory = connectionFactory; + queue = new ActiveMQQueue("queue"); + } + + @AfterClass + public static void close() throws Exception { + broker.stop(); + } + + @Before + public void clearData() throws Exception { + getMessages(false); // drain queue + convertAndSend("foo"); + convertAndSend("bar"); + } + + + @After + public void checkPostConditions() throws Exception { + + Thread.sleep(1000L); + List<String> list = getMessages(false); + assertEquals(2, list.size()); + + } + + @Test + public void testReceiveMessages() throws Exception { + + List<String> list = getMessages(true); + assertEquals(2, list.size()); + assertTrue(list.contains("foo")); + + } + + private void convertAndSend(String msg) throws Exception { + Connection connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage(msg)); + producer.close(); + session.commit(); + session.close(); + connection.close(); + } + + private List<String> getMessages(boolean rollback) throws Exception { + Connection connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + String next = ""; + List<String> msgs = new ArrayList<String>(); + while (next != null) { + next = (String) receiveAndConvert(session); + if (next != null) + msgs.add(next); + } + if (rollback) { + session.rollback(); + } else { + session.commit(); + } + session.close(); + connection.close(); + return msgs; + } + + private String receiveAndConvert(Session session) throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(100L); + consumer.close(); + if (message==null) { + return null; + } + return ((TextMessage)message).getText(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java new file mode 100644 index 0000000..65f30e3 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +public interface Receiver { + void receive(String s) throws Exception; +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java new file mode 100644 index 0000000..414b70d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RedeliveryPluginHeaderTest.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; +import org.apache.activemq.broker.util.RedeliveryPlugin; +import org.apache.activemq.util.IOHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Testing if the the broker "sends" the message as expected after the redeliveryPlugin has redelivered the + * message previously. + */ + +public class RedeliveryPluginHeaderTest extends TestCase { + + private static final String TEST_QUEUE_ONE = "TEST_QUEUE_ONE"; + private static final String TEST_QUEUE_TWO = "TEST_QUEUE_TWO"; + private static final Logger LOG = LoggerFactory + .getLogger(RedeliveryPluginHeaderTest.class); + private String transportURL; + private BrokerService broker; + + /** + * Test + * - consumes message from Queue1 + * - rolls back message to Queue1 and message is scheduled for redelivery to Queue1 by brokers plugin + * - consumes message from Queue1 again + * - sends same message to Queue2 + * - expects to consume message from Queue2 immediately + */ + + public void testSendAfterRedelivery() throws Exception { + broker = this.createBroker(false); + broker.start(); + broker.waitUntilStarted(); + + LOG.info("***Broker started..."); + + //pushed message to broker + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + transportURL + "?trace=true&jms.redeliveryPolicy.maximumRedeliveries=0"); + + Connection connection = factory.createConnection(); + connection.start(); + + try { + + Session session = connection.createSession(true, + Session.SESSION_TRANSACTED); + + Destination destinationQ1 = session.createQueue(TEST_QUEUE_ONE); + Destination destinationQ2 = session.createQueue(TEST_QUEUE_TWO); + + MessageProducer producerQ1 = session.createProducer(destinationQ1); + producerQ1.setDeliveryMode(DeliveryMode.PERSISTENT); + + Message m = session.createTextMessage("testMessage"); + LOG.info("*** send message to broker..."); + producerQ1.send(m); + session.commit(); + + //consume message from Q1 and rollback to get it redelivered + MessageConsumer consumerQ1 = session.createConsumer(destinationQ1); + + LOG.info("*** consume message from Q1 and rolled back.."); + + TextMessage textMessage = (TextMessage) consumerQ1.receive(); + LOG.info("got redelivered: " + textMessage); + assertFalse("JMSRedelivered flag is not set", textMessage.getJMSRedelivered()); + session.rollback(); + + LOG.info("*** consumed message from Q1 again and sending to Q2.."); + TextMessage textMessage2 = (TextMessage) consumerQ1.receive(); + LOG.info("got: " + textMessage2); + session.commit(); + assertTrue("JMSRedelivered flag is set", textMessage2.getJMSRedelivered()); + + //send message to Q2 and consume from Q2 + MessageConsumer consumerQ2 = session.createConsumer(destinationQ2); + MessageProducer producer_two = session.createProducer(destinationQ2); + producer_two.send(textMessage2); + session.commit(); + + //Message should be available straight away on the queue_two + Message textMessage3 = consumerQ2.receive(1000); + assertNotNull("should have consumed a message from TEST_QUEUE_TWO", textMessage3); + assertFalse("JMSRedelivered flag is not set", textMessage3.getJMSRedelivered()); + session.commit(); + + } finally { + + if (connection != null) { + connection.close(); + } + + if (broker != null) { + broker.stop(); + } + + } + + } + + protected BrokerService createBroker(boolean withJMX) throws Exception { + File schedulerDirectory = new File("target/scheduler"); + IOHelper.mkdirs(schedulerDirectory); + IOHelper.deleteChildren(schedulerDirectory); + + BrokerService answer = new BrokerService(); + answer.setAdvisorySupport(false); + answer.setDataDirectory("target"); + answer.setSchedulerDirectoryFile(schedulerDirectory); + answer.setSchedulerSupport(true); + answer.setPersistent(true); + answer.setDeleteAllMessagesOnStartup(true); + answer.setUseJmx(withJMX); + + RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin(); + RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap(); + RedeliveryPolicy defaultEntry = new RedeliveryPolicy(); + defaultEntry.setInitialRedeliveryDelay(5000); + defaultEntry.setMaximumRedeliveries(5); + redeliveryPolicyMap.setDefaultEntry(defaultEntry); + redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap); + + answer.setPlugins(new BrokerPlugin[] {redeliveryPlugin}); + TransportConnector transportConnector = + answer.addConnector("tcp://localhost:0"); + + transportURL = transportConnector.getConnectUri().toASCIIString(); + + return answer; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java new file mode 100644 index 0000000..a2c117e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SlowConsumerTest extends TestCase { + + private static final Logger LOG = LoggerFactory.getLogger(SlowConsumerTest.class); + private static final int MESSAGES_COUNT = 10000; + + private final int messageLogFrequency = 2500; + private final long messageReceiveTimeout = 10000L; + + private Socket stompSocket; + private ByteArrayOutputStream inputBuffer; + private int messagesCount; + + /** + * @param args + * @throws Exception + */ + public void testRemoveSubscriber() throws Exception { + final BrokerService broker = new BrokerService(); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.setDeleteAllMessagesOnStartup(true); + + broker.addConnector("tcp://localhost:0").setName("Default"); + broker.start(); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + broker.getTransportConnectors().get(0).getPublishableConnectString()); + final Connection connection = factory.createConnection(); + connection.start(); + + Thread producingThread = new Thread("Producing thread") { + public void run() { + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(new ActiveMQQueue(getDestinationName())); + for (int idx = 0; idx < MESSAGES_COUNT; ++idx) { + Message message = session.createTextMessage("" + idx); + producer.send(message); + LOG.debug("Sending: " + idx); + } + producer.close(); + session.close(); + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + }; + producingThread.setPriority(Thread.MAX_PRIORITY); + producingThread.start(); + Thread.sleep(1000); + + Thread consumingThread = new Thread("Consuming thread") { + + public void run() { + try { + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationName())); + int diff = 0; + while (messagesCount != MESSAGES_COUNT) { + Message msg = consumer.receive(messageReceiveTimeout); + if (msg == null) { + LOG.warn("Got null message at count: " + messagesCount + ". Continuing..."); + break; + } + String text = ((TextMessage)msg).getText(); + int currentMsgIdx = Integer.parseInt(text); + LOG.debug("Received: " + text + " messageCount: " + messagesCount); + msg.acknowledge(); + if ((messagesCount + diff) != currentMsgIdx) { + LOG.debug("Message(s) skipped!! Should be message no.: " + messagesCount + " but got: " + currentMsgIdx); + diff = currentMsgIdx - messagesCount; + } + ++messagesCount; + if (messagesCount % messageLogFrequency == 0) { + LOG.info("Received: " + messagesCount + " messages so far"); + } + // Thread.sleep(70); + } + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + }; + consumingThread.start(); + consumingThread.join(); + + assertEquals(MESSAGES_COUNT, messagesCount); + + } + + public void sendFrame(String data) throws Exception { + byte[] bytes = data.getBytes("UTF-8"); + OutputStream outputStream = stompSocket.getOutputStream(); + for (int i = 0; i < bytes.length; i++) { + outputStream.write(bytes[i]); + } + outputStream.flush(); + } + + public String receiveFrame(long timeOut) throws Exception { + stompSocket.setSoTimeout((int)timeOut); + InputStream is = stompSocket.getInputStream(); + int c = 0; + for (;;) { + c = is.read(); + if (c < 0) { + throw new IOException("socket closed."); + } else if (c == 0) { + c = is.read(); + byte[] ba = inputBuffer.toByteArray(); + inputBuffer.reset(); + return new String(ba, "UTF-8"); + } else { + inputBuffer.write(c); + } + } + } + + protected String getDestinationName() { + return getClass().getName() + "." + getName(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java new file mode 100644 index 0000000..ba15941 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupLevelDBStoreTest.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.leveldb.LevelDBStore; + + +public class SparseAckReplayAfterStoreCleanupLevelDBStoreTest extends AMQ2832Test { + @Override + protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception { + LevelDBStore store = new LevelDBStore(); + store.setFlushDelay(0); + brokerService.setPersistenceAdapter(store); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java new file mode 100644 index 0000000..44e7f5d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempQueueDeleteOnCloseTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.junit.Test; + +/** + * Demonstrates how unmarshalled VM advisory messages for temporary queues prevent other connections from being closed. + */ +public class TempQueueDeleteOnCloseTest { + + @Test + public void test() throws Exception { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + + // create a connection and session with a temporary queue + Connection connectionA = connectionFactory.createConnection(); + connectionA.setClientID("ConnectionA"); + Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination tempQueueA = sessionA.createTemporaryQueue(); + MessageConsumer consumer = sessionA.createConsumer(tempQueueA); + connectionA.start(); + + // start and stop another connection + Connection connectionB = connectionFactory.createConnection(); + connectionB.setClientID("ConnectionB"); + connectionB.start(); + connectionB.close(); + + consumer.close(); + connectionA.close(); + } +}
