[
https://issues.apache.org/jira/browse/AMQ-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13940096#comment-13940096
]
Arthur Naseef commented on AMQ-5107:
------------------------------------
Looking at this more closely, I'm leaning toward this being a non-problem.
The JMS specification states this for AUTO_ACKNOWLEDGE:
{quote}
AUTO_ACKNOWLEDGE - With this option, the session automatically acknowledges a
client’s receipt of a message when it has either successfully returned from a
call to receive or the MessageListener it has called to process the message
successfully returns.
{quote}
This means that MessageListener *must* be prepared to handle duplicates coming
into the {{onMessage}} call because (a) the message does not get acknowledged
until the handler returns, and (b) there's always the chance that client
application will fail between the {{onMessage}} call and the message ACK
(leading to redeliveries).
Really, all client apps should be prepared to handle duplicates, although
transacted applications are best positioned to avoid them.
While it may be possible to improve this one case, such a change is not
trivial, and such a change would not prevent the same condition from requiring
handling in the client.
> In-flight queue message redelivered to multiple listeners upon broker shutdown
> ------------------------------------------------------------------------------
>
> Key: AMQ-5107
> URL: https://issues.apache.org/jira/browse/AMQ-5107
> Project: ActiveMQ
> Issue Type: Bug
> Components: Transport
> Affects Versions: 5.9.0
> Environment: Windows 7 64Bit - Java "1.6.0_20"
> CentOS 6.0 - Java "1.7.0_09-icedtea"
> Reporter: Greg Garlak
> Assignee: Arthur Naseef
> Fix For: NEEDS_REVIEW
>
>
> To reproduce:
> 1) Start 3 or more listener processes (see listener code below)
> 2) Run producer to push one message on queue (see producer code below)
> 3) One of the listeners will pick-up the message and sleep for one minute
> before auto acknowledging the message
> 4) Start a shutdown sequence of the broker within the 60 second window
> (Ctrl-C or issue Terminate jvm(int) command from Hawtio console)
> 5) All other idle listeners should get the same message redelivered
> simultaneously, each one having deliveryCount incremented
> Listener code:
> --------------
> package com.test;
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class TestListener {
> public static void main(String[] args) {
> try {
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory("tcp://localhost:61616");
> Connection connection =
> connectionFactory.createConnection();
> Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> Destination destination =
> session.createQueue("TEST.QUEUE");
> MessageConsumer consumer =
> session.createConsumer(destination);
>
> consumer.setMessageListener(new MessageListener() {
> public void onMessage(Message message) {
> try {
> TextMessage textMessage =
> (TextMessage) message;
> System.out.print("\nReceived "
> + textMessage.getText());
> System.out.print(", Redelivery:
> " + message.getJMSRedelivered());
> System.out.print(", Count: " +
> message.getLongProperty("JMSXDeliveryCount"));
> Thread.sleep(60000);
>
> System.out.print("... finished
> after sleep");
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> });
>
> connection.start();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> public TestListener() {
> super();
> }
> }
> Producer code:
> --------------
> package com.test;
> import java.util.Date;
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class TestProducer {
> public static void main(String[] args) {
> try {
> thread(new HelloWorldProducer(), false);
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>
> public static class HelloWorldProducer implements Runnable {
> public void run() {
> try {
> ActiveMQConnectionFactory connectionFactory =
> new ActiveMQConnectionFactory("tcp://localhost:61616");
> Connection connection =
> connectionFactory.createConnection();
> connection.start();
> Session session =
> connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> Destination destination =
> session.createQueue("TEST.QUEUE");
> MessageProducer producer =
> session.createProducer(destination);
> String text = "test message created on " + new
> Date();
> TextMessage message =
> session.createTextMessage(text);
> System.out.println("Sent " + text);
> producer.send(message);
> session.close();
> connection.close();
> }
> catch (Exception e) {
> e.printStackTrace();
> }
> }
> public HelloWorldProducer() {}
> }
> public static void thread(Runnable runnable, boolean daemon) {
> Thread brokerThread = new Thread(runnable);
> brokerThread.setDaemon(daemon);
> brokerThread.start();
> }
>
> public TestProducer() {
> super();
> }
> }
--
This message was sent by Atlassian JIRA
(v6.2#6252)