http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPTestSupport.java new file mode 100644 index 0000000..6145f3b --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPTestSupport.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.blob; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.util.IOHelper; +import org.apache.ftpserver.FtpServer; +import org.apache.ftpserver.FtpServerFactory; +import org.apache.ftpserver.ftplet.Authority; +import org.apache.ftpserver.ftplet.UserManager; +import org.apache.ftpserver.listener.ListenerFactory; +import org.apache.ftpserver.usermanager.PropertiesUserManagerFactory; +import org.apache.ftpserver.usermanager.impl.BaseUser; +import org.apache.ftpserver.usermanager.impl.WritePermission; +import org.jmock.Mockery; + +public abstract class FTPTestSupport extends EmbeddedBrokerTestSupport { + + protected static final String ftpServerListenerName = "default"; + protected Connection connection; + protected FtpServer server; + String userNamePass = "activemq"; + + Mockery context = null; + String ftpUrl; + int ftpPort; + + final File ftpHomeDirFile = new File("target/FTPBlobTest/ftptest"); + + protected void setUp() throws Exception { + + if (ftpHomeDirFile.getParentFile().exists()) { + IOHelper.deleteFile(ftpHomeDirFile.getParentFile()); + } + ftpHomeDirFile.mkdirs(); + ftpHomeDirFile.getParentFile().deleteOnExit(); + + FtpServerFactory serverFactory = new FtpServerFactory(); + ListenerFactory factory = new ListenerFactory(); + + PropertiesUserManagerFactory userManagerFactory = new PropertiesUserManagerFactory(); + UserManager userManager = userManagerFactory.createUserManager(); + + BaseUser user = new BaseUser(); + user.setName("activemq"); + user.setPassword("activemq"); + user.setHomeDirectory(ftpHomeDirFile.getParent()); + + // authorize user + List<Authority> auths = new ArrayList<Authority>(); + Authority auth = new WritePermission(); + auths.add(auth); + user.setAuthorities(auths); + + userManager.save(user); + + BaseUser guest = new BaseUser(); + guest.setName("guest"); + guest.setPassword("guest"); + guest.setHomeDirectory(ftpHomeDirFile.getParent()); + + userManager.save(guest); + + serverFactory.setUserManager(userManager); + factory.setPort(0); + serverFactory.addListener(ftpServerListenerName, factory + .createListener()); + server = serverFactory.createServer(); + server.start(); + ftpPort = serverFactory.getListener(ftpServerListenerName) + .getPort(); + super.setUp(); + } + + public void setConnection() throws Exception { + ftpUrl = "ftp://" + + userNamePass + + ":" + + userNamePass + + "@localhost:" + + ftpPort + + "/ftptest/"; + bindAddress = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=" + ftpUrl; + + connectionFactory = createConnectionFactory(); + + connection = createConnection(); + connection.start(); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.stop(); + } + super.tearDown(); + if (server != null) { + server.stop(); + } + IOHelper.deleteFile(ftpHomeDirFile.getParentFile()); + } + + + +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java new file mode 100644 index 0000000..1754689 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.blob; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.InputStream; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.BlobMessage; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.command.ActiveMQBlobMessage; +import org.apache.activemq.util.IOHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class FilesystemBlobTest extends EmbeddedBrokerTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(FilesystemBlobTest.class); + + private Connection connection; + private final String tmpDir = System.getProperty("user.dir") + "/target/FilesystemBlobTest"; + @Override + public void setUp() throws Exception { + super.setUp(); + // replace \ with / to let it work on windows too + String fileUrl = "file:///" +tmpDir.replaceAll("\\\\", "/"); + LOG.info("Using file: " + fileUrl); + bindAddress = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=" + fileUrl; + + connectionFactory = createConnectionFactory(); + + connection = createConnection(); + connection.start(); + } + + public void testBlobFile() throws Exception { + // first create Message + File file = File.createTempFile("amq-data-file-", ".dat"); + // lets write some data + String content = "hello world " + System.currentTimeMillis(); + BufferedWriter writer = new BufferedWriter(new FileWriter(file)); + writer.append(content); + writer.close(); + + ActiveMQSession session = (ActiveMQSession) connection.createSession( + false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destination); + BlobMessage message = session.createBlobMessage(file); + + producer.send(message); + Thread.sleep(1000); + + // check message send + Message msg = consumer.receive(1000); + assertTrue(msg instanceof ActiveMQBlobMessage); + + InputStream input = ((ActiveMQBlobMessage) msg).getInputStream(); + StringBuilder b = new StringBuilder(); + int i = input.read(); + while (i != -1) { + b.append((char) i); + i = input.read(); + } + input.close(); + File uploaded = new File(tmpDir, msg.getJMSMessageID().toString().replace(":", "_")); + assertEquals(content, b.toString()); + assertTrue(uploaded.exists()); + ((ActiveMQBlobMessage)msg).deleteFile(); + assertFalse(uploaded.exists()); + } + + @Override + protected void tearDown() throws Exception { + if (connection != null) { + connection.stop(); + } + super.tearDown(); + + IOHelper.deleteFile(new File(tmpDir)); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java new file mode 100644 index 0000000..e810f92 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import junit.framework.Test; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.*; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implements the test case attached to: + * https://issues.apache.org/jira/browse/AMQ-4351 + * + * This version avoids the spring deps. + */ +public class AMQ4351Test extends BrokerTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ4351Test.class); + + public static Test suite() { + return suite(AMQ4351Test.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + + // Lets clean up often. + broker.setOfflineDurableSubscriberTaskSchedule(500); + broker.setOfflineDurableSubscriberTimeout(2000); // lets delete durable subs much faster. + + JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); + EmbeddedDataSource dataSource = new EmbeddedDataSource(); + dataSource.setDatabaseName("derbyDb"); + dataSource.setCreateDatabase("create"); + jdbc.setDataSource(dataSource); + + jdbc.deleteAllMessages(); + broker.setPersistenceAdapter(jdbc); + return broker; + } + + ActiveMQConnectionFactory connectionFactory; + ActiveMQTopic destination = new ActiveMQTopic("TEST"); + + @Override + protected void setUp() throws Exception { + super.setUp(); + connectionFactory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + } + + class ProducingClient implements Runnable { + final AtomicLong size = new AtomicLong(); + final AtomicBoolean done = new AtomicBoolean(); + CountDownLatch doneLatch = new CountDownLatch(1); + + Connection connection; + Session session; + MessageProducer producer; + + ProducingClient() throws JMSException { + connection = connectionFactory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(destination); + } + + private void sendMessage() { + try { + producer.send(session.createTextMessage("Test")); + long i = size.incrementAndGet(); + if( (i % 1000) == 0 ) { + LOG.info("produced " + i + "."); + } + } catch (JMSException e) { + e.printStackTrace(); + } + } + + public void start() { + new Thread(this, "ProducingClient").start(); + } + + public void stop() throws InterruptedException { + done.set(true); + if( !doneLatch.await(20, TimeUnit.MILLISECONDS) ) { + try { + connection.close(); + doneLatch.await(); + } catch (JMSException e) { + } + } + } + + @Override + public void run() { + try { + try { + while (!done.get()) { + sendMessage(); + Thread.sleep(10); + } + } finally { + connection.close(); + } + } catch (Exception e) { + e.printStackTrace(); + done.set(true); + } finally { + doneLatch.countDown(); + } + } + } + + class ConsumingClient implements Runnable { + final String name; + final AtomicLong size = new AtomicLong(); + final AtomicBoolean done = new AtomicBoolean(); + CountDownLatch doneLatch = new CountDownLatch(1); + CountDownLatch started; + CountDownLatch finished; + + + public ConsumingClient(String name, CountDownLatch started, CountDownLatch finished) { + this.name = name; + this.started = started; + this.finished = finished; + } + + public void start() { + LOG.info("Starting JMS listener " + name); + new Thread(this, "ConsumingClient: "+name).start(); + } + + public void stopAsync() { + finished.countDown(); + done.set(true); + } + + public void stop() throws InterruptedException { + stopAsync(); + doneLatch.await(); + } + + @Override + public void run() { + try { + Connection connection = connectionFactory.createConnection(); + connection.setClientID(name); + connection.start(); + try { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createDurableSubscriber(destination, name, null, false); + started.countDown(); + while( !done.get() ) { + Message msg = consumer.receive(100); + if(msg!=null ) { + size.incrementAndGet(); + session.commit(); + } + } + } finally { + connection.close(); + LOG.info("Stopped JMS listener " + name); + } + } catch (Exception e) { + e.printStackTrace(); + done.set(true); + } finally { + doneLatch.countDown(); + } + } + + } + + public void testAMQ4351() throws InterruptedException, JMSException { + LOG.info("Start test."); + int subs = 100; + CountDownLatch startedLatch = new CountDownLatch(subs - 1); + CountDownLatch shutdownLatch = new CountDownLatch(subs - 4); + + + ProducingClient producer = new ProducingClient(); + ConsumingClient listener1 = new ConsumingClient("subscriber-1", startedLatch, shutdownLatch); + ConsumingClient listener2 = new ConsumingClient("subscriber-2", startedLatch, shutdownLatch); + ConsumingClient listener3 = new ConsumingClient("subscriber-3", startedLatch, shutdownLatch); + try { + + listener1.start(); + listener2.start(); + listener3.start(); + + List<ConsumingClient> subscribers = new ArrayList<ConsumingClient>(subs); + for (int i = 4; i < subs; i++) { + ConsumingClient client = new ConsumingClient("subscriber-" + i, startedLatch, shutdownLatch); + subscribers.add(client); + client.start(); + } + startedLatch.await(10, TimeUnit.SECONDS); + + LOG.info("All subscribers started."); + producer.sendMessage(); + + LOG.info("Stopping 97 subscribers...."); + for (ConsumingClient client : subscribers) { + client.stopAsync(); + } + shutdownLatch.await(10, TimeUnit.SECONDS); + + // Start producing messages for 10 minutes, at high rate + LOG.info("Starting mass message producer..."); + producer.start(); + + long lastSize = listener1.size.get(); + for( int i=0 ; i < 10; i++ ) { + Thread.sleep(1000); + long size = listener1.size.get(); + LOG.info("Listener 1: consumed: "+(size - lastSize)); + assertTrue( size > lastSize ); + lastSize = size; + } + } finally { + LOG.info("Stopping clients"); + listener1.stop(); + listener2.stop(); + listener3.stop(); + producer.stop(); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerBenchmark.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerBenchmark.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerBenchmark.java new file mode 100644 index 0000000..3e154f9 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerBenchmark.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +import junit.framework.Test; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * BrokerBenchmark is used to get an idea of the raw performance of a broker. + * Since the broker data structures using in message dispatching are under high + * contention from client requests, it's performance should be monitored closely + * since it typically is the biggest bottleneck in a high performance messaging + * fabric. The benchmarks are run under all the following combinations options: + * Queue vs. Topic, 1 vs. 10 producer threads, 1 vs. 10 consumer threads, and + * Persistent vs. Non-Persistent messages. Message Acking uses client ack style + * batch acking since that typically has the best ack performance. + * + * + */ +public class BrokerBenchmark extends BrokerTestSupport { + private static final transient Logger LOG = LoggerFactory.getLogger(BrokerBenchmark.class); + + public int produceCount = Integer.parseInt(System.getProperty("PRODUCE_COUNT", "10000")); + public ActiveMQDestination destination; + public int prodcuerCount; + public int consumerCount; + public boolean deliveryMode; + + public void initCombosForTestPerformance() { + addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST")}); + addCombinationValues("PRODUCER_COUNT", new Object[] {new Integer("1"), new Integer("10")}); + addCombinationValues("CONSUMER_COUNT", new Object[] {new Integer("1"), new Integer("10")}); + addCombinationValues("CONSUMER_COUNT", new Object[] {new Integer("1"), new Integer("10")}); + addCombinationValues("deliveryMode", new Object[] {Boolean.TRUE}); + } + + public void testPerformance() throws Exception { + + LOG.info("Running Benchmark for destination=" + destination + ", producers=" + prodcuerCount + ", consumers=" + consumerCount + ", deliveryMode=" + deliveryMode); + final int consumeCount = destination.isTopic() ? consumerCount * produceCount : produceCount; + + final Semaphore consumersStarted = new Semaphore(1 - consumerCount); + final Semaphore producersFinished = new Semaphore(1 - prodcuerCount); + final Semaphore consumersFinished = new Semaphore(1 - consumerCount); + final ProgressPrinter printer = new ProgressPrinter(produceCount + consumeCount, 10); + + // Start a producer and consumer + + profilerPause("Benchmark ready. Start profiler "); + + long start = System.currentTimeMillis(); + + final AtomicInteger receiveCounter = new AtomicInteger(0); + for (int i = 0; i < consumerCount; i++) { + new Thread() { + public void run() { + try { + + // Consume the messages + StubConnection connection = new StubConnection(broker); + ConnectionInfo connectionInfo = createConnectionInfo(); + connection.send(connectionInfo); + + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setPrefetchSize(1000); + connection.send(sessionInfo); + connection.send(consumerInfo); + + consumersStarted.release(); + + while (receiveCounter.get() < consumeCount) { + + int counter = 0; + // Get a least 1 message. + Message msg = receiveMessage(connection, 2000); + if (msg != null) { + printer.increment(); + receiveCounter.incrementAndGet(); + + counter++; + + // Try to piggy back a few extra message acks if + // they are ready. + Message extra = null; + while ((extra = receiveMessage(connection, 0)) != null) { + msg = extra; + printer.increment(); + receiveCounter.incrementAndGet(); + counter++; + } + } + + if (msg != null) { + connection.send(createAck(consumerInfo, msg, counter, MessageAck.STANDARD_ACK_TYPE)); + } else if (receiveCounter.get() < consumeCount) { + LOG.info("Consumer stall, waiting for message #" + receiveCounter.get() + 1); + } + } + + connection.send(closeConsumerInfo(consumerInfo)); + } catch (Throwable e) { + e.printStackTrace(); + } finally { + consumersFinished.release(); + } + } + + }.start(); + } + + // Make sure that the consumers are started first to avoid sending + // messages + // before a topic is subscribed so that those messages are not missed. + consumersStarted.acquire(); + + // Send the messages in an async thread. + for (int i = 0; i < prodcuerCount; i++) { + new Thread() { + public void run() { + try { + StubConnection connection = new StubConnection(broker); + ConnectionInfo connectionInfo = createConnectionInfo(); + connection.send(connectionInfo); + + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + for (int i = 0; i < produceCount / prodcuerCount; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(deliveryMode); + message.setResponseRequired(false); + connection.send(message); + printer.increment(); + } + } catch (Throwable e) { + e.printStackTrace(); + } finally { + producersFinished.release(); + } + }; + }.start(); + } + + producersFinished.acquire(); + long end1 = System.currentTimeMillis(); + consumersFinished.acquire(); + long end2 = System.currentTimeMillis(); + + LOG.info("Results for destination=" + destination + ", producers=" + prodcuerCount + ", consumers=" + consumerCount + ", deliveryMode=" + deliveryMode); + LOG.info("Produced at messages/sec: " + (produceCount * 1000.0 / (end1 - start))); + LOG.info("Consumed at messages/sec: " + (consumeCount * 1000.0 / (end2 - start))); + profilerPause("Benchmark done. Stop profiler "); + } + + public static Test suite() { + return suite(BrokerBenchmark.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java new file mode 100644 index 0000000..1fc7a6a --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.concurrent.TimeUnit; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; +import org.apache.activemq.broker.util.RedeliveryPlugin; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport { + + static final Logger LOG = LoggerFactory.getLogger(BrokerRedeliveryTest.class); + BrokerService broker = null; + + final ActiveMQQueue destination = new ActiveMQQueue("Redelivery"); + final String data = "hi"; + final long redeliveryDelayMillis = 2000; + long initialRedeliveryDelayMillis = 4000; + int maxBrokerRedeliveries = 2; + + public void testScheduledRedelivery() throws Exception { + doTestScheduledRedelivery(maxBrokerRedeliveries, true); + } + + public void testInfiniteRedelivery() throws Exception { + initialRedeliveryDelayMillis = redeliveryDelayMillis; + maxBrokerRedeliveries = RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES; + doTestScheduledRedelivery(RedeliveryPolicy.DEFAULT_MAXIMUM_REDELIVERIES + 1, false); + } + + public void doTestScheduledRedelivery(int maxBrokerRedeliveriesToValidate, boolean validateDLQ) throws Exception { + + startBroker(true); + sendMessage(0); + + ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection(); + RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + redeliveryPolicy.setInitialRedeliveryDelay(0); + redeliveryPolicy.setMaximumRedeliveries(0); + consumerConnection.setRedeliveryPolicy(redeliveryPolicy); + consumerConnection.start(); + Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = consumerSession.createConsumer(destination); + Message message = consumer.receive(1000); + assertNotNull("got message", message); + LOG.info("got: " + message); + consumerSession.rollback(); + + for (int i=0;i<maxBrokerRedeliveriesToValidate;i++) { + Message shouldBeNull = consumer.receive(500); + assertNull("did not get message after redelivery count exceeded: " + shouldBeNull, shouldBeNull); + + TimeUnit.SECONDS.sleep(3); + + Message brokerRedeliveryMessage = consumer.receive(500); + LOG.info("got: " + brokerRedeliveryMessage); + assertNotNull("got message via broker redelivery after delay", brokerRedeliveryMessage); + assertEquals("message matches", message.getStringProperty("data"), brokerRedeliveryMessage.getStringProperty("data")); + assertEquals("has expiryDelay specified", i == 0 ? initialRedeliveryDelayMillis : redeliveryDelayMillis, brokerRedeliveryMessage.getLongProperty(RedeliveryPlugin.REDELIVERY_DELAY)); + + consumerSession.rollback(); + } + + if (validateDLQ) { + MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME)); + Message dlqMessage = dlqConsumer.receive(2000); + assertNotNull("Got message from dql", dlqMessage); + assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data")); + consumerSession.commit(); + } else { + // consume/commit ok + message = consumer.receive(3000); + assertNotNull("got message", message); + assertEquals("redeliveries accounted for", maxBrokerRedeliveriesToValidate + 2, message.getLongProperty("JMSXDeliveryCount")); + consumerSession.commit(); + } + + consumerConnection.close(); + } + + public void testNoScheduledRedeliveryOfExpired() throws Exception { + startBroker(true); + ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection(); + consumerConnection.start(); + Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(destination); + sendMessage(1500); + Message message = consumer.receive(1000); + assertNotNull("got message", message); + + // ensure there is another consumer to redispatch to + MessageConsumer redeliverConsumer = consumerSession.createConsumer(destination); + + // allow consumed to expire so it gets redelivered + TimeUnit.SECONDS.sleep(2); + consumer.close(); + + // should go to dlq as it has expired + // validate DLQ + MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME)); + Message dlqMessage = dlqConsumer.receive(2000); + assertNotNull("Got message from dql", dlqMessage); + assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data")); + } + + private void sendMessage(int timeToLive) throws Exception { + ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection(); + producerConnection.start(); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(destination); + if (timeToLive > 0) { + producer.setTimeToLive(timeToLive); + } + Message message = producerSession.createMessage(); + message.setStringProperty("data", data); + producer.send(message); + producerConnection.close(); + } + + private void startBroker(boolean deleteMessages) throws Exception { + broker = new BrokerService(); + broker.setSchedulerSupport(true); + + + RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin(); + + RedeliveryPolicy brokerRedeliveryPolicy = new RedeliveryPolicy(); + brokerRedeliveryPolicy.setRedeliveryDelay(redeliveryDelayMillis); + brokerRedeliveryPolicy.setInitialRedeliveryDelay(initialRedeliveryDelayMillis); + brokerRedeliveryPolicy.setMaximumRedeliveries(maxBrokerRedeliveries); + + RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap(); + redeliveryPolicyMap.setDefaultEntry(brokerRedeliveryPolicy); + redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap); + + broker.setPlugins(new BrokerPlugin[]{redeliveryPlugin}); + + if (deleteMessages) { + broker.setDeleteAllMessagesOnStartup(true); + } + broker.start(); + } + + + private void stopBroker() throws Exception { + if (broker != null) + broker.stop(); + broker = null; + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("vm://localhost"); + } + + @Override + protected void tearDown() throws Exception { + stopBroker(); + super.tearDown(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java new file mode 100644 index 0000000..c4e3848 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; + +import org.apache.activemq.util.IOHelper; + +public class BrokerRestartTestSupport extends BrokerTestSupport { + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + File dir = broker.getBrokerDataDirectory(); + if (dir != null) { + IOHelper.deleteChildren(dir); + } + broker.setDeleteAllMessagesOnStartup(true); + configureBroker(broker); + return broker; + } + + /** + * @return + * @throws Exception + */ + protected BrokerService createRestartedBroker() throws Exception { + BrokerService broker = new BrokerService(); + configureBroker(broker); + return broker; + } + + protected void configureBroker(BrokerService broker) throws Exception { + broker.setDestinationPolicy(policyMap); + } + + /** + * Simulates a broker restart. The memory based persistence adapter is + * reused so that it does not "loose" it's "persistent" messages. + * + * @throws IOException + * @throws URISyntaxException + */ + protected void restartBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = createRestartedBroker(); + broker.start(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java new file mode 100644 index 0000000..9d55d04 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import junit.framework.TestCase; +import org.apache.activemq.network.NetworkConnector; + +/** + * Tests for the BrokerService class + * + * @author chirino + */ +public class BrokerServiceTest extends TestCase { + + public void testAddRemoveTransportsWithJMX() throws Exception { + BrokerService service = new BrokerService(); + service.setUseJmx(true); + service.setPersistent(false); + TransportConnector connector = service.addConnector("tcp://localhost:0"); + service.start(); + + service.removeConnector(connector); + connector.stop(); + service.stop(); + } + + public void testAddRemoveTransportsWithoutJMX() throws Exception { + BrokerService service = new BrokerService(); + service.setPersistent(false); + service.setUseJmx(false); + TransportConnector connector = service.addConnector("tcp://localhost:0"); + service.start(); + + service.removeConnector(connector); + connector.stop(); + service.stop(); + } + + public void testAddRemoveNetworkWithJMX() throws Exception { + BrokerService service = new BrokerService(); + service.setPersistent(false); + service.setUseJmx(true); + NetworkConnector connector = service.addNetworkConnector("multicast://default?group=group-"+System.currentTimeMillis()); + service.start(); + + service.removeNetworkConnector(connector); + connector.stop(); + service.stop(); + } + + public void testAddRemoveNetworkWithoutJMX() throws Exception { + BrokerService service = new BrokerService(); + service.setPersistent(false); + service.setUseJmx(false); + NetworkConnector connector = service.addNetworkConnector("multicast://default?group=group-"+System.currentTimeMillis()); + service.start(); + + service.removeNetworkConnector(connector); + connector.stop(); + service.stop(); + } + + public void testSystemUsage() + { + BrokerService service = new BrokerService(); + assertEquals( 1024 * 1024 * 1024, service.getSystemUsage().getMemoryUsage().getLimit() ); + assertEquals( 1024L * 1024 * 1024 * 50, service.getSystemUsage().getTempUsage().getLimit() ); + assertEquals( 1024L * 1024 * 1024 * 100, service.getSystemUsage().getStoreUsage().getLimit() ); + } +}
