[ 
https://issues.apache.org/jira/browse/AMQ-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arthur Naseef reassigned AMQ-5107:
----------------------------------

    Assignee: Arthur Naseef

> In-flight queue message redelivered to multiple listeners upon broker shutdown
> ------------------------------------------------------------------------------
>
>                 Key: AMQ-5107
>                 URL: https://issues.apache.org/jira/browse/AMQ-5107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.9.0
>         Environment: Windows 7 64Bit - Java "1.6.0_20"
> CentOS 6.0 - Java "1.7.0_09-icedtea" 
>            Reporter: Greg Garlak
>            Assignee: Arthur Naseef
>             Fix For: NEEDS_REVIEW
>
>
> To reproduce: 
> 1) Start 3 or more listener processes (see listener code below)
> 2) Run producer to push one message on queue (see producer code below)
> 3) One of the listeners will pick-up the message and sleep for one minute 
> before auto acknowledging the message
> 4) Start a shutdown sequence of the broker within the 60 second window 
> (Ctrl-C or issue Terminate jvm(int) command from Hawtio console) 
> 5) All other idle listeners should get the same message redelivered 
> simultaneously, each one having deliveryCount incremented 
> Listener code:
> --------------
> package com.test;
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class TestListener {
>       public static void main(String[] args) {
>               try {   
>                       ActiveMQConnectionFactory connectionFactory = new 
> ActiveMQConnectionFactory("tcp://localhost:61616");
>                       Connection connection = 
> connectionFactory.createConnection();
>                       Session session = connection.createSession(false, 
> Session.AUTO_ACKNOWLEDGE);
>                       Destination destination = 
> session.createQueue("TEST.QUEUE");
>                       MessageConsumer consumer = 
> session.createConsumer(destination);
>                       
>                       consumer.setMessageListener(new MessageListener() {
>                               public void onMessage(Message message) {
>                                       try     {
>                                               TextMessage textMessage = 
> (TextMessage) message;
>                                               System.out.print("\nReceived " 
> + textMessage.getText());
>                                               System.out.print(", Redelivery: 
> " + message.getJMSRedelivered());
>                                               System.out.print(", Count: " + 
> message.getLongProperty("JMSXDeliveryCount"));
>                                               Thread.sleep(60000);            
>         
>                                               System.out.print("... finished 
> after sleep");
>                                       } catch (Exception e) {
>                                               e.printStackTrace();
>                                       }
>                               }
>                       });
>                       
>                       connection.start();
>               } catch (Exception e) {
>                       e.printStackTrace();
>               }
>       }
>       public TestListener() {
>               super();
>       }
> }
> Producer code:
> --------------
> package com.test;
> import java.util.Date;
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class TestProducer {
>       public static void main(String[] args) {
>               try {
>                       thread(new HelloWorldProducer(), false);
>               } catch (Exception e) {
>                       e.printStackTrace();
>               }
>       }
>  
>       public static class HelloWorldProducer implements Runnable {
>               public void run() {
>                       try {
>                               ActiveMQConnectionFactory connectionFactory = 
> new ActiveMQConnectionFactory("tcp://localhost:61616");
>                               Connection connection = 
> connectionFactory.createConnection();
>                               connection.start();
>                               Session session = 
> connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>                               Destination destination = 
> session.createQueue("TEST.QUEUE");
>                               MessageProducer producer = 
> session.createProducer(destination);
>                               String text = "test message created on " + new 
> Date();
>                               TextMessage message = 
> session.createTextMessage(text);
>                               System.out.println("Sent " + text);
>                               producer.send(message);
>                               session.close();
>                               connection.close();
>                       }
>                       catch (Exception e) {
>                               e.printStackTrace();
>                       }
>               }
>               public HelloWorldProducer() {}
>       }
>       public static void thread(Runnable runnable, boolean daemon) {
>               Thread brokerThread = new Thread(runnable);
>               brokerThread.setDaemon(daemon);
>               brokerThread.start();
>       }
>     
>       public TestProducer() {
>               super();
>       }
> }



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to