http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java new file mode 100644 index 0000000..8a952fd --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import junit.framework.Test; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.broker.BrokerTestSupport; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.usecases.MyObject; + +public class AMQ2103Test extends BrokerTestSupport { + static PolicyEntry reduceMemoryFootprint = new PolicyEntry(); + static { + reduceMemoryFootprint.setReduceMemoryFootprint(true); + } + + public PolicyEntry defaultPolicy = reduceMemoryFootprint; + + @Override + protected PolicyEntry getDefaultPolicy() { + return defaultPolicy; + } + + public void initCombosForTestVerifyMarshalledStateIsCleared() throws Exception { + addCombinationValues("defaultPolicy", new Object[]{defaultPolicy, null}); + } + + public static Test suite() { + return suite(AMQ2103Test.class); + } + + /** + * use mem persistence so no marshaling, + * reduceMemoryFootprint on/off that will reduce memory by whacking the marshaled state + * With vm transport and deferred serialisation and no persistence (mem persistence), + * we see the message as sent by the client so we can validate the contents against + * the policy + * @throws Exception + */ + public void testVerifyMarshalledStateIsCleared() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + factory.setOptimizedMessageDispatch(true); + factory.setObjectMessageSerializationDefered(true); + factory.setCopyMessageOnSend(false); + + Connection connection = factory.createConnection(); + Session session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = new ActiveMQQueue("testQ"); + MessageConsumer consumer = session.createConsumer(destination); + connection.start(); + + MessageProducer producer = session.createProducer(destination); + final MyObject obj = new MyObject("A message"); + ActiveMQObjectMessage m1 = (ActiveMQObjectMessage)session.createObjectMessage(); + m1.setObject(obj); + producer.send(m1); + + ActiveMQTextMessage m2 = new ActiveMQTextMessage(); + m2.setText("Test Message Payload."); + producer.send(m2); + + ActiveMQMapMessage m3 = new ActiveMQMapMessage(); + m3.setString("text", "my message"); + producer.send(m3); + + Message m = consumer.receive(maxWait); + assertNotNull(m); + assertEquals(m1.getMessageId().toString(), m.getJMSMessageID()); + assertTrue(m instanceof ActiveMQObjectMessage); + + if (getDefaultPolicy() != null) { + assertNull("object data cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)", + ((ActiveMQObjectMessage)m).getObject()); + } + + // verify no serialisation via vm transport + assertEquals("writeObject called", 0, obj.getWriteObjectCalled()); + assertEquals("readObject called", 0, obj.getReadObjectCalled()); + assertEquals("readObjectNoData called", 0, obj.getReadObjectNoDataCalled()); + + m = consumer.receive(maxWait); + assertNotNull(m); + assertEquals(m2.getMessageId().toString(), m.getJMSMessageID()); + assertTrue(m instanceof ActiveMQTextMessage); + + if (getDefaultPolicy() != null) { + assertNull("text cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)", + ((ActiveMQTextMessage)m).getText()); + } + + m = consumer.receive(maxWait); + assertNotNull(m); + assertEquals(m3.getMessageId().toString(), m.getJMSMessageID()); + assertTrue(m instanceof ActiveMQMapMessage); + + if (getDefaultPolicy() != null) { + assertNull("text cleared by reduceMemoryFootprint (and never marshalled as using mem persistence)", + ((ActiveMQMapMessage)m).getStringProperty("text")); + } + + connection.close(); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java new file mode 100644 index 0000000..1ad8b68 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149LevelDBTest.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.leveldb.LevelDBStore; + +public class AMQ2149LevelDBTest extends AMQ2149Test { + + @Override + protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception { + LevelDBStore persistenceFactory = new LevelDBStore(); + persistenceFactory.setDirectory(dataDirFile); + brokerService.setPersistenceAdapter(persistenceFactory); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java new file mode 100644 index 0000000..b2eba61 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java @@ -0,0 +1,584 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.io.File; +import java.lang.IllegalStateException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Timer; +import java.util.TimerTask; +import java.util.Vector; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.jms.*; + +import org.apache.activemq.AutoFailTestSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.util.LoggingBrokerPlugin; +import org.apache.activemq.command.ActiveMQDestination; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; + +interface Configurer { + public void configure(BrokerService broker) throws Exception; +} + +public class AMQ2149Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ2149Test.class); + @Rule + public TestName testName = new TestName(); + + private static final String BROKER_CONNECTOR = "tcp://localhost:61617"; + private static final String DEFAULT_BROKER_URL = "failover:("+ BROKER_CONNECTOR + +")?maxReconnectDelay=1000&useExponentialBackOff=false"; + + private final String SEQ_NUM_PROPERTY = "seqNum"; + + final int MESSAGE_LENGTH_BYTES = 75 * 1024; + final long SLEEP_BETWEEN_SEND_MS = 25; + final int NUM_SENDERS_AND_RECEIVERS = 10; + final Object brokerLock = new Object(); + + private static final long DEFAULT_BROKER_STOP_PERIOD = 10 * 1000; + private static final long DEFAULT_NUM_TO_SEND = 1400; + + long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD; + long numtoSend = DEFAULT_NUM_TO_SEND; + long sleepBetweenSend = SLEEP_BETWEEN_SEND_MS; + String brokerURL = DEFAULT_BROKER_URL; + + int numBrokerRestarts = 0; + final static int MAX_BROKER_RESTARTS = 4; + BrokerService broker; + Vector<Throwable> exceptions = new Vector<Throwable>(); + + protected File dataDirFile; + final LoggingBrokerPlugin[] plugins = new LoggingBrokerPlugin[]{new LoggingBrokerPlugin()}; + + + public void createBroker(Configurer configurer) throws Exception { + broker = new BrokerService(); + configurePersistenceAdapter(broker); + + broker.getSystemUsage().getMemoryUsage().setLimit(MESSAGE_LENGTH_BYTES * 200 * NUM_SENDERS_AND_RECEIVERS); + + broker.addConnector(BROKER_CONNECTOR); + broker.setBrokerName(testName.getMethodName()); + broker.setDataDirectoryFile(dataDirFile); + if (configurer != null) { + configurer.configure(broker); + } + broker.start(); + } + + protected void configurePersistenceAdapter(BrokerService brokerService) throws Exception { + } + + @Before + public void setUp() throws Exception { + LOG.debug("Starting test {}", testName.getMethodName()); + dataDirFile = new File("target/"+ testName.getMethodName()); + numtoSend = DEFAULT_NUM_TO_SEND; + brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD; + sleepBetweenSend = SLEEP_BETWEEN_SEND_MS; + brokerURL = DEFAULT_BROKER_URL; + } + + @After + public void tearDown() throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future<Boolean> future = executor.submit(new TeardownTask(brokerLock, broker)); + try { + LOG.debug("Teardown started."); + long start = System.currentTimeMillis(); + Boolean result = future.get(30, TimeUnit.SECONDS); + long finish = System.currentTimeMillis(); + LOG.debug("Result of teardown: {} after {} ms ", result, (finish - start)); + } catch (TimeoutException e) { + fail("Teardown timed out"); + AutoFailTestSupport.dumpAllThreads(testName.getMethodName()); + } + executor.shutdownNow(); + exceptions.clear(); + } + + private String buildLongString() { + final StringBuilder stringBuilder = new StringBuilder( + MESSAGE_LENGTH_BYTES); + for (int i = 0; i < MESSAGE_LENGTH_BYTES; ++i) { + stringBuilder.append((int) (Math.random() * 10)); + } + return stringBuilder.toString(); + } + + HashSet<Connection> connections = new HashSet<Connection>(); + private class Receiver implements MessageListener { + + private final javax.jms.Destination dest; + + private final Connection connection; + + private final Session session; + + private final MessageConsumer messageConsumer; + + private volatile long nextExpectedSeqNum = 0; + + private final boolean transactional; + + private String lastId = null; + + public Receiver(javax.jms.Destination dest, boolean transactional) throws JMSException { + this.dest = dest; + this.transactional = transactional; + connection = new ActiveMQConnectionFactory(brokerURL) + .createConnection(); + connection.setClientID(dest.toString()); + session = connection.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); + if (ActiveMQDestination.transform(dest).isTopic()) { + messageConsumer = session.createDurableSubscriber((Topic) dest, dest.toString()); + } else { + messageConsumer = session.createConsumer(dest); + } + messageConsumer.setMessageListener(this); + connection.start(); + connections.add(connection); + } + + public void close() throws JMSException { + connection.close(); + } + + public long getNextExpectedSeqNo() { + return nextExpectedSeqNum; + } + + final int TRANSACITON_BATCH = 500; + boolean resumeOnNextOrPreviousIsOk = false; + public void onMessage(Message message) { + try { + final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY); + if ((seqNum % TRANSACITON_BATCH) == 0) { + LOG.info(dest + " received " + seqNum); + + if (transactional) { + LOG.info("committing.."); + session.commit(); + } + } + if (resumeOnNextOrPreviousIsOk) { + // after an indoubt commit we need to accept what we get (within reason) + if (seqNum != nextExpectedSeqNum) { + if (seqNum == nextExpectedSeqNum - (TRANSACITON_BATCH -1)) { + nextExpectedSeqNum -= (TRANSACITON_BATCH -1); + LOG.info("In doubt commit failed, getting replay at:" + nextExpectedSeqNum); + } + } + resumeOnNextOrPreviousIsOk = false; + } + if (seqNum != nextExpectedSeqNum) { + LOG.warn(dest + " received " + seqNum + + " in msg: " + message.getJMSMessageID() + + " expected " + + nextExpectedSeqNum + + ", lastId: " + lastId + + ", message:" + message); + fail(dest + " received " + seqNum + " expected " + + nextExpectedSeqNum); + } + ++nextExpectedSeqNum; + lastId = message.getJMSMessageID(); + } catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) { + LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery); + if (expectedSometimesOnFailoverRecovery.getMessage().contains("completion in doubt")) { + // in doubt - either commit command or reply missing + // don't know if we will get a replay + resumeOnNextOrPreviousIsOk = true; + nextExpectedSeqNum++; + LOG.info("in doubt transaction completion: ok to get next or previous batch. next:" + nextExpectedSeqNum); + } else { + resumeOnNextOrPreviousIsOk = false; + // batch will be replayed + nextExpectedSeqNum -= (TRANSACITON_BATCH -1); + } + + } catch (Throwable e) { + LOG.error(dest + " onMessage error", e); + exceptions.add(e); + } + } + + } + + private class Sender implements Runnable { + + private final javax.jms.Destination dest; + + private final Connection connection; + + private final Session session; + + private final MessageProducer messageProducer; + + private volatile long nextSequenceNumber = 0; + + public Sender(javax.jms.Destination dest) throws JMSException { + this.dest = dest; + connection = new ActiveMQConnectionFactory(brokerURL) + .createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + messageProducer = session.createProducer(dest); + messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection.start(); + connections.add(connection); + } + + public void run() { + final String longString = buildLongString(); + while (nextSequenceNumber < numtoSend) { + try { + final Message message = session + .createTextMessage(longString); + message.setLongProperty(SEQ_NUM_PROPERTY, + nextSequenceNumber); + ++nextSequenceNumber; + messageProducer.send(message); + + if ((nextSequenceNumber % 500) == 0) { + LOG.info(dest + " sent " + nextSequenceNumber); + } + + } catch (javax.jms.IllegalStateException e) { + LOG.error(dest + " bailing on send error", e); + exceptions.add(e); + break; + } catch (Exception e) { + LOG.error(dest + " send error", e); + exceptions.add(e); + } + if (sleepBetweenSend > 0) { + try { + Thread.sleep(sleepBetweenSend); + } catch (InterruptedException e) { + LOG.warn(dest + " sleep interrupted", e); + } + } + } + try { + connection.close(); + } catch (JMSException ignored) { + } + } + } + + // attempt to simply replicate leveldb failure. no joy yet + public void x_testRestartReReceive() throws Exception { + createBroker(new Configurer() { + public void configure(BrokerService broker) throws Exception { + broker.deleteAllMessages(); + } + }); + + final javax.jms.Destination destination = + ActiveMQDestination.createDestination("test.dest.X", ActiveMQDestination.QUEUE_TYPE); + Thread thread = new Thread(new Sender(destination)); + thread.start(); + thread.join(); + + Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection(); + connection.setClientID(destination.toString()); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer messageConsumer = session.createConsumer(destination); + connection.start(); + + int batch = 200; + long expectedSeq; + + final TimerTask restartTask = schedualRestartTask(null, new Configurer() { + public void configure(BrokerService broker) throws Exception { + } + }); + + expectedSeq = 0; + for (int s = 0; s < 4; s++) { + for (int i = 0; i < batch; i++) { + Message message = messageConsumer.receive(20000); + assertNotNull("s:" + s + ", i:" + i, message); + final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY); + assertEquals("expected order s:" + s, expectedSeq++, seqNum); + + if (i > 0 && i%600 == 0) { + LOG.info("Commit on %5"); + // session.commit(); + } + } + restartTask.run(); + } + + } + + // no need to run this unless there are some issues with the others + public void vanilaVerify_testOrder() throws Exception { + + createBroker(new Configurer() { + public void configure(BrokerService broker) throws Exception { + broker.deleteAllMessages(); + } + }); + + verifyOrderedMessageReceipt(); + verifyStats(false); + } + + @Test(timeout = 5 * 60 * 1000) + public void testOrderWithRestart() throws Exception { + createBroker(new Configurer() { + public void configure(BrokerService broker) throws Exception { + broker.deleteAllMessages(); + } + }); + + final Timer timer = new Timer(); + schedualRestartTask(timer, new Configurer() { + public void configure(BrokerService broker) throws Exception { + } + }); + + try { + verifyOrderedMessageReceipt(); + } finally { + timer.cancel(); + } + + verifyStats(true); + } + + @Test(timeout = 5 * 60 * 1000) + public void testTopicOrderWithRestart() throws Exception { + createBroker(new Configurer() { + public void configure(BrokerService broker) throws Exception { + broker.deleteAllMessages(); + } + }); + + final Timer timer = new Timer(); + schedualRestartTask(timer, null); + + try { + verifyOrderedMessageReceipt(ActiveMQDestination.TOPIC_TYPE); + } finally { + timer.cancel(); + } + + verifyStats(true); + } + + @Test(timeout = 5 * 60 * 1000) + public void testQueueTransactionalOrderWithRestart() throws Exception { + doTestTransactionalOrderWithRestart(ActiveMQDestination.QUEUE_TYPE); + } + + @Test(timeout = 5 * 60 * 1000) + public void testTopicTransactionalOrderWithRestart() throws Exception { + doTestTransactionalOrderWithRestart(ActiveMQDestination.TOPIC_TYPE); + } + + public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception { + numtoSend = 10000; + sleepBetweenSend = 3; + brokerStopPeriod = 10 * 1000; + + createBroker(new Configurer() { + public void configure(BrokerService broker) throws Exception { + broker.deleteAllMessages(); + } + }); + + final Timer timer = new Timer(); + schedualRestartTask(timer, null); + + try { + verifyOrderedMessageReceipt(destinationType, 1, true); + } finally { + timer.cancel(); + } + + verifyStats(true); + } + + private void verifyStats(boolean brokerRestarts) throws Exception { + RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + + for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) { + DestinationStatistics stats = dest.getDestinationStatistics(); + if (brokerRestarts) { + // all bets are off w.r.t stats as there may be duplicate sends and duplicate + // dispatches, all of which will be suppressed - either by the reference store + // not allowing duplicate references or consumers acking duplicates + LOG.info("with restart: not asserting qneue/dequeue stat match for: " + dest.getName() + + " " + stats.getEnqueues().getCount() + " <= " +stats.getDequeues().getCount()); + } else { + assertEquals("qneue/dequeue match for: " + dest.getName(), + stats.getEnqueues().getCount(), stats.getDequeues().getCount()); + } + } + } + + private TimerTask schedualRestartTask(final Timer timer, final Configurer configurer) { + class RestartTask extends TimerTask { + public void run() { + synchronized (brokerLock) { + LOG.info("stopping broker.."); + try { + broker.stop(); + broker.waitUntilStopped(); + } catch (Exception e) { + LOG.error("ex on broker stop", e); + exceptions.add(e); + } + LOG.info("restarting broker"); + try { + createBroker(configurer); + broker.waitUntilStarted(); + } catch (Exception e) { + LOG.error("ex on broker restart", e); + exceptions.add(e); + } + } + if (++numBrokerRestarts < MAX_BROKER_RESTARTS && timer != null) { + // do it again + try { + timer.schedule(new RestartTask(), brokerStopPeriod); + } catch (IllegalStateException ignore_alreadyCancelled) { + } + } else { + LOG.info("no longer stopping broker on reaching Max restarts: " + MAX_BROKER_RESTARTS); + } + } + } + RestartTask task = new RestartTask(); + if (timer != null) { + timer.schedule(task, brokerStopPeriod); + } + return task; + } + + private void verifyOrderedMessageReceipt(byte destinationType) throws Exception { + verifyOrderedMessageReceipt(destinationType, NUM_SENDERS_AND_RECEIVERS, false); + } + + private void verifyOrderedMessageReceipt() throws Exception { + verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE, NUM_SENDERS_AND_RECEIVERS, false); + } + + private void verifyOrderedMessageReceipt(byte destinationType, int concurrentPairs, boolean transactional) throws Exception { + + Vector<Thread> threads = new Vector<Thread>(); + Vector<Receiver> receivers = new Vector<Receiver>(); + + for (int i = 0; i < concurrentPairs; ++i) { + final javax.jms.Destination destination = + ActiveMQDestination.createDestination("test.dest." + i, destinationType); + receivers.add(new Receiver(destination, transactional)); + Thread thread = new Thread(new Sender(destination)); + thread.start(); + threads.add(thread); + } + + final long expiry = System.currentTimeMillis() + 1000 * 60 * 4; + while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) { + Thread sendThread = threads.firstElement(); + sendThread.join(1000*30); + if (!sendThread.isAlive()) { + threads.remove(sendThread); + } else { + AutoFailTestSupport.dumpAllThreads("Send blocked"); + } + } + LOG.info("senders done..." + threads); + + while(!receivers.isEmpty() && System.currentTimeMillis() < expiry) { + Receiver receiver = receivers.firstElement(); + if (receiver.getNextExpectedSeqNo() >= numtoSend || !exceptions.isEmpty()) { + receiver.close(); + receivers.remove(receiver); + } + } + + for (Connection connection : connections) { + try { + connection.close(); + } catch (Exception ignored) {} + } + connections.clear(); + + assertTrue("No timeout waiting for senders/receivers to complete", System.currentTimeMillis() < expiry); + if (!exceptions.isEmpty()) { + exceptions.get(0).printStackTrace(); + } + + LOG.info("Dangling threads: " + threads); + for (Thread dangling : threads) { + dangling.interrupt(); + dangling.join(10*1000); + } + + assertTrue("No exceptions", exceptions.isEmpty()); + } + +} + +class TeardownTask implements Callable<Boolean> { + private Object brokerLock; + private BrokerService broker; + + public TeardownTask(Object brokerLock, BrokerService broker) { + this.brokerLock = brokerLock; + this.broker = broker; + } + + @Override + public Boolean call() throws Exception { + synchronized(brokerLock) { + if (broker!= null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + return Boolean.TRUE; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java new file mode 100644 index 0000000..f23f758 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2171Test.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import javax.jms.*; +import javax.jms.Queue; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class AMQ2171Test implements Thread.UncaughtExceptionHandler { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ2171Test.class); + private static final String BROKER_URL = "tcp://localhost:0"; + private static final int QUEUE_SIZE = 100; + + private static BrokerService brokerService; + private static Queue destination; + + private String brokerUri; + private String brokerUriNoPrefetch; + private Collection<Throwable> exceptions = new CopyOnWriteArrayList<Throwable>(); + + @Before + public void setUp() throws Exception { + // Start an embedded broker up. + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.addConnector(BROKER_URL); + brokerService.start(); + + brokerUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString().toString(); + brokerUriNoPrefetch = brokerUri + "?jms.prefetchPolicy.all=0"; + + destination = new ActiveMQQueue("Test"); + produce(brokerUri, QUEUE_SIZE); + } + + @Before + public void addHandler() { + Thread.setDefaultUncaughtExceptionHandler(this); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + } + + @Test(timeout = 10000) + public void testBrowsePrefetch() throws Exception { + runTest(brokerUri); + } + + @Test(timeout = 10000) + public void testBrowseNoPrefetch() throws Exception { + runTest(brokerUriNoPrefetch); + } + + private void runTest(String brokerURL) throws Exception { + + Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection(); + + try { + connection.start(); + + Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + @SuppressWarnings("unchecked") + Enumeration<Message> unread = (Enumeration<Message>) session.createBrowser(destination).getEnumeration(); + + int count = 0; + while (unread.hasMoreElements()) { + unread.nextElement(); + count++; + } + + assertEquals(QUEUE_SIZE, count); + assertTrue(exceptions.isEmpty()); + } finally { + try { + connection.close(); + } catch (JMSException e) { + exceptions.add(e); + } + } + } + + private static void produce(String brokerURL, int count) throws Exception { + Connection connection = null; + + try { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL); + connection = factory.createConnection(); + Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setTimeToLive(0); + connection.start(); + + for (int i = 0; i < count; i++) { + int id = i + 1; + TextMessage message = session.createTextMessage("Message " + id); + message.setIntProperty("MsgNumber", id); + producer.send(message); + + if (id % 500 == 0) { + LOG.info("sent " + id + ", ith " + message); + } + } + } finally { + try { + if (connection != null) { + connection.close(); + } + } catch (Throwable e) { + } + } + } + + public void uncaughtException(Thread t, Throwable e) { + exceptions.add(e); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java new file mode 100644 index 0000000..0903e56 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2200Test.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.*; +import java.io.File; +import java.util.concurrent.TimeUnit; + +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.TopicSubscriptionViewMBean; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ2200Test { + + private static final String bindAddress = "tcp://0.0.0.0:0"; + private BrokerService broker; + private ActiveMQConnectionFactory cf; + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + broker.setDataDirectory("target" + File.separator + "activemq-data"); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.setAdvisorySupport(false); + broker.setDeleteAllMessagesOnStartup(true); + broker.addConnector(bindAddress); + String address = broker.getTransportConnectors().get(0).getPublishableConnectString(); + broker.start(); + broker.waitUntilStarted(); + + cf = new ActiveMQConnectionFactory(address); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void testTopicSubscriptionView() throws Exception { + TopicConnection connection = cf.createTopicConnection(); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic destination = session.createTopic("TopicViewTestTopic"); + MessageConsumer consumer = session.createConsumer(destination); + assertNotNull(consumer); + TimeUnit.SECONDS.sleep(1); + + ObjectName subscriptionNames[] = broker.getAdminView().getTopicSubscribers(); + assertTrue(subscriptionNames.length > 0); + + boolean fail = true; + for(ObjectName name : subscriptionNames) { + if (name.toString().contains("TopicViewTestTopic")) { + TopicSubscriptionViewMBean sub = (TopicSubscriptionViewMBean) + broker.getManagementContext().newProxyInstance(name, TopicSubscriptionViewMBean.class, true); + assertNotNull(sub); + assertTrue(sub.getSessionId() != -1); + // Check that its the default value then configure something new. + assertTrue(sub.getMaximumPendingQueueSize() == -1); + sub.setMaximumPendingQueueSize(1000); + assertTrue(sub.getMaximumPendingQueueSize() != -1); + fail = false; + } + } + + if (fail) { + fail("Didn't find the TopicSubscriptionView"); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java new file mode 100644 index 0000000..f267a99 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2213Test.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ2213Test +{ + BrokerService broker; + ConnectionFactory factory; + Connection connection; + Session session; + Queue queue; + MessageConsumer consumer; + + public void createBroker(boolean deleteAll) throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(deleteAll); + broker.setDataDirectory("target/AMQ3145Test"); + broker.setUseJmx(true); + broker.getManagementContext().setCreateConnector(false); + broker.addConnector("tcp://localhost:0"); + broker.start(); + broker.waitUntilStarted(); + factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString()); + connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + } + + @Before + public void createBroker() throws Exception { + createBroker(true); + } + + @After + public void tearDown() throws Exception { + if (consumer != null) { + consumer.close(); + } + session.close(); + connection.stop(); + connection.close(); + broker.stop(); + } + + @Test + public void testEqualsGenericSession() throws JMSException + { + assertNotNull(this.connection); + Session sess = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertTrue(sess.equals(sess)); + } + + @Test + public void testEqualsTopicSession() throws JMSException + { + assertNotNull(this.connection); + assertTrue(this.connection instanceof TopicConnection); + TopicSession sess = ((TopicConnection)this.connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + assertTrue(sess.equals(sess)); + } + + @Test + public void testEqualsQueueSession() throws JMSException + { + assertNotNull(this.connection); + assertTrue(this.connection instanceof QueueConnection); + QueueSession sess = ((QueueConnection)this.connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + assertTrue(sess.equals(sess)); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java new file mode 100644 index 0000000..369385c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import java.util.concurrent.CountDownLatch; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.Test; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ2314Test extends CombinationTestSupport { + + public boolean consumeAll = false; + public int deliveryMode = DeliveryMode.NON_PERSISTENT; + + private static final Logger LOG = LoggerFactory.getLogger(AMQ2314Test.class); + private static final int MESSAGES_COUNT = 30000; + private static byte[] buf = new byte[1024]; + private BrokerService broker; + private String connectionUri; + + private static final long messageReceiveTimeout = 500L; + + Destination destination = new ActiveMQTopic("FooTwo"); + + public void testRemoveSlowSubscriberWhacksTempStore() throws Exception { + runProducerWithHungConsumer(); + } + + public void testMemoryUsageReleasedOnAllConsumed() throws Exception { + consumeAll = true; + runProducerWithHungConsumer(); + // do it again to ensure memory limits are decreased + runProducerWithHungConsumer(); + } + + public void runProducerWithHungConsumer() throws Exception { + + final CountDownLatch consumerContinue = new CountDownLatch(1); + final CountDownLatch consumerReady = new CountDownLatch(1); + + final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage(); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + factory.setAlwaysSyncSend(true); + + // ensure messages are spooled to disk for this consumer + ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); + prefetch.setTopicPrefetch(500); + factory.setPrefetchPolicy(prefetch); + final Connection connection = factory.createConnection(); + connection.start(); + + Thread producingThread = new Thread("Producing thread") { + public void run() { + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + for (int idx = 0; idx < MESSAGES_COUNT; ++idx) { + Message message = session.createTextMessage(new String(buf) + idx); + producer.send(message); + } + producer.close(); + session.close(); + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + }; + + Thread consumingThread = new Thread("Consuming thread") { + public void run() { + try { + int count = 0; + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + + while (consumer.receive(messageReceiveTimeout) == null) { + consumerReady.countDown(); + } + count++; + LOG.info("Received one... waiting"); + consumerContinue.await(); + if (consumeAll) { + LOG.info("Consuming the rest of the messages..."); + while (consumer.receive(messageReceiveTimeout) != null) { + count++; + } + } + LOG.info("consumer session closing: consumed count: " + count); + session.close(); + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + }; + consumingThread.start(); + consumerReady.await(); + + producingThread.start(); + producingThread.join(); + + final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage(); + LOG.info("Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription); + assertTrue("some temp store has been used", tempUsageBySubscription != origTempUsage); + consumerContinue.countDown(); + consumingThread.join(); + connection.close(); + + LOG.info("Subscription Usage: " + tempUsageBySubscription + ", endUsage: " + + broker.getSystemUsage().getTempUsage().getUsage()); + + assertTrue("temp usage decreased with removed sub", Wait.waitFor(new Wait.Condition(){ + public boolean isSatisified() throws Exception { + return broker.getSystemUsage().getTempUsage().getUsage() < tempUsageBySubscription; + } + })); + } + + public void setUp() throws Exception { + super.setAutoFail(true); + super.setUp(); + broker = new BrokerService(); + broker.setDataDirectory("target" + File.separator + "activemq-data"); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.setAdvisorySupport(false); + broker.setDeleteAllMessagesOnStartup(true); + broker.getSystemUsage().getMemoryUsage().setLimit(1024l*1024*64); + + broker.addConnector("tcp://localhost:0").setName("Default"); + broker.start(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + } + + public void tearDown() throws Exception { + broker.stop(); + } + + public static Test suite() { + return suite(AMQ2314Test.class); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java new file mode 100644 index 0000000..283dd92 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBStore; + +/* + A AMQ2356Test + We have an environment where we have a very large number of destinations. + In an effort to reduce the number of threads I have set the options + -Dorg.apache.activemq.UseDedicatedTaskRunner=false + + and + + <policyEntry queue=">" optimizedDispatch="true"/> + + Unfortunately this very quickly leads to deadlocked queues. + + My environment is: + + ActiveMQ 5.2 Ubunty Jaunty kernel 2.6.28-14-generic #47-Ubuntu SMP (although only a single core on my system) + TCP transportConnector + + To reproduce the bug (which I can do 100% of the time) I connect 5 consumers (AUTO_ACK) to 5 different queues. + Then I start 5 producers and pair them up with a consumer on a queue, and they start sending PERSISTENT messages. + I've set the producer to send 100 messages and disconnect, and the consumer to receive 100 messages and disconnect. + The first pair usually gets through their 100 messages and disconnect, at which point all the other pairs have + deadlocked at less than 30 messages each. + */ +public class AMQ2356Test extends TestCase { + protected static final int MESSAGE_COUNT = 1000; + protected static final int NUMBER_OF_PAIRS = 10; + protected BrokerService broker; + protected String brokerURL = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; + protected int destinationCount; + + public void testScenario() throws Exception { + for (int i = 0; i < NUMBER_OF_PAIRS; i++) { + ActiveMQQueue queue = new ActiveMQQueue(getClass().getName() + ":" + i); + ProducerConsumerPair cp = new ProducerConsumerPair(); + cp.start(this.brokerURL, queue, MESSAGE_COUNT); + cp.testRun(); + cp.stop(); + } + } + + protected Destination getDestination(Session session) throws JMSException { + String destinationName = getClass().getName() + "." + destinationCount++; + return session.createQueue(destinationName); + } + + @Override + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + } + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + } + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + configureBroker(answer); + answer.start(); + return answer; + } + + protected void configureBroker(BrokerService answer) throws Exception { + File dataFileDir = new File("target/test-amq-data/bugs/AMQ2356/kahadb"); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(dataFileDir); + answer.setUseJmx(false); + // Setup a destination policy where it takes only 1 message at a time. + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setOptimizedDispatch(true); + policyMap.setDefaultEntry(policy); + answer.setDestinationPolicy(policyMap); + + answer.setAdvisorySupport(false); + answer.setEnableStatistics(false); + answer.setDeleteAllMessagesOnStartup(true); + answer.addConnector(brokerURL); + + } + + static class ProducerConsumerPair { + private Destination destination; + private MessageProducer producer; + private MessageConsumer consumer; + private Connection producerConnection; + private Connection consumerConnection; + private int numberOfMessages; + + ProducerConsumerPair() { + + } + + void start(String brokerURL, final Destination dest, int msgNum) throws Exception { + this.destination = dest; + this.numberOfMessages = msgNum; + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL); + this.producerConnection = cf.createConnection(); + this.producerConnection.start(); + this.consumerConnection = cf.createConnection(); + this.consumerConnection.start(); + this.producer = createProducer(this.producerConnection); + this.consumer = createConsumer(this.consumerConnection); + } + + void testRun() throws Exception { + + Session s = this.producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + for (int i = 0; i < this.numberOfMessages; i++) { + BytesMessage msg = s.createBytesMessage(); + msg.writeBytes(new byte[1024]); + this.producer.send(msg); + } + int received = 0; + for (int i = 0; i < this.numberOfMessages; i++) { + Message msg = this.consumer.receive(); + assertNotNull(msg); + received++; + } + assertEquals("Messages received on " + this.destination, this.numberOfMessages, received); + + } + + void stop() throws Exception { + if (this.producerConnection != null) { + this.producerConnection.close(); + } + if (this.consumerConnection != null) { + this.consumerConnection.close(); + } + } + + private MessageProducer createProducer(Connection connection) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer result = session.createProducer(this.destination); + return result; + } + + private MessageConsumer createConsumer(Connection connection) throws Exception { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer result = session.createConsumer(this.destination); + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java new file mode 100644 index 0000000..15d24d5 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2364Test.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; +//package org.apache.activemq.transport.failover; + +import static org.junit.Assert.assertEquals; + +import java.lang.reflect.Field; +import java.net.URI; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.state.ConnectionState; +import org.apache.activemq.state.ConnectionStateTracker; +import org.apache.activemq.state.TransactionState; +import org.apache.activemq.transport.MutexTransport; +import org.apache.activemq.transport.ResponseCorrelator; +import org.apache.activemq.transport.failover.FailoverTransport; +import org.junit.Test; + + +public class AMQ2364Test { + + @SuppressWarnings("unchecked") + @Test + public void testRollbackLeak() throws Exception { + + int messageCount = 1000; + URI failoverUri = new URI("failover:(vm://localhost)?jms.redeliveryPolicy.maximumRedeliveries=0"); + + Destination dest = new ActiveMQQueue("Failover.Leak"); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUri); + ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + connection.start(); + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer producer = session.createProducer(dest); + + for (int i = 0; i < messageCount; ++i) + producer.send(session.createTextMessage("Test message #" + i)); + producer.close(); + session.commit(); + + MessageConsumer consumer = session.createConsumer(dest); + + final CountDownLatch latch = new CountDownLatch(messageCount); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message msg) { + try { + session.rollback(); + } catch (JMSException e) { + e.printStackTrace(); + } finally { + latch.countDown(); + } + } + }); + + latch.await(); + consumer.close(); + session.close(); + + ResponseCorrelator respCorr = (ResponseCorrelator) connection.getTransport(); + MutexTransport mutexTrans = (MutexTransport) respCorr.getNext(); + FailoverTransport failoverTrans = (FailoverTransport) mutexTrans.getNext(); + Field stateTrackerField = FailoverTransport.class.getDeclaredField("stateTracker"); + stateTrackerField.setAccessible(true); + ConnectionStateTracker stateTracker = (ConnectionStateTracker) stateTrackerField.get(failoverTrans); + Field statesField = ConnectionStateTracker.class.getDeclaredField("connectionStates"); + statesField.setAccessible(true); + ConcurrentHashMap<ConnectionId, ConnectionState> states = + (ConcurrentHashMap<ConnectionId, ConnectionState>) statesField.get(stateTracker); + + ConnectionState state = states.get(connection.getConnectionInfo().getConnectionId()); + + Collection<TransactionState> transactionStates = state.getTransactionStates(); + + connection.stop(); + connection.close(); + + assertEquals("Transaction states not cleaned up", 0,transactionStates.size()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java new file mode 100644 index 0000000..49c2366 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2383Test.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + + +import static org.junit.Assert.*; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.Test; + +public class AMQ2383Test { + + @Test + public void activeMQTest() throws Exception { + Destination dest = ActiveMQQueue.createDestination("testQueue", ActiveMQQueue.QUEUE_TYPE); + ConnectionFactory factory = new ActiveMQConnectionFactory( + "vm://localhost?broker.useJmx=false&broker.persistent=false"); + Connection producerConnection = factory.createConnection(); + producerConnection.start(); + Connection consumerConnection = factory.createConnection(); + consumerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(dest); + TextMessage sentMsg = producerSession.createTextMessage("test..."); + producer.send(sentMsg); + producerSession.close(); + + Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = consumerSession.createConsumer(dest); + TextMessage receivedMsg = (TextMessage)consumer.receive(); + consumerSession.rollback(); + consumerSession.close(); + + assertEquals(sentMsg, receivedMsg); + + producerConnection.close(); + consumerConnection.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java new file mode 100644 index 0000000..74f920f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2401Test.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An AMQ-2401 Test + */ +public class AMQ2401Test extends TestCase implements MessageListener { + private BrokerService broker; + private ActiveMQConnectionFactory factory; + private static final int SEND_COUNT = 500; + private static final int CONSUMER_COUNT = 50; + private static final int PRODUCER_COUNT = 1; + private static final int LOG_INTERVAL = 10; + + private static final Logger LOG = LoggerFactory.getLogger(AMQ2401Test.class); + + private final ArrayList<Service> services = new ArrayList<Service>(CONSUMER_COUNT + PRODUCER_COUNT); + private int count = 0; + private CountDownLatch latch; + + @Override + protected void setUp() throws Exception { + broker = new BrokerService(); + broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "AMQ2401Test"); + broker.setDeleteAllMessagesOnStartup(true); + String connectionUri = broker.addConnector("tcp://0.0.0.0:0").getPublishableConnectString(); + PolicyMap policies = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setMemoryLimit(1024 * 100); + entry.setProducerFlowControl(true); + entry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); + entry.setQueue(">"); + policies.setDefaultEntry(entry); + broker.setDestinationPolicy(policies); + broker.setUseJmx(false); + broker.start(); + broker.waitUntilStarted(); + + factory = new ActiveMQConnectionFactory(connectionUri); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + public void testDupsOk() throws Exception { + + TestProducer p = null; + TestConsumer c = null; + try { + latch = new CountDownLatch(SEND_COUNT); + + for (int i = 0; i < CONSUMER_COUNT; i++) { + TestConsumer consumer = new TestConsumer(); + consumer.start(); + services.add(consumer); + } + for (int i = 0; i < PRODUCER_COUNT; i++) { + TestProducer producer = new TestProducer(); + producer.start(); + services.add(producer); + } + + waitForMessageReceipt(TimeUnit.SECONDS.toMillis(30)); + } finally { + if (p != null) { + p.close(); + } + + if (c != null) { + c.close(); + } + } + } + + @Override + public void onMessage(Message message) { + latch.countDown(); + if (++count % LOG_INTERVAL == 0) { + LOG.debug("Received message " + count); + } + + try { + Thread.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * @throws InterruptedException + * @throws TimeoutException + */ + private void waitForMessageReceipt(long timeout) throws InterruptedException, TimeoutException { + if (!latch.await(timeout, TimeUnit.MILLISECONDS)) { + throw new TimeoutException(String.format("Consumner didn't receive expected # of messages, %d of %d received.", latch.getCount(), SEND_COUNT)); + } + } + + private interface Service { + public void start() throws Exception; + public void close(); + } + + private class TestProducer implements Runnable, Service { + Thread thread; + BytesMessage message; + + Connection connection; + Session session; + MessageProducer producer; + + TestProducer() throws Exception { + thread = new Thread(this, "TestProducer"); + connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + producer = session.createProducer(session.createQueue("AMQ2401Test")); + } + + @Override + public void start() { + thread.start(); + } + + @Override + public void run() { + + int count = SEND_COUNT / PRODUCER_COUNT; + for (int i = 1; i <= count; i++) { + try { + if ((i % LOG_INTERVAL) == 0) { + LOG.debug("Sending: " + i); + } + message = session.createBytesMessage(); + message.writeBytes(new byte[1024]); + producer.send(message); + } catch (JMSException jmse) { + jmse.printStackTrace(); + break; + } + } + } + + @Override + public void close() { + try { + connection.close(); + } catch (JMSException e) { + } + } + } + + private class TestConsumer implements Runnable, Service { + ActiveMQConnection connection; + Session session; + MessageConsumer consumer; + + TestConsumer() throws Exception { + factory.setOptimizeAcknowledge(false); + connection = (ActiveMQConnection) factory.createConnection(); + + session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + consumer = session.createConsumer(session.createQueue("AMQ2401Test")); + + consumer.setMessageListener(AMQ2401Test.this); + } + + @Override + public void start() throws Exception { + connection.start(); + } + + @Override + public void close() { + try { + connection.close(); + } catch (JMSException e) { + } + } + + @Override + public void run() { + while (latch.getCount() > 0) { + try { + onMessage(consumer.receive()); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } +}
