Repository: activemq Updated Branches: refs/heads/trunk 5cd56e7fb -> c5f183548
https://issues.apache.org/jira/browse/AMQ-5456 Apply patch from gemmellr to ensure that TX messages accepted retain the TX state until commit. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c5f18354 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c5f18354 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c5f18354 Branch: refs/heads/trunk Commit: c5f183548eb486bac3061a25c5f108bf52d18636 Parents: 5cd56e7 Author: Timothy Bish <[email protected]> Authored: Tue Nov 25 09:15:36 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Tue Nov 25 09:15:36 2014 -0500 ---------------------------------------------------------------------- .../transport/amqp/AmqpProtocolConverter.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/c5f18354/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 35cc491..9a95725 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -72,6 +72,7 @@ import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Outcome; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.Released; import org.apache.qpid.proton.amqp.messaging.Target; @@ -1148,12 +1149,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { if (state instanceof TransactionalState) { TransactionalState txState = (TransactionalState) state; - if (txState.getOutcome() instanceof DeliveryState) { - LOG.trace("onDelivery: TX delivery state = {}", state); - state = (DeliveryState) txState.getOutcome(); - if (state instanceof Accepted) { + LOG.trace("onDelivery: TX delivery state = {}", state); + if (txState.getOutcome() != null) { + Outcome outcome = txState.getOutcome(); + if (outcome instanceof Accepted) { if (!delivery.remotelySettled()) { - delivery.disposition(new Accepted()); + TransactionalState txAccepted = new TransactionalState(); + txAccepted.setOutcome(Accepted.getInstance()); + txAccepted.setTxnId(((TransactionalState) state).getTxnId()); + + delivery.disposition(txAccepted); } settle(delivery, MessageAck.DELIVERED_ACK_TYPE); }
