Repository: activemq Updated Branches: refs/heads/trunk e8818fafe -> 69c0d399f
make perpared xa transactions visible in kahadb persistenceadapter view mbean Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/69c0d399 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/69c0d399 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/69c0d399 Branch: refs/heads/trunk Commit: 69c0d399fb374700b1b7351671fcaf897a1d3e16 Parents: e8818fa Author: gtully <[email protected]> Authored: Mon Mar 3 13:55:27 2014 +0000 Committer: gtully <[email protected]> Committed: Mon Mar 3 13:55:27 2014 +0000 ---------------------------------------------------------------------- .../broker/jmx/PersistenceAdapterViewMBean.java | 2 +- .../activemq/store/kahadb/MessageDatabase.java | 27 ++++++++++++++++++-- .../activemq/broker/XARecoveryBrokerTest.java | 23 ++++++++++++++--- 3 files changed, 46 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/69c0d399/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java index e99fef2..b860e9c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/PersistenceAdapterViewMBean.java @@ -21,7 +21,7 @@ public interface PersistenceAdapterViewMBean { @MBeanInfo("Name of this persistence adapter.") String getName(); - @MBeanInfo("Current inflight local transactions.") + @MBeanInfo("Inflight transactions.") String getTransactions(); @MBeanInfo("Current data.") http://git-wip-us.apache.org/repos/asf/activemq/blob/69c0d399/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 4775f1b..78e26a9 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -563,6 +563,18 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } } + synchronized (preparedTransactions) { + if (!preparedTransactions.isEmpty()) { + for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) { + TranInfo info = new TranInfo(); + info.id = entry.getKey(); + for (Operation operation : entry.getValue()) { + info.track(operation); + } + infos.add(info); + } + } + } return infos.toString(); } @@ -2290,6 +2302,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe @SuppressWarnings("rawtypes") protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); protected final Set<String> ackedAndPrepared = new HashSet<String>(); + protected final Set<String> rolledBackAcks = new HashSet<String>(); // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback, // till then they are skipped by the store. @@ -2305,12 +2318,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } - public void forgetRecoveredAcks(ArrayList<MessageAck> acks) throws IOException { + public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException { if (acks != null) { this.indexLock.writeLock().lock(); try { for (MessageAck ack : acks) { - ackedAndPrepared.remove(ack.getLastMessageId().toProducerKey()); + final String id = ack.getLastMessageId().toProducerKey(); + ackedAndPrepared.remove(id); + if (rollback) { + rolledBackAcks.add(id); + } } } finally { this.indexLock.writeLock().unlock(); @@ -2933,6 +2950,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return lastGetPriority; } + public boolean alreadyDispatched(Long sequence) { + return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) || + (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) || + (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence); + } + class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{ Iterator<Entry<Long, MessageKeys>>currentIterator; final Iterator<Entry<Long, MessageKeys>>highIterator; http://git-wip-us.apache.org/repos/asf/activemq/blob/69c0d399/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java index edab489..fb570b2 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java @@ -25,10 +25,13 @@ import javax.management.InstanceNotFoundException; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import junit.framework.Test; +import org.apache.activemq.broker.jmx.BrokerMBeanSupport; import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean; import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.command.*; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.apache.activemq.util.JMXSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +80,14 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { DataArrayResponse dar = (DataArrayResponse)response; assertEquals(4, dar.getData().length); + // view prepared in kahadb view + if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { + PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString()); + String txFromView = kahadbView.getTransactions(); + LOG.info("Tx view fromm PA:" + txFromView); + assertTrue("xid with our dud format in transaction string " + txFromView, txFromView.contains("XID:[55,")); + } + // restart the broker. restartBroker(); @@ -125,6 +136,12 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { } } + private PersistenceAdapterViewMBean getProxyToPersistenceAdapter(String name) throws MalformedObjectNameException, JMSException { + return (PersistenceAdapterViewMBean)broker.getManagementContext().newProxyInstance( + BrokerMBeanSupport.createPersistenceAdapterName(broker.getBrokerObjectName().toString(), name), + PersistenceAdapterViewMBean.class, true); + } + private RecoveredXATransactionViewMBean getProxyToPreparedTransactionViewMBean(TransactionId xid) throws MalformedObjectNameException, JMSException { ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,transactionType=RecoveredXaTransaction,Xid=" + @@ -216,7 +233,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { // Commit the prepared transactions. for (int i = 0; i < dar.getData().length; i++) { - connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId)dar.getData()[i])); + connection.request(createCommitTransaction2Phase(connectionInfo, (TransactionId) dar.getData()[i])); } // We should get the committed transactions. @@ -304,7 +321,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { // Commit the prepared transactions. for (int i = 0; i < dar.getData().length; i++) { - connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId) dar.getData()[i])); + connection.request(createCommitTransaction2Phase(connectionInfo, (TransactionId) dar.getData()[i])); } // We should get the committed transactions. @@ -1057,7 +1074,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { } MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE); ack.setTransactionId(txid); - connection.send(ack); + connection.request(ack); } // Don't commit
