Repository: activemq Updated Branches: refs/heads/master 1316b57ed -> e91f5c806
AMQ-6454 - ensure message.acknowledge throws if consumer has closed and message has been released broker side Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e91f5c80 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e91f5c80 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e91f5c80 Branch: refs/heads/master Commit: e91f5c8062f81a76e6983c489bfd092ce4071480 Parents: 1316b57 Author: gtully <gary.tu...@gmail.com> Authored: Fri Oct 7 09:57:14 2016 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Fri Oct 7 09:57:28 2016 +0100 ---------------------------------------------------------------------- .../activemq/ActiveMQMessageConsumer.java | 2 ++ .../org/apache/activemq/JMSConsumerTest.java | 28 ++++++++++++++++++++ .../org/apache/activemq/JMSXAConsumerTest.java | 4 +++ 3 files changed, 34 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/e91f5c80/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 83ce137..a52e2d4 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -602,6 +602,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC m.setAcknowledgeCallback(new Callback() { @Override public void execute() throws Exception { + checkClosed(); session.checkClosed(); session.acknowledge(); } @@ -610,6 +611,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC m.setAcknowledgeCallback(new Callback() { @Override public void execute() throws Exception { + checkClosed(); session.checkClosed(); acknowledge(md); } http://git-wip-us.apache.org/repos/asf/activemq/blob/e91f5c80/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java index 4f02d47..8785acb 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.BytesMessage; +import javax.jms.JMSException; import javax.jms.DeliveryMode; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -859,6 +860,33 @@ public class JMSConsumerTest extends JmsTestSupport { redispatchSession.close(); } + public void testExceptionOnClientAckAfterConsumerClose() throws Exception { + + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + sendMessages(connection, destination, 1); + + MessageConsumer consumer = session.createConsumer(destination); + Message message = consumer.receive(1000); + assertNotNull(message); + consumer.close(); + + try { + message.acknowledge(); + fail("Expect exception on ack after close - consumer gone so message available again"); + } catch (JMSException expected) {} + + Session redispatchSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination); + Message msg = redispatchConsumer.receive(1000); + assertNotNull(msg); + + redispatchSession.close(); + } + + public void initCombosForTestAckOfExpired() { addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); http://git-wip-us.apache.org/repos/asf/activemq/blob/e91f5c80/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java index 7deff27..89fb25b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java @@ -47,4 +47,8 @@ public class JMSXAConsumerTest extends JMSConsumerTest { public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception { } + + // needs client ack, xa is auto ack if no transaction + public void testExceptionOnClientAckAfterConsumerClose() throws Exception { + } }