http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerManagementTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerManagementTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerManagementTest.java new file mode 100644 index 0000000..e65d819 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerManagementTest.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler.memory; + +import org.apache.activemq.broker.scheduler.JobSchedulerManagementTest; + +/** + * Tests management of in memory scheduler via JMS client. + */ +public class InMemoryJobSchedulerManagementTest extends JobSchedulerManagementTest { + + @Override + protected boolean isPersistent() { + return false; + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStoreTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStoreTest.java new file mode 100644 index 0000000..ac90070 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerStoreTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler.memory; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.broker.scheduler.Job; +import org.apache.activemq.broker.scheduler.JobScheduler; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOHelper; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class InMemoryJobSchedulerStoreTest { + + private static final Logger LOG = LoggerFactory.getLogger(InMemoryJobSchedulerStoreTest.class); + + @Test(timeout = 120 * 1000) + public void testRestart() throws Exception { + InMemoryJobSchedulerStore store = new InMemoryJobSchedulerStore(); + File directory = new File("target/test/ScheduledDB"); + IOHelper.mkdirs(directory); + IOHelper.deleteChildren(directory); + store.setDirectory(directory); + final int NUMBER = 1000; + store.start(); + List<ByteSequence> list = new ArrayList<ByteSequence>(); + for (int i = 0; i < NUMBER; i++) { + ByteSequence buff = new ByteSequence(new String("testjob" + i).getBytes()); + list.add(buff); + } + + JobScheduler js = store.getJobScheduler("test"); + js.startDispatching(); + int count = 0; + long startTime = 10 * 60 * 1000; + long period = startTime; + for (ByteSequence job : list) { + js.schedule("id:" + (count++), job, "", startTime, period, -1); + } + + List<Job> test = js.getAllJobs(); + LOG.debug("Found {} jobs in the store before restart", test.size()); + assertEquals(list.size(), test.size()); + store.stop(); + store.start(); + js = store.getJobScheduler("test"); + test = js.getAllJobs(); + LOG.debug("Found {} jobs in the store after restart", test.size()); + assertEquals(0, test.size()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java new file mode 100644 index 0000000..36771b0 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler.memory; + +import org.apache.activemq.broker.scheduler.JobSchedulerStore; +import org.apache.activemq.broker.scheduler.JobSchedulerTest; + +/** + * In-Memory store based variation of the JobSchedulerTest + */ +public class InMemoryJobSchedulerTest extends JobSchedulerTest { + + @Override + public void testAddStopThenDeliver() throws Exception { + // In Memory store that's stopped doesn't retain the jobs. + } + + @Override + protected JobSchedulerStore createJobSchedulerStore() throws Exception { + return new InMemoryJobSchedulerStore(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTxTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTxTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTxTest.java new file mode 100644 index 0000000..fb87905 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTxTest.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler.memory; + +import org.apache.activemq.broker.scheduler.JobSchedulerTxTest; + +/** + * In memory version of the TX test case + */ +public class InMemoryJobSchedulerTxTest extends JobSchedulerTxTest { + + @Override + protected boolean isPersistent() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/spring.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/spring.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/spring.xml new file mode 100644 index 0000000..7d37217 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/spring.xml @@ -0,0 +1,97 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- START SNIPPET: spring --> +<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd"> +<beans> + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> + + <!-- an embedded broker --> + + <bean id="broker" class="org.apache.activemq.broker.BrokerService" + init-method="start"> + <property name="transportConnectorURIs"> + <list> + <value>tcp://localhost:61616</value> + <value>tcp://localhost:61636</value> + </list> + </property> + </bean> + + <!-- JMS ConnectionFactory to use --> + <bean id="jmsFactory" + class="org.apache.activemq.ActiveMQConnectionFactory"> + <property name="brokerURL" value="tcp://localhost:61636" /> + </bean> + + <!-- Spring JMS Template --> + <bean id="myJmsTemplate" + class="org.springframework.jms.core.JmsTemplate"> + <property name="connectionFactory"> + <!-- lets wrap in a pool to avoid creating a connection per send --> + <bean + class="org.springframework.jms.connection.SingleConnectionFactory"> + <property name="targetConnectionFactory"> + <ref local="jmsFactory" /> + </property> + </bean> + </property> + </bean> + + <!-- Spring JMS Template --> + <bean id="consumerJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> + <property name="connectionFactory" ref="jmsFactory"/> + </bean> + + <!-- a sample POJO which uses a Spring JmsTemplate --> + <bean id="producer" class="org.apache.activemq.spring.SpringProducer"> + <property name="template"> + <ref bean="myJmsTemplate"></ref> + </property> + + <property name="destination"> + <ref bean="destination" /> + </property> + + <property name="messageCount"> + <value>10</value> + </property> + </bean> + + + <!-- a sample POJO consumer --> + <bean id="consumer" class="org.apache.activemq.spring.SpringConsumer"> + <property name="template"> + <ref bean="consumerJmsTemplate"></ref> + </property> + + <property name="destination"> + <ref bean="destination" /> + </property> + </bean> + + <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic" + autowire="constructor"> + <constructor-arg> + <value>org.apache.activemq.spring.Test.spring.topic</value> + </constructor-arg> + </bean> + +</beans> + +<!-- END SNIPPET: spring --> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreBrokerTest.java new file mode 100644 index 0000000..1e9633a --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreBrokerTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.store; + +import java.net.URI; + +import junit.framework.Test; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerTest; + +/** + * Once the wire format is completed we can test against real persistence storage. + * + * + */ +public class DefaultStoreBrokerTest extends BrokerTest { + + protected BrokerService createBroker() throws Exception { + return BrokerFactory.createBroker(new URI("broker://()/localhost?deleteAllMessagesOnStartup=true")); + } + + protected BrokerService createRestartedBroker() throws Exception { + return BrokerFactory.createBroker(new URI("broker://()/localhost")); + } + + public static Test suite() { + return suite(DefaultStoreBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreRecoveryBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreRecoveryBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreRecoveryBrokerTest.java new file mode 100644 index 0000000..e89ca04 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/DefaultStoreRecoveryBrokerTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.store; + +import java.net.URI; + +import junit.framework.Test; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.RecoveryBrokerTest; + +/** + * Used to verify that recovery works correctly against + * + * + */ +public class DefaultStoreRecoveryBrokerTest extends RecoveryBrokerTest { + + protected BrokerService createBroker() throws Exception { + return BrokerFactory.createBroker(new URI("broker://()/localhost?deleteAllMessagesOnStartup=true")); + } + + protected BrokerService createRestartedBroker() throws Exception { + return BrokerFactory.createBroker(new URI("broker://()/localhost")); + } + + public static Test suite() { + return suite(DefaultStoreRecoveryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/LoadTester.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/LoadTester.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/LoadTester.java new file mode 100644 index 0000000..a6d78b4 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/LoadTester.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.store; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import javax.jms.BytesMessage; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.Test; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsTestSupport; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ProgressPrinter; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class LoadTester extends JmsTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(LoadTester.class); + + protected int messageSize = 1024 * 64; + protected int produceCount = 10000; + + @Override + protected BrokerService createBroker() throws Exception { + return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/broker/store/loadtester.xml")); + } + + @Override + protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getServer().getConnectURI()); + factory.setUseAsyncSend(true); + return factory; + } + + public void testQueueSendThenAddConsumer() throws Exception { + ProgressPrinter printer = new ProgressPrinter(produceCount, 20); + + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + connection.setUseCompression(false); + connection.getPrefetchPolicy().setAll(10); + connection.start(); + Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + LOG.info("Sending " + produceCount + " messages that are " + (messageSize / 1024.0) + "k large, for a total of " + (produceCount * messageSize / (1024.0 * 1024.0)) + + " megs of data."); + // Send a message to the broker. + long start = System.currentTimeMillis(); + for (int i = 0; i < produceCount; i++) { + printer.increment(); + BytesMessage msg = session.createBytesMessage(); + msg.writeBytes(new byte[messageSize]); + producer.send(msg); + } + long end1 = System.currentTimeMillis(); + + LOG.info("Produced messages/sec: " + (produceCount * 1000.0 / (end1 - start))); + + printer = new ProgressPrinter(produceCount, 10); + start = System.currentTimeMillis(); + MessageConsumer consumer = session.createConsumer(destination); + for (int i = 0; i < produceCount; i++) { + printer.increment(); + assertNotNull("Getting message: " + i, consumer.receive(20000)); + } + end1 = System.currentTimeMillis(); + LOG.info("Consumed messages/sec: " + (produceCount * 1000.0 / (end1 - start))); + + } + + public static Test suite() { + return suite(LoadTester.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java new file mode 100644 index 0000000..fb0296c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/RecoverExpiredMessagesTest.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.store; + +import java.io.File; +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; +import junit.framework.Test; +import org.apache.activemq.broker.BrokerRestartTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.IOHelper; + +public class RecoverExpiredMessagesTest extends BrokerRestartTestSupport { + final ArrayList<String> expected = new ArrayList<String>(); + final ActiveMQDestination destination = new ActiveMQQueue("TEST"); + public PendingQueueMessageStoragePolicy queuePendingPolicy; + + @Override + protected void setUp() throws Exception { + setAutoFail(true); + super.setUp(); + } + + public void initCombosForTestRecovery() throws Exception { + addCombinationValues("queuePendingPolicy", new PendingQueueMessageStoragePolicy[] {new FilePendingQueueMessageStoragePolicy(), new VMPendingQueueMessageStoragePolicy()}); + PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[] { + new KahaDBPersistenceAdapter(), + new JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()), new OpenWireFormat()) + }; + for (PersistenceAdapter adapter : persistenceAdapters) { + adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory())); + } + addCombinationValues("persistenceAdapter", persistenceAdapters); + } + + public void testRecovery() throws Exception { + sendSomeMessagesThatWillExpireIn5AndThenOne(); + + broker.stop(); + broker.waitUntilStopped(); + TimeUnit.SECONDS.sleep(6); + broker = createRestartedBroker(); + broker.start(); + + consumeExpected(); + } + + private void consumeExpected() throws Exception { + // Setup the consumer and receive the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + Message m = receiveMessage(connection); + assertNotNull("Should have received message " + expected.get(0) + " by now!", m); + assertEquals(expected.get(0), m.getMessageId().toString()); + MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE); + connection.send(ack); + + assertNoMessagesLeft(connection); + connection.request(closeConnectionInfo(connectionInfo)); + } + + private void sendSomeMessagesThatWillExpireIn5AndThenOne() throws Exception { + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + + int MESSAGE_COUNT = 10; + for(int i=0; i < MESSAGE_COUNT; i++) { + Message message = createMessage(producerInfo, destination); + message.setExpiration(System.currentTimeMillis()+5000); + message.setPersistent(true); + connection.send(message); + } + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + expected.add(message.getMessageId().toString()); + + connection.request(closeConnectionInfo(connectionInfo)); + } + + @Override + protected PolicyEntry getDefaultPolicy() { + PolicyEntry policy = super.getDefaultPolicy(); + policy.setPendingQueuePolicy(queuePendingPolicy); + policy.setExpireMessagesPeriod(0); + return policy; + } + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + super.configureBroker(broker); + broker.setPersistenceAdapter(persistenceAdapter); + } + + public static Test suite() { + return suite(RecoverExpiredMessagesTest.class); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/kahabroker.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/kahabroker.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/kahabroker.xml new file mode 100644 index 0000000..4c8254b --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/kahabroker.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:amq="http://activemq.apache.org/schema/core" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> + + <broker brokerName="broker" persistent="false" useJmx="false" xmlns="http://activemq.apache.org/schema/core"> + <persistenceAdapter> + <kahaPersistenceAdapter dir = "${basedir}/target/activemq-data/kaha-broker.db" maxDataFileLength = "1024"/> + </persistenceAdapter> + + <transportConnectors> + <transportConnector uri="tcp://localhost:0"/> + </transportConnectors> + + </broker> + +</beans> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/loadtester.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/loadtester.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/loadtester.xml new file mode 100644 index 0000000..6383e84 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/store/loadtester.xml @@ -0,0 +1,61 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:amq="http://activemq.apache.org/schema/core" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> + + <amq:systemUsage id="memory-manager" > + <amq:memoryUsage> + <amq:memoryUsage limit = "1048576" /> <!-- 1 meg limit --> + </amq:memoryUsage> + </amq:systemUsage> + + <broker useJmx="true" deleteAllMessagesOnStartup="true" systemUsage="#memory-manager" xmlns="http://activemq.apache.org/schema/core"> + + <persistenceFactory> + <journalPersistenceAdapterFactory + useQuickJournal="false" journalLogFiles="2" dataDirectory="loadtest"/> + </persistenceFactory> + + <transportConnectors> + <transportConnector uri="tcp://localhost:0"/> + </transportConnectors> + + </broker> + + <!-- The Derby Datasource that will be used by the Broker --> + <!-- + <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource" destroy-method="close"> + <property name="serverName" value="localhost"/> + <property name="databaseName" value="activemq"/> + <property name="portNumber" value="0"/> + <property name="user" value="activemq"/> + <property name="password" value="activemq"/> + <property name="dataSourceName" value="postgres"/> + <property name="initialConnections" value="1"/> + <property name="maxConnections" value="10"/> + </bean> + --> + +</beans> +<!-- END SNIPPET: xbean --> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/DestinationsPluginTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/DestinationsPluginTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/DestinationsPluginTest.java new file mode 100644 index 0000000..d08fc5e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/DestinationsPluginTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.util; + + +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerView; +import org.apache.activemq.command.ActiveMQDestination; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class DestinationsPluginTest { + + BrokerService broker; + + @Before + public void setUp() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void shutdown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + protected BrokerService createBroker() { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(true); + broker.setPlugins(new BrokerPlugin[]{new DestinationsPlugin()}); + broker.setDataDirectory("target/test"); + return broker; + } + + @Test + public void testDestinationSave() throws Exception { + + BrokerView brokerView = broker.getAdminView(); + brokerView.addQueue("test-queue"); + + broker.stop(); + broker.waitUntilStopped(); + + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + + + ActiveMQDestination[] destinations = broker.getRegionBroker().getDestinations(); + for (ActiveMQDestination destination : destinations) { + if (destination.isQueue()) { + assertEquals("test-queue", destination.getPhysicalName()); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/PluginBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/PluginBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/PluginBrokerTest.java new file mode 100644 index 0000000..9361859 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/PluginBrokerTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.util; + +import java.net.URI; + +import javax.jms.JMSException; +import javax.jms.Message; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.test.JmsTopicSendReceiveTest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * + */ +public class PluginBrokerTest extends JmsTopicSendReceiveTest { + private static final Logger LOG = LoggerFactory.getLogger(PluginBrokerTest.class); + private BrokerService broker; + + protected void setUp() throws Exception { + broker = createBroker(); + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + } + } + + protected BrokerService createBroker() throws Exception { + return createBroker("org/apache/activemq/util/plugin-broker.xml"); + } + + protected BrokerService createBroker(String uri) throws Exception { + LOG.info("Loading broker configuration from the classpath with URI: " + uri); + return BrokerFactory.createBroker(new URI("xbean:" + uri)); + } + + protected void assertMessageValid(int index, Message message) + throws JMSException { + // check if broker path has been set + assertEquals("localhost", message.getStringProperty("BrokerPath")); + ActiveMQMessage amqMsg = (ActiveMQMessage)message; + if (index == 7) { + // check custom expiration + assertTrue("expiration is in range, depends on two distinct calls to System.currentTimeMillis", 1500 < amqMsg.getExpiration() - amqMsg.getTimestamp()); + } else if (index == 9) { + // check ceiling + assertTrue("expiration ceeling is in range, depends on two distinct calls to System.currentTimeMillis", 59500 < amqMsg.getExpiration() - amqMsg.getTimestamp()); + } else { + // check default expiration + assertEquals(1000, amqMsg.getExpiration() - amqMsg.getTimestamp()); + } + super.assertMessageValid(index, message); + } + + protected void sendMessage(int index, Message message) throws Exception { + if (index == 7) { + producer.send(producerDestination, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 2000); + } else if (index == 9) { + producer.send(producerDestination, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 200000); + } else { + super.sendMessage(index, message); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java new file mode 100644 index 0000000..dd12768 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/RedeliveryPluginTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.util; + +import junit.framework.TestCase; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ErrorBroker; +import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RedeliveryPluginTest extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPluginTest.class); + RedeliveryPlugin underTest = new RedeliveryPlugin(); + + public void testInstallPluginValidation() throws Exception { + RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap(); + RedeliveryPolicy defaultEntry = new RedeliveryPolicy(); + defaultEntry.setInitialRedeliveryDelay(500); + redeliveryPolicyMap.setDefaultEntry(defaultEntry); + underTest.setRedeliveryPolicyMap(redeliveryPolicyMap); + + final BrokerService brokerService = new BrokerService(); + brokerService.setSchedulerSupport(false); + Broker broker = new ErrorBroker("hi") { + @Override + public BrokerService getBrokerService() { + return brokerService; + } + }; + + try { + underTest.installPlugin(broker); + fail("expect exception on no scheduler support"); + } catch (Exception expected) { + LOG.info("expected: " + expected); + } + + brokerService.setSchedulerSupport(true); + try { + underTest.installPlugin(broker); + fail("expect exception on small initial delay"); + } catch (Exception expected) { + LOG.info("expected: " + expected); + } + + defaultEntry.setInitialRedeliveryDelay(5000); + defaultEntry.setRedeliveryDelay(500); + brokerService.setSchedulerSupport(true); + try { + underTest.installPlugin(broker); + fail("expect exception on small redelivery delay"); + } catch (Exception expected) { + LOG.info("expected: " + expected); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java new file mode 100644 index 0000000..1a91f88 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.util; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; +import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TimeStampingBrokerPluginTest extends TestCase { + + BrokerService broker; + TransportConnector tcpConnector; + MessageProducer producer; + MessageConsumer consumer; + Connection connection; + Session session; + Destination destination; + String queue = "TEST.FOO"; + long expiry = 500; + + @Before + public void setUp() throws Exception { + TimeStampingBrokerPlugin tsbp = new TimeStampingBrokerPlugin(); + tsbp.setZeroExpirationOverride(expiry); + tsbp.setTtlCeiling(expiry); + + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(true); + broker.setPlugins(new BrokerPlugin[] {tsbp}); + tcpConnector = broker.addConnector("tcp://localhost:0"); + + // Add policy and individual DLQ strategy + PolicyEntry policy = new PolicyEntry(); + DeadLetterStrategy strategy = new IndividualDeadLetterStrategy(); + strategy.setProcessExpired(true); + ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true); + ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ."); + strategy.setProcessNonPersistent(true); + policy.setDeadLetterStrategy(strategy); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + + broker.start(); + // Create a ConnectionFactory + ActiveMQConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(tcpConnector.getConnectUri()); + + // Create a Connection + connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create the destination Queue + destination = session.createQueue(queue); + + // Create a MessageProducer from the Session to the Topic or Queue + producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + } + + @After + public void tearDown() throws Exception { + // Clean up + producer.close(); + consumer.close(); + session.close(); + connection.close(); + broker.stop(); + } + @Test + public void testExpirationSet() throws Exception { + + // Create a messages + Message sentMessage = session.createMessage(); + + // Tell the producer to send the message + long beforeSend = System.currentTimeMillis(); + producer.send(sentMessage); + + // Create a MessageConsumer from the Session to the Topic or Queue + consumer = session.createConsumer(destination); + + // Wait for a message + Message receivedMessage = consumer.receive(1000); + + // assert we got the same message ID we sent + assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID()); + + // assert message timestamp is in window + assertTrue("Expiration should be not null" + receivedMessage.getJMSExpiration() + "\n", Long.valueOf(receivedMessage.getJMSExpiration()) != null); + + // assert message expiration is in window + assertTrue("Before send: " + beforeSend + " Msg ts: " + receivedMessage.getJMSTimestamp() + " Msg Expiry: " + receivedMessage.getJMSExpiration(), beforeSend <= receivedMessage.getJMSExpiration() && receivedMessage.getJMSExpiration() <= (receivedMessage.getJMSTimestamp() + expiry)); + } + @Test + public void testExpirationCelingSet() throws Exception { + + // Create a messages + Message sentMessage = session.createMessage(); + // Tell the producer to send the message + long beforeSend = System.currentTimeMillis(); + long sendExpiry = beforeSend + (expiry*22); + sentMessage.setJMSExpiration(sendExpiry); + + producer.send(sentMessage); + + // Create a MessageConsumer from the Session to the Topic or Queue + consumer = session.createConsumer(destination); + + // Wait for a message + Message receivedMessage = consumer.receive(1000); + + // assert we got the same message ID we sent + assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID()); + + // assert message timestamp is in window + assertTrue("Expiration should be not null" + receivedMessage.getJMSExpiration() + "\n", Long.valueOf(receivedMessage.getJMSExpiration()) != null); + + // assert message expiration is in window + assertTrue("Sent expiry: " + sendExpiry + " Recv ts: " + receivedMessage.getJMSTimestamp() + " Recv expiry: " + receivedMessage.getJMSExpiration(), beforeSend <= receivedMessage.getJMSExpiration() && receivedMessage.getJMSExpiration() <= (receivedMessage.getJMSTimestamp() + expiry)); + } + + @Test + public void testExpirationDLQ() throws Exception { + + // Create a messages + Message sentMessage = session.createMessage(); + // Tell the producer to send the message + long beforeSend = System.currentTimeMillis(); + long sendExpiry = beforeSend + expiry; + sentMessage.setJMSExpiration(sendExpiry); + + producer.send(sentMessage); + + // Create a MessageConsumer from the Session to the Topic or Queue + consumer = session.createConsumer(destination); + + Thread.sleep(expiry+250); + + // Wait for a message + Message receivedMessage = consumer.receive(1000); + + // Message should roll to DLQ + assertNull(receivedMessage); + + // Close old consumer, setup DLQ listener + consumer.close(); + consumer = session.createConsumer(session.createQueue("DLQ."+queue)); + + // Get mesage from DLQ + receivedMessage = consumer.receive(1000); + + // assert we got the same message ID we sent + assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID()); + + // assert message timestamp is in window + //System.out.println("Recv: " + receivedMessage.getJMSExpiration()); + assertEquals("Expiration should be zero" + receivedMessage.getJMSExpiration() + "\n", receivedMessage.getJMSExpiration(), 0); + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TraceBrokerPathPluginTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TraceBrokerPathPluginTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TraceBrokerPathPluginTest.java new file mode 100644 index 0000000..35e9cdb --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/util/TraceBrokerPathPluginTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.activemq.broker.util; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests TraceBrokerPathPlugin by creating two brokers linked by a network connector, and checking to see if the consuming end receives the expected value in the trace property + * @author Raul Kripalani + * + */ +public class TraceBrokerPathPluginTest extends TestCase { + + BrokerService brokerA; + BrokerService brokerB; + TransportConnector tcpConnectorA; + TransportConnector tcpConnectorB; + MessageProducer producer; + MessageConsumer consumer; + Connection connectionA; + Connection connectionB; + Session sessionA; + Session sessionB; + String queue = "TEST.FOO"; + String traceProperty = "BROKER_PATH"; + + @Before + public void setUp() throws Exception { + TraceBrokerPathPlugin tbppA = new TraceBrokerPathPlugin(); + tbppA.setStampProperty(traceProperty); + + TraceBrokerPathPlugin tbppB = new TraceBrokerPathPlugin(); + tbppB.setStampProperty(traceProperty); + + brokerA = new BrokerService(); + brokerA.setBrokerName("brokerA"); + brokerA.setPersistent(false); + brokerA.setUseJmx(true); + brokerA.setPlugins(new BrokerPlugin[] {tbppA}); + tcpConnectorA = brokerA.addConnector("tcp://localhost:0"); + + brokerB = new BrokerService(); + brokerB.setBrokerName("brokerB"); + brokerB.setPersistent(false); + brokerB.setUseJmx(true); + brokerB.setPlugins(new BrokerPlugin[] {tbppB}); + tcpConnectorB = brokerB.addConnector("tcp://localhost:0"); + + brokerA.addNetworkConnector("static:(" + tcpConnectorB.getConnectUri().toString() + ")"); + + brokerB.start(); + brokerB.waitUntilStarted(); + brokerA.start(); + brokerA.waitUntilStarted(); + + // Initialise connection to A and MessageProducer + connectionA = new ActiveMQConnectionFactory(tcpConnectorA.getConnectUri()).createConnection(); + connectionA.start(); + sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = sessionA.createProducer(sessionA.createQueue(queue)); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + // Initialise connection to B and MessageConsumer + connectionB = new ActiveMQConnectionFactory(tcpConnectorB.getConnectUri()).createConnection(); + connectionB.start(); + sessionB = connectionB.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = sessionB.createConsumer(sessionB.createQueue(queue)); + + } + + @After + public void tearDown() throws Exception { + // Clean up + producer.close(); + consumer.close(); + sessionA.close(); + sessionB.close(); + connectionA.close(); + connectionB.close(); + brokerA.stop(); + brokerB.stop(); + } + + @Test + public void testTraceBrokerPathPlugin() throws Exception { + Message sentMessage = sessionA.createMessage(); + producer.send(sentMessage); + Message receivedMessage = consumer.receive(1000); + + // assert we got the message + assertNotNull(receivedMessage); + + // assert we got the same message ID we sent + assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID()); + + assertEquals("brokerA,brokerB", receivedMessage.getStringProperty(traceProperty)); + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java new file mode 100644 index 0000000..3621a14 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import java.net.URI; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.spring.ConsumerBean; +import org.apache.activemq.xbean.XBeanBrokerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * + */ +public class CompositeQueueTest extends EmbeddedBrokerTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(CompositeQueueTest.class); + + protected int total = 10; + protected Connection connection; + public String messageSelector1, messageSelector2 = null; + + + public void testVirtualTopicCreation() throws Exception { + if (connection == null) { + connection = createConnection(); + } + connection.start(); + + ConsumerBean messageList1 = new ConsumerBean(); + ConsumerBean messageList2 = new ConsumerBean(); + messageList1.setVerbose(true); + messageList2.setVerbose(true); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination producerDestination = getProducerDestination(); + Destination destination1 = getConsumer1Dsetination(); + Destination destination2 = getConsumer2Dsetination(); + + LOG.info("Sending to: " + producerDestination); + LOG.info("Consuming from: " + destination1 + " and " + destination2); + + MessageConsumer c1 = session.createConsumer(destination1, messageSelector1); + MessageConsumer c2 = session.createConsumer(destination2, messageSelector2); + + c1.setMessageListener(messageList1); + c2.setMessageListener(messageList2); + + // create topic producer + MessageProducer producer = session.createProducer(producerDestination); + assertNotNull(producer); + + for (int i = 0; i < total; i++) { + producer.send(createMessage(session, i)); + } + + assertMessagesArrived(messageList1, messageList2); + } + + protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) { + messageList1.assertMessagesArrived(total); + messageList2.assertMessagesArrived(total); + } + + protected TextMessage createMessage(Session session, int i) throws JMSException { + TextMessage textMessage = session.createTextMessage("message: " + i); + if (i % 2 != 0) { + textMessage.setStringProperty("odd", "yes"); + } else { + textMessage.setStringProperty("odd", "no"); + } + textMessage.setIntProperty("i", i); + return textMessage; + } + + protected Destination getConsumer1Dsetination() { + return new ActiveMQQueue("FOO"); + } + + protected Destination getConsumer2Dsetination() { + return new ActiveMQTopic("BAR"); + } + + protected Destination getProducerDestination() { + return new ActiveMQQueue("MY.QUEUE"); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + super.tearDown(); + } + + protected BrokerService createBroker() throws Exception { + XBeanBrokerFactory factory = new XBeanBrokerFactory(); + BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri())); + return answer; + } + + protected String getBrokerConfigUri() { + return "org/apache/activemq/broker/virtual/composite-queue.xml"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java new file mode 100644 index 0000000..991e3a5 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import javax.jms.Destination; + +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; + +/** + * + * + */ +public class CompositeTopicTest extends CompositeQueueTest { + + protected Destination getConsumer1Dsetination() { + return new ActiveMQQueue("FOO"); + } + + protected Destination getConsumer2Dsetination() { + return new ActiveMQTopic("BAR"); + } + + protected Destination getProducerDestination() { + return new ActiveMQTopic("MY.TOPIC"); + } + + protected String getBrokerConfigUri() { + return "org/apache/activemq/broker/virtual/composite-topic.xml"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java new file mode 100644 index 0000000..b6ba22d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import java.io.IOException; +import java.net.URI; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanServerConnection; +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; + +import org.apache.activemq.EmbeddedBrokerTestSupport; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationFilter; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.activemq.xbean.XBeanBrokerFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Test for AMQ-4571. + * checks that durable subscription is fully unregistered + * when using nested destination interceptors. + */ +public class DestinationInterceptorDurableSubTest extends EmbeddedBrokerTestSupport { + + private static final transient Logger LOG = LoggerFactory.getLogger(DestinationInterceptorDurableSubTest.class); + private MBeanServerConnection mbsc = null; + public static final String JMX_CONTEXT_BASE_NAME = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="; + + /** + * Tests AMQ-4571. + * @throws Exception + */ + public void testVirtualTopicRemoval() throws Exception { + + LOG.debug("Running testVirtualTopicRemoval()"); + String clientId1 = "myId1"; + String clientId2 = "myId2"; + + Connection conn = null; + Session session = null; + + try { + assertTrue(broker.isStarted()); + + // create durable sub 1 + conn = createConnection(); + conn.setClientID(clientId1); + conn.start(); + session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + // Topic topic = session.createTopic(destination.getPhysicalName()); + TopicSubscriber sub1 = session.createDurableSubscriber((Topic) destination, clientId1); + + // create durable sub 2 + TopicSubscriber sub2 = session.createDurableSubscriber((Topic) destination, clientId2); + + // verify two subs registered in JMX + assertSubscriptionCount(destination.getPhysicalName(), 2); + assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1)); + assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2)); + + // delete sub 1 + sub1.close(); + session.unsubscribe(clientId1); + + // verify only one sub registered in JMX + assertSubscriptionCount(destination.getPhysicalName(), 1); + assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1)); + assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2)); + + // delete sub 2 + sub2.close(); + session.unsubscribe(clientId2); + + // verify no sub registered in JMX + assertSubscriptionCount(destination.getPhysicalName(), 0); + assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1)); + assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2)); + } finally { + session.close(); + conn.close(); + } + } + + + /** + * Connects to broker using JMX + * @return The JMX connection + * @throws IOException in case of any errors + */ + protected MBeanServerConnection connectJMXBroker() throws IOException { + // connect to broker via JMX + JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://:1299/jmxrmi"); + JMXConnector jmxc = JMXConnectorFactory.connect(url, null); + MBeanServerConnection mbsc = jmxc.getMBeanServerConnection(); + LOG.debug("JMX connection established"); + return mbsc; + } + + /** + * Asserts that the Subscriptions JMX attribute of a topic has the expected + * count. + * @param topicName name of the topic destination + * @param expectedCount expected number of subscriptions + * @return + */ + protected boolean assertSubscriptionCount(String topicName, int expectedCount) { + try { + if (mbsc == null) { + mbsc = connectJMXBroker(); + } + // query broker queue size + ObjectName[] tmp = (ObjectName[])mbsc.getAttribute(new ObjectName(JMX_CONTEXT_BASE_NAME + topicName), "Subscriptions"); + assertEquals(expectedCount, tmp.length); + } catch (Exception ex) { + LOG.error(ex.getMessage()); + return false; + } + return true; + } + + /** + * Checks if a subscriptions for topic topicName with subName is registered in JMX + * + * @param topicName physical name of topic destination (excluding prefix 'topic://') + * @param subName name of the durable subscription + * @return true if registered, false otherwise + */ + protected boolean isSubRegisteredInJmx(String topicName, String subName) { + + try { + if (mbsc == null) { + mbsc = connectJMXBroker(); + } + + // A durable sub is registered under the Subscriptions JMX attribute of the topic and + // as its own ObjectInstance under the topic's Consumer namespace. + // AMQ-4571 only removed the latter not the former on unsubscribe(), so we need + // to check against both. + ObjectName[] names = (ObjectName[])mbsc.getAttribute(new ObjectName(JMX_CONTEXT_BASE_NAME + topicName), "Subscriptions"); + ObjectInstance instance = (ObjectInstance)mbsc.getObjectInstance( + new ObjectName(JMX_CONTEXT_BASE_NAME + + topicName + + ",endpoint=Consumer,clientId=myId1,consumerId=Durable(myId1_" + + subName + + ")") + ); + + if (instance == null) + return false; + + for (int i=0; i < names.length; i++) { + if (names[i].toString().contains(subName)) + return true; + } + } catch (InstanceNotFoundException ine) { + //this may be expected so log at info level + LOG.info(ine.toString()); + return false; + } + catch (Exception ex) { + LOG.error(ex.toString()); + return false; + } + return false; + } + + + protected void tearDown() throws Exception { + super.tearDown(); + } + + + protected BrokerService createBroker() throws Exception { + XBeanBrokerFactory factory = new XBeanBrokerFactory(); + BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri())); + + // lets disable persistence as we are a test + answer.setPersistent(false); + useTopic = true; + return answer; + } + + + protected String getBrokerConfigUri() { + return "org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml"; + } + + + /** + * Simple but custom topic interceptor. + * To be used for testing nested interceptors in conjunction with + * virtual topic interceptor. + */ + public static class SimpleDestinationInterceptor implements DestinationInterceptor { + + private final Logger LOG = LoggerFactory.getLogger(SimpleDestinationInterceptor.class); + private BrokerService broker; + + public SimpleDestinationInterceptor() { + } + + /* (non-Javadoc) + * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) + */ + public void setBrokerService(BrokerService brokerService) { + LOG.info("setBrokerService()"); + this.broker = brokerService; + } + + /* (non-Javadoc) + * @see org.apache.activemq.broker.region.DestinationInterceptor#intercept(org.apache.activemq.broker.region.Destination) + */ + public Destination intercept(final Destination destination) { + LOG.info("intercept({})", destination.getName()); + + if (!destination.getActiveMQDestination().getPhysicalName().startsWith("ActiveMQ")) { + return new DestinationFilter(destination) { + public void send(ProducerBrokerExchange context, Message message) throws Exception { + // Send message to Destination + if (LOG.isDebugEnabled()) { + LOG.debug("SimpleDestinationInterceptor: Sending message to destination:" + + this.getActiveMQDestination().getPhysicalName()); + } + // message.setDestination(destination.getActiveMQDestination()); + super.send(context, message); + } + }; + } + return destination; + } + + + /* (non-Javadoc) + * @see org.apache.activemq.broker.region.DestinationInterceptor#remove(org.apache.activemq.broker.region.Destination) + */ + public void remove(Destination destination) { + LOG.info("remove({})", destination.getName()); + this.broker = null; + } + + + /* (non-Javadoc) + * @see org.apache.activemq.broker.region.DestinationInterceptor#create(org.apache.activemq.broker.Broker, org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.ActiveMQDestination) + */ + public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { + LOG.info("create("+ broker.getBrokerName() + ", " + context.toString() + ", " + destination.getPhysicalName()); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java new file mode 100644 index 0000000..17a8706 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import org.apache.activemq.spring.ConsumerBean; + +/** + * + */ +public class FilteredQueueTest extends CompositeQueueTest { + + @Override + protected String getBrokerConfigUri() { + return "org/apache/activemq/broker/virtual/filtered-queue.xml"; + } + + @Override + protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) { + messageList1.assertMessagesArrived(total / 2); + messageList2.assertMessagesArrived(1); + } +}
