Repository: activemq
Updated Branches:
  refs/heads/activemq-5.15.x eabfa451d -> 8e9f80e7d


AMQ-6796 Avoid double Ack on commit in STOMP individual ack mode

During commit of transaction when subscription mode is individual ack
the messages can get double acked leading to an error about receipt of
an invalid ack.
(cherry picked from commit 4c986d102cd5c862d88fd84eec1889b9786e9970)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8e9f80e7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8e9f80e7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8e9f80e7

Branch: refs/heads/activemq-5.15.x
Commit: 8e9f80e7d6fdee3e3ecd2293fa364c05a2e9b18b
Parents: eabfa45
Author: Timothy Bish <[email protected]>
Authored: Wed Aug 23 12:21:03 2017 -0400
Committer: Timothy Bish <[email protected]>
Committed: Wed Aug 23 12:21:30 2017 -0400

----------------------------------------------------------------------
 .../transport/stomp/StompSubscription.java      |  5 +-
 .../activemq/transport/stomp/Stomp11Test.java   | 69 ++++++++++++++++++++
 2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8e9f80e7/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
 
b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
index 889b6f7..dbbe871 100644
--- 
a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
+++ 
b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
@@ -115,8 +115,11 @@ public class StompSubscription {
                 }
             }
 
-            if (!unconsumedMessage.isEmpty()) {
+            // For individual Ack we already sent an Ack that will be applied 
on commit
+            // we don't send a second standard Ack as that would produce an 
error.
+            if (!unconsumedMessage.isEmpty() && ackMode == CLIENT_ACK) {
                 ack = new MessageAck(unconsumedMessage.getLast(), 
MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
+                ack.setTransactionId(transactionId);
                 unconsumedMessage.clear();
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8e9f80e7/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
 
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
index e12eaca..8679684 100644
--- 
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
+++ 
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
@@ -1197,4 +1197,73 @@ public class Stomp11Test extends StompTestSupport {
         String receipt = stompConnection.receiveFrame();
         assertTrue(receipt.contains("RECEIPT"));
     }
+
+    @Test(timeout = 60000)
+    public void testAckMessagesInTransactionOutOfOrderWithTXClientAck() throws 
Exception {
+        doTestAckMessagesInTransactionOutOfOrderWithTXClientAck("client");
+    }
+
+    @Test(timeout = 60000)
+    public void 
testAckMessagesInTransactionOutOfOrderWithTXClientIndividualAck() throws 
Exception {
+        
doTestAckMessagesInTransactionOutOfOrderWithTXClientAck("client-individual");
+    }
+
+    public void doTestAckMessagesInTransactionOutOfOrderWithTXClientAck(String 
ackMode) throws Exception {
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Message 1"));
+        producer.send(session.createTextMessage("Message 2"));
+        producer.close();
+
+        String frame = "STOMP\n" + "login:system\n" + "passcode:manager\n" +
+            "accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + 
"\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        String f = stompConnection.receiveFrame();
+        assertTrue(f.startsWith("CONNECTED"));
+
+        final QueueViewMBean queueView = getProxyToQueue(getQueueName());
+        assertEquals(2, queueView.getQueueSize());
+
+        frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+            "id:12345\n" + "ack:" + ackMode + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame receivedFirst = stompConnection.receive();
+        assertTrue(receivedFirst.getAction().equals("MESSAGE"));
+        StompFrame receivedSecond = stompConnection.receive();
+        assertTrue(receivedSecond.getAction().equals("MESSAGE"));
+
+        // ack second, then first message
+        frame = "ACK\n" + "transaction: tx1\n" + "subscription:12345\n" + 
"message-id:" +
+                receivedSecond.getHeaders().get("message-id") + "\n\n" + 
Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        frame = "ACK\n" + "transaction: tx1\n" + "subscription:12345\n" + 
"message-id:" +
+                receivedFirst.getHeaders().get("message-id") + "\n\n" + 
Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        // commit transaction
+        frame = "COMMIT\n" + "receipt:1\n" + "transaction: tx1\n" + "\n\n" + 
Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        String receipt = stompConnection.receiveFrame();
+        LOG.debug("Receipt Frame = {}", receipt);
+        assertTrue(receipt.contains("RECEIPT"));
+
+        assertTrue("Message not ack'd", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getQueueSize() == 0;
+            }
+        }));
+
+        String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + 
getQueueName() + "\n" +
+            "receipt:1\n" + "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(unsub);
+
+        receipt = stompConnection.receiveFrame();
+        assertTrue(receipt.contains("RECEIPT"));
+    }
 }

Reply via email to