Hi Rodrigo,
Is that junit test a patch you want to contibute to Apache? If so,
I'll add it to our test suite asap.
Regards,
Hiram
On 4/15/06, Rodrigo S de Castro <[EMAIL PROTECTED]> wrote:
>
> Hi,
>
> 1) I have a MessageListener implementation that rolls back the session if
> something goes wrong. I expected the message to be redelivered to this
> listener, but that does not happen. It is never redelivered.
>
> This code does NOT work:
>
> private class MessageListenerTest implements MessageListener {
> private Session session;
> public int counter = 0;
>
> public MessageListenerTest(ActiveMQMessageConsumer session) {
> this.session = session;
> }
>
> public void onMessage(Message message) {
> try {
> System.out.println("Message: " + message);
> counter++;
> if (counter <= 2) {
> System.out.println("ROLLBACK");
> session.rollback();
> } else {
> System.out.println("COMMIT");
> message.acknowledge();
> session.commit();
> }
> } catch(JMSException e) {
> System.err.println("Error when rolling back
> transaction");
> }
> }
> }
>
>
> 2) The only I managed to make it redeliver is to pass a reference to
> MessageConsumer to the MessageListener implementation, cast it to
> ActiveMQMessageConsumer and call its rollback method.
>
> The code below DOES work:
>
> private class MessageListenerTest implements MessageListener {
> private ActiveMQMessageConsumer consumer;
> public int counter = 0;
>
> public MessageListenerTest(ActiveMQMessageConsumer consumer) {
> this.consumer = consumer;
> }
>
> public void onMessage(Message message) {
> try {
> System.out.println("Message: " + message);
> counter++;
> if (counter <= 2) {
> System.out.println("ROLLBACK");
> session.rollback();
> } else {
> System.out.println("COMMIT");
> message.acknowledge();
> session.commit();
> }
> } catch(JMSException e) {
> System.err.println("Error when rolling back
> transaction");
> }
> }
> }
>
> It this right? I think that session.rollback() should work as well.
>
> Below a JUnit code that shows this problem:
>
>
> import javax.jms.Connection;
> import javax.jms.DeliveryMode;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.jms.TextMessage;
>
> import junit.framework.TestCase;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.ActiveMQMessageConsumer;
> import org.apache.activemq.RedeliveryPolicy;
> import org.apache.activemq.command.ActiveMQMessage;
>
> public class MessageListenerRedeliveryTest extends TestCase {
>
> private Connection connection;
>
> protected void setUp() throws Exception {
> connection = createConnection();
> }
>
> /**
> * @see junit.framework.TestCase#tearDown()
> */
> protected void tearDown() throws Exception {
> if (connection != null) {
> connection.close();
> connection = null;
> }
> }
>
> protected RedeliveryPolicy getRedeliveryPolicy() {
> RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
> redeliveryPolicy.setInitialRedeliveryDelay(1000);
> redeliveryPolicy.setBackOffMultiplier((short) 5);
> redeliveryPolicy.setMaximumRedeliveries(10);
> redeliveryPolicy.setUseExponentialBackOff(true);
> return redeliveryPolicy;
> }
>
> protected Connection createConnection() throws Exception{
> ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
> factory.setRedeliveryPolicy(getRedeliveryPolicy());
> return factory.createConnection();
> }
>
> private class ConsumerMessageListenerTest implements MessageListener {
> private ActiveMQMessageConsumer consumer;
> public int counter = 0;
>
> public ConsumerMessageListenerTest(ActiveMQMessageConsumer consumer) {
> this.consumer = consumer;
> }
>
> public void onMessage(Message message) {
> try {
> System.out.println("Message: " + message);
> counter++;
> if (counter <= 2) {
> System.out.println("ROLLBACK");
> consumer.rollback();
> } else {
> System.out.println("COMMIT");
> message.acknowledge();
> consumer.commit();
> }
> } catch(JMSException e) {
> System.err.println("Error when rolling back
> transaction");
> }
> }
> }
>
> private class SessionMessageListenerTest implements MessageListener {
> private Session session;
> public int counter = 0;
>
> public SessionMessageListenerTest(Session session) {
> this.session = session;
> }
>
> public void onMessage(Message message) {
> try {
> System.out.println("Message: " + message);
> counter++;
> if (counter <= 2) {
> System.out.println("ROLLBACK");
> session.rollback();
> } else {
> System.out.println("COMMIT");
> message.acknowledge();
> session.commit();
> }
> } catch(JMSException e) {
> System.err.println("Error when rolling back
> transaction");
> }
> }
> }
>
> public void testQueueRollbackMessageListener() throws JMSException {
> connection.start();
>
> Session session = connection.createSession(true,
> Session.CLIENT_ACKNOWLEDGE);
> Queue queue = session.createQueue("queue-"+getName());
> MessageProducer producer = createProducer(session, queue);
> Message message = createTextMessage(session);
> producer.send(message);
> session.commit();
>
> MessageConsumer consumer = session.createConsumer(queue);
>
> ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
> mc.setRedeliveryPolicy(getRedeliveryPolicy());
>
> SessionMessageListenerTest listener = new
> SessionMessageListenerTest(session);
> consumer.setMessageListener(listener);
>
> // redelivery works with the code below
> /*
> ConsumerMessageListenerTest listener = new
> ConsumerMessageListenerTest(session);
> consumer.setMessageListener(listener);
> */
>
> try {
> Thread.sleep(7000);
> } catch(InterruptedException e) {
>
> }
> assertEquals(2, listener.counter);
>
> producer.send(createTextMessage(session));
> session.commit();
>
> try {
> Thread.sleep(2000);
> } catch(InterruptedException e) {
> // ignore
> }
> assertEquals(3, listener.counter);
>
> session.close();
> }
>
> private TextMessage createTextMessage(Session session) throws
> JMSException {
> return session.createTextMessage("Hello");
> }
>
> private MessageProducer createProducer(Session session, Destination
> queue) throws JMSException {
> MessageProducer producer = session.createProducer(queue);
> producer.setDeliveryMode(getDeliveryMode());
> return producer;
> }
>
> protected int getDeliveryMode() {
> return DeliveryMode.PERSISTENT;
> }
> }
>
>
>
>
> --
> View this message in context:
> http://www.nabble.com/Redelivery-problem-with-MessageListener-and-Session-rollback-t1455413.html#a3933378
> Sent from the ActiveMQ - User forum at Nabble.com.
>
>
--
Regards,
Hiram