http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageGroupConfigTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageGroupConfigTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageGroupConfigTest.java new file mode 100644 index 0000000..112cc46 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageGroupConfigTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.group.CachedMessageGroupMap; +import org.apache.activemq.broker.region.group.MessageGroupHashBucket; +import org.apache.activemq.broker.region.group.MessageGroupMap; +import org.apache.activemq.broker.region.group.SimpleMessageGroupMap; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; + +/** + * + */ +public class MessageGroupConfigTest extends TestSupport { + protected BrokerService broker; + + @Override + protected void setUp() throws Exception { + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + broker.stop(); + super.tearDown(); + } + + + + public void testCachedGroupConfiguration() throws Exception { + doTestGroupConfiguration("cached",CachedMessageGroupMap.class); + } + + public void testCachedGroupConfigurationWithCacheSize() throws Exception { + CachedMessageGroupMap result = (CachedMessageGroupMap) doTestGroupConfiguration("cached?cacheSize=10",CachedMessageGroupMap.class); + assertEquals(10,result.getMaximumCacheSize()); + + } + + public void testSimpleGroupConfiguration() throws Exception { + doTestGroupConfiguration("simple", SimpleMessageGroupMap.class); + } + + public void testBucketGroupConfiguration() throws Exception { + doTestGroupConfiguration("bucket", MessageGroupHashBucket.class); + } + + public void testBucketGroupConfigurationWithBucketCount() throws Exception { + MessageGroupHashBucket result = (MessageGroupHashBucket) doTestGroupConfiguration("bucket?bucketCount=2", MessageGroupHashBucket.class); + assertEquals(2,result.getBucketCount()); + } + + public MessageGroupMap doTestGroupConfiguration(String type, Class classType) throws Exception { + broker = new BrokerService(); + + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setMessageGroupMapFactoryType(type); + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + broker.start(); + super.topic = false; + ActiveMQDestination destination = (ActiveMQDestination) createDestination("org.apache.foo"); + Queue brokerDestination = (Queue) broker.getDestination(destination); + + assertNotNull(brokerDestination); + MessageGroupMap messageGroupMap = brokerDestination.getMessageGroupOwners(); + assertNotNull(messageGroupMap); + assertTrue(messageGroupMap.getClass().isAssignableFrom(classType)); + return messageGroupMap; + } + + +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java new file mode 100644 index 0000000..b91f5a8 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageListenerDeadLetterTest extends DeadLetterTestSupport { + private static final Logger LOG = LoggerFactory + .getLogger(MessageListenerDeadLetterTest.class); + + private int rollbackCount; + + private Session dlqSession; + + private final Error[] error = new Error[1]; + + protected void doTest() throws Exception { + messageCount = 200; + connection.start(); + + ActiveMQConnection amqConnection = (ActiveMQConnection) connection; + rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1; + LOG.info("Will redeliver messages: " + rollbackCount + " times"); + + makeConsumer(); + makeDlqConsumer(); + + sendMessages(); + + // now lets receive and rollback N times + int maxRollbacks = messageCount * rollbackCount; + consumer.setMessageListener(new RollbackMessageListener(maxRollbacks, rollbackCount)); + + for (int i = 0; i < messageCount; i++) { + Message msg = dlqConsumer.receive(4000); + if (error[0] != null) { + // error from message listener + throw error[0]; + } + assertMessage(msg, i); + assertNotNull("Should be a DLQ message for loop: " + i, msg); + } + if (error[0] != null) { + throw error[0]; + } + } + + protected void makeDlqConsumer() throws JMSException { + dlqDestination = createDlqDestination(); + + LOG.info("Consuming from dead letter on: " + dlqDestination); + dlqConsumer = dlqSession.createConsumer(dlqDestination); + } + + @Override + protected void setUp() throws Exception { + transactedMode = true; + super.setUp(); + dlqSession = connection.createSession(transactedMode, acknowledgeMode); + } + + @Override + protected void tearDown() throws Exception { + dlqConsumer.close(); + dlqSession.close(); + session.close(); + super.tearDown(); + }; + + protected ActiveMQConnectionFactory createConnectionFactory() + throws Exception { + ActiveMQConnectionFactory answer = super.createConnectionFactory(); + RedeliveryPolicy policy = new RedeliveryPolicy(); + policy.setMaximumRedeliveries(3); + policy.setBackOffMultiplier((short) 1); + policy.setRedeliveryDelay(0); + policy.setInitialRedeliveryDelay(0); + policy.setUseExponentialBackOff(false); + answer.setRedeliveryPolicy(policy); + return answer; + } + + protected Destination createDlqDestination() { + return new ActiveMQQueue("ActiveMQ.DLQ"); + } + + class RollbackMessageListener implements MessageListener { + + final int maxRollbacks; + + final int deliveryCount; + + AtomicInteger rollbacks = new AtomicInteger(); + + RollbackMessageListener(int c, int delvery) { + maxRollbacks = c; + deliveryCount = delvery; + } + + public void onMessage(Message message) { + try { + int expectedMessageId = rollbacks.get() / deliveryCount; + LOG.info("expecting messageId: " + expectedMessageId); + assertMessage(message, expectedMessageId); + if (rollbacks.incrementAndGet() > maxRollbacks) { + fail("received too many messages, already done too many rollbacks: " + + rollbacks); + } + session.rollback(); + + } catch (Throwable e) { + LOG.error("unexpected exception:" + e, e); + // propagating assertError to execution task will cause a hang + // at shutdown + if (e instanceof Error) { + error[0] = (Error) e; + } else { + fail("unexpected exception: " + e); + } + + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java new file mode 100644 index 0000000..7c53730 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; + +/** + * + */ +public class NoConsumerDeadLetterTest extends DeadLetterTestSupport { + + // lets disable the inapplicable tests + public void testDurableQueueMessage() throws Exception { + } + + public void testDurableTopicMessage() throws Exception { + } + + protected void doTest() throws Exception { + makeDlqConsumer(); + sendMessages(); + + for (int i = 0; i < messageCount; i++) { + Message msg = dlqConsumer.receive(1000); + assertNotNull("Should be a message for loop: " + i, msg); + } + } + + public void testConsumerReceivesMessages() throws Exception { + this.topic = false; + ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory(); + connection = (ActiveMQConnection)factory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(getDestination()); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + Topic advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(getDestination()); + MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic); + + TextMessage msg = session.createTextMessage("Message: x"); + producer.send(msg); + + Message advisoryMessage = advisoryConsumer.receive(1000); + assertNotNull("Advisory message not received", advisoryMessage); + + Thread.sleep(1000); + + factory = (ActiveMQConnectionFactory)createConnectionFactory(); + connection = (ActiveMQConnection)factory.createConnection(); + connection.start(); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(getDestination()); + Message received = consumer.receive(1000); + assertNotNull("Message not received", received); + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + + PolicyEntry policy = new PolicyEntry(); + policy.setSendAdvisoryIfNoConsumers(true); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + + return broker; + } + + protected Destination createDlqDestination() { + if (this.topic) { + return AdvisorySupport.getNoTopicConsumersAdvisoryTopic((ActiveMQDestination)getDestination()); + } else { + return AdvisorySupport.getNoQueueConsumersAdvisoryTopic((ActiveMQDestination)getDestination()); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java new file mode 100644 index 0000000..a14df74 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; + +public class NoRetryDeadLetterTest extends DeadLetterTest { + + @Override + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory connectionFactory = super.createConnectionFactory(); + RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + redeliveryPolicy.setMaximumRedeliveries(0); + connectionFactory.setRedeliveryPolicy(redeliveryPolicy); + return connectionFactory; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java new file mode 100644 index 0000000..05be639 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import javax.jms.Destination; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; + +/** + * for durable subs, allow a dlq per subscriber such that poison messages are not duplicates + * on the dlq and such that rejecting consumers can be identified + * https://issues.apache.org/jira/browse/AMQ-3003 + */ +public class PerDurableConsumerDeadLetterTest extends DeadLetterTest { + + private static final String CLIENT_ID = "george"; + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + + PolicyEntry policy = new PolicyEntry(); + IndividualDeadLetterStrategy strategy = new IndividualDeadLetterStrategy(); + strategy.setProcessNonPersistent(true); + strategy.setDestinationPerDurableSubscriber(true); + policy.setDeadLetterStrategy(strategy); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + + return broker; + } + + @Override + protected String createClientId() { + return CLIENT_ID; + } + + @Override + protected Destination createDlqDestination() { + String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue."; + String destinationName = prefix + getClass().getName() + "." + getName(); + if (durableSubscriber) { + String subName = // connectionId:SubName + CLIENT_ID + ":" + getDestination().toString(); + destinationName += "." + subName ; + } + return new ActiveMQQueue(destinationName); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java new file mode 100644 index 0000000..448f47c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.TopicSubscription; +import org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.usage.SystemUsage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class PriorityNetworkDispatchPolicyTest { + + PriorityNetworkDispatchPolicy underTest = new PriorityNetworkDispatchPolicy(); + SystemUsage usageManager = new SystemUsage(); + ConsumerInfo info = new ConsumerInfo(); + ActiveMQMessage node = new ActiveMQMessage(); + ConsumerId id = new ConsumerId(); + ConnectionContext context = new ConnectionContext(); + BrokerService brokerService = new BrokerService(); + + @Before + public void init() throws Exception { + info.setDestination(ActiveMQDestination.createDestination("test", ActiveMQDestination.TOPIC_TYPE)); + info.setConsumerId(id); + info.setNetworkSubscription(true); + info.setNetworkConsumerPath(new ConsumerId[]{id}); + node.setMessageId(new MessageId("test:1:1:1:1")); + } + + @After + public void stopBroker() throws Exception { + brokerService.stop(); + } + + @Test + public void testRemoveLowerPriorityDup() throws Exception { + + List<Subscription> consumers = new ArrayList<Subscription>(); + + for (int i=0; i<3; i++) { + ConsumerInfo instance = info.copy(); + instance.setPriority((byte)i); + consumers.add(new TopicSubscription(brokerService.getBroker(), context, instance, usageManager)); + } + underTest.dispatch(node, null, consumers); + + long count = 0; + for (Subscription consumer : consumers) { + count += consumer.getEnqueueCounter(); + } + assertEquals("only one sub got message", 1, count); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java new file mode 100644 index 0000000..221f603 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.QueueSubscriptionTest; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.BlockJUnit4ClassRunner; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +@RunWith(BlockJUnit4ClassRunner.class) +public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + + PolicyEntry policy = new PolicyEntry(); + policy.setDispatchPolicy(new RoundRobinDispatchPolicy()); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + + return broker; + } + + @Test(timeout = 60 * 1000) + public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception { + super.testOneProducerTwoConsumersSmallMessagesOnePrefetch(); + + // Ensure that each consumer should have received at least one message + // We cannot guarantee that messages will be equally divided, since + // prefetch is one + assertEachConsumerReceivedAtLeastXMessages(1); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception { + super.testOneProducerTwoConsumersSmallMessagesLargePrefetch(); + assertMessagesDividedAmongConsumers(); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception { + super.testOneProducerTwoConsumersLargeMessagesOnePrefetch(); + + // Ensure that each consumer should have received at least one message + // We cannot guarantee that messages will be equally divided, since + // prefetch is one + assertEachConsumerReceivedAtLeastXMessages(1); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception { + super.testOneProducerTwoConsumersLargeMessagesLargePrefetch(); + assertMessagesDividedAmongConsumers(); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerManyConsumersFewMessages() throws Exception { + super.testOneProducerManyConsumersFewMessages(); + + // Since there are more consumers, each consumer should have received at + // most one message only + assertMessagesDividedAmongConsumers(); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerManyConsumersManyMessages() throws Exception { + super.testOneProducerManyConsumersManyMessages(); + assertMessagesDividedAmongConsumers(); + } + + @Test(timeout = 60 * 1000) + public void testManyProducersManyConsumers() throws Exception { + super.testManyProducersManyConsumers(); + assertMessagesDividedAmongConsumers(); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerTwoMatchingConsumersOneNotMatchingConsumer() throws Exception { + // Create consumer that won't consume any message + createMessageConsumer(createConnectionFactory().createConnection(), createDestination(), "JMSPriority<1"); + super.testOneProducerTwoConsumersSmallMessagesLargePrefetch(); + assertMessagesDividedAmongConsumers(); + } + + protected MessageConsumer createMessageConsumer(Connection conn, Destination dest, String selector) throws Exception { + connections.add(conn); + + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = sess.createConsumer(dest, selector); + conn.start(); + + return consumer; + } + + public void assertMessagesDividedAmongConsumers() { + assertEachConsumerReceivedAtLeastXMessages((messageCount * producerCount) / consumerCount); + assertEachConsumerReceivedAtMostXMessages(((messageCount * producerCount) / consumerCount) + 1); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java new file mode 100644 index 0000000..354031e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.filter.DestinationMap; +import org.apache.activemq.security.*; + +import javax.jms.*; + +import static org.apache.activemq.security.SimpleSecurityBrokerSystemTest.*; + +public class SecureDLQTest extends DeadLetterTestSupport { + + Connection dlqConnection; + Session dlqSession; + + public static AuthorizationMap createAuthorizationMap() { + DestinationMap readAccess = new DefaultAuthorizationMap(); + readAccess.put(new ActiveMQQueue("TEST"), ADMINS); + readAccess.put(new ActiveMQQueue("TEST"), USERS); + readAccess.put(new ActiveMQQueue("ActiveMQ.DLQ"), ADMINS); + + DestinationMap writeAccess = new DefaultAuthorizationMap(); + writeAccess.put(new ActiveMQQueue("TEST"), ADMINS); + writeAccess.put(new ActiveMQQueue("TEST"), USERS); + writeAccess.put(new ActiveMQQueue("ActiveMQ.DLQ"), ADMINS); + + readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); + writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); + + DestinationMap adminAccess = new DefaultAuthorizationMap(); + adminAccess.put(new ActiveMQQueue("TEST"), ADMINS); + adminAccess.put(new ActiveMQQueue("TEST"), USERS); + adminAccess.put(new ActiveMQQueue("ActiveMQ.DLQ"), ADMINS); + adminAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); + + return new SimpleAuthorizationMap(writeAccess, readAccess, adminAccess); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(createAuthorizationMap()); + + broker.setPlugins(new BrokerPlugin[] {authorizationPlugin, new SimpleSecurityBrokerSystemTest.SimpleAuthenticationFactory()}); + return broker; + } + + // lets disable the inapplicable tests + public void testTransientTopicMessage() throws Exception { + } + + public void testDurableTopicMessage() throws Exception { + } + + @Override + protected void doTest() throws Exception { + timeToLive = 1000; + acknowledgeMode = Session.CLIENT_ACKNOWLEDGE; + makeConsumer(); + sendMessages(); + Thread.sleep(1000); + consumer.close(); + + Thread.sleep(1000); + // this should try to send expired messages to dlq + makeConsumer(); + + makeDlqConsumer(); + for (int i = 0; i < messageCount; i++) { + Message msg = dlqConsumer.receive(1000); + assertMessage(msg, i); + assertNotNull("Should be a DLQ message for loop: " + i, msg); + } + + } + + @Override + public void tearDown() throws Exception { + if (dlqConnection != null) { + dlqConnection.close(); + } + super.tearDown(); + } + + @Override + protected Connection createConnection() throws Exception { + return getConnectionFactory().createConnection("user", "password"); + } + + @Override + protected void makeDlqConsumer() throws Exception { + dlqDestination = createDlqDestination(); + dlqConnection = getConnectionFactory().createConnection("system", "manager"); + dlqConnection.start(); + dlqSession = dlqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + dlqConsumer = dlqSession.createConsumer(dlqDestination); + } + + @Override + protected Destination createDlqDestination() { + return new ActiveMQQueue("ActiveMQ.DLQ"); + } + + @Override + protected String getDestinationString() { + return "TEST"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java new file mode 100644 index 0000000..d6d6b08 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.QueueSubscriptionTest; +import org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; +import org.apache.activemq.util.MessageIdList; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.BlockJUnit4ClassRunner; + +import javax.jms.MessageConsumer; +import java.util.Iterator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(BlockJUnit4ClassRunner.class) +public class SimpleDispatchPolicyTest extends QueueSubscriptionTest { + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + + PolicyEntry policy = new PolicyEntry(); + policy.setDispatchPolicy(new SimpleDispatchPolicy()); + policy.setSubscriptionRecoveryPolicy(new FixedCountSubscriptionRecoveryPolicy()); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + + return broker; + } + + @Override + @Test(timeout = 60 * 1000) + public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception { + super.testOneProducerTwoConsumersSmallMessagesLargePrefetch(); + + // One consumer should have received all messages, and the rest none + // assertOneConsumerReceivedAllMessages(messageCount); + } + + @Override + @Test(timeout = 60 * 1000) + public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception { + super.testOneProducerTwoConsumersLargeMessagesLargePrefetch(); + + // One consumer should have received all messages, and the rest none + // assertOneConsumerReceivedAllMessages(messageCount); + } + + public void assertOneConsumerReceivedAllMessages(int messageCount) throws Exception { + boolean found = false; + for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) { + MessageIdList messageIdList = consumers.get(i.next()); + int count = messageIdList.getMessageCount(); + if (count > 0) { + if (found) { + fail("No other consumers should have received any messages"); + } else { + assertEquals("Consumer should have received all messages.", messageCount, count); + found = true; + } + } + } + + if (!found) { + fail("At least one consumer should have received all messages"); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java new file mode 100644 index 0000000..dbeabfe --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import java.util.Iterator; + +import javax.jms.MessageConsumer; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TopicSubscriptionTest; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy; +import org.apache.activemq.util.MessageIdList; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.BlockJUnit4ClassRunner; + +import static org.junit.Assert.*; + +@RunWith(BlockJUnit4ClassRunner.class) +public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest { + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + + PolicyEntry policy = new PolicyEntry(); + policy.setDispatchPolicy(new StrictOrderDispatchPolicy()); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + + return broker; + } + + @Test + @Override + public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception { + super.testOneProducerTwoConsumersLargeMessagesOnePrefetch(); + + assertReceivedMessagesAreOrdered(); + } + + @Test + @Override + public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception { + super.testOneProducerTwoConsumersSmallMessagesOnePrefetch(); + + assertReceivedMessagesAreOrdered(); + } + + @Test + @Override + public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception { + super.testOneProducerTwoConsumersSmallMessagesLargePrefetch(); + + assertReceivedMessagesAreOrdered(); + } + + @Test + @Override + public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception { + super.testOneProducerTwoConsumersLargeMessagesLargePrefetch(); + + assertReceivedMessagesAreOrdered(); + } + + @Test + @Override + public void testOneProducerManyConsumersFewMessages() throws Exception { + super.testOneProducerManyConsumersFewMessages(); + + assertReceivedMessagesAreOrdered(); + } + + + @Test + @Override + public void testOneProducerManyConsumersManyMessages() throws Exception { + super.testOneProducerManyConsumersManyMessages(); + + assertReceivedMessagesAreOrdered(); + } + + @Test + @Override + public void testManyProducersOneConsumer() throws Exception { + super.testManyProducersOneConsumer(); + + assertReceivedMessagesAreOrdered(); + } + + @Test + @Override + public void testManyProducersManyConsumers() throws Exception { + super.testManyProducersManyConsumers(); + + assertReceivedMessagesAreOrdered(); + } + + public void assertReceivedMessagesAreOrdered() throws Exception { + // If there is only one consumer, messages is definitely ordered + if (consumers.size() <= 1) { + return; + } + + // Get basis of order + Iterator<MessageConsumer> i = consumers.keySet().iterator(); + MessageIdList messageOrder = (MessageIdList)consumers.get(i.next()); + + for (; i.hasNext();) { + MessageIdList messageIdList = (MessageIdList)consumers.get(i.next()); + assertTrue("Messages are not ordered.", messageOrder.equals(messageIdList)); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/cursor.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/cursor.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/cursor.xml new file mode 100644 index 0000000..d067a2c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/cursor.xml @@ -0,0 +1,67 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Copyright 2005-2006 The Apache Software Foundation + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- this file can only be parsed using the xbean-spring library --> +<!-- START SNIPPET: xbean --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> + + <broker persistent="false" xmlns="http://activemq.apache.org/schema/core"> + + <!-- lets define the dispatch policy --> + <destinationPolicy> + <policyMap> + <policyEntries> + <policyEntry topic="org.apache.>" producerFlowControl="false" memoryLimit="1mb"> + + <deadLetterStrategy> + <individualDeadLetterStrategy topicPrefix="Test.DLQ." /> + </deadLetterStrategy> + <dispatchPolicy> + <strictOrderDispatchPolicy /> + </dispatchPolicy> + <pendingSubscriberPolicy> + <vmCursor /> + </pendingSubscriberPolicy> + </policyEntry> + + <policyEntry queue="org.apache.>"> + + <deadLetterStrategy> + <individualDeadLetterStrategy queuePrefix="Test.DLQ."/> + </deadLetterStrategy> + <dispatchPolicy> + <strictOrderDispatchPolicy /> + </dispatchPolicy> + <pendingQueuePolicy> + <vmQueueCursor /> + </pendingQueuePolicy> + </policyEntry> + + </policyEntries> + </policyMap> + </destinationPolicy> + </broker> + +</beans> +<!-- END SNIPPET: xbean --> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/individual-dlq.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/individual-dlq.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/individual-dlq.xml new file mode 100644 index 0000000..e79ea92 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/individual-dlq.xml @@ -0,0 +1,60 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- this file can only be parsed using the xbean-spring library --> +<!-- START SNIPPET: xbean --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> + + <broker persistent="false" xmlns="http://activemq.apache.org/schema/core"> + + <!-- lets define the dispatch policy --> + <destinationPolicy> + <policyMap> + <policyEntries> + <policyEntry topic="org.apache.>"> + <deadLetterStrategy> + <individualDeadLetterStrategy topicPrefix="Test.DLQ." processNonPersistent="true" /> + </deadLetterStrategy> + <dispatchPolicy> + <strictOrderDispatchPolicy /> + </dispatchPolicy> + </policyEntry> + + <policyEntry queue="org.apache.>"> + <deadLetterStrategy> + <individualDeadLetterStrategy queuePrefix="Test.DLQ." processNonPersistent="true"/> + </deadLetterStrategy> + <dispatchPolicy> + <strictOrderDispatchPolicy /> + </dispatchPolicy> + </policyEntry> + + </policyEntries> + </policyMap> + </destinationPolicy> + </broker> + +</beans> +<!-- END SNIPPET: xbean --> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java new file mode 100644 index 0000000..17bf2b2 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; + +public class DestinationGCTest extends EmbeddedBrokerTestSupport { + + ActiveMQQueue queue = new ActiveMQQueue("TEST"); + ActiveMQQueue otherQueue = new ActiveMQQueue("TEST-OTHER"); + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + broker.setDestinations(new ActiveMQDestination[] {queue}); + broker.setSchedulePeriodForDestinationPurge(1000); + broker.setMaxPurgedDestinationsPerSweep(1); + PolicyEntry entry = new PolicyEntry(); + entry.setGcInactiveDestinations(true); + entry.setInactiveTimoutBeforeGC(3000); + PolicyMap map = new PolicyMap(); + map.setDefaultEntry(entry); + broker.setDestinationPolicy(map); + return broker; + } + + public void testDestinationGCWithActiveConsumers() throws Exception { + assertEquals(1, broker.getAdminView().getQueues().length); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false"); + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createProducer(otherQueue).close(); + MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + } + }); + connection.start(); + + TimeUnit.SECONDS.sleep(5); + + assertTrue("After GC runs there should be one Queue.", Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getAdminView().getQueues().length == 1; + } + })); + + connection.close(); + } + + public void testDestinationGc() throws Exception { + assertEquals(1, broker.getAdminView().getQueues().length); + assertTrue("After GC runs the Queue should be empty.", Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getAdminView().getQueues().length == 0; + } + })); + } + + public void testDestinationGcLimit() throws Exception { + + broker.getAdminView().addQueue("TEST1"); + broker.getAdminView().addQueue("TEST2"); + broker.getAdminView().addQueue("TEST3"); + broker.getAdminView().addQueue("TEST4"); + + assertEquals(5, broker.getAdminView().getQueues().length); + Thread.sleep(7000); + int queues = broker.getAdminView().getQueues().length; + assertTrue(queues > 0 && queues < 5); + assertTrue("After GC runs the Queue should be empty.", Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getAdminView().getQueues().length == 0; + } + })); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationRemoveRestartTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationRemoveRestartTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationRemoveRestartTest.java new file mode 100644 index 0000000..06011fe --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationRemoveRestartTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +import junit.framework.Test; +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; + +// from https://issues.apache.org/activemq/browse/AMQ-2216 +public class DestinationRemoveRestartTest extends CombinationTestSupport { + private final static String destinationName = "TEST"; + public byte destinationType = ActiveMQDestination.QUEUE_TYPE; + BrokerService broker; + + @Override + protected void setUp() throws Exception { + broker = createBroker(); + } + + private BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setUseJmx(false); + broker.setPersistent(true); + broker.setDeleteAllMessagesOnStartup(true); + broker.start(); + return broker; + } + + @Override + protected void tearDown() throws Exception { + broker.stop(); + } + + public void initCombosForTestCheckDestinationRemoveActionAfterRestart() { + addCombinationValues("destinationType", new Object[]{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), + Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); + } + + public void testCheckDestinationRemoveActionAfterRestart() throws Exception { + doAddDestination(); + doRemoveDestination(); + broker.stop(); + broker.waitUntilStopped(); + broker = createBroker(); + doCheckRemoveActionAfterRestart(); + } + + public void doAddDestination() throws Exception { + boolean res = false; + + ActiveMQDestination amqDestination = + ActiveMQDestination.createDestination(destinationName, destinationType); + broker.getRegionBroker().addDestination(broker.getAdminConnectionContext(), amqDestination,true); + + final ActiveMQDestination[] list = broker.getRegionBroker().getDestinations(); + for (final ActiveMQDestination element : list) { + final Destination destination = broker.getDestination(element); + if (destination.getActiveMQDestination().getPhysicalName().equals(destinationName)) { + res = true; + break; + } + } + + assertTrue("Adding destination Failed", res); + } + + public void doRemoveDestination() throws Exception { + boolean res = true; + + broker.removeDestination(ActiveMQDestination.createDestination(destinationName, destinationType)); + final ActiveMQDestination[] list = broker.getRegionBroker().getDestinations(); + for (final ActiveMQDestination element : list) { + final Destination destination = broker.getDestination(element); + if (destination.getActiveMQDestination().getPhysicalName().equals(destinationName)) { + res = false; + break; + } + } + + assertTrue("Removing destination Failed", res); + } + + + public void doCheckRemoveActionAfterRestart() throws Exception { + boolean res = true; + + final ActiveMQDestination[] list = broker.getRegionBroker().getDestinations(); + for (final ActiveMQDestination element : list) { + final Destination destination = broker.getDestination(element); + if (destination.getActiveMQDestination().getPhysicalName().equals(destinationName)) { + res = false; + break; + } + } + + assertTrue("The removed destination is reloaded after restart !", res); + } + + public static Test suite() { + return suite(DestinationRemoveRestartTest.class); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java new file mode 100644 index 0000000..f69c380 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -0,0 +1,407 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region; + +import java.io.IOException; +import java.util.List; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.InvalidSelectorException; +import javax.management.ObjectName; + +import junit.framework.TestCase; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.management.CountStatisticImpl; +import org.apache.activemq.state.ProducerState; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author gtully + * @see https://issues.apache.org/activemq/browse/AMQ-2020 + **/ +public class QueueDuplicatesFromStoreTest extends TestCase { + private static final Logger LOG = LoggerFactory + .getLogger(QueueDuplicatesFromStoreTest.class); + + ActiveMQQueue destination = new ActiveMQQueue("queue-" + + QueueDuplicatesFromStoreTest.class.getSimpleName()); + BrokerService brokerService; + + final static String mesageIdRoot = "11111:22222:"; + final int messageBytesSize = 256; + final String text = new String(new byte[messageBytesSize]); + + final int ackStartIndex = 100; + final int ackWindow = 50; + final int ackBatchSize = 50; + final int fullWindow = 200; + protected int count = 5000; + + @Override + public void setUp() throws Exception { + brokerService = createBroker(); + brokerService.setUseJmx(false); + brokerService.deleteAllMessages(); + brokerService.start(); + } + + protected BrokerService createBroker() throws Exception { + return new BrokerService(); + } + + @Override + public void tearDown() throws Exception { + brokerService.stop(); + } + + public void testNoDuplicateAfterCacheFullAndAckedWithLargeAuditDepth() throws Exception { + doTestNoDuplicateAfterCacheFullAndAcked(1024*10); + } + + public void testNoDuplicateAfterCacheFullAndAckedWithSmallAuditDepth() throws Exception { + doTestNoDuplicateAfterCacheFullAndAcked(512); + } + + public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception { + final PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter(); + final MessageStore queueMessageStore = + persistenceAdapter.createQueueMessageStore(destination); + final ConnectionContext contextNotInTx = new ConnectionContext(); + final ConsumerInfo consumerInfo = new ConsumerInfo(); + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + consumerInfo.setExclusive(true); + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, brokerService.getTaskRunnerFactory()); + + // a workaround for this issue + // queue.setUseCache(false); + queue.systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 10); + queue.setMaxAuditDepth(auditDepth); + queue.initialize(); + queue.start(); + + + ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); + ProducerInfo producerInfo = new ProducerInfo(); + ProducerState producerState = new ProducerState(producerInfo); + producerExchange.setProducerState(producerState); + producerExchange.setConnectionContext(contextNotInTx); + + final CountDownLatch receivedLatch = new CountDownLatch(count); + final AtomicLong ackedCount = new AtomicLong(0); + final AtomicLong enqueueCounter = new AtomicLong(0); + final Vector<String> errors = new Vector<String>(); + + // populate the queue store, exceed memory limit so that cache is disabled + for (int i = 0; i < count; i++) { + Message message = getMessage(i); + queue.send(producerExchange, message); + } + + assertEquals("store count is correct", count, queueMessageStore.getMessageCount()); + + // pull from store in small windows + Subscription subscription = new Subscription() { + + @Override + public void add(MessageReference node) throws Exception { + if (enqueueCounter.get() != node.getMessageId().getProducerSequenceId()) { + errors.add("Not in sequence at: " + enqueueCounter.get() + ", received: " + + node.getMessageId().getProducerSequenceId()); + } + assertEquals("is in order", enqueueCounter.get(), node + .getMessageId().getProducerSequenceId()); + receivedLatch.countDown(); + enqueueCounter.incrementAndGet(); + node.decrementReferenceCount(); + } + + @Override + public void add(ConnectionContext context, Destination destination) + throws Exception { + } + + @Override + public int countBeforeFull() { + if (isFull()) { + return 0; + } else { + return fullWindow - (int) (enqueueCounter.get() - ackedCount.get()); + } + } + + @Override + public void destroy() { + }; + + @Override + public void gc() { + } + + @Override + public ConsumerInfo getConsumerInfo() { + return consumerInfo; + } + + @Override + public ConnectionContext getContext() { + return null; + } + + @Override + public long getDequeueCounter() { + return 0; + } + + @Override + public long getDispatchedCounter() { + return 0; + } + + @Override + public int getDispatchedQueueSize() { + return 0; + } + + @Override + public long getEnqueueCounter() { + return 0; + } + + @Override + public int getInFlightSize() { + return 0; + } + + @Override + public int getInFlightUsage() { + return 0; + } + + @Override + public ObjectName getObjectName() { + return null; + } + + @Override + public int getPendingQueueSize() { + return 0; + } + + @Override + public int getPrefetchSize() { + return 0; + } + + @Override + public String getSelector() { + return null; + } + + @Override + public boolean isBrowser() { + return false; + } + + @Override + public boolean isFull() { + return (enqueueCounter.get() - ackedCount.get()) >= fullWindow; + } + + @Override + public boolean isHighWaterMark() { + return false; + } + + @Override + public boolean isLowWaterMark() { + return false; + } + + @Override + public boolean isRecoveryRequired() { + return false; + } + + @Override + public boolean matches(MessageReference node, + MessageEvaluationContext context) throws IOException { + return true; + } + + @Override + public boolean matches(ActiveMQDestination destination) { + return true; + } + + @Override + public void processMessageDispatchNotification( + MessageDispatchNotification mdn) throws Exception { + } + + @Override + public Response pullMessage(ConnectionContext context, + MessagePull pull) throws Exception { + return null; + } + + @Override + public boolean isWildcard() { + return false; + } + + @Override + public List<MessageReference> remove(ConnectionContext context, + Destination destination) throws Exception { + return null; + } + + @Override + public void setObjectName(ObjectName objectName) { + } + + @Override + public void setSelector(String selector) + throws InvalidSelectorException, + UnsupportedOperationException { + } + + @Override + public void updateConsumerPrefetch(int newPrefetch) { + } + + @Override + public boolean addRecoveredMessage(ConnectionContext context, + MessageReference message) throws Exception { + return false; + } + + @Override + public ActiveMQDestination getActiveMQDestination() { + return destination; + } + + @Override + public void acknowledge(ConnectionContext context, MessageAck ack) + throws Exception { + } + + @Override + public int getCursorMemoryHighWaterMark(){ + return 0; + } + + @Override + public void setCursorMemoryHighWaterMark( + int cursorMemoryHighWaterMark) { + } + + @Override + public boolean isSlowConsumer() { + return false; + } + + @Override + public void unmatched(MessageReference node) throws IOException { + } + + @Override + public long getTimeOfLastMessageAck() { + return 0; + } + + @Override + public long getConsumedCount() { + return 0; + } + + public void incrementConsumedCount(){ + + } + + public void resetConsumedCount(){ + + } + }; + + queue.addSubscription(contextNotInTx, subscription); + int removeIndex = 0; + do { + // Simulate periodic acks in small but recent windows + long receivedCount = enqueueCounter.get(); + if (receivedCount > ackStartIndex) { + if (receivedCount >= removeIndex + ackWindow) { + for (int j = 0; j < ackBatchSize; j++, removeIndex++) { + ackedCount.incrementAndGet(); + MessageAck ack = new MessageAck(); + ack.setLastMessageId(new MessageId(mesageIdRoot + + removeIndex)); + ack.setMessageCount(1); + queue.removeMessage(contextNotInTx, subscription, + new IndirectMessageReference( + getMessage(removeIndex)), ack); + queue.wakeup(); + + } + if (removeIndex % 1000 == 0) { + LOG.info("acked: " + removeIndex); + persistenceAdapter.checkpoint(true); + } + } + } + + } while (!receivedLatch.await(0, TimeUnit.MILLISECONDS) && errors.isEmpty()); + + assertTrue("There are no errors: " + errors, errors.isEmpty()); + assertEquals(count, enqueueCounter.get()); + assertEquals("store count is correct", count - removeIndex, + queueMessageStore.getMessageCount()); + } + + private Message getMessage(int i) throws Exception { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setMessageId(new MessageId(mesageIdRoot + i)); + message.setDestination(destination); + message.setPersistent(true); + message.setResponseRequired(true); + message.setText("Msg:" + i + " " + text); + assertEquals(message.getMessageId().getProducerSequenceId(), i); + return message; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueOptimizedDispatchExceptionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueOptimizedDispatchExceptionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueOptimizedDispatchExceptionTest.java new file mode 100644 index 0000000..b46169e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueOptimizedDispatchExceptionTest.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +import static org.junit.Assert.*; + +import java.io.IOException; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.Connection; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.Connector; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionControl; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.state.ProducerState; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.usage.MemoryUsage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QueueOptimizedDispatchExceptionTest { + + private static final Logger LOG = LoggerFactory.getLogger(QueueOptimizedDispatchExceptionTest.class); + + private static final String brokerName = "testBroker"; + private static final String brokerUrl = "vm://" + brokerName; + private static final int count = 50; + + private final static String mesageIdRoot = "11111:22222:"; + private final ActiveMQQueue destination = new ActiveMQQueue("queue-" + + QueueOptimizedDispatchExceptionTest.class.getSimpleName()); + private final int messageBytesSize = 256; + private final String text = new String(new byte[messageBytesSize]); + + private BrokerService broker; + + @Before + public void setUp() throws Exception { + + // Setup and start the broker + broker = new BrokerService(); + broker.setBrokerName(brokerName); + broker.setPersistent(false); + broker.setSchedulerSupport(false); + broker.setUseJmx(false); + broker.setUseShutdownHook(false); + broker.addConnector(brokerUrl); + + // Start the broker + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + private class MockMemoryUsage extends MemoryUsage { + + private boolean full = true; + + public void setFull(boolean full) { + this.full = full; + } + + @Override + public boolean isFull() { + return full; + } + } + + @Test + public void TestOptimizedDispatchCME() throws Exception { + final PersistenceAdapter persistenceAdapter = broker.getPersistenceAdapter(); + final MessageStore queueMessageStore = + persistenceAdapter.createQueueMessageStore(destination); + final ConnectionContext contextNotInTx = new ConnectionContext(); + contextNotInTx.setConnection(new Connection() { + + @Override + public void stop() throws Exception { + } + + @Override + public void start() throws Exception { + } + + @Override + public void updateClient(ConnectionControl control) { + } + + @Override + public void serviceExceptionAsync(IOException e) { + } + + @Override + public void serviceException(Throwable error) { + } + + @Override + public Response service(Command command) { + return null; + } + + @Override + public boolean isSlow() { + return false; + } + + @Override + public boolean isNetworkConnection() { + return false; + } + + @Override + public boolean isManageable() { + return false; + } + + @Override + public boolean isFaultTolerantConnection() { + return false; + } + + @Override + public boolean isConnected() { + return true; + } + + @Override + public boolean isBlocked() { + return false; + } + + @Override + public boolean isActive() { + return false; + } + + @Override + public ConnectionStatistics getStatistics() { + return null; + } + + @Override + public String getRemoteAddress() { + return null; + } + + @Override + public int getDispatchQueueSize() { + return 0; + } + + @Override + public Connector getConnector() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getConnectionId() { + return null; + } + + @Override + public void dispatchSync(Command message) { + } + + @Override + public void dispatchAsync(Command command) { + } + + @Override + public int getActiveTransactionCount() { + return 0; + } + + @Override + public Long getOldestActiveTransactionDuration() { + return null; + } + }); + + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + final Queue queue = new Queue(broker, destination, + queueMessageStore, destinationStatistics, broker.getTaskRunnerFactory()); + + final MockMemoryUsage usage = new MockMemoryUsage(); + + queue.setOptimizedDispatch(true); + queue.initialize(); + queue.start(); + queue.memoryUsage = usage; + + ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); + ProducerInfo producerInfo = new ProducerInfo(); + ProducerState producerState = new ProducerState(producerInfo); + producerExchange.setProducerState(producerState); + producerExchange.setConnectionContext(contextNotInTx); + + // populate the queue store, exceed memory limit so that cache is disabled + for (int i = 0; i < count; i++) { + Message message = getMessage(i); + queue.send(producerExchange, message); + } + + usage.setFull(false); + + try { + queue.wakeup(); + } catch(Exception e) { + LOG.error("Queue threw an unexpected exception: " + e.toString()); + fail("Should not throw an exception."); + } + } + + private Message getMessage(int i) throws Exception { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setMessageId(new MessageId(mesageIdRoot + i)); + message.setDestination(destination); + message.setPersistent(false); + message.setResponseRequired(true); + message.setText("Msg:" + i + " " + text); + assertEquals(message.getMessageId().getProducerSequenceId(), i); + return message; + } +}
