http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java new file mode 100644 index 0000000..066fb07 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ResourceAllocationException; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.kahadb.plist.PListStoreImpl; +import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.usage.StoreUsage; +import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.usage.TempUsage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TempStorageBlockedBrokerTest extends TestSupport { + + public int deliveryMode = DeliveryMode.PERSISTENT; + + private static final Logger LOG = LoggerFactory.getLogger(TempStorageBlockedBrokerTest.class); + private static final int MESSAGES_COUNT = 1000; + private static byte[] buf = new byte[4 * 1024]; + private BrokerService broker; + AtomicInteger messagesSent = new AtomicInteger(0); + AtomicInteger messagesConsumed = new AtomicInteger(0); + + protected long messageReceiveTimeout = 10000L; + + Destination destination = new ActiveMQTopic("FooTwo"); + + private String connectionUri; + + public void testRunProducerWithHungConsumer() throws Exception { + + final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage(); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + // ensure messages are spooled to disk for this consumer + ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); + prefetch.setTopicPrefetch(10); + factory.setPrefetchPolicy(prefetch); + Connection consumerConnection = factory.createConnection(); + consumerConnection.start(); + + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(destination); + + final Connection producerConnection = factory.createConnection(); + producerConnection.start(); + + final CountDownLatch producerHasSentTenMessages = new CountDownLatch(10); + Thread producingThread = new Thread("Producing thread") { + @Override + public void run() { + try { + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + for (int idx = 0; idx < MESSAGES_COUNT; ++idx) { + Message message = session.createTextMessage(new String(buf) + idx); + + producer.send(message); + messagesSent.incrementAndGet(); + producerHasSentTenMessages.countDown(); + Thread.sleep(10); + if (idx != 0 && idx%100 == 0) { + LOG.info("Sent Message " + idx); + LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); + } + } + producer.close(); + session.close(); + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + }; + producingThread.start(); + + assertTrue("producer has sent 10 in a reasonable time", producerHasSentTenMessages.await(30, TimeUnit.SECONDS)); + + int count = 0; + + Message m = null; + while ((m = consumer.receive(messageReceiveTimeout)) != null) { + count++; + if (count != 0 && count%10 == 0) { + LOG.info("Recieved Message (" + count + "):" + m); + } + messagesConsumed.incrementAndGet(); + try { + Thread.sleep(100); + } catch (Exception e) { + LOG.info("error sleeping"); + } + } + + LOG.info("Connection Timeout: Retrying.. count: " + count); + + while ((m = consumer.receive(messageReceiveTimeout)) != null) { + count++; + if (count != 0 && count%100 == 0) { + LOG.info("Recieved Message (" + count + "):" + m); + } + messagesConsumed.incrementAndGet(); + try { + Thread.sleep(100); + } catch (Exception e) { + LOG.info("error sleeping"); + } + } + + LOG.info("consumer session closing: consumed count: " + count); + + consumerSession.close(); + + producingThread.join(); + + final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage(); + LOG.info("Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription); + + producerConnection.close(); + consumerConnection.close(); + + LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: " + + broker.getSystemUsage().getTempUsage().getUsage()); + + // do a cleanup + ((PListStoreImpl)broker.getTempDataStore()).run(); + LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: " + + broker.getSystemUsage().getTempUsage().getUsage()); + + assertEquals("Incorrect number of Messages Sent: " + messagesSent.get(), messagesSent.get(), MESSAGES_COUNT); + assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(), + MESSAGES_COUNT); + } + + public void testFillTempAndConsume() throws Exception { + + broker.getSystemUsage().setSendFailIfNoSpace(true); + destination = new ActiveMQQueue("Foo"); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection(); + // so we can easily catch the ResourceAllocationException on send + producerConnection.setAlwaysSyncSend(true); + producerConnection.start(); + + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + try { + while (true) { + Message message = session.createTextMessage(new String(buf) + messagesSent.toString()); + producer.send(message); + messagesSent.incrementAndGet(); + if (messagesSent.get() % 100 == 0) { + LOG.info("Sent Message " + messagesSent.get()); + LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); + } + } + } catch (ResourceAllocationException ex) { + LOG.info("Got resource exception : " + ex + ", after sent: " + messagesSent.get()); + } + + // consume all sent + Connection consumerConnection = factory.createConnection(); + consumerConnection.start(); + + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(destination); + + + while (consumer.receive(messageReceiveTimeout) != null) { + messagesConsumed.incrementAndGet(); + if (messagesConsumed.get() % 1000 == 0) { + LOG.info("received Message " + messagesConsumed.get()); + LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); + } + } + + assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(), + messagesSent.get()); + } + + @Override + public void setUp() throws Exception { + + broker = new BrokerService(); + broker.setDataDirectory("target" + File.separator + "activemq-data"); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.setAdvisorySupport(false); + broker.setDeleteAllMessagesOnStartup(true); + + setDefaultPersistenceAdapter(broker); + SystemUsage sysUsage = broker.getSystemUsage(); + MemoryUsage memUsage = new MemoryUsage(); + memUsage.setLimit((1024 * 1024)); + StoreUsage storeUsage = new StoreUsage(); + storeUsage.setLimit((1024 * 1024) * 38); + TempUsage tmpUsage = new TempUsage(); + tmpUsage.setLimit((1024 * 1024) * 38); + + PolicyEntry defaultPolicy = new PolicyEntry(); + // defaultPolicy.setTopic("FooTwo"); + defaultPolicy.setProducerFlowControl(false); + defaultPolicy.setMemoryLimit(10 * 1024); + + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(defaultPolicy); + + sysUsage.setMemoryUsage(memUsage); + sysUsage.setStoreUsage(storeUsage); + sysUsage.setTempUsage(tmpUsage); + + broker.setDestinationPolicy(policyMap); + broker.setSystemUsage(sysUsage); + + broker.addConnector("tcp://localhost:0").setName("Default"); + broker.start(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + } + + @Override + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java new file mode 100644 index 0000000..1061346 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.*; + +import java.io.File; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ResourceAllocationException; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.plist.PListStoreImpl; +import org.junit.After; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test that when configuring small temp store limits the journal size must also + * be smaller than the configured limit, but will still send a ResourceAllocationException + * if its not when sendFailIfNoSpace is enabled. + */ +public class TempStorageConfigBrokerTest { + + public int deliveryMode = DeliveryMode.PERSISTENT; + + private static final Logger LOG = LoggerFactory.getLogger(TempStorageConfigBrokerTest.class); + private static byte[] buf = new byte[4 * 1024]; + private BrokerService broker; + private AtomicInteger messagesSent = new AtomicInteger(0); + private AtomicInteger messagesConsumed = new AtomicInteger(0); + + private String brokerUri; + private long messageReceiveTimeout = 10000L; + private Destination destination = new ActiveMQTopic("FooTwo"); + + @Test(timeout=360000) + @Ignore("blocks in hudson, needs investigation") + public void testFillTempAndConsumeWithBadTempStoreConfig() throws Exception { + + createBrokerWithInvalidTempStoreConfig(); + + broker.getSystemUsage().setSendFailIfNoSpace(true); + destination = new ActiveMQQueue("Foo"); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri); + final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection(); + // so we can easily catch the ResourceAllocationException on send + producerConnection.setAlwaysSyncSend(true); + producerConnection.start(); + + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + try { + while (true) { + Message message = session.createTextMessage(new String(buf) + messagesSent.toString()); + producer.send(message); + messagesSent.incrementAndGet(); + if (messagesSent.get() % 100 == 0) { + LOG.info("Sent Message " + messagesSent.get()); + LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); + } + } + } catch (ResourceAllocationException ex) { + assertTrue("Should not be able to send 100 messages: ", messagesSent.get() < 100); + LOG.info("Got resource exception : " + ex + ", after sent: " + messagesSent.get()); + } + } + + @Test(timeout=360000) + @Ignore("blocks in hudson, needs investigation") + public void testFillTempAndConsumeWithGoodTempStoreConfig() throws Exception { + + createBrokerWithValidTempStoreConfig(); + + broker.getSystemUsage().setSendFailIfNoSpace(true); + destination = new ActiveMQQueue("Foo"); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri); + final ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection(); + // so we can easily catch the ResourceAllocationException on send + producerConnection.setAlwaysSyncSend(true); + producerConnection.start(); + + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + try { + while (true) { + Message message = session.createTextMessage(new String(buf) + messagesSent.toString()); + producer.send(message); + messagesSent.incrementAndGet(); + if (messagesSent.get() % 100 == 0) { + LOG.info("Sent Message " + messagesSent.get()); + LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); + } + } + } catch (ResourceAllocationException ex) { + assertTrue("Should be able to send at least 200 messages but was: " + messagesSent.get(), + messagesSent.get() > 200); + LOG.info("Got resource exception : " + ex + ", after sent: " + messagesSent.get()); + } + + // consume all sent + Connection consumerConnection = factory.createConnection(); + consumerConnection.start(); + + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(destination); + + while (consumer.receive(messageReceiveTimeout) != null) { + messagesConsumed.incrementAndGet(); + if (messagesConsumed.get() % 1000 == 0) { + LOG.info("received Message " + messagesConsumed.get()); + LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); + } + } + + assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), + messagesConsumed.get(), messagesSent.get()); + } + + private void createBrokerWithValidTempStoreConfig() throws Exception { + broker = new BrokerService(); + broker.setDataDirectory("target" + File.separator + "activemq-data"); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.setAdvisorySupport(false); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistenceAdapter(new KahaDBPersistenceAdapter()); + + broker.getSystemUsage().setSendFailIfNoSpace(true); + broker.getSystemUsage().getMemoryUsage().setLimit(1048576); + broker.getSystemUsage().getTempUsage().setLimit(2*1048576); + ((PListStoreImpl)broker.getSystemUsage().getTempUsage().getStore()).setJournalMaxFileLength(2 * 1048576); + broker.getSystemUsage().getStoreUsage().setLimit(20*1048576); + + PolicyEntry defaultPolicy = new PolicyEntry(); + defaultPolicy.setProducerFlowControl(false); + defaultPolicy.setMemoryLimit(10 * 1024); + + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(defaultPolicy); + + broker.setDestinationPolicy(policyMap); + broker.addConnector("tcp://localhost:0").setName("Default"); + broker.start(); + + brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + } + + private void createBrokerWithInvalidTempStoreConfig() throws Exception { + broker = new BrokerService(); + broker.setDataDirectory("target" + File.separator + "activemq-data"); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.setAdvisorySupport(false); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistenceAdapter(new KahaDBPersistenceAdapter()); + + broker.getSystemUsage().setSendFailIfNoSpace(true); + broker.getSystemUsage().getMemoryUsage().setLimit(1048576); + broker.getSystemUsage().getTempUsage().setLimit(2*1048576); + broker.getSystemUsage().getStoreUsage().setLimit(2*1048576); + + PolicyEntry defaultPolicy = new PolicyEntry(); + defaultPolicy.setProducerFlowControl(false); + defaultPolicy.setMemoryLimit(10 * 1024); + + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(defaultPolicy); + + broker.setDestinationPolicy(policyMap); + broker.addConnector("tcp://localhost:0").setName("Default"); + broker.start(); + + brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java new file mode 100644 index 0000000..34df4a3 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.plist.PListStoreImpl; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TempStoreDataCleanupTest { + + private static final Logger LOG = LoggerFactory.getLogger(TempStoreDataCleanupTest.class); + private static final String QUEUE_NAME = TempStoreDataCleanupTest.class.getName() + "Queue"; + + private final String str = new String( + "QAa0bcLdUK2eHfJgTP8XhiFj61DOklNm9nBoI5pGqYVrs3CtSuMZvwWx4yE7zR"); + + private BrokerService broker; + private String connectionUri; + private ExecutorService pool; + private String queueName; + private Random r = new Random(); + + @Before + public void setUp() throws Exception { + + broker = new BrokerService(); + broker.setDataDirectory("target" + File.separator + "activemq-data"); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.setDedicatedTaskRunner(false); + broker.setAdvisorySupport(false); + broker.setDeleteAllMessagesOnStartup(true); + + SharedDeadLetterStrategy strategy = new SharedDeadLetterStrategy(); + strategy.setProcessExpired(false); + strategy.setProcessNonPersistent(false); + + PolicyEntry defaultPolicy = new PolicyEntry(); + defaultPolicy.setQueue(">"); + defaultPolicy.setOptimizedDispatch(true); + defaultPolicy.setDeadLetterStrategy(strategy); + defaultPolicy.setMemoryLimit(9000000); + + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(defaultPolicy); + + broker.setDestinationPolicy(policyMap); + + broker.getSystemUsage().getMemoryUsage().setLimit(300000000L); + + broker.addConnector("tcp://localhost:0").setName("Default"); + broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + pool = Executors.newFixedThreadPool(10); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + + if (pool != null) { + pool.shutdown(); + } + } + + @Test + public void testIt() throws Exception { + + int startPercentage = broker.getAdminView().getMemoryPercentUsage(); + LOG.info("MemoryUsage at test start = " + startPercentage); + + for (int i = 0; i < 2; i++) { + LOG.info("Started the test iteration: " + i + " using queueName = " + queueName); + queueName = QUEUE_NAME + i; + final CountDownLatch latch = new CountDownLatch(11); + + pool.execute(new Runnable() { + @Override + public void run() { + receiveAndDiscard100messages(latch); + } + }); + + for (int j = 0; j < 10; j++) { + pool.execute(new Runnable() { + @Override + public void run() { + send10000messages(latch); + } + }); + } + + LOG.info("Waiting on the send / receive latch"); + latch.await(5, TimeUnit.MINUTES); + LOG.info("Resumed"); + + destroyQueue(); + TimeUnit.SECONDS.sleep(2); + } + + LOG.info("MemoryUsage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage()); + + final PListStoreImpl pa = (PListStoreImpl) broker.getTempDataStore(); + assertTrue("only one journal file should be left: " + pa.getJournal().getFileMap().size(), + Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return pa.getJournal().getFileMap().size() == 1; + } + }, TimeUnit.MINUTES.toMillis(3)) + ); + + int endPercentage = broker.getAdminView().getMemoryPercentUsage(); + LOG.info("MemoryUseage at test end = " + endPercentage); + + assertEquals(startPercentage, endPercentage); + } + + public void destroyQueue() { + try { + Broker broker = this.broker.getBroker(); + if (!broker.isStopped()) { + LOG.info("Removing: " + queueName); + broker.removeDestination(this.broker.getAdminConnectionContext(), new ActiveMQQueue(queueName), 10); + } + } catch (Exception e) { + LOG.warn("Got an error while removing the test queue", e); + } + } + + private void send10000messages(CountDownLatch latch) { + ActiveMQConnection activeMQConnection = null; + try { + activeMQConnection = createConnection(null); + Session session = activeMQConnection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session + .createQueue(queueName)); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + activeMQConnection.start(); + for (int i = 0; i < 10000; i++) { + TextMessage textMessage = session.createTextMessage(); + textMessage.setText(generateBody(1000)); + textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(textMessage); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + producer.close(); + } catch (JMSException e) { + LOG.warn("Got an error while sending the messages", e); + } finally { + if (activeMQConnection != null) { + try { + activeMQConnection.close(); + } catch (JMSException e) { + } + } + } + latch.countDown(); + } + + private void receiveAndDiscard100messages(CountDownLatch latch) { + ActiveMQConnection activeMQConnection = null; + try { + activeMQConnection = createConnection(null); + Session session = activeMQConnection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer( + session.createQueue(queueName)); + activeMQConnection.start(); + for (int i = 0; i < 100; i++) { + messageConsumer.receive(); + } + messageConsumer.close(); + LOG.info("Created and disconnected"); + } catch (JMSException e) { + LOG.warn("Got an error while receiving the messages", e); + } finally { + if (activeMQConnection != null) { + try { + activeMQConnection.close(); + } catch (JMSException e) { + } + } + } + latch.countDown(); + } + + private ActiveMQConnection createConnection(String id) throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + if (id != null) { + factory.setClientID(id); + } + + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + return connection; + } + + private String generateBody(int length) { + + StringBuilder sb = new StringBuilder(); + int te = 0; + for (int i = 1; i <= length; i++) { + te = r.nextInt(62); + sb.append(str.charAt(te)); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java new file mode 100644 index 0000000..3d32867 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertTrue; + +// https://issues.apache.org/jira/browse/AMQ-4262 +public class TransactedStoreUsageSuspendResumeTest { + private static final Logger LOG = LoggerFactory.getLogger(TransactedStoreUsageSuspendResumeTest.class); + + private static final int MAX_MESSAGES = 10000; + + private static final String QUEUE_NAME = "test.queue"; + + private BrokerService broker; + + private final CountDownLatch messagesReceivedCountDown = new CountDownLatch(MAX_MESSAGES); + private final CountDownLatch messagesSentCountDown = new CountDownLatch(MAX_MESSAGES); + private final CountDownLatch consumerStartLatch = new CountDownLatch(1); + + private class ConsumerThread extends Thread { + + @Override + public void run() { + try { + + consumerStartLatch.await(30, TimeUnit.SECONDS); + + ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + // wait for producer to stop + long currentSendCount; + do { + currentSendCount = messagesSentCountDown.getCount(); + TimeUnit.SECONDS.sleep(5); + } while (currentSendCount != messagesSentCountDown.getCount()); + + LOG.info("Starting consumer at: " + currentSendCount); + + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + + do { + Message message = consumer.receive(5000); + if (message != null) { + session.commit(); + messagesReceivedCountDown.countDown(); + } + if (messagesReceivedCountDown.getCount() % 500 == 0) { + LOG.info("remaining to receive: " + messagesReceivedCountDown.getCount()); + } + } while (messagesReceivedCountDown.getCount() != 0); + consumer.close(); + session.close(); + connection.close(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + } + + @Before + public void setup() throws Exception { + + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + + KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter(); + kahaDB.setJournalMaxFileLength(500 * 1024); + kahaDB.setCleanupInterval(10*1000); + broker.setPersistenceAdapter(kahaDB); + + broker.getSystemUsage().getStoreUsage().setLimit(7*1024*1024); + + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + + @Test + public void testTransactedStoreUsageSuspendResume() throws Exception { + + ConsumerThread thread = new ConsumerThread(); + thread.start(); + ExecutorService sendExecutor = Executors.newSingleThreadExecutor(); + sendExecutor.execute(new Runnable() { + @Override + public void run() { + try { + sendMessages(); + } catch (Exception ignored) { + } + } + }); + sendExecutor.shutdown(); + sendExecutor.awaitTermination(5, TimeUnit.MINUTES); + + boolean allMessagesReceived = messagesReceivedCountDown.await(10, TimeUnit.MINUTES); + if (!allMessagesReceived) { + TestSupport.dumpAllThreads("StuckConsumer!"); + } + assertTrue("Got all messages: " + messagesReceivedCountDown, allMessagesReceived); + + // give consumers a chance to exit gracefully + TimeUnit.SECONDS.sleep(2); + } + + private void sendMessages() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + factory.setAlwaysSyncSend(true); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination queue = session.createQueue(QUEUE_NAME); + Destination retainQueue = session.createQueue(QUEUE_NAME + "-retain"); + MessageProducer producer = session.createProducer(null); + + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[10]); + + for (int i=0; i<4240; i++) { + // mostly fill the store with retained messages + // so consumer only has a small bit of store usage to work with + producer.send(retainQueue, message); + session.commit(); + } + + consumerStartLatch.countDown(); + for (int i = 0; i < MAX_MESSAGES; i++) { + producer.send(queue, message); + if (i>0 && i%20 == 0) { + session.commit(); + } + messagesSentCountDown.countDown(); + if (i>0 && i%500 == 0) { + LOG.info("Sent : " + i); + } + + } + session.commit(); + producer.close(); + session.close(); + connection.close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java new file mode 100644 index 0000000..1baff9a --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * simulate message flow which cause the following exception in the broker + * (exception logged by client) <p/> 2007-07-24 13:51:23,624 + * com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure + * javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344' + * has not been started. at + * org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230) + * This appears to be consistent in a MacBook. Haven't been able to replicate it + * on Windows though + */ +public class TransactionNotStartedErrorTest extends TestCase { + + private static final Logger LOG = LoggerFactory.getLogger(TransactionNotStartedErrorTest.class); + + private static final int counter = 500; + + private static int hectorToHaloCtr; + private static int xenaToHaloCtr; + private static int troyToHaloCtr; + + private static int haloToHectorCtr; + private static int haloToXenaCtr; + private static int haloToTroyCtr; + + private final String hectorToHalo = "hectorToHalo"; + private final String xenaToHalo = "xenaToHalo"; + private final String troyToHalo = "troyToHalo"; + + private final String haloToHector = "haloToHector"; + private final String haloToXena = "haloToXena"; + private final String haloToTroy = "haloToTroy"; + + private BrokerService broker; + + private Connection hectorConnection; + private Connection xenaConnection; + private Connection troyConnection; + private Connection haloConnection; + + private final Object lock = new Object(); + + public Connection createConnection() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + broker.getTransportConnectors().get(0).getPublishableConnectString()); + return factory.createConnection(); + } + + public Session createSession(Connection connection, boolean transacted) throws JMSException { + return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + } + + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.addConnector("tcp://localhost:0").setName("Default"); + broker.start(); + LOG.info("Starting broker.."); + } + + public void tearDown() throws Exception { + hectorConnection.close(); + xenaConnection.close(); + troyConnection.close(); + haloConnection.close(); + broker.stop(); + } + + public void testTransactionNotStartedError() throws Exception { + startBroker(); + hectorConnection = createConnection(); + Thread hectorThread = buildProducer(hectorConnection, hectorToHalo); + Receiver hHectorReceiver = new Receiver() { + public void receive(String s) throws Exception { + haloToHectorCtr++; + if (haloToHectorCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver); + + troyConnection = createConnection(); + Thread troyThread = buildProducer(troyConnection, troyToHalo); + Receiver hTroyReceiver = new Receiver() { + public void receive(String s) throws Exception { + haloToTroyCtr++; + if (haloToTroyCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver); + + xenaConnection = createConnection(); + Thread xenaThread = buildProducer(xenaConnection, xenaToHalo); + Receiver hXenaReceiver = new Receiver() { + public void receive(String s) throws Exception { + haloToXenaCtr++; + if (haloToXenaCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver); + + haloConnection = createConnection(); + final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection); + final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection); + final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection); + Receiver hectorReceiver = new Receiver() { + public void receive(String s) throws Exception { + hectorToHaloCtr++; + troySender.send("halo to troy because of hector"); + if (hectorToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + Receiver xenaReceiver = new Receiver() { + public void receive(String s) throws Exception { + xenaToHaloCtr++; + hectorSender.send("halo to hector because of xena"); + if (xenaToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + Receiver troyReceiver = new Receiver() { + public void receive(String s) throws Exception { + troyToHaloCtr++; + xenaSender.send("halo to xena because of troy"); + if (troyToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver); + buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver); + buildReceiver(haloConnection, troyToHalo, true, troyReceiver); + + haloConnection.start(); + + troyConnection.start(); + troyThread.start(); + + xenaConnection.start(); + xenaThread.start(); + + hectorConnection.start(); + hectorThread.start(); + waitForMessagesToBeDelivered(); + // number of messages received should match messages sent + assertEquals(hectorToHaloCtr, counter); + LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages"); + assertEquals(xenaToHaloCtr, counter); + LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages"); + assertEquals(troyToHaloCtr, counter); + LOG.info("troyToHalo received " + troyToHaloCtr + " messages"); + assertEquals(haloToHectorCtr, counter); + LOG.info("haloToHector received " + haloToHectorCtr + " messages"); + assertEquals(haloToXenaCtr, counter); + LOG.info("haloToXena received " + haloToXenaCtr + " messages"); + assertEquals(haloToTroyCtr, counter); + LOG.info("haloToTroy received " + haloToTroyCtr + " messages"); + + } + + protected void waitForMessagesToBeDelivered() { + // let's give the listeners enough time to read all messages + long maxWaitTime = counter * 3000; + long waitTime = maxWaitTime; + long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis(); + + synchronized (lock) { + boolean hasMessages = true; + while (hasMessages && waitTime >= 0) { + try { + lock.wait(200); + } catch (InterruptedException e) { + LOG.error(e.toString()); + } + // check if all messages have been received + hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter + || haloToTroyCtr < counter; + waitTime = maxWaitTime - (System.currentTimeMillis() - start); + } + } + } + + public MessageSender buildTransactionalProducer(String queueName, Connection connection) throws Exception { + return new MessageSender(queueName, connection, true, false); + } + + public Thread buildProducer(Connection connection, final String queueName) throws Exception { + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageSender producer = new MessageSender(queueName, connection, false, false); + Thread thread = new Thread() { + + public synchronized void run() { + for (int i = 0; i < counter; i++) { + try { + producer.send(queueName); + if (session.getTransacted()) { + session.commit(); + } + + } catch (Exception e) { + throw new RuntimeException("on " + queueName + " send", e); + } + } + } + }; + return thread; + } + + public void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver) throws Exception { + final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer inputMessageConsumer = session.createConsumer(session.createQueue(queueName)); + MessageListener messageListener = new MessageListener() { + + public void onMessage(Message message) { + try { + ObjectMessage objectMessage = (ObjectMessage)message; + String s = (String)objectMessage.getObject(); + receiver.receive(s); + if (session.getTransacted()) { + session.commit(); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + inputMessageConsumer.setMessageListener(messageListener); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java new file mode 100644 index 0000000..688d066 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/TrapMessageInJDBCStoreTest.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.jdbc.DataSourceServiceSupport; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.jdbc.LeaseDatabaseLocker; +import org.apache.activemq.store.jdbc.TransactionContext; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.LeaseLockerIOExceptionHandler; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test to demostrate a message trapped in the JDBC store and not + * delivered to consumer + * + * The test throws issues the commit to the DB but throws + * an exception back to the broker. This scenario could happen when a network + * cable is disconnected - message is committed to DB but broker does not know. + * + * + */ + +public class TrapMessageInJDBCStoreTest extends TestCase { + + private static final String MY_TEST_Q = "MY_TEST_Q"; + private static final Logger LOG = LoggerFactory + .getLogger(TrapMessageInJDBCStoreTest.class); + private String transportUrl = "tcp://127.0.0.1:0"; + private BrokerService broker; + private TestTransactionContext testTransactionContext; + private TestJDBCPersistenceAdapter jdbc; + + protected BrokerService createBroker(boolean withJMX) throws Exception { + BrokerService broker = new BrokerService(); + + broker.setUseJmx(withJMX); + + EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()); + embeddedDataSource.setCreateDatabase("create"); + + //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch() + // method that can be configured to throw a SQL exception on demand + jdbc = new TestJDBCPersistenceAdapter(); + jdbc.setDataSource(embeddedDataSource); + jdbc.setCleanupPeriod(0); + testTransactionContext = new TestTransactionContext(jdbc); + + jdbc.setLockKeepAlivePeriod(1000l); + LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); + leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); + jdbc.setLocker(leaseDatabaseLocker); + + broker.setPersistenceAdapter(jdbc); + + broker.setIoExceptionHandler(new LeaseLockerIOExceptionHandler()); + + transportUrl = broker.addConnector(transportUrl).getPublishableConnectString(); + return broker; + } + + /** + * + * sends 3 messages to the queue. When the second message is being committed to the JDBCStore, $ + * it throws a dummy SQL exception - the message has been committed to the embedded DB before the exception + * is thrown + * + * Excepted correct outcome: receive 3 messages and the DB should contain no messages + * + * @throws Exception + */ + + public void testDBCommitException() throws Exception { + + broker = this.createBroker(false); + broker.deleteAllMessages(); + broker.start(); + broker.waitUntilStarted(); + + LOG.info("***Broker started..."); + + // failover but timeout in 5 seconds so the test does not hang + String failoverTransportURL = "failover:(" + transportUrl + + ")?timeout=5000"; + + + sendMessage(MY_TEST_Q, failoverTransportURL); + + //check db contents + ArrayList<Long> dbSeq = dbMessageCount(); + LOG.info("*** after send: db contains message seq " +dbSeq ); + + List<TextMessage> consumedMessages = consumeMessages(MY_TEST_Q,failoverTransportURL); + + assertEquals("number of consumed messages",3,consumedMessages.size()); + + //check db contents + dbSeq = dbMessageCount(); + LOG.info("*** after consume - db contains message seq " + dbSeq); + + assertEquals("number of messages in DB after test",0,dbSeq.size()); + + broker.stop(); + broker.waitUntilStopped(); + } + + + + public List<TextMessage> consumeMessages(String queue, + String transportURL) throws JMSException { + Connection connection = null; + LOG.debug("*** consumeMessages() called ..."); + + try { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + transportURL); + + connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(queue); + + ArrayList<TextMessage> consumedMessages = new ArrayList<TextMessage>(); + + MessageConsumer messageConsumer = session.createConsumer(destination); + + while(true){ + TextMessage textMessage= (TextMessage) messageConsumer.receive(4000); + LOG.debug("*** consumed Messages :"+textMessage); + + if(textMessage==null){ + return consumedMessages; + } + consumedMessages.add(textMessage); + } + + + } finally { + if (connection != null) { + connection.close(); + } + } + } + + public void sendMessage(String queue, String transportURL) + throws Exception { + Connection connection = null; + + try { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + transportURL); + + connection = factory.createConnection(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(queue); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + TextMessage m = session.createTextMessage("1"); + + LOG.debug("*** send message 1 to broker..."); + producer.send(m); + + // trigger SQL exception in transactionContext + LOG.debug("*** send message 2 to broker"); + m.setText("2"); + producer.send(m); + + //check db contents + ArrayList<Long> dbSeq = dbMessageCount(); + LOG.info("*** after send 2 - db contains message seq " + dbSeq); + assertEquals("number of messages in DB after send 2",2,dbSeq.size()); + + LOG.debug("*** send message 3 to broker"); + m.setText("3"); + producer.send(m); + LOG.debug("*** Finished sending messages to broker"); + + } finally { + if (connection != null) { + connection.close(); + } + } + } + + /** + * query the DB to see what messages are left in the store + * @return + * @throws SQLException + * @throws IOException + */ + private ArrayList<Long> dbMessageCount() throws SQLException, IOException { + java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection(); + PreparedStatement statement = conn.prepareStatement("SELECT MSGID_SEQ FROM ACTIVEMQ_MSGS"); + + try{ + + ResultSet result = statement.executeQuery(); + ArrayList<Long> dbSeq = new ArrayList<Long>(); + + while (result.next()){ + dbSeq.add(result.getLong(1)); + } + + return dbSeq; + + }finally{ + statement.close(); + conn.close(); + + } + + } + + /* + * Mock classes used for testing + */ + + public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter { + public TransactionContext getTransactionContext() throws IOException { + return testTransactionContext; + } + } + + public class TestTransactionContext extends TransactionContext { + + private int count; + + public TestTransactionContext( + JDBCPersistenceAdapter jdbcPersistenceAdapter) + throws IOException { + super(jdbcPersistenceAdapter); + } + + public void executeBatch() throws SQLException { + super.executeBatch(); + count++; + LOG.debug("ExecuteBatchOverride: count:" + count, new RuntimeException("executeBatch")); + + // throw on second add message + if (count == 16){ + throw new SQLException("TEST SQL EXCEPTION from executeBatch after super.execution: count:" + count); + } + } + + + + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java new file mode 100644 index 0000000..6a96a14 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VMTransportClosureTest.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.io.IOException; + +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportListener; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class VMTransportClosureTest extends EmbeddedBrokerTestSupport { + private static final Log LOG = LogFactory + .getLog(VMTransportClosureTest.class); + private static final long MAX_TEST_TIME_MILLIS = 300000; // 5min + private static final int NUM_ATTEMPTS = 100000; + + public void setUp() throws Exception { + setAutoFail(true); + setMaxTestTime(MAX_TEST_TIME_MILLIS); + super.setUp(); + } + + /** + * EmbeddedBrokerTestSupport.createBroker() binds the broker to a VM + * transport address, which results in a call to + * VMTransportFactory.doBind(location): + * <p> + * <code> + * public TransportServer doBind(URI location) throws IOException { + * return bind(location, false); + *} + *</code> + * </p> + * As a result, VMTransportServer.disposeOnDisconnect is <code>false</code>. + * To expose the bug, we need to have VMTransportServer.disposeOnDisconnect + * <code>true</code>, which is the case when the VMTransportServer is not + * already bound when the first connection is made. + */ + @Override + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(isPersistent()); + // answer.addConnector(bindAddress); + return answer; + } + + /** + * This test demonstrates how the "disposeOnDisonnect" feature of + * VMTransportServer can incorrectly close all VM connections to the local + * broker. + */ + public void testPrematureClosure() throws Exception { + + // Open a persistent connection to the local broker. The persistent + // connection is maintained through the test and should prevent the + // VMTransportServer from stopping itself when the local transport is + // closed. + ActiveMQConnection persistentConn = (ActiveMQConnection) createConnection(); + persistentConn.start(); + Session session = persistentConn.createSession(true, + Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(destination); + + for (int i = 0; i < NUM_ATTEMPTS; i++) { + LOG.info("Attempt: " + i); + + // Open and close a local transport connection. As is done by by + // most users of the transport, ensure that the transport is stopped + // when closed by the peer (via ShutdownInfo). Closing the local + // transport should not affect the persistent connection. + final Transport localTransport = TransportFactory.connect(broker + .getVmConnectorURI()); + localTransport.setTransportListener(new TransportListener() { + public void onCommand(Object command) { + if (command instanceof ShutdownInfo) { + try { + localTransport.stop(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + } + + public void onException(IOException error) { + // ignore + } + + public void transportInterupted() { + // ignore + } + + public void transportResumed() { + // ignore + } + }); + + localTransport.start(); + localTransport.stop(); + + // Ensure that the persistent connection is still usable. + producer.send(session.createMessage()); + session.rollback(); + } + + persistentConn.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java new file mode 100644 index 0000000..7939453 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import java.io.File; +import java.text.DateFormat; +import java.util.Date; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class VerifySteadyEnqueueRate extends TestCase { + + private static final Logger LOG = LoggerFactory.getLogger(VerifySteadyEnqueueRate.class); + + private static int max_messages = 1000000; + private final String destinationName = getName() + "_Queue"; + private BrokerService broker; + final boolean useTopic = false; + + protected static final String payload = new String(new byte[24]); + + @Override + public void setUp() throws Exception { + startBroker(); + } + + @Override + public void tearDown() throws Exception { + broker.stop(); + } + + @SuppressWarnings("unused") + public void testEnqueueRateCanMeetSLA() throws Exception { + if (true) { + return; + } + doTestEnqueue(false); + } + + private void doTestEnqueue(final boolean transacted) throws Exception { + final long min = 100; + final AtomicLong total = new AtomicLong(0); + final AtomicLong slaViolations = new AtomicLong(0); + final AtomicLong max = new AtomicLong(0); + final int numThreads = 6; + + Runnable runner = new Runnable() { + + @Override + public void run() { + try { + MessageSender producer = new MessageSender(destinationName, + createConnection(), transacted, useTopic); + + for (int i = 0; i < max_messages; i++) { + long startT = System.currentTimeMillis(); + producer.send(payload); + long endT = System.currentTimeMillis(); + long duration = endT - startT; + + total.incrementAndGet(); + + if (duration > max.get()) { + max.set(duration); + } + + if (duration > min) { + slaViolations.incrementAndGet(); + System.err.println("SLA violation @ "+Thread.currentThread().getName() + + " " + + DateFormat.getTimeInstance().format( + new Date(startT)) + " at message " + + i + " send time=" + duration + + " - Total SLA violations: "+slaViolations.get()+"/"+total.get()+" ("+String.format("%.6f", 100.0*slaViolations.get()/total.get())+"%)"); + } + } + + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + System.out.println("Max Violation = " + max + " - Total SLA violations: "+slaViolations.get()+"/"+total.get()+" ("+String.format("%.6f", 100.0*slaViolations.get()/total.get())+"%)"); + } + }; + ExecutorService executor = Executors.newCachedThreadPool(); + + for (int i = 0; i < numThreads; i++) { + executor.execute(runner); + } + + executor.shutdown(); + while(!executor.isTerminated()) { + executor.awaitTermination(10, TimeUnit.SECONDS); + } + } + + private Connection createConnection() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + broker.getTransportConnectors().get(0).getConnectUri()); + return factory.createConnection(); + } + + private void startBroker() throws Exception { + broker = new BrokerService(); + //broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + broker.setUseJmx(true); + + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + // The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified + // what happens if the index is updated but a journal update is lost. + // Index is going to be in consistent, but can it be repaired? + kaha.setEnableJournalDiskSyncs(false); + // Using a bigger journal file size makes he take fewer spikes as it is not switching files as often. + kaha.setJournalMaxFileLength(1024*1024*100); + + // small batch means more frequent and smaller writes + kaha.setIndexWriteBatchSize(100); + // do the index write in a separate thread + kaha.setEnableIndexWriteAsync(true); + + broker.setPersistenceAdapter(kaha); + + broker.addConnector("tcp://localhost:0").setName("Default"); + broker.start(); + LOG.info("Starting broker.."); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java new file mode 100644 index 0000000..01ecdb1 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java @@ -0,0 +1,166 @@ +/* ==================================================================== + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +==================================================================== */ + +package org.apache.activemq.bugs.amq1095; + +import java.net.URI; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import junit.framework.TestCase; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQTopic; + +/** + * <p> + * Common functionality for ActiveMQ test cases. + * </p> + * + * @author Rainer Klute <a + * href="mailto:[email protected]"><[email protected]></a> + * @since 2007-08-10 + * @version $Id: ActiveMQTestCase.java 12 2007-08-14 12:02:02Z rke $ + */ +public class ActiveMQTestCase extends TestCase +{ + private Context context; + private BrokerService broker; + protected Connection connection; + protected Destination destination; + private final List<MessageConsumer> consumersToEmpty = new LinkedList<MessageConsumer>(); + protected final long RECEIVE_TIMEOUT = 500; + + + /** <p>Constructor</p> */ + public ActiveMQTestCase() + {} + + /** <p>Constructor</p> + * @param name the test case's name + */ + public ActiveMQTestCase(final String name) + { + super(name); + } + + /** + * <p>Sets up the JUnit testing environment. + */ + @Override + protected void setUp() + { + URI uri; + try + { + /* Copy all system properties starting with "java.naming." to the initial context. */ + final Properties systemProperties = System.getProperties(); + final Properties jndiProperties = new Properties(); + for (final Iterator<Object> i = systemProperties.keySet().iterator(); i.hasNext();) + { + final String key = (String) i.next(); + if (key.startsWith("java.naming.") || key.startsWith("topic.") || + key.startsWith("queue.")) + { + final String value = (String) systemProperties.get(key); + jndiProperties.put(key, value); + } + } + context = new InitialContext(jndiProperties); + uri = new URI("xbean:org/apache/activemq/bugs/amq1095/activemq.xml"); + broker = BrokerFactory.createBroker(uri); + broker.start(); + } + catch (Exception ex) + { + throw new RuntimeException(ex); + } + + final ConnectionFactory connectionFactory; + try + { + /* Lookup the connection factory. */ + connectionFactory = (ConnectionFactory) context.lookup("TopicConnectionFactory"); + + destination = new ActiveMQTopic("TestTopic"); + + /* Create a connection: */ + connection = connectionFactory.createConnection(); + connection.setClientID("sampleClientID"); + } + catch (JMSException ex1) + { + ex1.printStackTrace(); + fail(ex1.toString()); + } + catch (NamingException ex2) { + ex2.printStackTrace(); + fail(ex2.toString()); + } + catch (Throwable ex3) { + ex3.printStackTrace(); + fail(ex3.toString()); + } + } + + + /** + * <p> + * Tear down the testing environment by receiving any messages that might be + * left in the topic after a failure and shutting down the broker properly. + * This is quite important for subsequent test cases that assume the topic + * to be empty. + * </p> + */ + @Override + protected void tearDown() throws Exception { + TextMessage msg; + try { + for (final Iterator<MessageConsumer> i = consumersToEmpty.iterator(); i.hasNext();) + { + final MessageConsumer consumer = i.next(); + if (consumer != null) + do + msg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT); + while (msg != null); + } + } catch (Exception e) { + } + if (connection != null) { + connection.stop(); + } + broker.stop(); + } + + protected void registerToBeEmptiedOnShutdown(final MessageConsumer consumer) + { + consumersToEmpty.add(consumer); + } +}
