Repository: activemq Updated Branches: refs/heads/trunk 7d136de42 -> 52d95ee01
https://issues.apache.org/jira/browse/AMQ-5423 Ensure that pendingAcks map is cleaned up either on a single message ACK / NACK or on TX commit or rollback Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/52d95ee0 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/52d95ee0 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/52d95ee0 Branch: refs/heads/trunk Commit: 52d95ee01cf57ad5cba63a8d2c809948aa432990 Parents: 7d136de Author: Timothy Bish <[email protected]> Authored: Wed Nov 5 15:32:55 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Wed Nov 5 15:32:55 2014 -0500 ---------------------------------------------------------------------- .../activemq/transport/stomp/ProtocolConverter.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/52d95ee0/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index edefb15..d366962 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -369,7 +369,7 @@ public class ProtocolConverter { boolean nacked = false; if (ackId != null) { - AckEntry pendingAck = this.pedingAcks.get(ackId); + AckEntry pendingAck = this.pedingAcks.remove(ackId); if (pendingAck != null) { messageId = pendingAck.getMessageId(); MessageAck ack = pendingAck.onMessageNack(activemqTx); @@ -425,8 +425,7 @@ public class ProtocolConverter { boolean acked = false; if (ackId != null) { - - AckEntry pendingAck = this.pedingAcks.get(ackId); + AckEntry pendingAck = this.pedingAcks.remove(ackId); if (pendingAck != null) { messageId = pendingAck.getMessageId(); MessageAck ack = pendingAck.onMessageAck(activemqTx); @@ -437,7 +436,6 @@ public class ProtocolConverter { } } else if (subscriptionId != null) { - StompSubscription sub = this.subscriptions.get(subscriptionId); if (sub != null) { MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); @@ -446,13 +444,10 @@ public class ProtocolConverter { acked = true; } } - } else { - // STOMP v1.0: acking with just a message id is very bogus since the same message id // could have been sent to 2 different subscriptions on the same Stomp connection. // For example, when 2 subs are created on the same topic. - for (StompSubscription sub : subscriptionsByConsumerId.values()) { MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); if (ack != null) { @@ -513,6 +508,8 @@ public class ProtocolConverter { sub.onStompCommit(activemqTx); } + pedingAcks.clear(); + TransactionInfo tx = new TransactionInfo(); tx.setConnectionId(connectionId); tx.setTransactionId(activemqTx); @@ -542,6 +539,8 @@ public class ProtocolConverter { } } + pedingAcks.clear(); + TransactionInfo tx = new TransactionInfo(); tx.setConnectionId(connectionId); tx.setTransactionId(activemqTx);
