Author: ceposta
Date: Tue Mar 26 14:30:36 2013
New Revision: 1461153
URL: http://svn.apache.org/r1461153
Log:
Updated to take into account scenario from mailing list where durable sub would
connect.reconnect every second message
Modified:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
Modified:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java?rev=1461153&r1=1461152&r2=1461153&view=diff
==============================================================================
---
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
(original)
+++
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
Tue Mar 26 14:30:36 2013
@@ -28,10 +28,15 @@ import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.test.TestSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
/**
*
*/
@@ -39,6 +44,8 @@ public class DurableConsumerCloseAndReco
protected static final long RECEIVE_TIMEOUT = 5000L;
private static final Logger LOG =
LoggerFactory.getLogger(DurableConsumerCloseAndReconnectTest.class);
+ BrokerService brokerService;
+
protected Connection connection;
private Session session;
private MessageConsumer consumer;
@@ -46,29 +53,64 @@ public class DurableConsumerCloseAndReco
private Destination destination;
private int messageCount;
+ private String vmConnectorURI;
+
@Override
protected void setUp() throws Exception {
+ createBroker();
super.setUp();
- deleteAllMessages();
}
@Override
protected void tearDown() throws Exception {
+ stopBroker();
super.tearDown();
- deleteAllMessages();
}
- private void deleteAllMessages() throws Exception {
- ActiveMQConnectionFactory fac = new
ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=true");
- Connection dummyConnection = fac.createConnection();
+ protected ActiveMQConnectionFactory createConnectionFactory() throws
Exception {
+ return new ActiveMQConnectionFactory(vmConnectorURI);
+ }
+
+ protected void createBroker() throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setUseJmx(false);
+ brokerService.setPersistent(false);
+ KahaDBPersistenceAdapter store = new KahaDBPersistenceAdapter();
+ brokerService.setPersistenceAdapter(store);
+ brokerService.start();
+ brokerService.waitUntilStarted();
+ vmConnectorURI = brokerService.getVmConnectorURI().toString();
+ }
+
+ protected void stopBroker() throws Exception {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ }
+
+ public void testDurableSubscriberReconnectMultipleTimes() throws Exception
{
+ Connection dummyConnection = createConnection();
dummyConnection.start();
+
+ makeConsumer(Session.AUTO_ACKNOWLEDGE);
+ closeConsumer();
+
+ publish(30);
+
+ int counter = 1;
+ for (int i = 0; i < 15; i++) {
+ makeConsumer(Session.AUTO_ACKNOWLEDGE);
+ Message message = consumer.receive(RECEIVE_TIMEOUT);
+ assertTrue("Should have received a message!", message != null);
+ LOG.info("Received message " + counter++);
+ message = consumer.receive(RECEIVE_TIMEOUT);
+ assertTrue("Should have received a message!", message != null);
+ LOG.info("Received message " + counter++);
+ closeConsumer();
+ }
+
dummyConnection.close();
}
-
- protected ActiveMQConnectionFactory createConnectionFactory() throws
Exception {
- return new
ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=false");
- }
public void testCreateDurableConsumerCloseThenReconnect() throws Exception
{
// force the server to stay up across both connection tests
@@ -84,10 +126,11 @@ public class DurableConsumerCloseAndReco
}
protected void consumeMessagesDeliveredWhileConsumerClosed() throws
Exception {
+ // default to client ack for consumer
makeConsumer();
closeConsumer();
- publish();
+ publish(1);
// wait a few moments for the close to really occur
Thread.sleep(1000);
@@ -117,7 +160,7 @@ public class DurableConsumerCloseAndReco
closeConsumer();
LOG.info("Lets publish one more message now");
- publish();
+ publish(1);
makeConsumer();
message = consumer.receive(RECEIVE_TIMEOUT);
@@ -127,7 +170,7 @@ public class DurableConsumerCloseAndReco
closeConsumer();
}
- protected void publish() throws Exception {
+ protected void publish(int numMessages) throws Exception {
connection = createConnection();
connection.start();
@@ -136,8 +179,10 @@ public class DurableConsumerCloseAndReco
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- TextMessage msg = session.createTextMessage("This is a test: " +
messageCount++);
- producer.send(msg);
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage msg = session.createTextMessage("This is a test: " +
messageCount++);
+ producer.send(msg);
+ }
producer.close();
producer = null;
@@ -157,6 +202,7 @@ public class DurableConsumerCloseAndReco
}
protected void closeConsumer() throws JMSException {
+ LOG.info("Closing the consumer");
consumer.close();
consumer = null;
closeSession();
@@ -170,10 +216,14 @@ public class DurableConsumerCloseAndReco
}
protected void makeConsumer() throws Exception {
+ makeConsumer(Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ protected void makeConsumer(int ackMode) throws Exception {
String durableName = getName();
String clientID = getSubject();
- LOG.info("Creating a durable subscribe for clientID: " + clientID + "
and durable name: " + durableName);
- createSession(clientID);
+ LOG.info("Creating a durable subscriber for clientID: " + clientID + "
and durable name: " + durableName);
+ createSession(clientID, ackMode);
consumer = createConsumer(durableName);
}
@@ -185,12 +235,12 @@ public class DurableConsumerCloseAndReco
}
}
- protected void createSession(String clientID) throws Exception {
+ protected void createSession(String clientID, int ackMode) throws
Exception {
connection = createConnection();
connection.setClientID(clientID);
connection.start();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ session = connection.createSession(false, ackMode);
destination = createDestination();
}
}