[
https://issues.apache.org/jira/browse/AMQ-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13939935#comment-13939935
]
Arthur Naseef commented on AMQ-5107:
------------------------------------
I reproduced this. I also tried using the failover transport, shutting down
the broker, restarting all the clients, bringing the broker back up, and then
shutting it down again.
In this second test case, the message was only redelivered one time. Since the
multiple deliveries only happen as the broker is shutting down, the shutdown of
each connection is leading to redelivery to another consumer on another
connection, and in the errant case, the second connection to shutdown (out of
3) happens to be the next one to which the message is delivered. Therefore,
the last connection remaining also gets the message. Like this:
* Consumer 1 gets message
* Consumer 1 connection shuts down
* Consumer 2 gets message
* Consumer 2 connection shuts down
* Consumer 3 gets message
* Consumer 3 connection shuts down
Bottom line - the broker should not be redelivering (or even delivering) any
messages during shutdown. Not sure if this will be an easy fix though, since
the broker sends messages (advisories) to advise of things like shutting down
consumers.
> In-flight queue message redelivered to multiple listeners upon broker shutdown
> ------------------------------------------------------------------------------
>
> Key: AMQ-5107
> URL: https://issues.apache.org/jira/browse/AMQ-5107
> Project: ActiveMQ
> Issue Type: Bug
> Components: Transport
> Affects Versions: 5.9.0
> Environment: Windows 7 64Bit - Java "1.6.0_20"
> CentOS 6.0 - Java "1.7.0_09-icedtea"
> Reporter: Greg Garlak
> Assignee: Arthur Naseef
> Fix For: NEEDS_REVIEW
>
>
> To reproduce:
> 1) Start 3 or more listener processes (see listener code below)
> 2) Run producer to push one message on queue (see producer code below)
> 3) One of the listeners will pick-up the message and sleep for one minute
> before auto acknowledging the message
> 4) Start a shutdown sequence of the broker within the 60 second window
> (Ctrl-C or issue Terminate jvm(int) command from Hawtio console)
> 5) All other idle listeners should get the same message redelivered
> simultaneously, each one having deliveryCount incremented
> Listener code:
> --------------
> package com.test;
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class TestListener {
> public static void main(String[] args) {
> try {
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory("tcp://localhost:61616");
> Connection connection =
> connectionFactory.createConnection();
> Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> Destination destination =
> session.createQueue("TEST.QUEUE");
> MessageConsumer consumer =
> session.createConsumer(destination);
>
> consumer.setMessageListener(new MessageListener() {
> public void onMessage(Message message) {
> try {
> TextMessage textMessage =
> (TextMessage) message;
> System.out.print("\nReceived "
> + textMessage.getText());
> System.out.print(", Redelivery:
> " + message.getJMSRedelivered());
> System.out.print(", Count: " +
> message.getLongProperty("JMSXDeliveryCount"));
> Thread.sleep(60000);
>
> System.out.print("... finished
> after sleep");
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> });
>
> connection.start();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> public TestListener() {
> super();
> }
> }
> Producer code:
> --------------
> package com.test;
> import java.util.Date;
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class TestProducer {
> public static void main(String[] args) {
> try {
> thread(new HelloWorldProducer(), false);
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>
> public static class HelloWorldProducer implements Runnable {
> public void run() {
> try {
> ActiveMQConnectionFactory connectionFactory =
> new ActiveMQConnectionFactory("tcp://localhost:61616");
> Connection connection =
> connectionFactory.createConnection();
> connection.start();
> Session session =
> connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> Destination destination =
> session.createQueue("TEST.QUEUE");
> MessageProducer producer =
> session.createProducer(destination);
> String text = "test message created on " + new
> Date();
> TextMessage message =
> session.createTextMessage(text);
> System.out.println("Sent " + text);
> producer.send(message);
> session.close();
> connection.close();
> }
> catch (Exception e) {
> e.printStackTrace();
> }
> }
> public HelloWorldProducer() {}
> }
> public static void thread(Runnable runnable, boolean daemon) {
> Thread brokerThread = new Thread(runnable);
> brokerThread.setDaemon(daemon);
> brokerThread.start();
> }
>
> public TestProducer() {
> super();
> }
> }
--
This message was sent by Atlassian JIRA
(v6.2#6252)