Repository: activemq
Updated Branches:
  refs/heads/master 1316b57ed -> e91f5c806


AMQ-6454 - ensure message.acknowledge throws if consumer has closed and message 
has been released broker side


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e91f5c80
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e91f5c80
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e91f5c80

Branch: refs/heads/master
Commit: e91f5c8062f81a76e6983c489bfd092ce4071480
Parents: 1316b57
Author: gtully <gary.tu...@gmail.com>
Authored: Fri Oct 7 09:57:14 2016 +0100
Committer: gtully <gary.tu...@gmail.com>
Committed: Fri Oct 7 09:57:28 2016 +0100

----------------------------------------------------------------------
 .../activemq/ActiveMQMessageConsumer.java       |  2 ++
 .../org/apache/activemq/JMSConsumerTest.java    | 28 ++++++++++++++++++++
 .../org/apache/activemq/JMSXAConsumerTest.java  |  4 +++
 3 files changed, 34 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e91f5c80/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 83ce137..a52e2d4 100755
--- 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -602,6 +602,7 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
             m.setAcknowledgeCallback(new Callback() {
                 @Override
                 public void execute() throws Exception {
+                    checkClosed();
                     session.checkClosed();
                     session.acknowledge();
                 }
@@ -610,6 +611,7 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
             m.setAcknowledgeCallback(new Callback() {
                 @Override
                 public void execute() throws Exception {
+                    checkClosed();
                     session.checkClosed();
                     acknowledge(md);
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e91f5c80/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
index 4f02d47..8785acb 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.BytesMessage;
+import javax.jms.JMSException;
 import javax.jms.DeliveryMode;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -859,6 +860,33 @@ public class JMSConsumerTest extends JmsTestSupport {
         redispatchSession.close();
     }
 
+    public void testExceptionOnClientAckAfterConsumerClose() throws Exception {
+
+        connection.start();
+        Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        destination = createDestination(session, 
ActiveMQDestination.QUEUE_TYPE);
+
+        sendMessages(connection, destination, 1);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        Message message = consumer.receive(1000);
+        assertNotNull(message);
+        consumer.close();
+
+        try {
+            message.acknowledge();
+            fail("Expect exception on ack after close - consumer gone so 
message available again");
+        } catch (JMSException expected) {}
+
+        Session redispatchSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer redispatchConsumer = 
redispatchSession.createConsumer(destination);
+        Message msg = redispatchConsumer.receive(1000);
+        assertNotNull(msg);
+
+        redispatchSession.close();
+    }
+
+
     public void initCombosForTestAckOfExpired() {
         addCombinationValues("destinationType",
             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), 
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});

http://git-wip-us.apache.org/repos/asf/activemq/blob/e91f5c80/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
index 7deff27..89fb25b 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
@@ -47,4 +47,8 @@ public class JMSXAConsumerTest extends JMSConsumerTest {
 
     public void 
testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws 
Exception {
     }
+
+    // needs client ack, xa is auto ack if no transaction
+    public void testExceptionOnClientAckAfterConsumerClose() throws Exception {
+    }
 }

Reply via email to