[ 
https://issues.apache.org/activemq/browse/AMQ-2233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dave Syer updated AMQ-2233:
---------------------------

    Attachment: RawRollbackSharedConsumerTests.java

Added shared consumer test case.  This passes in 5.3-SNAPSHOT even without the 
fetch size hint as predicted.  But it also fails in 5.2.0, which still seems 
like a bug to me, considering what you said above.  What do you think?

> After rollback received messages not re-presented
> -------------------------------------------------
>
>                 Key: AMQ-2233
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2233
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.2.0
>            Reporter: Dave Syer
>            Assignee: Gary Tully
>         Attachments: RawRollbackSharedConsumerTests.java, 
> RawRollbackTests.java
>
>
> After rollback received messages not re-presented.  If I receive in a 
> transaction and then roll back the messages should be re-presented in the 
> next transaction.  This used to work in 5.1.0, but is broken in 5.2.0.
> You can browse the Queue in JMX after the rollback and see that the messages 
> are still there, but they are not received by a consumer in the same process.
> Here's a test case (fails on the checkPostConditions()):
> {code}
> public class RawRollbackTests {
>       
>       private static ConnectionFactory connectionFactory;
>       private static Destination queue;
>       private static BrokerService broker;
>       @BeforeClass
>       public static void clean() throws Exception {
>               FileUtils.deleteDirectory(new File("activemq-data"));
>               broker = new BrokerService();
>               broker.setUseJmx(true);
>               broker.start();
>               ActiveMQConnectionFactory connectionFactory = new 
> ActiveMQConnectionFactory();
>               connectionFactory.setBrokerURL("vm://localhost?async=false");
>               RawRollbackTests.connectionFactory = connectionFactory;
>               queue = new ActiveMQQueue("queue");
>       }
>       @AfterClass
>       public static void close() throws Exception {
>               broker.stop();
>       }
>       @Before
>       public void clearData() throws Exception {
>               getMessages(false); // drain queue
>               convertAndSend("foo");
>               convertAndSend("bar");
>       }
>       @After
>       public void checkPostConditions() throws Exception {
>               Thread.sleep(1000L);
>               List<String> list = getMessages(false);
>               assertEquals(2, list.size());
>       }
>       @Test
>       public void testReceiveMessages() throws Exception {
>               List<String> list = getMessages(true);
>               assertEquals(2, list.size());
>               assertTrue(list.contains("foo"));
>       }
>       
>       private void convertAndSend(String msg) throws Exception {
>               Connection connection = connectionFactory.createConnection();
>               connection.start();
>               Session session = connection.createSession(true, 
> Session.AUTO_ACKNOWLEDGE);
>               MessageProducer producer = session.createProducer(queue);
>               producer.send(session.createTextMessage(msg));
>               producer.close();
>               session.commit();
>               session.close();
>               connection.close();
>       }
>       private List<String> getMessages(boolean rollback) throws Exception {
>               Connection connection = connectionFactory.createConnection();
>               connection.start();
>               Session session = connection.createSession(true, 
> Session.AUTO_ACKNOWLEDGE);
>               String next = "";
>               List<String> msgs = new ArrayList<String>();
>               while (next != null) {
>                       next = (String) receiveAndConvert(session);
>                       if (next != null)
>                               msgs.add(next);
>               }
>               if (rollback) {
>                       session.rollback();
>               } else {
>                       session.commit();
>               }
>               session.close();
>               connection.close();
>               return msgs;
>       }
>       private String receiveAndConvert(Session session) throws Exception {
>               MessageConsumer consumer = session.createConsumer(queue);
>               Message message = consumer.receive(100L);
>               consumer.close();
>               if (message==null) {
>                       return null;
>               }
>               return ((TextMessage)message).getText();
>       }
> }
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to