Updated Branches: refs/heads/trunk a64976a37 -> 57f5d49ae
http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java index 1473025..15c0627 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -16,50 +16,32 @@ */ package org.apache.activemq.usecases; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.disk.page.PageFile; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; -import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; -import javax.management.ObjectName; - -import junit.framework.Test; +import java.util.HashSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; -import org.apache.activemq.broker.jmx.TopicViewMBean; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.store.kahadb.disk.journal.Journal; -import org.apache.activemq.store.kahadb.disk.page.PageFile; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.junit.Assert.*; -public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport { +public class DurableSubscriptionOfflineTest extends DurableSubscriptionOfflineTestBase { private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTest.class); - public boolean usePrioritySupport = Boolean.TRUE; - public int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; - public boolean keepDurableSubsActive = true; - private BrokerService broker; - private ActiveMQTopic topic; - private final List<Throwable> exceptions = new ArrayList<Throwable>(); @Override protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { @@ -68,84 +50,8 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp return connectionFactory; } - @Override - protected Connection createConnection() throws Exception { - return createConnection("cliName"); - } - - protected Connection createConnection(String name) throws Exception { - Connection con = super.createConnection(); - con.setClientID(name); - con.start(); - return con; - } - - public static Test suite() { - return suite(DurableSubscriptionOfflineTest.class); - } - - @Override - protected void setUp() throws Exception { - setAutoFail(true); - setMaxTestTime(2 * 60 * 1000); - exceptions.clear(); - topic = (ActiveMQTopic) createDestination(); - createBroker(); - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - destroyBroker(); - } - - private void createBroker() throws Exception { - createBroker(true); - } - - private void createBroker(boolean deleteAllMessages) throws Exception { - broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")"); - broker.setBrokerName(getName(true)); - broker.setDeleteAllMessagesOnStartup(deleteAllMessages); - broker.getManagementContext().setCreateConnector(false); - broker.setAdvisorySupport(false); - broker.setKeepDurableSubsActive(keepDurableSubsActive); - broker.addConnector("tcp://0.0.0.0:0"); - - if (usePrioritySupport) { - PolicyEntry policy = new PolicyEntry(); - policy.setPrioritizedMessages(true); - PolicyMap policyMap = new PolicyMap(); - policyMap.setDefaultEntry(policy); - broker.setDestinationPolicy(policyMap); - } - - setDefaultPersistenceAdapter(broker); - if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) { - // ensure it kicks in during tests - ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000); - } else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { - // have lots of journal files - ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength); - } - broker.start(); - broker.waitUntilStarted(); - } - - private void destroyBroker() throws Exception { - if (broker != null) - broker.stop(); - } - - public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception { - this.addCombinationValues("defaultPersistenceAdapter", - new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC}); - this.addCombinationValues("usePrioritySupport", - new Object[]{ Boolean.TRUE, Boolean.FALSE}); - } - - public void testConsumeOnlyMatchedMessages() throws Exception { + @Test(timeout = 60 * 1000) + public void testConsumeAllMatchedMessages() throws Exception { // create durable subscription Connection con = createConnection(); Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -160,15 +66,14 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp int sent = 0; for (int i = 0; i < 10; i++) { - boolean filter = i % 2 == 1; - if (filter) - sent++; - + sent++; Message message = session.createMessage(); - message.setStringProperty("filter", filter ? "true" : "false"); + message.setStringProperty("filter", "true"); producer.send(topic, message); } + Thread.sleep(1 * 1000); + session.close(); con.close(); @@ -176,7 +81,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp con = createConnection(); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - Listener listener = new Listener(); + DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener(); consumer.setMessageListener(listener); Thread.sleep(3 * 1000); @@ -187,110 +92,8 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals(sent, listener.count); } - public void testConsumeAllMatchedMessages() throws Exception { - // create durable subscription - Connection con = createConnection(); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - session.close(); - con.close(); - - // send messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(null); - - int sent = 0; - for (int i = 0; i < 10; i++) { - sent++; - Message message = session.createMessage(); - message.setStringProperty("filter", "true"); - producer.send(topic, message); - } - - Thread.sleep(1 * 1000); - - session.close(); - con.close(); - - // consume messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - Listener listener = new Listener(); - consumer.setMessageListener(listener); - - Thread.sleep(3 * 1000); - - session.close(); - con.close(); - - assertEquals(sent, listener.count); - } - - public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception { - this.addCombinationValues("defaultPersistenceAdapter", - new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC}); - this.addCombinationValues("usePrioritySupport", - new Object[]{ Boolean.TRUE, Boolean.FALSE}); - } - - public void testVerifyAllConsumedAreAcked() throws Exception { - // create durable subscription - Connection con = createConnection(); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - session.close(); - con.close(); - - // send messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(null); - - int sent = 0; - for (int i = 0; i < 10; i++) { - sent++; - Message message = session.createMessage(); - message.setStringProperty("filter", "true"); - producer.send(topic, message); - } - - Thread.sleep(1 * 1000); - - session.close(); - con.close(); - - // consume messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - Listener listener = new Listener(); - consumer.setMessageListener(listener); - - Thread.sleep(3 * 1000); - - session.close(); - con.close(); - - LOG.info("Consumed: " + listener.count); - assertEquals(sent, listener.count); - - // consume messages again, should not get any - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - listener = new Listener(); - consumer.setMessageListener(listener); - - Thread.sleep(3 * 1000); - - session.close(); - con.close(); - - assertEquals(0, listener.count); - } + @Test(timeout = 60 * 1000) public void testTwoOfflineSubscriptionCanConsume() throws Exception { // create durable subscription 1 Connection con = createConnection("cliId1"); @@ -303,7 +106,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp Connection con2 = createConnection("cliId2"); Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - Listener listener2 = new Listener(); + DurableSubscriptionOfflineTestListener listener2 = new DurableSubscriptionOfflineTestListener(); consumer2.setMessageListener(listener2); // send messages @@ -334,273 +137,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp con = createConnection("cliId1"); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - Listener listener = new Listener(); - consumer.setMessageListener(listener); - - Thread.sleep(3 * 1000); - - session.close(); - con.close(); - - assertEquals("offline consumer got all", sent, listener.count); - } - - public void initCombosForTestJMXCountersWithOfflineSubs() throws Exception { - this.addCombinationValues("keepDurableSubsActive", - new Object[]{Boolean.TRUE, Boolean.FALSE}); - } - - public void testJMXCountersWithOfflineSubs() throws Exception { - // create durable subscription 1 - Connection con = createConnection("cliId1"); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId", null, true); - session.close(); - con.close(); - - // restart broker - broker.stop(); - createBroker(false /*deleteAllMessages*/); - - // send messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(null); - - int sent = 0; - for (int i = 0; i < 10; i++) { - sent++; - Message message = session.createMessage(); - producer.send(topic, message); - } - session.close(); - con.close(); - - // consume some messages - con = createConnection("cliId1"); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true); - - for (int i=0; i<sent/2; i++) { - Message m = consumer.receive(4000); - assertNotNull("got message: " + i, m); - LOG.info("Got :" + i + ", " + m); - } - - // check some counters while active - ObjectName activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0]; - LOG.info("active durable sub name: " + activeDurableSubName); - final DurableSubscriptionViewMBean durableSubscriptionView = (DurableSubscriptionViewMBean) - broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true); - - assertTrue("is active", durableSubscriptionView.isActive()); - assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView.getEnqueueCounter()); - assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return 5 == durableSubscriptionView.getMessageCountAwaitingAcknowledge(); - } - })); - assertEquals("correct dequeue", 5, durableSubscriptionView.getDequeueCounter()); - - - ObjectName destinationName = broker.getAdminView().getTopics()[0]; - TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true); - assertEquals("correct enqueue", 10, topicView.getEnqueueCount()); - assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount()); - assertEquals("inflight", 5, topicView.getInFlightCount()); - - session.close(); - con.close(); - - // check some counters when inactive - ObjectName inActiveDurableSubName = broker.getAdminView().getInactiveDurableTopicSubscribers()[0]; - LOG.info("inactive durable sub name: " + inActiveDurableSubName); - DurableSubscriptionViewMBean durableSubscriptionView1 = (DurableSubscriptionViewMBean) - broker.getManagementContext().newProxyInstance(inActiveDurableSubName, DurableSubscriptionViewMBean.class, true); - - assertTrue("is not active", !durableSubscriptionView1.isActive()); - assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView1.getEnqueueCounter()); - assertEquals("correct awaiting ack", 0, durableSubscriptionView1.getMessageCountAwaitingAcknowledge()); - assertEquals("correct dequeue", keepDurableSubsActive ? 5 : 0, durableSubscriptionView1.getDequeueCounter()); - - // destination view - assertEquals("correct enqueue", 10, topicView.getEnqueueCount()); - assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount()); - assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount()); - - // consume the rest - con = createConnection("cliId1"); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = session.createDurableSubscriber(topic, "SubsId", null, true); - - for (int i=0; i<sent/2;i++) { - Message m = consumer.receive(30000); - assertNotNull("got message: " + i, m); - LOG.info("Got :" + i + ", " + m); - } - - activeDurableSubName = broker.getAdminView().getDurableTopicSubscribers()[0]; - LOG.info("durable sub name: " + activeDurableSubName); - final DurableSubscriptionViewMBean durableSubscriptionView2 = (DurableSubscriptionViewMBean) - broker.getManagementContext().newProxyInstance(activeDurableSubName, DurableSubscriptionViewMBean.class, true); - - assertTrue("is active", durableSubscriptionView2.isActive()); - assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, durableSubscriptionView2.getEnqueueCounter()); - assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - long val = durableSubscriptionView2.getDequeueCounter(); - LOG.info("dequeue count:" + val); - return 10 == val; - } - })); - } - - public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception { - this.addCombinationValues("defaultPersistenceAdapter", - new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC}); - this.addCombinationValues("usePrioritySupport", - new Object[]{ Boolean.TRUE, Boolean.FALSE}); - } - - public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception { - Connection con = createConnection("offCli1"); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - session.close(); - con.close(); - - con = createConnection("offCli2"); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - session.close(); - con.close(); - - Connection con2 = createConnection("onlineCli1"); - Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - Listener listener2 = new Listener(); - consumer2.setMessageListener(listener2); - - // send messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(null); - - int sent = 0; - for (int i = 0; i < 10; i++) { - sent++; - Message message = session.createMessage(); - message.setStringProperty("filter", "true"); - producer.send(topic, message); - } - - Thread.sleep(1 * 1000); - session.close(); - con.close(); - - // test online subs - Thread.sleep(3 * 1000); - session2.close(); - con2.close(); - assertEquals(sent, listener2.count); - - // restart broker - broker.stop(); - createBroker(false /*deleteAllMessages*/); - - // test offline - con = createConnection("offCli1"); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - - Connection con3 = createConnection("offCli2"); - Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - - Listener listener = new Listener(); - consumer.setMessageListener(listener); - Listener listener3 = new Listener(); - consumer3.setMessageListener(listener3); - - Thread.sleep(3 * 1000); - - session.close(); - con.close(); - session3.close(); - con3.close(); - - assertEquals(sent, listener.count); - assertEquals(sent, listener3.count); - } - - public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception { - this.addCombinationValues("defaultPersistenceAdapter", - new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC}); - } - - public void testInterleavedOfflineSubscriptionCanConsume() throws Exception { - // create durable subscription 1 - Connection con = createConnection("cliId1"); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - session.close(); - con.close(); - - // send messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(null); - - int sent = 0; - for (int i = 0; i < 10; i++) { - sent++; - Message message = session.createMessage(); - message.setStringProperty("filter", "true"); - producer.send(topic, message); - } - - Thread.sleep(1 * 1000); - - // create durable subscription 2 - Connection con2 = createConnection("cliId2"); - Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - Listener listener2 = new Listener(); - consumer2.setMessageListener(listener2); - - assertEquals(0, listener2.count); - session2.close(); - con2.close(); - - // send some more - for (int i = 0; i < 10; i++) { - sent++; - Message message = session.createMessage(); - message.setStringProperty("filter", "true"); - producer.send(topic, message); - } - - Thread.sleep(1 * 1000); - session.close(); - con.close(); - - con2 = createConnection("cliId2"); - session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - listener2 = new Listener("cliId2"); - consumer2.setMessageListener(listener2); - // test online subs - Thread.sleep(3 * 1000); - - assertEquals(10, listener2.count); - - // consume all messages - con = createConnection("cliId1"); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - Listener listener = new Listener("cliId1"); + DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener(); consumer.setMessageListener(listener); Thread.sleep(3 * 1000); @@ -611,117 +148,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals("offline consumer got all", sent, listener.count); } - public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception { - this.addCombinationValues("defaultPersistenceAdapter", - new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC}); - } - - private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))"; - public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception { - // create offline subs 1 - Connection con = createConnection("offCli1"); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId", filter, true); - session.close(); - con.close(); - - // create offline subs 2 - con = createConnection("offCli2"); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId", filter, true); - session.close(); - con.close(); - - // create online subs - Connection con2 = createConnection("onlineCli1"); - Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", filter, true); - Listener listener2 = new Listener(); - consumer2.setMessageListener(listener2); - - // create non-durable consumer - Connection con4 = createConnection("nondurableCli"); - Session session4 = con4.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer4 = session4.createConsumer(topic, filter, true); - Listener listener4 = new Listener(); - consumer4.setMessageListener(listener4); - - // send messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(null); - - boolean hasRelevant = false; - int filtered = 0; - for (int i = 0; i < 100; i++) { - int postf = (int) (Math.random() * 9) + 1; - String d = "D" + postf; - - if ("D1".equals(d) || "D2".equals(d)) { - hasRelevant = true; - filtered++; - } - - Message message = session.createMessage(); - message.setStringProperty("$a", "A1"); - message.setStringProperty("$d", d); - producer.send(topic, message); - } - - Message message = session.createMessage(); - message.setStringProperty("$a", "A1"); - message.setBooleanProperty("$b", true); - message.setBooleanProperty("$c", hasRelevant); - producer.send(topic, message); - - if (hasRelevant) - filtered++; - - Thread.sleep(1 * 1000); - session.close(); - con.close(); - - Thread.sleep(3 * 1000); - - // test non-durable consumer - session4.close(); - con4.close(); - assertEquals(filtered, listener4.count); // succeeded! - - // test online subs - session2.close(); - con2.close(); - assertEquals(filtered, listener2.count); // succeeded! - - // test offline 1 - con = createConnection("offCli1"); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true); - Listener listener = new FilterCheckListener(); - consumer.setMessageListener(listener); - - Thread.sleep(3 * 1000); - session.close(); - con.close(); - - assertEquals(filtered, listener.count); - - // test offline 2 - Connection con3 = createConnection("offCli2"); - Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", filter, true); - Listener listener3 = new FilterCheckListener(); - consumer3.setMessageListener(listener3); - - Thread.sleep(3 * 1000); - session3.close(); - con3.close(); - - assertEquals(filtered, listener3.count); - assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty()); - } - + @Test(timeout = 60 * 1000) public void testRemovedDurableSubDeletes() throws Exception { + String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))"; // create durable subscription 1 Connection con = createConnection("cliId1"); Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -753,13 +182,14 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp con = createConnection("offCli2"); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true); - Listener listener = new Listener(); + DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener(); consumer.setMessageListener(listener); session.close(); con.close(); assertEquals(0, listener.count); } + @Test(timeout = 60 * 1000) public void testRemovedDurableSubDeletesFromIndex() throws Exception { if (! (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter)) { @@ -810,169 +240,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp } } - public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception { - this.addCombinationValues("defaultPersistenceAdapter", - new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC}); - } - - public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception { - - if (PersistenceAdapterChoice.LevelDB == defaultPersistenceAdapter) { - // https://issues.apache.org/jira/browse/AMQ-4296 - return; - } - - // create offline subs 1 - Connection con = createConnection("offCli1"); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - session.close(); - con.close(); - - // create offline subs 2 - con = createConnection("offCli2"); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - session.close(); - con.close(); - - // send messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(null); - - int filtered = 0; - for (int i = 0; i < 10; i++) { - boolean filter = (int) (Math.random() * 2) >= 1; - if (filter) - filtered++; - - Message message = session.createMessage(); - message.setStringProperty("filter", filter ? "true" : "false"); - producer.send(topic, message); - } - - LOG.info("sent: " + filtered); - Thread.sleep(1 * 1000); - session.close(); - con.close(); - - // restart broker - Thread.sleep(3 * 1000); - broker.stop(); - createBroker(false /*deleteAllMessages*/); - - // send more messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(null); - - for (int i = 0; i < 10; i++) { - boolean filter = (int) (Math.random() * 2) >= 1; - if (filter) - filtered++; - - Message message = session.createMessage(); - message.setStringProperty("filter", filter ? "true" : "false"); - producer.send(topic, message); - } - - LOG.info("after restart, total sent with filter='true': " + filtered); - Thread.sleep(1 * 1000); - session.close(); - con.close(); - - // test offline subs - con = createConnection("offCli1"); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - Listener listener = new Listener("1>"); - consumer.setMessageListener(listener); - - Connection con3 = createConnection("offCli2"); - Session session3 = con3.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - Listener listener3 = new Listener(); - consumer3.setMessageListener(listener3); - - Thread.sleep(3 * 1000); - - session.close(); - con.close(); - session3.close(); - con3.close(); - - assertEquals(filtered, listener.count); - assertEquals(filtered, listener3.count); - } - - public void initCombosForTestOfflineSubscriptionAfterRestart() throws Exception { - this.addCombinationValues("defaultPersistenceAdapter", - new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC}); - } - - public void testOfflineSubscriptionAfterRestart() throws Exception { - // create offline subs 1 - Connection con = createConnection("offCli1"); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, false); - Listener listener = new Listener(); - consumer.setMessageListener(listener); - - // send messages - MessageProducer producer = session.createProducer(null); - - int sent = 0; - for (int i = 0; i < 10; i++) { - sent++; - Message message = session.createMessage(); - message.setStringProperty("filter", "false"); - producer.send(topic, message); - } - - LOG.info("sent: " + sent); - Thread.sleep(5 * 1000); - session.close(); - con.close(); - - assertEquals(sent, listener.count); - - // restart broker - Thread.sleep(3 * 1000); - broker.stop(); - createBroker(false /*deleteAllMessages*/); - - // send more messages - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(null); - - for (int i = 0; i < 10; i++) { - sent++; - Message message = session.createMessage(); - message.setStringProperty("filter", "false"); - producer.send(topic, message); - } - - LOG.info("after restart, sent: " + sent); - Thread.sleep(1 * 1000); - session.close(); - con.close(); - - // test offline subs - con = createConnection("offCli1"); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = session.createDurableSubscriber(topic, "SubsId", null, true); - consumer.setMessageListener(listener); - - Thread.sleep(3 * 1000); - - session.close(); - con.close(); - - assertEquals(sent, listener.count); - } - + @Test(timeout = 60 * 1000) public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception { // create offline subs 1 Connection con = createConnection("offCli1"); @@ -1016,7 +284,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp con = createConnection("offCli2"); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true); - Listener listener = new Listener("SubsId"); + DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener("SubsId"); consumer.setMessageListener(listener); Thread.sleep(3 * 1000); @@ -1027,6 +295,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals("offline consumer got all", sent, listener.count); } + @Test(timeout = 60 * 1000) public void testNoDuplicateOnConcurrentSendTranCommitAndActivate() throws Exception { final int messageCount = 1000; Connection con = null; @@ -1117,10 +386,9 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp } - /* - * Ignoring for now, see https://issues.apache.org/jira/browse/AMQ-4874 - */ - public void XXXtestOrderOnActivateDeactivate() throws Exception { + @Ignore("see https://issues.apache.org/jira/browse/AMQ-4874") + @Test(timeout = 60 * 1000) + public void testOrderOnActivateDeactivate() throws Exception { for (int i=0;i<10;i++) { LOG.info("Iteration: " + i); doTestOrderOnActivateDeactivate(); @@ -1145,13 +413,13 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp } final String url = "failover:(tcp://localhost:" - + (broker.getTransportConnectors().get(1).getConnectUri()).getPort() - + "?wireFormat.maxInactivityDuration=0)?" - + "jms.watchTopicAdvisories=false&" - + "jms.alwaysSyncSend=true&jms.dispatchAsync=true&" - + "jms.sendAcksAsync=true&" - + "initialReconnectDelay=100&maxReconnectDelay=30000&" - + "useExponentialBackOff=true"; + + (broker.getTransportConnectors().get(1).getConnectUri()).getPort() + + "?wireFormat.maxInactivityDuration=0)?" + + "jms.watchTopicAdvisories=false&" + + "jms.alwaysSyncSend=true&jms.dispatchAsync=true&" + + "jms.sendAcksAsync=true&" + + "initialReconnectDelay=100&maxReconnectDelay=30000&" + + "useExponentialBackOff=true"; final ActiveMQConnectionFactory clientFactory = new ActiveMQConnectionFactory(url); class CheckOrderClient implements Runnable { @@ -1235,6 +503,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); } + @Test(timeout = 60 * 1000) public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception { // create offline subs 1 Connection con = createConnection("offCli1"); @@ -1274,7 +543,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp con = createConnection("offCli1"); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); - Listener listener = new Listener(); + DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener(); consumer.setMessageListener(listener); Thread.sleep(3 * 1000); @@ -1285,6 +554,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals(0, listener.count); } + @Test(timeout = 60 * 1000) public void testAllConsumed() throws Exception { final String filter = "filter = 'true'"; Connection con = createConnection("cli1"); @@ -1320,7 +590,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp con = createConnection("cli1"); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true); - Listener listener = new Listener(); + DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener(); consumer.setMessageListener(listener); Thread.sleep(3 * 1000); session.close(); @@ -1358,7 +628,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp con = createConnection("cli1"); session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createDurableSubscriber(topic, "SubsId", filter, true); - listener = new Listener(); + listener = new DurableSubscriptionOfflineTestListener(); consumer.setMessageListener(listener); Thread.sleep(3 * 1000); session.close(); @@ -1368,6 +638,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp } // https://issues.apache.org/jira/browse/AMQ-3190 + @Test(timeout = 60 * 1000) public void testNoMissOnMatchingSubAfterRestart() throws Exception { final String filter = "filter = 'true'"; @@ -1447,80 +718,6 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp con.close(); } - // use very small journal to get lots of files to cleanup - public void initCombosForTestCleanupDeletedSubAfterRestart() throws Exception { - this.addCombinationValues("journalMaxFileLength", - new Object[]{new Integer(64 * 1024)}); - this.addCombinationValues("keepDurableSubsActive", - new Object[]{Boolean.TRUE, Boolean.FALSE}); - } - - // https://issues.apache.org/jira/browse/AMQ-3206 - public void testCleanupDeletedSubAfterRestart() throws Exception { - Connection con = createConnection("cli1"); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId", null, true); - session.close(); - con.close(); - - con = createConnection("cli2"); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, "SubsId", null, true); - session.close(); - con.close(); - - con = createConnection(); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(null); - - final int toSend = 500; - final String payload = new byte[40*1024].toString(); - int sent = 0; - for (int i = sent; i < toSend; i++) { - Message message = session.createTextMessage(payload); - message.setStringProperty("filter", "false"); - message.setIntProperty("ID", i); - producer.send(topic, message); - sent++; - } - con.close(); - LOG.info("sent: " + sent); - - // kill off cli1 - con = createConnection("cli1"); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.unsubscribe("SubsId"); - - destroyBroker(); - createBroker(false); - - con = createConnection("cli2"); - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true); - final Listener listener = new Listener(); - consumer.setMessageListener(listener); - assertTrue("got all sent", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("Want: " + toSend + ", current: " + listener.count); - return listener.count == toSend; - } - })); - session.close(); - con.close(); - - destroyBroker(); - createBroker(false); - final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - assertTrue("Should have less than three journal files left but was: " + - pa.getStore().getJournal().getFileMap().size(), Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return pa.getStore().getJournal().getFileMap().size() <= 3; - } - })); - } // // https://issues.apache.org/jira/browse/AMQ-3768 // public void testPageReuse() throws Exception { @@ -1679,46 +876,4 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp // assertTrue("No exceptions expected, but was: " + exceptions, exceptions.isEmpty()); // } - public static class Listener implements MessageListener { - int count = 0; - String id = null; - - Listener() { - } - Listener(String id) { - this.id = id; - } - @Override - public void onMessage(Message message) { - count++; - if (id != null) { - try { - LOG.info(id + ", " + message.getJMSMessageID()); - } catch (Exception ignored) {} - } - } - } - - public class FilterCheckListener extends Listener { - - @Override - public void onMessage(Message message) { - count++; - - try { - Object b = message.getObjectProperty("$b"); - if (b != null) { - boolean c = message.getBooleanProperty("$c"); - assertTrue("", c); - } else { - String d = message.getStringProperty("$d"); - assertTrue("", "D1".equals(d) || "D2".equals(d)); - } - } - catch (JMSException e) { - e.printStackTrace(); - exceptions.add(e); - } - } - } } http://git-wip-us.apache.org/repos/asf/activemq/blob/57f5d49a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java new file mode 100644 index 0000000..3bffb4e --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTestBase.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.TestSupport.PersistenceAdapterChoice; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter; +import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageListener; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +public abstract class DurableSubscriptionOfflineTestBase { + private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTestBase.class); + public boolean usePrioritySupport = Boolean.TRUE; + public int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; + public boolean keepDurableSubsActive = true; + protected BrokerService broker; + protected ActiveMQTopic topic; + protected final List<Throwable> exceptions = new ArrayList<Throwable>(); + protected ActiveMQConnectionFactory connectionFactory; + protected boolean isTopic = true; + public PersistenceAdapterChoice defaultPersistenceAdapter = PersistenceAdapterChoice.KahaDB; + + @Rule + public TestName testName = new TestName(); + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true)); + connectionFactory.setWatchTopicAdvisories(false); + return connectionFactory; + } + + protected Connection createConnection() throws Exception { + return createConnection("cliName"); + } + + protected Connection createConnection(String name) throws Exception { + ConnectionFactory connectionFactory1 = createConnectionFactory(); + Connection connection = connectionFactory1.createConnection(); + connection.setClientID(name); + connection.start(); + return connection; + } + + public ActiveMQConnectionFactory getConnectionFactory() throws Exception { + if (connectionFactory == null) { + connectionFactory = createConnectionFactory(); + assertTrue("Should have created a connection factory!", connectionFactory != null); + } + return connectionFactory; + } + + @Before + public void setUp() throws Exception { + exceptions.clear(); + topic = (ActiveMQTopic) createDestination(); + createBroker(); + } + + @After + public void tearDown() throws Exception { + destroyBroker(); + } + + protected void createBroker() throws Exception { + createBroker(true); + } + + protected void createBroker(boolean deleteAllMessages) throws Exception { + String currentTestName = getName(true); + broker = BrokerFactory.createBroker("broker:(vm://" + currentTestName +")"); + broker.setBrokerName(currentTestName); + broker.setDeleteAllMessagesOnStartup(deleteAllMessages); + broker.getManagementContext().setCreateConnector(false); + broker.setAdvisorySupport(false); + broker.setKeepDurableSubsActive(keepDurableSubsActive); + broker.addConnector("tcp://0.0.0.0:0"); + + if (usePrioritySupport) { + PolicyEntry policy = new PolicyEntry(); + policy.setPrioritizedMessages(true); + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(policy); + broker.setDestinationPolicy(policyMap); + } + + setDefaultPersistenceAdapter(broker); + if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) { + // ensure it kicks in during tests + ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000); + } else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { + // have lots of journal files + ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength); + } + broker.start(); + broker.waitUntilStarted(); + } + + protected void destroyBroker() throws Exception { + if (broker != null) + broker.stop(); + } + + protected Destination createDestination(String subject) { + if (isTopic) { + return new ActiveMQTopic(subject); + } else { + return new ActiveMQQueue(subject); + } + } + + protected Destination createDestination() { + return createDestination(getDestinationString()); + } + + /** + * Returns the name of the destination used in this test case + */ + protected String getDestinationString() { + return getClass().getName() + "." + getName(true); + } + + + public String getName() { + return getName(false); + } + + protected String getName(boolean original) { + String currentTestName = testName.getMethodName(); + currentTestName = currentTestName.replace("[",""); + currentTestName = currentTestName.replace("]",""); + return currentTestName; + } + + public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException { + return setPersistenceAdapter(broker, defaultPersistenceAdapter); + } + + public PersistenceAdapter setPersistenceAdapter(BrokerService broker, PersistenceAdapterChoice choice) throws IOException { + PersistenceAdapter adapter = null; + switch (choice) { + case JDBC: + LOG.debug(">>>> setPersistenceAdapter to JDBC "); + adapter = new JDBCPersistenceAdapter(); + break; + case KahaDB: + LOG.debug(">>>> setPersistenceAdapter to KahaDB "); + adapter = new KahaDBPersistenceAdapter(); + break; + case LevelDB: + LOG.debug(">>>> setPersistenceAdapter to LevelDB "); + adapter = new LevelDBPersistenceAdapter(); + break; + case MEM: + LOG.debug(">>>> setPersistenceAdapter to MEM "); + adapter = new MemoryPersistenceAdapter(); + break; + } + broker.setPersistenceAdapter(adapter); + return adapter; + } +} + +class DurableSubscriptionOfflineTestListener implements MessageListener { + private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionOfflineTestListener.class); + int count = 0; + String id = null; + + DurableSubscriptionOfflineTestListener() {} + + DurableSubscriptionOfflineTestListener(String id) { + this.id = id; + } + @Override + public void onMessage(javax.jms.Message message) { + count++; + if (id != null) { + try { + LOG.info(id + ", " + message.getJMSMessageID()); + } catch (Exception ignored) {} + } + } +} \ No newline at end of file
