Repository: activemq Updated Branches: refs/heads/master e3df09b9d -> 16bc0f0d7
https://issues.apache.org/jira/browse/AMQ-6089 - support TMNOFLAGS as a scan end to allow looping calls to recover to terminate Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/16bc0f0d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/16bc0f0d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/16bc0f0d Branch: refs/heads/master Commit: 16bc0f0d750530643333695ba20f6a736704271c Parents: e3df09b Author: gtully <[email protected]> Authored: Wed Jan 6 12:54:20 2016 +0000 Committer: gtully <[email protected]> Committed: Wed Jan 6 12:54:20 2016 +0000 ---------------------------------------------------------------------- .../org/apache/activemq/TransactionContext.java | 40 +++++++++++--------- .../activemq/broker/XARecoveryBrokerTest.java | 16 ++++++++ 2 files changed, 39 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/16bc0f0d/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java index 6bd7402..77826b1 100755 --- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java +++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java @@ -672,26 +672,32 @@ public class TransactionContext implements XAResource { @Override public Xid[] recover(int flag) throws XAException { LOG.debug("recover({})", flag); + XATransactionId[] answer; - TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); - try { - this.connection.checkClosedOrFailed(); - this.connection.ensureConnectionInfoSent(); - - DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info); - DataStructure[] data = receipt.getData(); - XATransactionId[] answer; - if (data instanceof XATransactionId[]) { - answer = (XATransactionId[])data; - } else { - answer = new XATransactionId[data.length]; - System.arraycopy(data, 0, answer, 0, data.length); + if (XAResource.TMNOFLAGS == flag) { + // signal next in cursor scan, which for us is always the end b/c we don't maintain any cursor state + // allows looping scan to complete + answer = new XATransactionId[0]; + } else { + TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); + try { + this.connection.checkClosedOrFailed(); + this.connection.ensureConnectionInfoSent(); + + DataArrayResponse receipt = (DataArrayResponse) this.connection.syncSendPacket(info); + DataStructure[] data = receipt.getData(); + if (data instanceof XATransactionId[]) { + answer = (XATransactionId[]) data; + } else { + answer = new XATransactionId[data.length]; + System.arraycopy(data, 0, answer, 0, data.length); + } + } catch (JMSException e) { + throw toXAException(e); } - LOG.debug("recover({})={}", flag, answer); - return answer; - } catch (JMSException e) { - throw toXAException(e); } + LOG.debug("recover({})={}", flag, answer); + return answer; } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/16bc0f0d/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java index 2c41673..9660ef0 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java @@ -16,7 +16,9 @@ */ package org.apache.activemq.broker; +import java.util.Arrays; import java.util.HashSet; +import java.util.LinkedList; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -24,7 +26,11 @@ import javax.jms.JMSException; import javax.management.InstanceNotFoundException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; import junit.framework.Test; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.TransactionContext; import org.apache.activemq.broker.jmx.BrokerMBeanSupport; import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean; @@ -102,6 +108,16 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { dar = (DataArrayResponse)response; assertEquals(4, dar.getData().length); + // verify XAResource scan loop + XAResource transactionContextXAResource = new TransactionContext(ActiveMQConnection.makeConnection(broker.getVmConnectorURI().toString())); + LinkedList<Xid> tracked = new LinkedList<Xid>(); + Xid[] recoveryXids = transactionContextXAResource.recover(XAResource.TMSTARTRSCAN); + while (recoveryXids.length > 0) { + tracked.addAll(Arrays.asList(recoveryXids)); + recoveryXids = transactionContextXAResource.recover(XAResource.TMNOFLAGS); + } + assertEquals("got 4 via scan loop", 4, tracked.size()); + // validate destination depth via jmx DestinationViewMBean destinationView = getProxyToDestination(destinationList(destination)[0]); assertEquals("enqueue count does not see prepared", 0, destinationView.getQueueSize());
