Author: tabish
Date: Mon Mar 19 20:23:51 2012
New Revision: 1302641
URL: http://svn.apache.org/viewvc?rev=1302641&view=rev
Log:
additional test for: https://issues.apache.org/jira/browse/AMQ-3775
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessMultiRestartTest.java
(with props)
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessMultiRestartTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessMultiRestartTest.java?rev=1302641&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessMultiRestartTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessMultiRestartTest.java
Mon Mar 19 20:23:51 2012
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR ONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.usecases;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DurableSubProcessMultiRestartTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(DurableSubProcessMultiRestartTest.class);
+
+ public static final long RUNTIME = 1 * 60 * 1000;
+
+ private BrokerService broker;
+ private ActiveMQTopic topic;
+
+ private final ReentrantReadWriteLock processLock = new
ReentrantReadWriteLock(true);
+
+ private int restartCount = 0;
+ private final int SUBSCRIPTION_ID = 1;
+
+ static final Vector<Throwable> exceptions = new Vector<Throwable>();
+
+ /**
+ * The test creates a durable subscriber and producer with a broker that is
+ * continually restarted.
+ *
+ * Producer creates a message every .5 seconds -creates a new connection
for
+ * each message
+ *
+ * durable subscriber - comes online for 10 seconds, - then goes offline
for
+ * a "moment" - repeats the cycle
+ *
+ * approx every 10 seconds the broker restarts. Subscriber and Producer
+ * connections will be closed BEFORE the restart.
+ *
+ * The Durable subscriber is "unsubscribed" before the the end of the test.
+ *
+ * checks for number of kahaDB files left on filesystem.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testProcess() throws Exception {
+
+ DurableSubscriber durableSubscriber = new
DurableSubscriber(SUBSCRIPTION_ID);
+ MsgProducer msgProducer = new MsgProducer();
+
+ try {
+ // register the subscription & start messages
+ durableSubscriber.start();
+ msgProducer.start();
+
+ long endTime = System.currentTimeMillis() + RUNTIME;
+
+ while (endTime > System.currentTimeMillis()) {
+ Thread.sleep(10000);
+ restartBroker();
+ }
+ } catch (Throwable e) {
+ exit("ProcessTest.testProcess failed.", e);
+ }
+
+ // wait for threads to finish
+ try {
+ msgProducer.join();
+ durableSubscriber.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace(System.out);
+ }
+
+ // restart broker one last time
+ restartBroker();
+
+ assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+
+ final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter)
broker.getPersistenceAdapter();
+ assertTrue("only one journal file should be left: " +
pa.getStore().getJournal().getFileMap().size(),
+ Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return pa.getStore().getJournal().getFileMap().size() == 1;
+ }
+ }, TimeUnit.MINUTES.toMillis(3))
+ );
+
+ LOG.info("DONE.");
+ }
+
+ private void restartBroker() throws Exception {
+ LOG.info("Broker restart: waiting for components.");
+
+ processLock.writeLock().lock();
+ try {
+ destroyBroker();
+ startBroker(false);
+
+ restartCount++;
+ LOG.info("Broker restarted. count: " + restartCount);
+ } finally {
+ processLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Producers messages
+ *
+ */
+ final class MsgProducer extends Thread {
+
+ String url = "vm://" + DurableSubProcessMultiRestartTest.getName();
+
+ final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
+
+ private long msgCount;
+ int messageRover = 0;
+
+ public MsgProducer() {
+ super("MsgProducer");
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+
+ long endTime = RUNTIME + System.currentTimeMillis();
+
+ try {
+ while (endTime > System.currentTimeMillis()) {
+
+ Thread.sleep(500);
+
+ processLock.readLock().lock();
+ try {
+ send();
+ } finally {
+ processLock.readLock().unlock();
+ }
+ LOG.info("MsgProducer msgCount=" + msgCount);
+ }
+ } catch (Throwable e) {
+ exit("Server.run failed", e);
+ }
+ }
+
+ public void send() throws JMSException {
+
+ LOG.info("Sending ... ");
+
+ Connection con = cf.createConnection();
+
+ Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(null);
+ Message message = sess.createMessage();
+ message.setIntProperty("ID", ++messageRover);
+ message.setBooleanProperty("COMMIT", true);
+ prod.send(topic, message);
+
+ msgCount++;
+ LOG.info("Message Sent.");
+
+ sess.close();
+ con.close();
+ }
+ }
+
+ /**
+ * Consumes massages from a durable subscription. Goes online/offline
+ * periodically.
+ */
+ private final class DurableSubscriber extends Thread {
+
+ String url = "tcp://localhost:61656";
+
+ final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
+
+ public static final String SUBSCRIPTION_NAME = "subscription";
+
+ private final int id;
+ private final String conClientId;
+ private long msgCount;
+
+ public DurableSubscriber(int id) throws JMSException {
+ super("DurableSubscriber" + id);
+ setDaemon(true);
+
+ this.id = id;
+ conClientId = "cli" + id;
+
+ subscribe();
+ }
+
+ @Override
+ public void run() {
+
+ long end = System.currentTimeMillis() + RUNTIME;
+
+ try {
+
+ // while (true) {
+ while (end > System.currentTimeMillis()) {
+
+ processLock.readLock().lock();
+ try {
+ process(5000);
+ } finally {
+ processLock.readLock().unlock();
+ }
+ }
+
+ unsubscribe();
+
+ } catch (Throwable e) {
+ exit(toString() + " failed.", e);
+ }
+
+ LOG.info(toString() + " DONE. MsgCout=" + msgCount);
+ }
+
+ private void process(long duration) throws JMSException {
+ LOG.info(toString() + " ONLINE.");
+
+ Connection con = openConnection();
+ Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer = sess.createDurableSubscriber(topic,
SUBSCRIPTION_NAME);
+
+ long end = System.currentTimeMillis() + duration;
+
+ try {
+ while (end > System.currentTimeMillis()) {
+ Message message = consumer.receive(100);
+ if (message != null) {
+ LOG.info(toString() + "received message...");
+ msgCount++;
+ }
+ }
+ } finally {
+ sess.close();
+ con.close();
+ LOG.info(toString() + " OFFLINE.");
+ }
+ }
+
+ private Connection openConnection() throws JMSException {
+ Connection con = cf.createConnection();
+ con.setClientID(conClientId);
+ con.start();
+ return con;
+ }
+
+ private void subscribe() throws JMSException {
+ Connection con = openConnection();
+ Session session = con.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ session.createDurableSubscriber(topic, SUBSCRIPTION_NAME);
+ LOG.info(toString() + " SUBSCRIBED");
+
+ session.close();
+ con.close();
+ }
+
+ private void unsubscribe() throws JMSException {
+ Connection con = openConnection();
+ Session session = con.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ session.unsubscribe(SUBSCRIPTION_NAME);
+ LOG.info(toString() + " UNSUBSCRIBED");
+
+ session.close();
+ con.close();
+ }
+
+ @Override
+ public String toString() {
+ return "DurableSubscriber[id=" + id + "]";
+ }
+ }
+
+ // -------- helper methods -----------
+
+ public static void exit(String message) {
+ exit(message, null);
+ }
+
+ public static void exit(String message, Throwable e) {
+ Throwable cause = new RuntimeException(message, e);
+ LOG.error(message, cause);
+ exceptions.add(cause);
+ fail(cause.toString());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ topic = new ActiveMQTopic("TopicT");
+ startBroker();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ destroyBroker();
+ }
+
+ private void startBroker() throws Exception {
+ startBroker(true);
+ }
+
+ private void startBroker(boolean deleteAllMessages) throws Exception {
+ if (broker != null)
+ return;
+
+ broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
+ broker.setBrokerName(getName());
+ broker.setAdvisorySupport(false);
+ broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+
+ broker.setKeepDurableSubsActive(true);
+
+ File kahadbData = new File("activemq-data/" + getName() + "-kahadb");
+ if (deleteAllMessages)
+ delete(kahadbData);
+
+ broker.setPersistent(true);
+ KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+ kahadb.setDirectory(kahadbData);
+ kahadb.setJournalMaxFileLength(20 * 1024);
+ broker.setPersistenceAdapter(kahadb);
+
+ broker.addConnector("tcp://localhost:61656");
+
+ broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024);
+ broker.getSystemUsage().getTempUsage().setLimit(256 * 1024 * 1024);
+ broker.getSystemUsage().getStoreUsage().setLimit(256 * 1024 * 1024);
+
+ broker.start();
+ }
+
+ protected static String getName() {
+ return "DurableSubProcessMultiRestartTest";
+ }
+
+ private static boolean delete(File path) {
+ if (path == null)
+ return true;
+
+ if (path.isDirectory()) {
+ for (File file : path.listFiles()) {
+ delete(file);
+ }
+ }
+ return path.delete();
+ }
+
+ private void destroyBroker() throws Exception {
+ if (broker == null)
+ return;
+
+ broker.stop();
+ broker = null;
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessMultiRestartTest.java
------------------------------------------------------------------------------
svn:eol-style = native