http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java new file mode 100644 index 0000000..2ac6ff3 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ2645Test extends EmbeddedBrokerTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(AMQ2645Test.class); + private final static String QUEUE_NAME = "test.daroo.q"; + + public void testWaitForTransportInterruptionProcessingHang() + throws Exception { + final ConnectionFactory fac = new ActiveMQConnectionFactory( + "failover:(" + this.bindAddress + ")"); + final Connection connection = fac.createConnection(); + try { + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue queue = session.createQueue(QUEUE_NAME); + final MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection.start(); + + producer.send(session.createTextMessage("test")); + + final CountDownLatch afterRestart = new CountDownLatch(1); + final CountDownLatch twoNewMessages = new CountDownLatch(1); + final CountDownLatch thirdMessageReceived = new CountDownLatch(1); + + final MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + try { + afterRestart.await(); + + final TextMessage txtMsg = (TextMessage) message; + if (txtMsg.getText().equals("test")) { + producer.send(session.createTextMessage("test 1")); + TimeUnit.SECONDS.sleep(5); + // THIS SECOND send() WILL CAUSE CONSUMER DEADLOCK + producer.send(session.createTextMessage("test 2")); + LOG.info("Two new messages produced."); + twoNewMessages.countDown(); + } else if (txtMsg.getText().equals("test 3")) { + thirdMessageReceived.countDown(); + } + } catch (Exception e) { + LOG.error(e.toString()); + throw new RuntimeException(e); + } + } + }); + + LOG.info("Stopping broker...."); + broker.stop(); + + LOG.info("Creating new broker..."); + broker = createBroker(); + startBroker(); + broker.waitUntilStarted(); + + afterRestart.countDown(); + assertTrue("Consumer is deadlocked!", twoNewMessages.await(60, TimeUnit.SECONDS)); + + producer.send(session.createTextMessage("test 3")); + assertTrue("Consumer got third message after block", thirdMessageReceived.await(60, TimeUnit.SECONDS)); + + } finally { + broker.stop(); + } + + } + + @Override + protected void setUp() throws Exception { + bindAddress = "tcp://0.0.0.0:61617"; + super.setUp(); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java new file mode 100644 index 0000000..e584572 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.util.DefaultIOExceptionHandler; +import org.junit.After; +import org.junit.Test; + + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class AMQ2736Test { + BrokerService broker; + + @Test + public void testRollbackOnRecover() throws Exception { + broker = createAndStartBroker(true); + DefaultIOExceptionHandler ignoreAllExceptionsIOExHandler = new DefaultIOExceptionHandler(); + ignoreAllExceptionsIOExHandler.setIgnoreAllErrors(true); + broker.setIoExceptionHandler(ignoreAllExceptionsIOExHandler); + + ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost?async=false"); + f.setAlwaysSyncSend(true); + Connection c = f.createConnection(); + c.start(); + Session s = c.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer p = s.createProducer(new ActiveMQQueue("Tx")); + p.send(s.createTextMessage("aa")); + + // kill journal without commit + KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + KahaDBStore store = pa.getStore(); + + assertNotNull("last tx location is present " + store.getInProgressTxLocationRange()[1]); + + // test hack, close the journal to ensure no further journal updates when broker stops + // mimic kill -9 in terms of no normal shutdown sequence + store.getJournal().close(); + try { + store.close(); + } catch (Exception expectedLotsAsJournalBorked) { + } + + broker.stop(); + broker.waitUntilStopped(); + + // restart with recovery + broker = createAndStartBroker(false); + + pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + store = pa.getStore(); + + // inflight non xa tx should be rolledback on recovery + assertNull("in progress tx location is present ", store.getInProgressTxLocationRange()[0]); + + } + + @After + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + private BrokerService createAndStartBroker(boolean deleteAll) throws Exception { + BrokerService broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(deleteAll); + broker.setUseJmx(false); + broker.getManagementContext().setCreateConnector(false); + broker.start(); + return broker; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java new file mode 100644 index 0000000..b52f5c0 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ2751Test extends EmbeddedBrokerTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(AMQ2751Test.class); + + private static String clientIdPrefix = "consumer"; + private static String queueName = "FOO"; + + public void testRecoverRedelivery() throws Exception { + + final CountDownLatch redelivery = new CountDownLatch(6); + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + "failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")"); + try { + + Connection connection = factory.createConnection(); + String clientId = clientIdPrefix; + connection.setClientID(clientId); + + final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Queue queue = session.createQueue(queueName); + + MessageConsumer consumer = session.createConsumer(queue); + + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + try { + LOG.info("Got message: " + message.getJMSMessageID()); + if (message.getJMSRedelivered()) { + LOG.info("It's a redelivery."); + redelivery.countDown(); + } + LOG.info("calling recover() on the session to force redelivery."); + session.recover(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + + System.out.println("Created queue consumer with clientId " + clientId); + connection.start(); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("test")); + + assertTrue("we got 6 redeliveries", redelivery.await(20, TimeUnit.SECONDS)); + + } finally { + broker.stop(); + } + + } + + @Override + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:0"; + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java new file mode 100644 index 0000000..a1d0bc1 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import static org.junit.Assert.*; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; +import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.usage.SystemUsage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ2801Test +{ + private static final Logger LOG = LoggerFactory.getLogger(AMQ2801Test.class); + + private static final String TOPICNAME = "InvalidPendingQueueTest"; + private static final String SELECTOR1 = "JMS_ID" + " = '" + "TEST" + "'"; + private static final String SELECTOR2 = "JMS_ID" + " = '" + "TEST2" + "'"; + private static final String SUBSCRIPTION1 = "InvalidPendingQueueTest_1"; + private static final String SUBSCRIPTION2 = "InvalidPendingQueueTest_2"; + private static final int MSG_COUNT = 2500; + private Session session1; + private Connection conn1; + private Topic topic1; + private MessageConsumer consumer1; + private Session session2; + private Connection conn2; + private Topic topic2; + private MessageConsumer consumer2; + private BrokerService broker; + private String connectionUri; + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + broker.setDataDirectory("target" + File.separator + "activemq-data"); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.setAdvisorySupport(false); + broker.setDeleteAllMessagesOnStartup(true); + broker.addConnector("tcp://localhost:0").setName("Default"); + applyMemoryLimitPolicy(broker); + broker.start(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + } + + private void applyMemoryLimitPolicy(BrokerService broker) { + final SystemUsage memoryManager = new SystemUsage(); + memoryManager.getMemoryUsage().setLimit(5818230784L); + memoryManager.getStoreUsage().setLimit(6442450944L); + memoryManager.getTempUsage().setLimit(3221225472L); + broker.setSystemUsage(memoryManager); + + final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>(); + final PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setProducerFlowControl(false); + entry.setMemoryLimit(504857608); + entry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy()); + policyEntries.add(entry); + + final PolicyMap policyMap = new PolicyMap(); + policyMap.setPolicyEntries(policyEntries); + broker.setDestinationPolicy(policyMap); + } + + @After + public void tearDown() throws Exception { + conn1.close(); + conn2.close(); + if (broker != null) { + broker.stop(); + } + } + + private void produceMessages() throws Exception { + TopicConnection connection = createConnection(); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(TOPICNAME); + TopicPublisher producer = session.createPublisher(topic); + connection.start(); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + long tStamp = System.currentTimeMillis(); + BytesMessage message = session2.createBytesMessage(); + for (int i = 1; i <= MSG_COUNT; i++) + { + message.setStringProperty("JMS_ID", "TEST"); + message.setIntProperty("Type", i); + producer.publish(message); + if (i%100 == 0) { + LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - tStamp) / 100) + "m/ms"); + tStamp = System.currentTimeMillis() ; + } + } + } + + private void activeateSubscribers() throws Exception { + // First consumer + conn1 = createConnection(); + conn1.setClientID(SUBSCRIPTION1); + session1 = conn1.createSession(true, Session.SESSION_TRANSACTED); + topic1 = session1.createTopic(TOPICNAME); + consumer1 = session1.createDurableSubscriber(topic1, SUBSCRIPTION1, SELECTOR1, false); + conn1.start(); + + // Second consumer that just exists + conn2 = createConnection(); + conn2.setClientID(SUBSCRIPTION2); + session2 = conn2.createSession(true, Session.SESSION_TRANSACTED); + topic2 = session2.createTopic(TOPICNAME); + consumer2 = session2.createDurableSubscriber(topic2, SUBSCRIPTION2, SELECTOR2, false); + conn2.start(); + } + + @Test + public void testInvalidPendingQueue() throws Exception { + + activeateSubscribers(); + + assertNotNull(consumer1); + assertNotNull(consumer2); + + produceMessages(); + LOG.debug("Sent messages to a single subscriber"); + Thread.sleep(2000); + + LOG.debug("Closing durable subscriber connections"); + conn1.close(); + conn2.close(); + LOG.debug("Closed durable subscriber connections"); + + Thread.sleep(2000); + LOG.debug("Re-starting durable subscriber connections"); + + activeateSubscribers(); + LOG.debug("Started up durable subscriber connections - now view activemq console to see pending queue size on the other subscriber"); + + ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers(); + + for (int i = 0; i < subs.length; i++) { + ObjectName subName = subs[i]; + DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) + broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true); + + LOG.info(sub.getSubscriptionName() + ": pending = " + sub.getPendingQueueSize() + ", dispatched: " + sub.getDispatchedQueueSize()); + if(sub.getSubscriptionName().equals(SUBSCRIPTION1)) { + assertEquals("Incorrect number of pending messages", MSG_COUNT, sub.getPendingQueueSize() + sub.getDispatchedQueueSize()); + } else { + assertEquals("Incorrect number of pending messages", 0, sub.getPendingQueueSize()); + } + } + } + + private TopicConnection createConnection() throws Exception + { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + connectionFactory.setBrokerURL(connectionUri); + TopicConnection conn = connectionFactory.createTopicConnection(); + return conn; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java new file mode 100644 index 0000000..22ad6ab --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java @@ -0,0 +1,380 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.leveldb.LevelDBStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.disk.journal.DataFile; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ2832Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ2832Test.class); + + BrokerService broker = null; + private ActiveMQConnectionFactory cf; + private final Destination destination = new ActiveMQQueue("AMQ2832Test"); + private String connectionUri; + + protected void startBroker() throws Exception { + doStartBroker(true, false); + } + + protected void restartBroker() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + doStartBroker(false, false); + } + + protected void recoverBroker() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + doStartBroker(false, true); + } + + private void doStartBroker(boolean delete, boolean recover) throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(delete); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.addConnector("tcp://localhost:0"); + + configurePersistence(broker, recover); + + connectionUri = "vm://localhost?create=false"; + cf = new ActiveMQConnectionFactory(connectionUri); + + broker.start(); + LOG.info("Starting broker.."); + } + + protected void configurePersistence(BrokerService brokerService, boolean recover) throws Exception { + KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); + + // ensure there are a bunch of data files but multiple entries in each + adapter.setJournalMaxFileLength(1024 * 20); + + // speed up the test case, checkpoint an cleanup early and often + adapter.setCheckpointInterval(5000); + adapter.setCleanupInterval(5000); + + if (recover) { + adapter.setForceRecoverIndex(true); + } + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + /** + * Scenario: + * db-1.log has an unacknowledged message, + * db-2.log contains acks for the messages from db-1.log, + * db-3.log contains acks for the messages from db-2.log + * + * Expected behavior: since db-1.log is blocked, db-2.log and db-3.log should not be removed during the cleanup. + * Current situation on 5.10.0, 5.10.1 is that db-3.log is removed causing all messages from db-2.log, whose acks were in db-3.log, to be replayed. + * + * @throws Exception + */ + @Test + public void testAckChain() throws Exception { + startBroker(); + + StagedConsumer consumer = new StagedConsumer(); + // file #1 + produceMessagesToConsumeMultipleDataFiles(5); + // acknowledge first 2 messages and leave the 3rd one unacknowledged blocking db-1.log + consumer.receive(3); + + // send messages by consuming and acknowledging every message right after sent in order to get KahadbAdd and Remove command to be saved together + // this is necessary in order to get KahaAddMessageCommand to be saved in one db file and the corresponding KahaRemoveMessageCommand in the next one + produceAndConsumeImmediately(20, consumer); + consumer.receive(2).acknowledge(); // consume and ack the last 2 unconsumed + + // now we have 3 files written and started with #4 + consumer.close(); + + broker.stop(); + broker.waitUntilStopped(); + + recoverBroker(); + + consumer = new StagedConsumer(); + Message message = consumer.receive(1); + assertNotNull("One message stays unacked from db-1.log", message); + message.acknowledge(); + message = consumer.receive(1); + assertNull("There should not be any unconsumed messages any more", message); + consumer.close(); + } + + private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer consumer) throws Exception { + for (int i = 0; i < numOfMsgs; i++) { + produceMessagesToConsumeMultipleDataFiles(1); + consumer.receive(1).acknowledge(); + } + } + + @Test + public void testAckRemovedMessageReplayedAfterRecovery() throws Exception { + + startBroker(); + + StagedConsumer consumer = new StagedConsumer(); + int numMessagesAvailable = produceMessagesToConsumeMultipleDataFiles(20); + // this will block the reclaiming of one data file + Message firstUnacked = consumer.receive(10); + LOG.info("first unacked: " + firstUnacked.getJMSMessageID()); + Message secondUnacked = consumer.receive(1); + LOG.info("second unacked: " + secondUnacked.getJMSMessageID()); + numMessagesAvailable -= 11; + + numMessagesAvailable += produceMessagesToConsumeMultipleDataFiles(10); + // ensure ack is another data file + LOG.info("Acking firstUnacked: " + firstUnacked.getJMSMessageID()); + firstUnacked.acknowledge(); + + numMessagesAvailable += produceMessagesToConsumeMultipleDataFiles(10); + + consumer.receive(numMessagesAvailable).acknowledge(); + + // second unacked should keep first data file available but journal with the first ack + // may get whacked + consumer.close(); + + broker.stop(); + broker.waitUntilStopped(); + + recoverBroker(); + + consumer = new StagedConsumer(); + // need to force recovery? + + Message msg = consumer.receive(1, 5); + assertNotNull("One messages left after recovery", msg); + msg.acknowledge(); + + // should be no more messages + msg = consumer.receive(1, 5); + assertEquals("Only one messages left after recovery: " + msg, null, msg); + consumer.close(); + } + + @Test + public void testAlternateLossScenario() throws Exception { + + startBroker(); + PersistenceAdapter pa = broker.getPersistenceAdapter(); + if (pa instanceof LevelDBStore) { + return; + } + + ActiveMQQueue queue = new ActiveMQQueue("MyQueue"); + ActiveMQQueue disposable = new ActiveMQQueue("MyDisposableQueue"); + ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic"); + + // This ensure that data file 1 never goes away. + createInactiveDurableSub(topic); + assertEquals(1, getNumberOfJournalFiles()); + + // One Queue Message that will be acked in another data file. + produceMessages(queue, 1); + assertEquals(1, getNumberOfJournalFiles()); + + // Add some messages to consume space + produceMessages(disposable, 50); + + int dataFilesCount = getNumberOfJournalFiles(); + assertTrue(dataFilesCount > 1); + + // Create an ack for the single message on this queue + drainQueue(queue); + + // Add some more messages to consume space beyond tha data file with the ack + produceMessages(disposable, 50); + + assertTrue(dataFilesCount < getNumberOfJournalFiles()); + dataFilesCount = getNumberOfJournalFiles(); + + restartBroker(); + + // Clear out all queue data + broker.getAdminView().removeQueue(disposable.getQueueName()); + + // Once this becomes true our ack could be lost. + assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getNumberOfJournalFiles() <= 3; + } + }, TimeUnit.MINUTES.toMillis(3))); + + // Recover and the Message should not be replayed but if the old MessageAck is lost + // then it could be. + recoverBroker(); + + assertTrue(drainQueue(queue) == 0); + } + + private int getNumberOfJournalFiles() throws IOException { + + Collection<DataFile> files = + ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); + int reality = 0; + for (DataFile file : files) { + if (file != null) { + reality++; + } + } + + return reality; + } + + private void createInactiveDurableSub(Topic topic) throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID("Inactive"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); + consumer.close(); + connection.close(); + produceMessages(topic, 1); + } + + private int drainQueue(Queue queue) throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID("Inactive"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + int count = 0; + while (consumer.receive(5000) != null) { + count++; + } + consumer.close(); + connection.close(); + return count; + } + + private int produceMessages(Destination destination, int numToSend) throws Exception { + int sent = 0; + Connection connection = new ActiveMQConnectionFactory( + broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); + connection.start(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < numToSend; i++) { + producer.send(createMessage(session, i)); + sent++; + } + } finally { + connection.close(); + } + + return sent; + } + + private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception { + return produceMessages(destination, numToSend); + } + + final String payload = new String(new byte[1024]); + + private Message createMessage(Session session, int i) throws Exception { + return session.createTextMessage(payload + "::" + i); + } + + private class StagedConsumer { + Connection connection; + MessageConsumer consumer; + + StagedConsumer() throws Exception { + connection = new ActiveMQConnectionFactory("failover://" + + broker.getTransportConnectors().get(0).getConnectUri().toString()).createConnection(); + connection.start(); + consumer = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE).createConsumer(destination); + } + + public Message receive(int numToReceive) throws Exception { + return receive(numToReceive, 2); + } + + public Message receive(int numToReceive, int timeoutInSeconds) throws Exception { + Message msg = null; + for (; numToReceive > 0; numToReceive--) { + + do { + msg = consumer.receive(1*1000); + } while (msg == null && --timeoutInSeconds > 0); + + if (numToReceive > 1) { + msg.acknowledge(); + } + + if (msg != null) { + LOG.debug("received: " + msg.getJMSMessageID()); + } + } + // last message, unacked + return msg; + } + + void close() throws JMSException { + consumer.close(); + connection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java new file mode 100644 index 0000000..8bb6bff --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerView; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(value = Parameterized.class) +public class AMQ2870Test extends org.apache.activemq.TestSupport { + + static final Logger LOG = LoggerFactory.getLogger(AMQ2870Test.class); + BrokerService broker = null; + ActiveMQTopic topic; + + ActiveMQConnection consumerConnection = null, producerConnection = null; + Session producerSession; + MessageProducer producer; + final int minPercentUsageForStore = 10; + String data; + + private final PersistenceAdapterChoice persistenceAdapterChoice; + + @Parameterized.Parameters + public static Collection<PersistenceAdapterChoice[]> getTestParameters() { + String osName = System.getProperty("os.name"); + LOG.info("Running on [" + osName + "]"); + PersistenceAdapterChoice[] kahaDb = {PersistenceAdapterChoice.KahaDB}; + PersistenceAdapterChoice[] levelDb = {PersistenceAdapterChoice.LevelDB}; + List<PersistenceAdapterChoice[]> choices = new ArrayList<PersistenceAdapterChoice[]>(); + choices.add(kahaDb); + if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) { + choices.add(levelDb); + } + + return choices; + } + + public AMQ2870Test(PersistenceAdapterChoice choice) { + this.persistenceAdapterChoice = choice; + } + + @Test(timeout = 300000) + public void testSize() throws Exception { + openConsumer(); + + assertEquals(0, broker.getAdminView().getStorePercentUsage()); + + for (int i = 0; i < 5000; i++) { + sendMessage(false); + } + + final BrokerView brokerView = broker.getAdminView(); + + // wait for reclaim + assertTrue("in range with consumer", + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + // usage percent updated only on send check for isFull so once + // sends complete it is no longer updated till next send via a call to isFull + // this is optimal as it is only used to block producers + broker.getSystemUsage().getStoreUsage().isFull(); + LOG.info("store percent usage: "+brokerView.getStorePercentUsage()); + return broker.getAdminView().getStorePercentUsage() < minPercentUsageForStore; + } + })); + + closeConsumer(); + + assertTrue("in range with closed consumer", + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + broker.getSystemUsage().getStoreUsage().isFull(); + LOG.info("store precent usage: "+brokerView.getStorePercentUsage()); + return broker.getAdminView().getStorePercentUsage() < minPercentUsageForStore; + } + })); + + for (int i = 0; i < 5000; i++) { + sendMessage(false); + } + + // What if i drop the subscription? + broker.getAdminView().destroyDurableSubscriber("cliID", "subName"); + + assertTrue("in range after send with consumer", + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + broker.getSystemUsage().getStoreUsage().isFull(); + LOG.info("store precent usage: "+brokerView.getStorePercentUsage()); + return broker.getAdminView().getStorePercentUsage() < minPercentUsageForStore; + } + })); + } + + private void openConsumer() throws Exception { + consumerConnection = (ActiveMQConnection) createConnection(); + consumerConnection.setClientID("cliID"); + consumerConnection.start(); + Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subName", "filter=true", false); + + subscriber.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + // received++; + } + }); + } + + private void closeConsumer() throws JMSException { + if (consumerConnection != null) + consumerConnection.close(); + consumerConnection = null; + } + + private void sendMessage(boolean filter) throws Exception { + if (producerConnection == null) { + producerConnection = (ActiveMQConnection) createConnection(); + producerConnection.start(); + producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = producerSession.createProducer(topic); + } + + Message message = producerSession.createMessage(); + message.setBooleanProperty("filter", filter); + message.setStringProperty("data", data); + producer.send(message); + } + + private void startBroker(boolean deleteMessages) throws Exception { + broker = new BrokerService(); + broker.setAdvisorySupport(false); + broker.setBrokerName("testStoreSize"); + + if (deleteMessages) { + broker.setDeleteAllMessagesOnStartup(true); + } + LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString()); + setPersistenceAdapter(broker, persistenceAdapterChoice); + configurePersistenceAdapter(broker.getPersistenceAdapter()); + broker.getSystemUsage().getStoreUsage().setLimit(100 * 1000 * 1000); + broker.start(); + } + + private void configurePersistenceAdapter(PersistenceAdapter persistenceAdapter) { + Properties properties = new Properties(); + String maxFileLengthVal = String.valueOf(2 * 1024 * 1024); + properties.put("journalMaxFileLength", maxFileLengthVal); + properties.put("maxFileLength", maxFileLengthVal); + properties.put("cleanupInterval", "2000"); + properties.put("checkpointInterval", "2000"); + + // leveldb + properties.put("logSize", maxFileLengthVal); + + IntrospectionSupport.setProperties(persistenceAdapter, properties); + } + + private void stopBroker() throws Exception { + if (broker != null) + broker.stop(); + broker = null; + } + + @Override + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&waitForStart=5000&create=false"); + } + + @Override + @Before + public void setUp() throws Exception { + StringBuilder sb = new StringBuilder(5000); + for (int i = 0; i < 5000; i++) { + sb.append('a'); + } + data = sb.toString(); + + startBroker(true); + topic = (ActiveMQTopic) createDestination(); + } + + @Override + @After + public void tearDown() throws Exception { + stopBroker(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java new file mode 100644 index 0000000..3c38186 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.TransportConnection; +import org.apache.activemq.transport.TransportDisposedIOException; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.slf4j.LoggerFactory; + +public class AMQ2902Test extends TestCase { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(AMQ2580Test.class); + + final AtomicBoolean gotExceptionInLog = new AtomicBoolean(Boolean.FALSE); + final AtomicBoolean failedToFindMDC = new AtomicBoolean(Boolean.FALSE); + + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getThrowableInformation() != null + && event.getThrowableInformation().getThrowable() instanceof TransportDisposedIOException) { + + // Prevent StackOverflowException so we can see a sane stack trace. + if (gotExceptionInLog.get()) { + return; + } + + gotExceptionInLog.set(Boolean.TRUE); + LOG.error("got event: " + event + ", ex:" + event.getThrowableInformation().getThrowable(), event.getThrowableInformation().getThrowable()); + LOG.error("Event source: ", new Throwable("Here")); + } + if( !"Loaded the Bouncy Castle security provider.".equals(event.getMessage()) ) { + if (event.getMDC("activemq.broker") == null) { + failedToFindMDC.set(Boolean.TRUE); + } + } + return; + } + }; + + public void testNoExceptionOnClosewithStartStop() throws JMSException { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( + "vm://localhost?broker.persistent=false"); + Connection connection = connectionFactory.createConnection(); + connection.start(); + connection.stop(); + connection.close(); + } + + public void testNoExceptionOnClose() throws JMSException { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( + "vm://localhost?broker.persistent=false"); + Connection connection = connectionFactory.createConnection(); + connection.close(); + } + + @Override + public void setUp() throws Exception { + gotExceptionInLog.set(Boolean.FALSE); + failedToFindMDC.set(Boolean.FALSE); + Logger.getRootLogger().addAppender(appender); + Logger.getLogger(TransportConnection.class.getName() + ".Transport").setLevel(Level.DEBUG); + Logger.getLogger(TransportConnection.class.getName()).setLevel(Level.DEBUG); + } + + @Override + public void tearDown() throws Exception { + Logger.getRootLogger().removeAppender(appender); + assertFalse("got unexpected ex in log on graceful close", gotExceptionInLog.get()); + assertFalse("MDC is there", failedToFindMDC.get()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java new file mode 100644 index 0000000..f665431 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsMultipleClientsTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.BlockJUnit4ClassRunner; + +import java.util.Vector; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +@RunWith(BlockJUnit4ClassRunner.class) +public class AMQ2910Test extends JmsMultipleClientsTestSupport { + + final int maxConcurrency = 60; + final int msgCount = 200; + final Vector<Throwable> exceptions = new Vector<Throwable>(); + + @Override + protected BrokerService createBroker() throws Exception { + //persistent = true; + BrokerService broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.addConnector("tcp://localhost:0"); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy()); + defaultEntry.setCursorMemoryHighWaterMark(50); + defaultEntry.setMemoryLimit(500*1024); + defaultEntry.setProducerFlowControl(false); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + + broker.getSystemUsage().getMemoryUsage().setLimit(1000 * 1024); + + return broker; + } + + @Test(timeout = 30 * 1000) + public void testConcurrentSendToPendingCursor() throws Exception { + final ActiveMQConnectionFactory factory = + new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()); + factory.setCloseTimeout(30000); + ExecutorService executor = Executors.newCachedThreadPool(); + for (int i=0; i<maxConcurrency; i++) { + final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i); + executor.execute(new Runnable() { + @Override + public void run() { + try { + sendMessages(factory.createConnection(), dest, msgCount); + } catch (Throwable t) { + exceptions.add(t); + } + } + }); + } + + executor.shutdown(); + + assertTrue("send completed", executor.awaitTermination(60, TimeUnit.SECONDS)); + assertNoExceptions(); + + executor = Executors.newCachedThreadPool(); + for (int i=0; i<maxConcurrency; i++) { + final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i); + executor.execute(new Runnable() { + @Override + public void run() { + try { + startConsumers(factory, dest); + } catch (Throwable t) { + exceptions.add(t); + } + } + }); + } + + executor.shutdown(); + assertTrue("consumers completed", executor.awaitTermination(60, TimeUnit.SECONDS)); + + allMessagesList.setMaximumDuration(120*1000); + final int numExpected = maxConcurrency * msgCount; + allMessagesList.waitForMessagesToArrive(numExpected); + + if (allMessagesList.getMessageCount() != numExpected) { + dumpAllThreads(getName()); + + } + allMessagesList.assertMessagesReceivedNoWait(numExpected); + + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + + } + + private void assertNoExceptions() { + if (!exceptions.isEmpty()) { + for (Throwable t: exceptions) { + t.printStackTrace(); + } + } + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java new file mode 100644 index 0000000..57469d5 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class AMQ2982Test { + + private static final int MAX_MESSAGES = 500; + + private static final String QUEUE_NAME = "test.queue"; + + private BrokerService broker; + + private final CountDownLatch messageCountDown = new CountDownLatch(MAX_MESSAGES); + + private CleanableKahaDBStore kahaDB; + + private static class CleanableKahaDBStore extends KahaDBStore { + // make checkpoint cleanup accessible + public void forceCleanup() throws IOException { + checkpointCleanup(true); + } + + public int getFileMapSize() throws IOException { + // ensure save memory publishing, use the right lock + indexLock.readLock().lock(); + try { + return getJournal().getFileMap().size(); + } finally { + indexLock.readLock().unlock(); + } + } + } + + @Before + public void setup() throws Exception { + + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + + kahaDB = new CleanableKahaDBStore(); + kahaDB.setJournalMaxFileLength(256 * 1024); + broker.setPersistenceAdapter(kahaDB); + + broker.start(); + broker.waitUntilStarted(); + } + + private Connection registerDLQMessageListener() throws Exception { + ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session + .createQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME)); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + messageCountDown.countDown(); + } + }); + + return connection; + } + + class ConsumerThread extends Thread { + + @Override + public void run() { + try { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + + RedeliveryPolicy policy = new RedeliveryPolicy(); + policy.setMaximumRedeliveries(0); + policy.setInitialRedeliveryDelay(100); + policy.setUseExponentialBackOff(false); + + factory.setRedeliveryPolicy(policy); + + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + do { + Message message = consumer.receive(300); + if (message != null) { + session.rollback(); + } + } while (messageCountDown.getCount() != 0); + consumer.close(); + session.close(); + connection.close(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + } + + private void sendMessages() throws Exception { + ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < MAX_MESSAGES; i++) { + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[1000]); + producer.send(message); + } + producer.close(); + session.close(); + connection.close(); + } + + @Test + public void testNoStickyKahaDbLogFilesOnLocalTransactionRollback() throws Exception { + + Connection dlqConnection = registerDLQMessageListener(); + + ConsumerThread thread = new ConsumerThread(); + thread.start(); + + sendMessages(); + + thread.join(60 * 1000); + assertFalse(thread.isAlive()); + + dlqConnection.close(); + + kahaDB.forceCleanup(); + + assertEquals("only one active KahaDB log file after cleanup is expected", 1, kahaDB.getFileMapSize()); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java new file mode 100644 index 0000000..f8b941a --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class AMQ2983Test { + + private static final int MAX_CONSUMER = 10; + + private static final int MAX_MESSAGES = 2000; + + private static final String QUEUE_NAME = "test.queue"; + + private BrokerService broker; + + private final CountDownLatch messageCountDown = new CountDownLatch(MAX_MESSAGES); + + private CleanableKahaDBStore kahaDB; + + private static class CleanableKahaDBStore extends KahaDBStore { + // make checkpoint cleanup accessible + public void forceCleanup() throws IOException { + checkpointCleanup(true); + } + + public int getFileMapSize() throws IOException { + // ensure save memory publishing, use the right lock + indexLock.readLock().lock(); + try { + return getJournal().getFileMap().size(); + } finally { + indexLock.readLock().unlock(); + } + } + } + + private class ConsumerThread extends Thread { + + @Override + public void run() { + try { + ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + do { + Message message = consumer.receive(200); + if (message != null) { + session.commit(); + messageCountDown.countDown(); + } + } while (messageCountDown.getCount() != 0); + consumer.close(); + session.close(); + connection.close(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + } + + @Before + public void setup() throws Exception { + + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + + kahaDB = new CleanableKahaDBStore(); + kahaDB.setJournalMaxFileLength(256 * 1024); + broker.setPersistenceAdapter(kahaDB); + + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + + @Test + public void testNoStickyKahaDbLogFilesOnConcurrentTransactionalConsumer() throws Exception { + + List<Thread> consumerThreads = new ArrayList<Thread>(); + for (int i = 0; i < MAX_CONSUMER; i++) { + ConsumerThread thread = new ConsumerThread(); + thread.start(); + consumerThreads.add(thread); + } + sendMessages(); + + boolean allMessagesReceived = messageCountDown.await(60, TimeUnit.SECONDS); + assertTrue(allMessagesReceived); + + for (Thread thread : consumerThreads) { + thread.join(TimeUnit.MILLISECONDS.convert(60, TimeUnit.SECONDS)); + assertFalse(thread.isAlive()); + } + kahaDB.forceCleanup(); + assertEquals("Expect only one active KahaDB log file after cleanup", 1, kahaDB.getFileMapSize()); + } + + private void sendMessages() throws Exception { + ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < MAX_MESSAGES; i++) { + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[200]); + producer.send(message); + } + producer.close(); + session.close(); + connection.close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java new file mode 100644 index 0000000..dcd5064 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.Connection; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.thread.Task; +import org.apache.activemq.thread.TaskRunner; +import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.transport.*; +import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * This test involves the creation of a local and remote broker, both of which + * communicate over VM and TCP. The local broker establishes a bridge to the + * remote broker for the purposes of verifying that broker info is only + * transfered once the local broker's ID is known to the bridge support. + */ +public class AMQ3014Test { + // Change this URL to be an unused port. + private static final String BROKER_URL = "tcp://localhost:0"; + + private List<BrokerInfo> remoteBrokerInfos = Collections + .synchronizedList(new ArrayList<BrokerInfo>()); + + private BrokerService localBroker = new BrokerService(); + + // Override the "remote" broker so that it records all (remote) BrokerInfos + // that it receives. + private BrokerService remoteBroker = new BrokerService() { + @Override + protected TransportConnector createTransportConnector(URI brokerURI) + throws Exception { + TransportServer transport = TransportFactorySupport.bind(this, brokerURI); + return new TransportConnector(transport) { + @Override + protected Connection createConnection(Transport transport) + throws IOException { + Connection connection = super.createConnection(transport); + final TransportListener proxiedListener = transport + .getTransportListener(); + transport.setTransportListener(new TransportListener() { + + @Override + public void onCommand(Object command) { + if (command instanceof BrokerInfo) { + remoteBrokerInfos.add((BrokerInfo) command); + } + proxiedListener.onCommand(command); + } + + @Override + public void onException(IOException error) { + proxiedListener.onException(error); + } + + @Override + public void transportInterupted() { + proxiedListener.transportInterupted(); + } + + @Override + public void transportResumed() { + proxiedListener.transportResumed(); + } + }); + return connection; + } + + }; + } + }; + + @Before + public void init() throws Exception { + localBroker.setBrokerName("localBroker"); + localBroker.setPersistent(false); + localBroker.setUseJmx(false); + localBroker.setSchedulerSupport(false); + + remoteBroker.setBrokerName("remoteBroker"); + remoteBroker.setPersistent(false); + remoteBroker.setUseJmx(false); + remoteBroker.addConnector(BROKER_URL); + remoteBroker.setSchedulerSupport(false); + } + + @After + public void cleanup() throws Exception { + try { + localBroker.stop(); + } finally { + remoteBroker.stop(); + } + } + + /** + * This test verifies that the local broker's ID is typically known by the + * bridge support before the local broker's BrokerInfo is sent to the remote + * broker. + */ + @Test + public void NormalCaseTest() throws Exception { + runTest(0, 3000); + } + + /** + * This test verifies that timing can arise under which the local broker's + * ID is not known by the bridge support before the local broker's + * BrokerInfo is sent to the remote broker. + */ + @Test + public void DelayedCaseTest() throws Exception { + runTest(500, 3000); + } + + private void runTest(final long taskRunnerDelay, long timeout) + throws Exception { + // Add a network connector to the local broker that will create a bridge + // to the remote broker. + DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector(); + SimpleDiscoveryAgent da = new SimpleDiscoveryAgent(); + da.setServices(remoteBroker.getTransportConnectors().get(0).getPublishableConnectString()); + dnc.setDiscoveryAgent(da); + localBroker.addNetworkConnector(dnc); + + // Before starting the local broker, intercept the task runner factory + // so that the + // local VMTransport dispatcher is artificially delayed. + final TaskRunnerFactory realTaskRunnerFactory = localBroker + .getTaskRunnerFactory(); + localBroker.setTaskRunnerFactory(new TaskRunnerFactory() { + public TaskRunner createTaskRunner(Task task, String name) { + final TaskRunner realTaskRunner = realTaskRunnerFactory + .createTaskRunner(task, name); + if (name.startsWith("ActiveMQ Connection Dispatcher: ")) { + return new TaskRunner() { + @Override + public void shutdown() throws InterruptedException { + realTaskRunner.shutdown(); + } + + @Override + public void shutdown(long timeout) + throws InterruptedException { + realTaskRunner.shutdown(timeout); + } + + @Override + public void wakeup() throws InterruptedException { + Thread.sleep(taskRunnerDelay); + realTaskRunner.wakeup(); + } + }; + } else { + return realTaskRunnerFactory.createTaskRunner(task, name); + } + } + }); + + // Start the brokers and wait for the bridge to be created; the remote + // broker is started first to ensure it is available for the local + // broker to connect to. + remoteBroker.start(); + localBroker.start(); + + // Wait for the remote broker to receive the local broker's BrokerInfo + // and then verify the local broker's ID is known. + long startTimeMillis = System.currentTimeMillis(); + while (remoteBrokerInfos.isEmpty() + && (System.currentTimeMillis() - startTimeMillis) < timeout) { + Thread.sleep(100); + } + + Assert.assertFalse("Timed out waiting for bridge to form.", + remoteBrokerInfos.isEmpty()); + ; + Assert.assertNotNull("Local broker ID is null.", remoteBrokerInfos.get( + 0).getBrokerId()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java new file mode 100644 index 0000000..bfff0fd --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.ConsumerThread; +import org.apache.activemq.util.ProducerThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.junit.Test; + +import javax.jms.*; + +import java.io.File; + +import static org.junit.Assert.assertEquals; + +public class AMQ3120Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ3120Test.class); + + BrokerService broker = null; + File kahaDbDir = null; + private final Destination destination = new ActiveMQQueue("AMQ3120Test"); + final String payload = new String(new byte[1024]); + + protected void startBroker(boolean delete) throws Exception { + broker = new BrokerService(); + + //Start with a clean directory + kahaDbDir = new File(broker.getBrokerDataDirectory(), "KahaDB"); + deleteDir(kahaDbDir); + + broker.setSchedulerSupport(false); + broker.setDeleteAllMessagesOnStartup(delete); + broker.setPersistent(true); + broker.setUseJmx(false); + broker.addConnector("tcp://localhost:0"); + + PolicyMap map = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setUseCache(false); + map.setDefaultEntry(entry); + broker.setDestinationPolicy(map); + + configurePersistence(broker, delete); + + broker.start(); + LOG.info("Starting broker.."); + } + + protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception { + KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); + + // ensure there are a bunch of data files but multiple entries in each + adapter.setJournalMaxFileLength(1024 * 20); + + // speed up the test case, checkpoint an cleanup early and often + adapter.setCheckpointInterval(500); + adapter.setCleanupInterval(500); + + if (!deleteAllOnStart) { + adapter.setForceRecoverIndex(true); + } + + } + + private boolean deleteDir(File dir) { + if (dir.isDirectory()) { + String[] children = dir.list(); + for (int i = 0; i < children.length; i++) { + boolean success = deleteDir(new File(dir, children[i])); + if (!success) { + return false; + } + } + } + + return dir.delete(); + } + + private int getFileCount(File dir){ + if (dir.isDirectory()) { + String[] children = dir.list(); + return children.length; + } + + return 0; + } + + @Test + public void testCleanupOfFiles() throws Exception { + final int messageCount = 500; + startBroker(true); + int fileCount = getFileCount(kahaDbDir); + assertEquals(4, fileCount); + + Connection connection = new ActiveMQConnectionFactory( + broker.getTransportConnectors().get(0).getConnectUri()).createConnection(); + connection.start(); + Session producerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session consumerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ProducerThread producer = new ProducerThread(producerSess, destination) { + @Override + protected Message createMessage(int i) throws Exception { + return sess.createTextMessage(payload + "::" + i); + } + }; + producer.setSleep(650); + producer.setMessageCount(messageCount); + ConsumerThread consumer = new ConsumerThread(consumerSess, destination); + consumer.setBreakOnNull(false); + consumer.setMessageCount(messageCount); + + producer.start(); + consumer.start(); + + producer.join(); + consumer.join(); + + assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived()); + + broker.stop(); + broker.waitUntilStopped(); + + } + +}
