http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java new file mode 100644 index 0000000..6c40239 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region.cursors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Iterator; + +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.util.IdGenerator; +import org.junit.Test; + +public class PrioritizedPendingListTest { + + @Test + public void testAddMessageFirst() { + PrioritizedPendingList list = new PrioritizedPendingList(); + + list.addMessageFirst(new TestMessageReference(1)); + list.addMessageFirst(new TestMessageReference(2)); + list.addMessageFirst(new TestMessageReference(3)); + list.addMessageFirst(new TestMessageReference(4)); + list.addMessageFirst(new TestMessageReference(5)); + + assertTrue(list.size() == 5); + + Iterator<MessageReference> iter = list.iterator(); + int lastId = list.size(); + while (iter.hasNext()) { + assertEquals(lastId--, iter.next().getMessageId().getProducerSequenceId()); + } + } + + @Test + public void testAddMessageLast() { + + PrioritizedPendingList list = new PrioritizedPendingList(); + + list.addMessageLast(new TestMessageReference(1)); + list.addMessageLast(new TestMessageReference(2)); + list.addMessageLast(new TestMessageReference(3)); + list.addMessageLast(new TestMessageReference(4)); + list.addMessageLast(new TestMessageReference(5)); + + assertTrue(list.size() == 5); + + Iterator<MessageReference> iter = list.iterator(); + int lastId = 1; + while (iter.hasNext()) { + assertEquals(lastId++, iter.next().getMessageId().getProducerSequenceId()); + } + } + + @Test + public void testClear() { + PrioritizedPendingList list = new PrioritizedPendingList(); + + list.addMessageFirst(new TestMessageReference(1)); + list.addMessageFirst(new TestMessageReference(2)); + list.addMessageFirst(new TestMessageReference(3)); + list.addMessageFirst(new TestMessageReference(4)); + list.addMessageFirst(new TestMessageReference(5)); + + assertFalse(list.isEmpty()); + assertTrue(list.size() == 5); + + list.clear(); + + assertTrue(list.isEmpty()); + assertTrue(list.size() == 0); + + list.addMessageFirst(new TestMessageReference(1)); + list.addMessageLast(new TestMessageReference(2)); + list.addMessageLast(new TestMessageReference(3)); + list.addMessageFirst(new TestMessageReference(4)); + list.addMessageLast(new TestMessageReference(5)); + + assertFalse(list.isEmpty()); + assertTrue(list.size() == 5); + } + + @Test + public void testIsEmpty() { + PrioritizedPendingList list = new PrioritizedPendingList(); + assertTrue(list.isEmpty()); + + list.addMessageFirst(new TestMessageReference(1)); + list.addMessageFirst(new TestMessageReference(2)); + list.addMessageFirst(new TestMessageReference(3)); + list.addMessageFirst(new TestMessageReference(4)); + list.addMessageFirst(new TestMessageReference(5)); + + assertFalse(list.isEmpty()); + list.clear(); + assertTrue(list.isEmpty()); + } + + @Test + public void testRemove() { + PrioritizedPendingList list = new PrioritizedPendingList(); + + TestMessageReference toRemove = new TestMessageReference(6); + + list.addMessageFirst(new TestMessageReference(1)); + list.addMessageFirst(new TestMessageReference(2)); + list.addMessageFirst(new TestMessageReference(3)); + list.addMessageFirst(new TestMessageReference(4)); + list.addMessageFirst(new TestMessageReference(5)); + + assertTrue(list.size() == 5); + + list.addMessageLast(toRemove); + list.remove(toRemove); + + assertTrue(list.size() == 5); + + list.remove(toRemove); + + assertTrue(list.size() == 5); + + Iterator<MessageReference> iter = list.iterator(); + int lastId = list.size(); + while (iter.hasNext()) { + assertEquals(lastId--, iter.next().getMessageId().getProducerSequenceId()); + } + + list.remove(null); + } + + @Test + public void testSize() { + PrioritizedPendingList list = new PrioritizedPendingList(); + assertTrue(list.isEmpty()); + + assertTrue(list.size() == 0); + list.addMessageFirst(new TestMessageReference(1)); + assertTrue(list.size() == 1); + list.addMessageLast(new TestMessageReference(2)); + assertTrue(list.size() == 2); + list.addMessageFirst(new TestMessageReference(3)); + assertTrue(list.size() == 3); + list.addMessageLast(new TestMessageReference(4)); + assertTrue(list.size() == 4); + list.addMessageFirst(new TestMessageReference(5)); + assertTrue(list.size() == 5); + + assertFalse(list.isEmpty()); + list.clear(); + assertTrue(list.isEmpty()); + assertTrue(list.size() == 0); + } + + @Test + public void testPrioritization() { + PrioritizedPendingList list = new PrioritizedPendingList(); + + list.addMessageFirst(new TestMessageReference(1, 5)); + list.addMessageFirst(new TestMessageReference(2, 4)); + list.addMessageFirst(new TestMessageReference(3, 3)); + list.addMessageFirst(new TestMessageReference(4, 2)); + list.addMessageFirst(new TestMessageReference(5, 1)); + + assertTrue(list.size() == 5); + + Iterator<MessageReference> iter = list.iterator(); + int lastId = list.size(); + while (iter.hasNext()) { + assertEquals(lastId--, iter.next().getMessage().getPriority()); + } + } + + static class TestMessageReference implements MessageReference { + + private static final IdGenerator id = new IdGenerator(); + + private Message message; + private MessageId messageId; + private int referenceCount = 0; + + public TestMessageReference(int sequenceId) { + messageId = new MessageId(id.generateId() + ":1", sequenceId); + message = new ActiveMQMessage(); + message.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY); + } + + public TestMessageReference(int sequenceId, int priority) { + messageId = new MessageId(id.generateId() + ":1", sequenceId); + message = new ActiveMQMessage(); + message.setPriority((byte) priority); + } + + @Override + public MessageId getMessageId() { + return messageId; + } + + @Override + public Message getMessageHardRef() { + return null; + } + + @Override + public Message getMessage() { + return message; + } + + @Override + public boolean isPersistent() { + return false; + } + + @Override + public Destination getRegionDestination() { + return null; + } + + @Override + public int getRedeliveryCounter() { + return 0; + } + + @Override + public void incrementRedeliveryCounter() { + } + + @Override + public int getReferenceCount() { + return this.referenceCount; + } + + @Override + public int incrementReferenceCount() { + return this.referenceCount++; + } + + @Override + public int decrementReferenceCount() { + return this.referenceCount--; + } + + @Override + public ConsumerId getTargetConsumerId() { + return null; + } + + @Override + public int getSize() { + return 1; + } + + @Override + public long getExpiration() { + return 0; + } + + @Override + public String getGroupID() { + return null; + } + + @Override + public int getGroupSequence() { + return 0; + } + + @Override + public boolean isExpired() { + return false; + } + + @Override + public boolean isDropped() { + return false; + } + + @Override + public boolean isAdvisory() { + return false; + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java new file mode 100644 index 0000000..a330723 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +/** + * A StoreBasedCursorTest + * + */ + +import java.util.Date; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.usage.SystemUsage; + +public class StoreBasedCursorTest extends TestCase { + protected String bindAddress = "tcp://localhost:60706"; + BrokerService broker; + ActiveMQConnectionFactory factory; + Connection connection; + Session session; + Queue queue; + int messageSize = 1024; + // actual message is messageSize*2, and 4*MessageSize would allow 2 messages be delivered, but the flush of the cache is async so the flush + // triggered on 2nd message maxing out the usage may not be in effect for the 3rd message to succeed. Making the memory usage more lenient + // gives the usageChange listener in the cursor an opportunity to kick in. + int memoryLimit = 12 * messageSize; + + protected void setUp() throws Exception { + super.setUp(); + if (broker == null) { + broker = new BrokerService(); + broker.setAdvisorySupport(false); + } + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + broker = null; + } + } + + protected void start() throws Exception { + broker.start(); + factory = new ActiveMQConnectionFactory("vm://localhost?jms.alwaysSyncSend=true"); + factory.setWatchTopicAdvisories(false); + connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queue = session.createQueue("QUEUE." + this.getClass().getName()); + } + + protected void stop() throws Exception { + session.close(); + connection.close(); + broker.stop(); + broker = null; + } + + protected void configureBroker(long memoryLimit, long systemLimit) throws Exception { + broker.setDeleteAllMessagesOnStartup(true); + broker.addConnector(bindAddress); + broker.setPersistent(true); + + SystemUsage systemUsage = broker.getSystemUsage(); + systemUsage.setSendFailIfNoSpace(true); + systemUsage.getMemoryUsage().setLimit(systemLimit); + + PolicyEntry policy = new PolicyEntry(); + policy.setProducerFlowControl(true); + policy.setUseCache(true); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + } + + protected String createMessageText(int index) { + StringBuffer buffer = new StringBuffer(messageSize); + buffer.append("Message: " + index + " sent at: " + new Date()); + if (buffer.length() > messageSize) { + return buffer.substring(0, messageSize); + } + for (int i = buffer.length(); i < messageSize; i++) { + buffer.append(' '); + } + return buffer.toString(); + } + + protected void sendMessages(int deliveryMode) throws Exception { + start(); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(deliveryMode); + int i =0; + try { + for (i = 0; i < 200; i++) { + TextMessage message = session.createTextMessage(createMessageText(i)); + producer.send(message); + } + } catch (javax.jms.ResourceAllocationException e) { + e.printStackTrace(); + fail(e.getMessage() + " num msgs = " + i + ". percentUsage = " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); + } + stop(); + } + + // use QueueStorePrefetch + public void testTwoUsageEqualPersistent() throws Exception { + configureBroker(memoryLimit, memoryLimit); + sendMessages(DeliveryMode.PERSISTENT); + } + + public void testUseCachePersistent() throws Exception { + int limit = memoryLimit / 2; + configureBroker(limit, memoryLimit); + sendMessages(DeliveryMode.PERSISTENT); + } + + public void testMemoryUsageLowPersistent() throws Exception { + configureBroker(memoryLimit, 10 * memoryLimit); + sendMessages(DeliveryMode.PERSISTENT); + } + + // use FilePendingMessageCursor + public void testTwoUsageEqualNonPersistent() throws Exception { + configureBroker(memoryLimit, memoryLimit); + sendMessages(DeliveryMode.NON_PERSISTENT); + } + + public void testMemoryUsageLowNonPersistent() throws Exception { + configureBroker(memoryLimit, 10 * memoryLimit); + sendMessages(DeliveryMode.NON_PERSISTENT); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java new file mode 100644 index 0000000..6b49c8d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region.cursors; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; + +/** + * @author gtully + * @see https://issues.apache.org/activemq/browse/AMQ-2020 + **/ +public class StoreQueueCursorJDBCNoDuplicateTest extends StoreQueueCursorNoDuplicateTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + PersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter(); + broker.setPersistenceAdapter(persistenceAdapter); + return broker; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorKahaDBNoDuplicateTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorKahaDBNoDuplicateTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorKahaDBNoDuplicateTest.java new file mode 100644 index 0000000..11a98d6 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorKahaDBNoDuplicateTest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region.cursors; + +import java.io.File; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBStore; + +/** + * @author gtully + * @see https://issues.apache.org/activemq/browse/AMQ-2020 + **/ +public class StoreQueueCursorKahaDBNoDuplicateTest extends StoreQueueCursorNoDuplicateTest { + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + PersistenceAdapter persistenceAdapter = new KahaDBStore(); + persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb")); + broker.setPersistenceAdapter(persistenceAdapter); + return broker; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorLevelDBNoDuplicateTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorLevelDBNoDuplicateTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorLevelDBNoDuplicateTest.java new file mode 100644 index 0000000..d119d50 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorLevelDBNoDuplicateTest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region.cursors; + +import org.apache.activeio.journal.active.JournalImpl; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.leveldb.LevelDBStore; +import org.apache.activemq.store.journal.JournalPersistenceAdapter; + +import java.io.File; + +/** + * @author gtully + * @see https://issues.apache.org/activemq/browse/AMQ-2020 + **/ +public class StoreQueueCursorLevelDBNoDuplicateTest extends StoreQueueCursorNoDuplicateTest { + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + LevelDBStore store = new LevelDBStore(); + store.setDirectory(new File("target/activemq-data/leveldb")); + broker.setPersistenceAdapter(store); + return broker; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java new file mode 100644 index 0000000..a62748a --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region.cursors; + +import org.apache.activemq.broker.BrokerService; + +/** + * @author gtully + * @see https://issues.apache.org/activemq/browse/AMQ-2020 + **/ +public class StoreQueueCursorMemoryNoDuplicateTest extends StoreQueueCursorNoDuplicateTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + broker.setPersistent(false); + return broker; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java new file mode 100644 index 0000000..2406e88 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region.cursors; + +import junit.framework.TestCase; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.usage.SystemUsage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author gtully + * https://issues.apache.org/activemq/browse/AMQ-2020 + **/ +public class StoreQueueCursorNoDuplicateTest extends TestCase { + static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursorNoDuplicateTest.class); + ActiveMQQueue destination = new ActiveMQQueue("queue-" + + StoreQueueCursorNoDuplicateTest.class.getSimpleName()); + BrokerService brokerService; + + final static String mesageIdRoot = "11111:22222:0:"; + final int messageBytesSize = 1024; + final String text = new String(new byte[messageBytesSize]); + + protected int count = 6; + + @Override + public void setUp() throws Exception { + brokerService = createBroker(); + brokerService.setUseJmx(false); + brokerService.deleteAllMessages(); + brokerService.start(); + } + + protected BrokerService createBroker() throws Exception { + return new BrokerService(); + } + + @Override + public void tearDown() throws Exception { + brokerService.stop(); + } + + public void testNoDuplicateAfterCacheFullAndReadPast() throws Exception { + final PersistenceAdapter persistenceAdapter = brokerService + .getPersistenceAdapter(); + final MessageStore queueMessageStore = persistenceAdapter + .createQueueMessageStore(destination); + final ConsumerInfo consumerInfo = new ConsumerInfo(); + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + consumerInfo.setExclusive(true); + + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, null); + + queueMessageStore.start(); + queueMessageStore.registerIndexListener(null); + + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); + SystemUsage systemUsage = new SystemUsage(); + // ensure memory limit is reached + systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 2)); + underTest.setSystemUsage(systemUsage); + underTest.setEnableAudit(false); + underTest.start(); + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + final ConnectionContext contextNotInTx = new ConnectionContext(); + for (int i = 0; i < count; i++) { + ActiveMQTextMessage msg = getMessage(i); + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + + queueMessageStore.addMessage(contextNotInTx, msg); + underTest.addMessageLast(msg); + } + + assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); + int dequeueCount = 0; + + underTest.setMaxBatchSize(2); + underTest.reset(); + while (underTest.hasNext() && dequeueCount < count) { + MessageReference ref = underTest.next(); + ref.decrementReferenceCount(); + underTest.remove(); + LOG.info("Received message: {} with body: {}", + ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText()); + assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId()); + } + underTest.release(); + assertEquals(count, dequeueCount); + } + + private ActiveMQTextMessage getMessage(int i) throws Exception { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + MessageId id = new MessageId(mesageIdRoot + i); + id.setBrokerSequenceId(i); + id.setProducerSequenceId(i); + message.setMessageId(id); + message.setDestination(destination); + message.setPersistent(true); + message.setResponseRequired(true); + message.setText("Msg:" + i + " " + text); + assertEquals(message.getMessageId().getProducerSequenceId(), i); + return message; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java new file mode 100644 index 0000000..f8fab10 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java @@ -0,0 +1,517 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region.cursors; + +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.AbstractMessageStore; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.usage.SystemUsage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StoreQueueCursorOrderTest { + private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursorOrderTest.class); + + ActiveMQQueue destination = new ActiveMQQueue("queue-" + + StoreQueueCursorOrderTest.class.getSimpleName()); + BrokerService brokerService; + + final static String mesageIdRoot = "11111:22222:0:"; + final int messageBytesSize = 1024; + final String text = new String(new byte[messageBytesSize]); + + @Before + public void setUp() throws Exception { + brokerService = createBroker(); + brokerService.setUseJmx(false); + brokerService.deleteAllMessages(); + brokerService.start(); + } + + protected BrokerService createBroker() throws Exception { + return new BrokerService(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + } + + @Test + public void tesBlockedFuture() throws Exception { + final int count = 2; + final Message[] messages = new Message[count]; + final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination); + final ConsumerInfo consumerInfo = new ConsumerInfo(); + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + consumerInfo.setExclusive(true); + + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, null); + + queueMessageStore.start(); + queueMessageStore.registerIndexListener(null); + + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); + SystemUsage systemUsage = new SystemUsage(); + // ensure memory limit is reached + systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1); + underTest.setSystemUsage(systemUsage); + underTest.setEnableAudit(false); + underTest.start(); + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + ActiveMQTextMessage msg = getMessage(0); + messages[1] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.setRecievedByDFBridge(true); + FutureTask<Long> future = new FutureTask<Long>(new Runnable() { + @Override + public void run() { + } + }, 2l) {}; + msg.getMessageId().setFutureOrSequenceLong(future); + underTest.addMessageLast(msg); + + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + // second message will flip the cache but will be stored before the future task + msg = getMessage(1); + messages[0] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(1l); + underTest.addMessageLast(msg); + + + assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); + assertEquals("setBatch unset", 0l, queueMessageStore.batch.get()); + + int dequeueCount = 0; + + underTest.setMaxBatchSize(2); + underTest.reset(); + while (underTest.hasNext() && dequeueCount < count) { + MessageReference ref = underTest.next(); + ref.decrementReferenceCount(); + underTest.remove(); + LOG.info("Received message: {} with body: {}", + ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText()); + assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId()); + } + underTest.release(); + assertEquals(count, dequeueCount); + } + + @Test + public void testNoSetBatchWithUnOrderedFutureCurrentSync() throws Exception { + final int count = 2; + final Message[] messages = new Message[count]; + final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination); + final ConsumerInfo consumerInfo = new ConsumerInfo(); + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + consumerInfo.setExclusive(true); + + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, null); + + queueMessageStore.start(); + queueMessageStore.registerIndexListener(null); + + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); + SystemUsage systemUsage = new SystemUsage(); + // ensure memory limit is reached + systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1); + underTest.setSystemUsage(systemUsage); + underTest.setEnableAudit(false); + underTest.start(); + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + ActiveMQTextMessage msg = getMessage(0); + messages[1] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.setRecievedByDFBridge(true); + final ActiveMQTextMessage msgRef = msg; + FutureTask<Long> future = new FutureTask<Long>(new Runnable() { + @Override + public void run() { + msgRef.getMessageId().setFutureOrSequenceLong(1l); + } + }, 1l) {}; + msg.getMessageId().setFutureOrSequenceLong(future); + Executors.newSingleThreadExecutor().submit(future); + underTest.addMessageLast(msg); + + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + // second message will flip the cache but will be stored before the future task + msg = getMessage(1); + messages[0] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(1l); + underTest.addMessageLast(msg); + + + assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); + assertEquals("setBatch unset", 0l, queueMessageStore.batch.get()); + + int dequeueCount = 0; + + underTest.setMaxBatchSize(2); + underTest.reset(); + while (underTest.hasNext() && dequeueCount < count) { + MessageReference ref = underTest.next(); + ref.decrementReferenceCount(); + underTest.remove(); + LOG.info("Received message: {} with body: {}", + ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText()); + assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId()); + } + underTest.release(); + assertEquals(count, dequeueCount); + } + + @Test + public void testSetBatchWithOrderedFutureCurrentFuture() throws Exception { + final int count = 2; + final Message[] messages = new Message[count]; + final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination); + final ConsumerInfo consumerInfo = new ConsumerInfo(); + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + consumerInfo.setExclusive(true); + + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, null); + + queueMessageStore.start(); + queueMessageStore.registerIndexListener(null); + + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); + SystemUsage systemUsage = new SystemUsage(); + // ensure memory limit is reached + systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1); + underTest.setSystemUsage(systemUsage); + underTest.setEnableAudit(false); + underTest.start(); + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + ActiveMQTextMessage msg = getMessage(0); + messages[0] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.setRecievedByDFBridge(true); + final ActiveMQTextMessage msgRef = msg; + FutureTask<Long> future = new FutureTask<Long>(new Runnable() { + @Override + public void run() { + msgRef.getMessageId().setFutureOrSequenceLong(0l); + } + }, 0l) {}; + msg.getMessageId().setFutureOrSequenceLong(future); + Executors.newSingleThreadExecutor().submit(future); + underTest.addMessageLast(msg); + + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + // second message will flip the cache but will be stored before the future task + msg = getMessage(1); + messages[1] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.setRecievedByDFBridge(true); + final ActiveMQTextMessage msgRe2f = msg; + FutureTask<Long> future2 = new FutureTask<Long>(new Runnable() { + @Override + public void run() { + msgRe2f.getMessageId().setFutureOrSequenceLong(1l); + } + }, 1l) {}; + msg.getMessageId().setFutureOrSequenceLong(future2); + Executors.newSingleThreadExecutor().submit(future2); + underTest.addMessageLast(msg); + + + assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); + assertEquals("setBatch set", 1l, queueMessageStore.batch.get()); + + int dequeueCount = 0; + + underTest.setMaxBatchSize(2); + underTest.reset(); + while (underTest.hasNext() && dequeueCount < count) { + MessageReference ref = underTest.next(); + ref.decrementReferenceCount(); + underTest.remove(); + LOG.info("Received message: {} with body: {}", + ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText()); + assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId()); + } + underTest.release(); + assertEquals(count, dequeueCount); + } + + @Test + public void testSetBatchWithFuture() throws Exception { + final int count = 4; + final Message[] messages = new Message[count]; + final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination); + final ConsumerInfo consumerInfo = new ConsumerInfo(); + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + consumerInfo.setExclusive(true); + + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, null); + + queueMessageStore.start(); + queueMessageStore.registerIndexListener(null); + + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); + SystemUsage systemUsage = new SystemUsage(); + // ensure memory limit is reached + systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 6)); + underTest.setSystemUsage(systemUsage); + underTest.setEnableAudit(false); + underTest.start(); + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + ActiveMQTextMessage msg = getMessage(0); + messages[0] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.setRecievedByDFBridge(true); + final ActiveMQTextMessage msgRef = msg; + FutureTask<Long> future0 = new FutureTask<Long>(new Runnable() { + @Override + public void run() { + msgRef.getMessageId().setFutureOrSequenceLong(0l); + } + }, 0l) {}; + msg.getMessageId().setFutureOrSequenceLong(future0); + underTest.addMessageLast(msg); + Executors.newSingleThreadExecutor().submit(future0); + + + msg = getMessage(1); + messages[3] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.setRecievedByDFBridge(true); + final ActiveMQTextMessage msgRef1 = msg; + FutureTask<Long> future1 = new FutureTask<Long>(new Runnable() { + @Override + public void run() { + msgRef1.getMessageId().setFutureOrSequenceLong(3l); + } + }, 3l) {}; + msg.getMessageId().setFutureOrSequenceLong(future1); + underTest.addMessageLast(msg); + + + msg = getMessage(2); + messages[1] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(1l); + underTest.addMessageLast(msg); + + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + // out of order future + Executors.newSingleThreadExecutor().submit(future1); + + // sync add to flip cache + msg = getMessage(3); + messages[2] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(3l); + underTest.addMessageLast(msg); + + + assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); + assertEquals("setBatch set", 2l, queueMessageStore.batch.get()); + + int dequeueCount = 0; + + underTest.setMaxBatchSize(count); + underTest.reset(); + while (underTest.hasNext() && dequeueCount < count) { + MessageReference ref = underTest.next(); + ref.decrementReferenceCount(); + underTest.remove(); + LOG.info("Received message: {} with body: {}", + ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText()); + assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId()); + } + underTest.release(); + assertEquals(count, dequeueCount); + } + + @Test + public void testSetBatch() throws Exception { + final int count = 3; + final Message[] messages = new Message[count]; + final TestMessageStore queueMessageStore = new TestMessageStore(messages, destination); + final ConsumerInfo consumerInfo = new ConsumerInfo(); + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + consumerInfo.setExclusive(true); + + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, null); + + queueMessageStore.start(); + queueMessageStore.registerIndexListener(null); + + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); + SystemUsage systemUsage = new SystemUsage(); + // ensure memory limit is reached + systemUsage.getMemoryUsage().setLimit(messageBytesSize * 5); + underTest.setSystemUsage(systemUsage); + underTest.setEnableAudit(false); + underTest.start(); + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + + ActiveMQTextMessage msg = getMessage(0); + messages[0] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(0l); + underTest.addMessageLast(msg); + + msg = getMessage(1); + messages[1] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(1l); + underTest.addMessageLast(msg); + + assertTrue("cache enabled", underTest.isUseCache() && underTest.isCacheEnabled()); + + msg = getMessage(2); + messages[2] = msg; + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + msg.getMessageId().setFutureOrSequenceLong(2l); + underTest.addMessageLast(msg); + + + assertTrue("cache is disabled as limit reached", !underTest.isCacheEnabled()); + assertEquals("setBatch set", 2l, queueMessageStore.batch.get()); + + int dequeueCount = 0; + + underTest.setMaxBatchSize(2); + underTest.reset(); + while (underTest.hasNext() && dequeueCount < count) { + MessageReference ref = underTest.next(); + ref.decrementReferenceCount(); + underTest.remove(); + LOG.info("Received message: {} with body: {}", + ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText()); + assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId()); + } + underTest.release(); + assertEquals(count, dequeueCount); + } + + private ActiveMQTextMessage getMessage(int i) throws Exception { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + MessageId id = new MessageId(mesageIdRoot + i); + id.setBrokerSequenceId(i); + id.setProducerSequenceId(i); + message.setMessageId(id); + message.setDestination(destination); + message.setPersistent(true); + message.setResponseRequired(true); + message.setText("Msg:" + i + " " + text); + assertEquals(message.getMessageId().getProducerSequenceId(), i); + return message; + } + + class TestMessageStore extends AbstractMessageStore { + final Message[] messages; + public AtomicLong batch = new AtomicLong(); + + public TestMessageStore(Message[] messages, ActiveMQDestination dest) { + super(dest); + this.messages = messages; + } + + @Override + public void addMessage(ConnectionContext context, Message message) throws IOException { + + } + + @Override + public Message getMessage(MessageId identity) throws IOException { + return null; + } + + @Override + public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { + + } + + @Override + public void removeAllMessages(ConnectionContext context) throws IOException { + + } + + @Override + public void recover(MessageRecoveryListener container) throws Exception { + + } + + @Override + public int getMessageCount() throws IOException { + return 0; + } + + @Override + public void resetBatching() { + + } + @Override + public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { + for (int i=batch.intValue();i<messages.length;i++) { + LOG.info("recovered index:" + i); + listener.recoverMessage(messages[i]); + } + } + + @Override + public void setBatch(MessageId message) { + batch.set((Long)message.getFutureOrSequenceLong()); + batch.incrementAndGet(); + } + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketTest.java new file mode 100644 index 0000000..fc35be6 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketTest.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.group; + + +/** + * + * + */ +public class MessageGroupHashBucketTest extends MessageGroupMapTest { + + protected MessageGroupMap createMessageGroupMap() { + return new MessageGroupHashBucket(1024, 64); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupMapTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupMapTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupMapTest.java new file mode 100644 index 0000000..ab02426 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupMapTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.group; + +import junit.framework.TestCase; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.SessionId; + +/** + * + * + */ +public class MessageGroupMapTest extends TestCase { + + protected MessageGroupMap map; + private ConsumerId consumer1; + private ConsumerId consumer2; + private ConsumerId consumer3; + private long idCounter; + + public void testSingleConsumerForManyBucks() throws Exception { + assertGet("1", null); + + map.put("1", consumer1); + assertGet("1", consumer1); + map.put("2", consumer1); + assertGet("2", consumer1); + map.put("3", consumer1); + assertGet("3", consumer1); + + MessageGroupSet set = map.removeConsumer(consumer1); + assertContains(set, "1"); + assertContains(set, "2"); + assertContains(set, "3"); + assertGet("1", null); + assertGet("2", null); + assertGet("3", null); + } + + public void testManyConsumers() throws Exception { + assertGet("1", null); + + map.put("1", consumer1); + assertGet("1", consumer1); + map.put("2", consumer2); + assertGet("2", consumer2); + map.put("3", consumer3); + assertGet("3", consumer3); + + MessageGroupSet set = map.removeConsumer(consumer1); + assertContains(set, "1"); + + assertGet("1", null); + map.put("1", consumer2); + assertGet("1", consumer2); + + set = map.removeConsumer(consumer2); + assertContains(set, "1"); + assertContains(set, "2"); + } + + protected void setUp() throws Exception { + super.setUp(); + map = createMessageGroupMap(); + consumer1 = createConsumerId(); + consumer2 = createConsumerId(); + consumer3 = createConsumerId(); + } + + protected MessageGroupMap createMessageGroupMap() { + return new SimpleMessageGroupMap(); + } + + protected ConsumerId createConsumerId() { + ConnectionId connectionId = new ConnectionId("" + ++idCounter); + SessionId sessionId = new SessionId(connectionId, ++idCounter); + ConsumerId answer = new ConsumerId(sessionId, ++idCounter); + return answer; + } + + protected void assertGet(String groupdId, ConsumerId expected) { + ConsumerId actual = map.get(groupdId); + assertEquals("Entry for groupId: " + groupdId, expected, actual); + } + + protected void assertContains(MessageGroupSet set, String groupID) { + assertTrue("MessageGroup set: " + set + " does not contain groupID: " + groupID, set.contains(groupID)); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupTest.java new file mode 100644 index 0000000..86741c9 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupTest.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.group; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.JmsTestSupport; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageGroupTest extends JmsTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(CombinationTestSupport.class); + + public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup a first connection + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer1 = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // Send the messages. + for (int i = 0; i < 4; i++) { + TextMessage message = session.createTextMessage("message " + i); + message.setStringProperty("JMSXGroupID", "TEST-GROUP"); + message.setIntProperty("JMSXGroupSeq", i + 1); + LOG.info("sending message: " + message); + producer.send(message); + } + + // All the messages should have been sent down connection 1.. just get + // the first 3 + for (int i = 0; i < 3; i++) { + TextMessage m1 = (TextMessage)consumer1.receive(500); + assertNotNull("m1 is null for index: " + i, m1); + assertEquals(m1.getIntProperty("JMSXGroupSeq"), i + 1); + } + + // Setup a second connection + Connection connection1 = factory.createConnection(userName, password); + connection1.start(); + Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = session2.createConsumer(destination); + + // Close the first consumer. + consumer1.close(); + + // The last messages should now go the the second consumer. + for (int i = 0; i < 1; i++) { + TextMessage m1 = (TextMessage)consumer2.receive(500); + assertNotNull("m1 is null for index: " + i, m1); + assertEquals(m1.getIntProperty("JMSXGroupSeq"), 4 + i); + } + + //assert that there are no other messages left for the consumer 2 + Message m = consumer2.receive(100); + assertNull("consumer 2 has some messages left", m); + } + + public void testAddingConsumer() throws Exception { + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup a first connection + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(destination); + //MessageConsumer consumer = session.createConsumer(destination); + + TextMessage message = session.createTextMessage("message"); + message.setStringProperty("JMSXGroupID", "TEST-GROUP"); + + LOG.info("sending message: " + message); + producer.send(message); + + MessageConsumer consumer = session.createConsumer(destination); + + TextMessage msg = (TextMessage)consumer.receive(); + assertNotNull(msg); + boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer"); + assertTrue(first); + } + + public void testClosingMessageGroup() throws Exception { + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup a first connection + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer1 = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + // Send the messages. + for (int i = 0; i < 4; i++) { + TextMessage message = session.createTextMessage("message " + i); + message.setStringProperty("JMSXGroupID", "TEST-GROUP"); + LOG.info("sending message: " + message); + producer.send(message); + } + + + + // All the messages should have been sent down consumer1.. just get + // the first 3 + for (int i = 0; i < 3; i++) { + TextMessage m1 = (TextMessage)consumer1.receive(500); + assertNotNull("m1 is null for index: " + i, m1); + } + + // Setup a second consumer + Connection connection1 = factory.createConnection(userName, password); + connection1.start(); + Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = session2.createConsumer(destination); + + //assert that there are no messages for the consumer 2 + Message m = consumer2.receive(100); + assertNull("consumer 2 has some messages", m); + + // Close the group + TextMessage message = session.createTextMessage("message " + 5); + message.setStringProperty("JMSXGroupID", "TEST-GROUP"); + message.setIntProperty("JMSXGroupSeq", -1); + LOG.info("sending message: " + message); + producer.send(message); + + //Send some more messages + for (int i = 0; i < 4; i++) { + message = session.createTextMessage("message " + i); + message.setStringProperty("JMSXGroupID", "TEST-GROUP"); + LOG.info("sending message: " + message); + producer.send(message); + } + + // Receive the fourth message + TextMessage m1 = (TextMessage)consumer1.receive(500); + assertNotNull("m1 is null for index: " + 4, m1); + + // Receive the closing message + m1 = (TextMessage)consumer1.receive(500); + assertNotNull("m1 is null for index: " + 5, m1); + + //assert that there are no messages for the consumer 1 + m = consumer1.receive(100); + assertNull("consumer 1 has some messages left: " + m, m); + + // The messages should now go to the second consumer. + for (int i = 0; i < 4; i++) { + m1 = (TextMessage)consumer2.receive(500); + assertNotNull("m1 is null for index: " + i, m1); + } + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java new file mode 100644 index 0000000..44ef670 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Date; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ScheduledMessage; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JmsCronSchedulerTest extends JobSchedulerTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(JmsCronSchedulerTest.class); + + @Test + public void testSimulatenousCron() throws Exception { + + final int COUNT = 10; + final AtomicInteger count = new AtomicInteger(); + Connection connection = createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(destination); + + final CountDownLatch latch = new CountDownLatch(COUNT); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + count.incrementAndGet(); + latch.countDown(); + assertTrue(message instanceof TextMessage); + TextMessage tm = (TextMessage) message; + try { + LOG.info("Received [{}] count: {} ", tm.getText(), count.get()); + } catch (JMSException e) { + LOG.error("Unexpected exception in onMessage", e); + fail("Unexpected exception in onMessage: " + e.getMessage()); + } + } + }); + + connection.start(); + for (int i = 0; i < COUNT; i++) { + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("test msg "+ i); + message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *"); + producer.send(message); + LOG.info("Message {} sent at {}", i, new Date().toString()); + producer.close(); + //wait a couple sec so cron start time is different for next message + Thread.sleep(2000); + } + SchedulerBroker sb = (SchedulerBroker) this.broker.getBroker().getAdaptor(SchedulerBroker.class); + JobScheduler js = sb.getJobScheduler(); + List<Job> list = js.getAllJobs(); + assertEquals(COUNT, list.size()); + latch.await(2, TimeUnit.MINUTES); + //All should messages should have been received by now + assertEquals(COUNT, count.get()); + } + + @Test + public void testCronScheduleWithTtlSet() throws Exception { + + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + connection.start(); + + MessageProducer producer = session.createProducer(destination); + producer.setTimeToLive(TimeUnit.MINUTES.toMillis(1)); + TextMessage message = session.createTextMessage("test msg "); + message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *"); + + producer.send(message); + producer.close(); + + Thread.sleep(TimeUnit.MINUTES.toMillis(2)); + + assertNotNull(consumer.receiveNoWait()); + assertNull(consumer.receiveNoWait()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java new file mode 100644 index 0000000..0ce584d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.util.ProducerThread; +import org.apache.activemq.util.Wait; +import org.junit.Test; + +public class JmsSchedulerTest extends JobSchedulerTestSupport { + + @Test + public void testCron() throws Exception { + final int COUNT = 10; + final AtomicInteger count = new AtomicInteger(); + Connection connection = createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(destination); + + final CountDownLatch latch = new CountDownLatch(COUNT); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + latch.countDown(); + count.incrementAndGet(); + } + }); + + connection.start(); + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("test msg"); + long time = 1000; + message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *"); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500); + message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, COUNT - 1); + + producer.send(message); + producer.close(); + + Thread.sleep(500); + SchedulerBroker sb = (SchedulerBroker) this.broker.getBroker().getAdaptor(SchedulerBroker.class); + JobScheduler js = sb.getJobScheduler(); + List<Job> list = js.getAllJobs(); + assertEquals(1, list.size()); + latch.await(240, TimeUnit.SECONDS); + assertEquals(COUNT, count.get()); + } + + @Test + public void testSchedule() throws Exception { + final int COUNT = 1; + Connection connection = createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(destination); + + final CountDownLatch latch = new CountDownLatch(COUNT); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + latch.countDown(); + } + }); + + connection.start(); + long time = 5000; + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("test msg"); + + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + + producer.send(message); + producer.close(); + // make sure the message isn't delivered early + Thread.sleep(2000); + assertEquals(latch.getCount(), COUNT); + latch.await(5, TimeUnit.SECONDS); + assertEquals(latch.getCount(), 0); + } + + @Test + public void testTransactedSchedule() throws Exception { + final int COUNT = 1; + Connection connection = createConnection(); + + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + MessageConsumer consumer = session.createConsumer(destination); + + final CountDownLatch latch = new CountDownLatch(COUNT); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + session.commit(); + } catch (JMSException e) { + e.printStackTrace(); + } + latch.countDown(); + } + }); + + connection.start(); + long time = 5000; + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("test msg"); + + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + + producer.send(message); + session.commit(); + producer.close(); + // make sure the message isn't delivered early + Thread.sleep(2000); + assertEquals(latch.getCount(), COUNT); + latch.await(5, TimeUnit.SECONDS); + assertEquals(latch.getCount(), 0); + } + + @Test + public void testScheduleRepeated() throws Exception { + final int NUMBER = 10; + final AtomicInteger count = new AtomicInteger(); + Connection connection = createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(destination); + + final CountDownLatch latch = new CountDownLatch(NUMBER); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + latch.countDown(); + count.incrementAndGet(); + } + }); + + connection.start(); + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("test msg"); + long time = 1000; + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500); + message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, NUMBER - 1); + producer.send(message); + producer.close(); + assertEquals(latch.getCount(), NUMBER); + latch.await(10, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + // wait a little longer - make sure we only get NUMBER of replays + Thread.sleep(1000); + assertEquals(NUMBER, count.get()); + } + + @Test + public void testScheduleRestart() throws Exception { + // send a message + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("test msg"); + long time = 5000; + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + producer.send(message); + producer.close(); + + //restart broker + broker.stop(); + broker.waitUntilStopped(); + + broker = createBroker(false); + broker.start(); + broker.waitUntilStarted(); + + + // consume the message + connection = createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + Message msg = consumer.receive(5000); + assertNotNull("Didn't receive the message", msg); + + //send another message + producer = session.createProducer(destination); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + producer.send(message); + producer.close(); + } + + @Test + public void testJobSchedulerStoreUsage() throws Exception { + + // Shrink the store limit down so we get the producer to block + broker.getSystemUsage().getJobSchedulerUsage().setLimit(10 * 1024); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + Connection conn = factory.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final long time = 5000; + final ProducerThread producer = new ProducerThread(sess, destination) { + @Override + protected Message createMessage(int i) throws Exception { + Message message = super.createMessage(i); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + return message; + } + }; + producer.setMessageCount(100); + producer.start(); + + MessageConsumer consumer = sess.createConsumer(destination); + final CountDownLatch latch = new CountDownLatch(100); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + latch.countDown(); + } + }); + + // wait for the producer to block, which should happen immediately, and also wait long + // enough for the delay to elapse. We should see no deliveries as the send should block + // on the first message. + Thread.sleep(10000l); + + assertEquals(100, latch.getCount()); + + // Increase the store limit so the producer unblocks. Everything should enqueue at this point. + broker.getSystemUsage().getJobSchedulerUsage().setLimit(1024 * 1024 * 33); + + // Wait long enough that the messages are enqueued and the delivery delay has elapsed. + Thread.sleep(10000l); + + // Make sure we sent all the messages we expected to send + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return producer.getSentCount() == producer.getMessageCount(); + } + }, 20000l); + + assertEquals("Producer didn't send all messages", producer.getMessageCount(), producer.getSentCount()); + + // Make sure we got all the messages we expected to get + latch.await(20000l, TimeUnit.MILLISECONDS); + + assertEquals("Consumer did not receive all messages.", 0, latch.getCount()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java new file mode 100644 index 0000000..641ee97 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import java.io.File; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.Session; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.ProducerThread; + +public class JobSchedulerBrokerShutdownTest extends EmbeddedBrokerTestSupport { + + @Override + protected BrokerService createBroker() throws Exception { + File schedulerDirectory = new File("target/scheduler"); + + IOHelper.mkdirs(schedulerDirectory); + IOHelper.deleteChildren(schedulerDirectory); + + BrokerService broker = super.createBroker(); + broker.setSchedulerSupport(true); + broker.setDataDirectory("target"); + broker.setSchedulerDirectoryFile(schedulerDirectory); + broker.getSystemUsage().getStoreUsage().setLimit(1 * 512); + broker.deleteAllMessages(); + return broker; + } + + @Override + protected boolean isPersistent() { + return true; + } + + public void testSchedule() throws Exception { + + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + connection.start(); + final long time = 1000; + + ProducerThread producer = new ProducerThread(session, destination) { + @Override + protected Message createMessage(int i) throws Exception { + Message message = super.createMessage(i); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + return message; + } + }; + + producer.setMessageCount(200); + producer.setDaemon(true); + + producer.start(); + + Thread.sleep(5000); + } +}
