Author: gtully Date: Wed Jan 28 18:25:01 2009 New Revision: 738577 URL: http://svn.apache.org/viewvc?rev=738577&view=rev Log: variant test case for https://issues.apache.org/activemq/browse/AMQ-2087
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java?rev=738577&r1=738576&r2=738577&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java Wed Jan 28 18:25:01 2009 @@ -24,6 +24,7 @@ import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; @@ -37,63 +38,163 @@ public class JmsRollbackRedeliveryTest extends TestCase { protected static final Log LOG = LogFactory.getLog(JmsRollbackRedeliveryTest.class); - private boolean consumerClose = true; - - public void testRedeliveryNoConsumerClose() throws Exception { - consumerClose = false; - testRedelivery(); + final int nbMessages = 10; + final String destinationName = "Destination"; + boolean consumerClose = true; + boolean rollback = true; + + public void setUp() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.start(); } + public void testRedelivery() throws Exception { final int nbMessages = 10; final String destinationName = "Destination"; - BrokerService broker = new BrokerService(); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.start(); - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=100"); Connection connection = connectionFactory.createConnection(); connection.start(); - // Enqueue nbMessages messages + populateDestination(nbMessages, destinationName, connection); + + // Consume messages and rollback transactions { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + AtomicInteger received = new AtomicInteger(); + Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>(); + while (received.get() < nbMessages) { + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(6000000); + if (msg != null) { + if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) { + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertTrue(msg.getJMSRedelivered()); + session.commit(); + } else { + LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID()); + session.rollback(); + } + } + consumer.close(); + session.close(); + } + } + } + + + public void testRedeliveryOnSingleConsumer() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + populateDestination(nbMessages, destinationName, connection); + + // Consume messages and rollback transactions + { + AtomicInteger received = new AtomicInteger(); + Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(destinationName); - MessageProducer producer = session.createProducer(destination); - for (int i = 1; i <= nbMessages; i++) { - producer.send(session.createTextMessage("<hello id='" + i + "'/>")); + MessageConsumer consumer = session.createConsumer(destination); + while (received.get() < nbMessages) { + TextMessage msg = (TextMessage) consumer.receive(6000000); + if (msg != null) { + if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) { + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertTrue(msg.getJMSRedelivered()); + session.commit(); + } else { + LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID()); + session.rollback(); + } + } } - producer.close(); + consumer.close(); session.close(); } + } + + public void testRedeliveryOnSingleSession() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=100"); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + populateDestination(nbMessages, destinationName, connection); // Consume messages and rollback transactions { AtomicInteger received = new AtomicInteger(); Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); while (received.get() < nbMessages) { - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue(destinationName); - MessageConsumer consumer = session.createConsumer(destination); + MessageConsumer consumer = session.createConsumer(destination); TextMessage msg = (TextMessage) consumer.receive(6000000); if (msg != null) { if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) { LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertTrue(msg.getJMSRedelivered()); session.commit(); } else { LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID()); session.rollback(); } } - if (consumerClose ) { - consumer.close(); + consumer.close(); + } + session.close(); + } + } + + public void testRedeliveryOnSessionCloseWithNoRollback() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=100"); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + populateDestination(nbMessages, destinationName, connection); + + { + AtomicInteger received = new AtomicInteger(); + Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>(); + while (received.get() < nbMessages) { + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(1000); + if (msg != null) { + if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) { + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertTrue(msg.getJMSRedelivered()); + session.commit(); + } } session.close(); } } } + + private void populateDestination(final int nbMessages, + final String destinationName, Connection connection) + throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + MessageProducer producer = session.createProducer(destination); + for (int i = 1; i <= nbMessages; i++) { + producer.send(session.createTextMessage("<hello id='" + i + "'/>")); + } + producer.close(); + session.close(); + } + }