Author: gtully
Date: Wed Jan 28 18:25:01 2009
New Revision: 738577

URL: http://svn.apache.org/viewvc?rev=738577&view=rev
Log:
variant test case for https://issues.apache.org/activemq/browse/AMQ-2087

Modified:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java?rev=738577&r1=738576&r2=738577&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
 Wed Jan 28 18:25:01 2009
@@ -24,6 +24,7 @@
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -37,63 +38,163 @@
 
 public class JmsRollbackRedeliveryTest extends TestCase {
     protected static final Log LOG = 
LogFactory.getLog(JmsRollbackRedeliveryTest.class);
-    private boolean consumerClose = true;
-
-    public void testRedeliveryNoConsumerClose() throws Exception {
-        consumerClose = false;
-        testRedelivery();
+    final int nbMessages = 10;
+    final String destinationName = "Destination";
+    boolean consumerClose = true;
+    boolean rollback = true;
+    
+    public void setUp() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.start();
     }
     
+    
     public void testRedelivery() throws Exception {
 
         final int nbMessages = 10;
         final String destinationName = "Destination";
 
-        BrokerService broker = new BrokerService();
-        broker.setPersistent(false);
-        broker.setUseJmx(false);
-        broker.start();
-
         ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=100");
 
         Connection connection = connectionFactory.createConnection();
         connection.start();
 
-        // Enqueue nbMessages messages
+        populateDestination(nbMessages, destinationName, connection);
+
+        // Consume messages and rollback transactions
         {
-            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            AtomicInteger received = new AtomicInteger();
+            Map<String, Boolean> rolledback = new ConcurrentHashMap<String, 
Boolean>();
+            while (received.get() < nbMessages) {
+                Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+                Destination destination = session.createQueue(destinationName);
+                MessageConsumer consumer = session.createConsumer(destination);
+                TextMessage msg = (TextMessage) consumer.receive(6000000);
+                if (msg != null) {
+                    if (msg != null && rolledback.put(msg.getText(), 
Boolean.TRUE) != null) {
+                        LOG.info("Received message " + msg.getText() + " (" + 
received.getAndIncrement() + ")" + msg.getJMSMessageID());
+                        assertTrue(msg.getJMSRedelivered());
+                        session.commit();
+                    } else {
+                        LOG.info("Rollback message " + msg.getText() + " id: " 
+  msg.getJMSMessageID());
+                        session.rollback();
+                    }
+                }
+                consumer.close();
+                session.close();
+            }
+        }
+    }
+
+   
+    public void testRedeliveryOnSingleConsumer() throws Exception {
+
+        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        populateDestination(nbMessages, destinationName, connection);
+
+        // Consume messages and rollback transactions
+        {
+            AtomicInteger received = new AtomicInteger();
+            Map<String, Boolean> rolledback = new ConcurrentHashMap<String, 
Boolean>();
+            Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
             Destination destination = session.createQueue(destinationName);
-            MessageProducer producer = session.createProducer(destination);
-            for (int i = 1; i <= nbMessages; i++) {
-                producer.send(session.createTextMessage("<hello id='" + i + 
"'/>"));
+            MessageConsumer consumer = session.createConsumer(destination);    
        
+            while (received.get() < nbMessages) {
+                TextMessage msg = (TextMessage) consumer.receive(6000000);
+                if (msg != null) {
+                    if (msg != null && rolledback.put(msg.getText(), 
Boolean.TRUE) != null) {
+                        LOG.info("Received message " + msg.getText() + " (" + 
received.getAndIncrement() + ")" + msg.getJMSMessageID());
+                        assertTrue(msg.getJMSRedelivered());
+                        session.commit();
+                    } else {
+                        LOG.info("Rollback message " + msg.getText() + " id: " 
+  msg.getJMSMessageID());
+                        session.rollback();
+                    }
+                }
             }
-            producer.close();
+            consumer.close();
             session.close();
         }
+    }
+    
+    public void testRedeliveryOnSingleSession() throws Exception {
+
+        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=100");
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        populateDestination(nbMessages, destinationName, connection);
 
         // Consume messages and rollback transactions
         {
             AtomicInteger received = new AtomicInteger();
             Map<String, Boolean> rolledback = new ConcurrentHashMap<String, 
Boolean>();
+            Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createQueue(destinationName);
             while (received.get() < nbMessages) {
-                Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
-                Destination destination = session.createQueue(destinationName);
-                MessageConsumer consumer = session.createConsumer(destination);
+                MessageConsumer consumer = 
session.createConsumer(destination);            
                 TextMessage msg = (TextMessage) consumer.receive(6000000);
                 if (msg != null) {
                     if (msg != null && rolledback.put(msg.getText(), 
Boolean.TRUE) != null) {
                         LOG.info("Received message " + msg.getText() + " (" + 
received.getAndIncrement() + ")" + msg.getJMSMessageID());
+                        assertTrue(msg.getJMSRedelivered());
                         session.commit();
                     } else {
                         LOG.info("Rollback message " + msg.getText() + " id: " 
+  msg.getJMSMessageID());
                         session.rollback();
                     }
                 }
-                if (consumerClose ) {
-                    consumer.close();
+                consumer.close();
+            }
+            session.close();
+        }
+    }
+    
+    public void testRedeliveryOnSessionCloseWithNoRollback() throws Exception {
+
+        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=100");
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        populateDestination(nbMessages, destinationName, connection);
+
+        {
+            AtomicInteger received = new AtomicInteger();
+            Map<String, Boolean> rolledback = new ConcurrentHashMap<String, 
Boolean>();
+            while (received.get() < nbMessages) {
+                Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+                Destination destination = session.createQueue(destinationName);
+
+                MessageConsumer consumer = 
session.createConsumer(destination);            
+                TextMessage msg = (TextMessage) consumer.receive(1000);
+                if (msg != null) {
+                    if (msg != null && rolledback.put(msg.getText(), 
Boolean.TRUE) != null) {
+                        LOG.info("Received message " + msg.getText() + " (" + 
received.getAndIncrement() + ")" + msg.getJMSMessageID());
+                        assertTrue(msg.getJMSRedelivered());
+                        session.commit();
+                    }
                 }
                 session.close();
             }
         }
     }
+    
+    private void populateDestination(final int nbMessages,
+            final String destinationName, Connection connection)
+            throws JMSException {
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(destinationName);
+        MessageProducer producer = session.createProducer(destination);
+        for (int i = 1; i <= nbMessages; i++) {
+            producer.send(session.createTextMessage("<hello id='" + i + 
"'/>"));
+        }
+        producer.close();
+        session.close();
+    }
+    
 }


Reply via email to